refactor extract dl task

This commit is contained in:
simon 2023-03-14 16:40:05 +07:00
parent 78f04a2ffc
commit 488711ee8f
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
3 changed files with 63 additions and 59 deletions

View File

@ -18,7 +18,7 @@ from home.src.index.playlist import YoutubePlaylist
from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig
from home.src.ta.helper import DurationConverter, is_shorts
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.ta_redis import RedisQueue
class PendingIndex:
@ -163,10 +163,11 @@ class PendingList(PendingIndex):
"simulate": True,
}
def __init__(self, youtube_ids=False):
def __init__(self, youtube_ids=False, task=False):
super().__init__()
self.config = AppConfig().config
self.youtube_ids = youtube_ids
self.task = task
self.to_skip = False
self.missing_videos = False
@ -175,16 +176,16 @@ class PendingList(PendingIndex):
self.missing_videos = []
self.get_download()
self.get_indexed()
for entry in self.youtube_ids:
# notify
mess_dict = {
"status": "message:add",
"level": "info",
"title": "Adding to download queue.",
"message": "Extracting lists",
}
RedisArchivist().set_message("message:add", mess_dict, expire=True)
total = len(self.youtube_ids)
for idx, entry in enumerate(self.youtube_ids):
self._process_entry(entry)
if not self.task:
continue
self.task.send_progress(
message_lines=[f"Extracting items {idx + 1}/{total}"],
progress=(idx + 1) / total,
)
def _process_entry(self, entry):
"""process single entry from url list"""
@ -238,9 +239,10 @@ class PendingList(PendingIndex):
self.get_channels()
bulk_list = []
total = len(self.missing_videos)
for idx, (youtube_id, vid_type) in enumerate(self.missing_videos):
print(f"{youtube_id} ({vid_type}): add to download queue")
self._notify_add(idx)
print(f"{youtube_id}: [{idx + 1}/{total}]: add to queue")
self._notify_add(idx, total)
video_details = self.get_youtube_details(youtube_id, vid_type)
if not video_details:
continue
@ -253,29 +255,34 @@ class PendingList(PendingIndex):
url = video_details["vid_thumb_url"]
ThumbManager(youtube_id).download_video_thumb(url)
if bulk_list:
# add last newline
bulk_list.append("\n")
query_str = "\n".join(bulk_list)
_, _ = ElasticWrap("_bulk").post(query_str, ndjson=True)
if len(bulk_list) >= 20:
self._ingest_bulk(bulk_list)
bulk_list = []
def _notify_add(self, idx):
self._ingest_bulk(bulk_list)
def _ingest_bulk(self, bulk_list):
"""add items to queue in bulk"""
if not bulk_list:
return
# add last newline
bulk_list.append("\n")
query_str = "\n".join(bulk_list)
_, _ = ElasticWrap("_bulk").post(query_str, ndjson=True)
def _notify_add(self, idx, total):
"""send notification for adding videos to download queue"""
progress = f"{idx + 1}/{len(self.missing_videos)}"
mess_dict = {
"status": "message:add",
"level": "info",
"title": "Adding new videos to download queue.",
"message": "Progress: " + progress,
}
if idx + 1 == len(self.missing_videos):
expire = 4
else:
expire = True
if not self.task:
return
RedisArchivist().set_message("message:add", mess_dict, expire=expire)
if idx + 1 % 25 == 0:
print("adding to queue progress: " + progress)
self.task.send_progress(
message_lines=[
"Adding new videos to download queue.",
f"Extracting items {idx + 1}/{total}",
],
progress=(idx + 1) / total,
)
def get_youtube_details(self, youtube_id, vid_type=VideoTypeEnum.VIDEOS):
"""get details from youtubedl for single pending video"""

View File

@ -46,7 +46,7 @@ class ChannelSubscription:
last_videos = []
for vid_type, limit_amount in queries:
for vid_type_enum, limit_amount in queries:
obs = {
"skip_download": True,
"extract_flat": True,
@ -54,9 +54,9 @@ class ChannelSubscription:
if limit:
obs["playlistend"] = limit_amount
path = vid_type.value
vid_type = vid_type_enum.value
channel = YtWrap(obs, self.config).extract(
f"https://www.youtube.com/channel/{channel_id}/{path}"
f"https://www.youtube.com/channel/{channel_id}/{vid_type}"
)
if not channel:
continue
@ -278,20 +278,16 @@ class SubscriptionScanner:
def scan(self):
"""scan channels and playlists"""
if self.task:
self.task.send_progress(["Rescanning channels and playlists."])
self.missing_videos = []
self._notify()
self._scan_channels()
self._scan_playlists()
if not self.missing_videos:
return
self.scan_channels()
self.scan_playlists()
self.add_to_pending()
return self.missing_videos
def _notify(self):
"""set redis notification"""
self.task.send_progress(["Rescanning channels and playlists."])
def _scan_channels(self):
def scan_channels(self):
"""get missing from channels"""
channel_handler = ChannelSubscription(task=self.task)
missing = channel_handler.find_missing()
@ -303,7 +299,7 @@ class SubscriptionScanner:
{"type": "video", "vid_type": vid_type, "url": vid_id}
)
def _scan_playlists(self):
def scan_playlists(self):
"""get missing from playlists"""
playlist_handler = PlaylistSubscription(task=self.task)
missing = playlist_handler.find_missing()
@ -312,15 +308,13 @@ class SubscriptionScanner:
for i in missing:
self.missing_videos.append(
{"type": "video", "vid_type": VideoTypeEnum.VIDEOS, "url": i}
{
"type": "video",
"vid_type": VideoTypeEnum.VIDEOS.value,
"url": i,
}
)
def add_to_pending(self):
"""add missing videos to pending queue"""
pending_handler = queue.PendingList(youtube_ids=self.missing_videos)
pending_handler.parse_url_list()
pending_handler.add_to_pending()
class SubscriptionHandler:
"""subscribe to channels and playlists from url_str"""

View File

@ -156,7 +156,10 @@ def update_subscribed(self):
return
manager.init(self)
SubscriptionScanner(task=self).scan()
missing_videos = SubscriptionScanner(task=self).scan()
if missing_videos:
print(missing_videos)
extrac_dl.delay(missing_videos)
@shared_task(name="download_pending", bind=True)
@ -174,10 +177,10 @@ def download_pending(self, from_queue=True):
downloader.run_queue()
@shared_task(name="extract_download")
def extrac_dl(youtube_ids):
@shared_task(name="extract_download", bind=True, base=BaseTask)
def extrac_dl(self, youtube_ids):
"""parse list passed and add to pending"""
pending_handler = PendingList(youtube_ids=youtube_ids)
pending_handler = PendingList(youtube_ids=youtube_ids, task=self)
pending_handler.parse_url_list()
pending_handler.add_to_pending()