consolidate dl queue notification to method

This commit is contained in:
simon 2023-04-22 13:42:54 +07:00
parent 76535c6304
commit bc39561606
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 16 additions and 19 deletions

View File

@ -166,37 +166,24 @@ class VideoDownloader:
break break
youtube_id = video_data.get("youtube_id") youtube_id = video_data.get("youtube_id")
video_type = VideoTypeEnum(video_data["vid_type"]) print(f"{youtube_id}: Downloading video")
print(f"{youtube_id}: Downloading type: {video_type.value}") self._notify(video_data, "Validate download format")
success = self._dl_single_vid(youtube_id) success = self._dl_single_vid(youtube_id)
if not success: if not success:
continue continue
if self.task: self._notify(video_data, "Add video metadata to index")
self.task.send_progress(
[
f"Processing video {youtube_id}",
"Add video metadata to index.",
]
)
vid_dict = index_new_video( vid_dict = index_new_video(
youtube_id, youtube_id,
video_overwrites=self.video_overwrites, video_overwrites=self.video_overwrites,
video_type=video_type, video_type=VideoTypeEnum(video_data["vid_type"]),
) )
self.channels.add(vid_dict["channel"]["channel_id"]) self.channels.add(vid_dict["channel"]["channel_id"])
self.videos.add(vid_dict["youtube_id"]) self.videos.add(vid_dict["youtube_id"])
if self.task: self._notify(video_data, "Move downloaded file to archive")
self.task.send_progress(
[
f"Processing video {youtube_id}",
"Move downloaded file to archive.",
]
)
self.move_to_archive(vid_dict) self.move_to_archive(vid_dict)
self._delete_from_pending(youtube_id) self._delete_from_pending(youtube_id)
@ -204,6 +191,15 @@ class VideoDownloader:
self._add_subscribed_channels() self._add_subscribed_channels()
DownloadPostProcess(self).run() DownloadPostProcess(self).run()
def _notify(self, video_data, message):
"""send progress notification to task"""
if not self.task:
return
typ = VideoTypeEnum(video_data["vid_type"]).value.rstrip("s").title()
title = video_data.get("title")
self.task.send_progress([f"Processing {typ}: {title}", message])
def _get_next(self, auto_only): def _get_next(self, auto_only):
"""get next item in queue""" """get next item in queue"""
must_list = [{"term": {"status": {"value": "pending"}}}] must_list = [{"term": {"status": {"value": "pending"}}}]
@ -228,6 +224,7 @@ class VideoDownloader:
def _get_overwrites(self): def _get_overwrites(self):
"""get channel overwrites""" """get channel overwrites"""
pending = PendingList() pending = PendingList()
pending.get_download()
pending.get_channels() pending.get_channels()
self.video_overwrites = pending.video_overwrites self.video_overwrites = pending.video_overwrites
@ -386,7 +383,7 @@ class VideoDownloader:
@staticmethod @staticmethod
def _delete_from_pending(youtube_id): def _delete_from_pending(youtube_id):
"""delete downloaded video from pending index if its there""" """delete downloaded video from pending index if its there"""
path = f"ta_download/_doc/{youtube_id}" path = f"ta_download/_doc/{youtube_id}?refresh=true"
_, _ = ElasticWrap(path).delete() _, _ = ElasticWrap(path).delete()
def _add_subscribed_channels(self): def _add_subscribed_channels(self):