refactor download_pending task

This commit is contained in:
simon 2023-03-15 13:28:19 +07:00
parent 488711ee8f
commit 2c719ae1ae
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
3 changed files with 60 additions and 120 deletions

View File

@ -22,7 +22,7 @@ from home.src.index.video import YoutubeVideo, index_new_video
from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig
from home.src.ta.helper import clean_string, ignore_filelist
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.ta_redis import RedisQueue
class DownloadPostProcess:
@ -125,28 +125,23 @@ class DownloadPostProcess:
def _notify_playlist_progress(self, all_channel_playlist, id_c, id_p):
"""notify to UI"""
title = (
"Processing playlists for channels: "
+ f"{id_c + 1}/{len(self.download.channels)}"
)
message = f"Progress: {id_p + 1}/{len(all_channel_playlist)}"
key = "message:download"
mess_dict = {
"status": key,
"level": "info",
"title": title,
"message": message,
}
if id_p + 1 == len(all_channel_playlist):
expire = 4
else:
expire = True
if not self.download.task:
return
RedisArchivist().set_message(key, mess_dict, expire=expire)
total_channel = len(self.download.channels)
total_playlist = len(all_channel_playlist)
message = [f"Validate Playlists {id_p + 1}/{total_playlist}"]
title = f"Post Processing Channels: {id_c + 1}/{total_channel}"
progress = (id_c + 1) / total_channel
self.download.task.send_progress(
message, progress=progress, title=title
)
def get_comments(self):
"""get comments from youtube"""
CommentList(self.download.videos).index(notify=True)
CommentList(self.download.videos, task=self.download.task).index()
class VideoDownloader:
@ -157,10 +152,11 @@ class VideoDownloader:
MSG = "message:download"
def __init__(self, youtube_id_list=False):
def __init__(self, youtube_id_list=False, task=False):
self.obs = False
self.video_overwrites = False
self.youtube_id_list = youtube_id_list
self.task = task
self.config = AppConfig().config
self._build_obs()
self.channels = set()
@ -181,11 +177,7 @@ class VideoDownloader:
if not youtube_data:
break
try:
youtube_data = json.loads(youtube_data)
except json.JSONDecodeError: # This many not be necessary
continue
youtube_data = json.loads(youtube_data)
youtube_id = youtube_data.get("youtube_id")
tmp_vid_type = youtube_data.get(
@ -198,13 +190,8 @@ class VideoDownloader:
if not success:
continue
mess_dict = {
"status": self.MSG,
"level": "info",
"title": "Indexing....",
"message": "Add video metadata to index.",
}
RedisArchivist().set_message(self.MSG, mess_dict, expire=120)
if self.task:
self.task.send_progress(["Add video metadata to index."])
vid_dict = index_new_video(
youtube_id,
@ -213,29 +200,20 @@ class VideoDownloader:
)
self.channels.add(vid_dict["channel"]["channel_id"])
self.videos.add(vid_dict["youtube_id"])
mess_dict = {
"status": self.MSG,
"level": "info",
"title": "Moving....",
"message": "Moving downloaded file to storage folder",
}
RedisArchivist().set_message(self.MSG, mess_dict)
if self.task:
self.task.send_progress(["Move downloaded file to archive."])
self.move_to_archive(vid_dict)
if queue.has_item():
message = "Continue with next video."
expire = False
else:
message = "Download queue is finished."
expire = 10
self.move_to_archive(vid_dict)
mess_dict = {
"status": self.MSG,
"level": "info",
"title": "Completed",
"message": message,
}
RedisArchivist().set_message(self.MSG, mess_dict, expire=expire)
if self.task:
self.task.send_progress([message])
self._delete_from_pending(youtube_id)
# post processing
@ -256,13 +234,9 @@ class VideoDownloader:
def add_pending(self):
"""add pending videos to download queue"""
mess_dict = {
"status": self.MSG,
"level": "info",
"title": "Looking for videos to download",
"message": "Scanning your download queue.",
}
RedisArchivist().set_message(self.MSG, mess_dict)
if self.task:
self.task.send_progress(["Scanning your download queue."])
pending = PendingList()
pending.get_download()
to_add = [
@ -279,40 +253,32 @@ class VideoDownloader:
if not to_add:
# there is nothing pending
print("download queue is empty")
mess_dict = {
"status": self.MSG,
"level": "error",
"title": "Download queue is empty",
"message": "Add some videos to the queue first.",
}
RedisArchivist().set_message(self.MSG, mess_dict, expire=True)
if self.task:
self.task.send_progress(["Download queue is empty."])
return
RedisQueue(queue_name="dl_queue").add_list(to_add)
def _progress_hook(self, response):
"""process the progress_hooks from yt_dlp"""
title = "Downloading: " + response["info_dict"]["title"]
progress = False
try:
size = response.get("_total_bytes_str")
if size.strip() == "N/A":
size = response.get("_total_bytes_estimate_str", "N/A")
percent = response["_percent_str"]
progress = float(percent.strip("%")) / 100
speed = response["_speed_str"]
eta = response["_eta_str"]
message = f"{percent} of {size} at {speed} - time left: {eta}"
except KeyError:
message = "processing"
mess_dict = {
"status": self.MSG,
"level": "info",
"title": title,
"message": message,
}
RedisArchivist().set_message(self.MSG, mess_dict, expire=True)
if self.task:
title = response["info_dict"]["title"]
self.task.send_progress([title, message], progress=progress)
def _build_obs(self):
"""collection to build all obs passed to yt-dlp"""

View File

@ -10,7 +10,6 @@ from datetime import datetime
from home.src.download.yt_dlp_base import YtWrap
from home.src.es.connect import ElasticWrap
from home.src.ta.config import AppConfig
from home.src.ta.ta_redis import RedisArchivist
class Comments:
@ -24,14 +23,13 @@ class Comments:
self.is_activated = False
self.comments_format = False
def build_json(self, notify=False):
def build_json(self):
"""build json document for es"""
print(f"{self.youtube_id}: get comments")
self.check_config()
if not self.is_activated:
return
self._send_notification(notify)
comments_raw, channel_id = self.get_yt_comments()
if not comments_raw and not channel_id:
return
@ -52,23 +50,6 @@ class Comments:
self.is_activated = bool(self.config["downloads"]["comment_max"])
@staticmethod
def _send_notification(notify):
"""send notification for download post process message"""
if not notify:
return
key = "message:download"
idx, total_videos = notify
message = {
"status": key,
"level": "info",
"title": "Download and index comments",
"message": f"Progress: {idx + 1}/{total_videos}",
}
RedisArchivist().set_message(key, message)
def build_yt_obs(self):
"""
get extractor config
@ -200,38 +181,29 @@ class Comments:
class CommentList:
"""interact with comments in group"""
def __init__(self, video_ids):
def __init__(self, video_ids, task=False):
self.video_ids = video_ids
self.task = task
self.config = AppConfig().config
def index(self, notify=False):
"""index group of videos"""
def index(self):
"""index comments for list, init with task object to notify"""
if not self.config["downloads"].get("comment_max"):
return
total_videos = len(self.video_ids)
if notify:
self._notify(f"add comments for {total_videos} videos", False)
for idx, youtube_id in enumerate(self.video_ids):
if self.task:
self.notify(idx, total_videos)
for idx, video_id in enumerate(self.video_ids):
comment = Comments(video_id, config=self.config)
if notify:
notify = (idx, total_videos)
comment.build_json(notify=notify)
comment = Comments(youtube_id, config=self.config)
comment.build_json()
if comment.json_data:
comment.upload_comments()
if notify:
self._notify(f"added comments for {total_videos} videos", 5)
@staticmethod
def _notify(message, expire):
"""send notification"""
key = "message:download"
message = {
"status": key,
"level": "info",
"title": "Download and index comments finished",
"message": message,
}
RedisArchivist().set_message(key, message, expire=expire)
def notify(self, idx, total_videos):
"""send notification on task"""
message = [f"Add comments for new videos {idx + 1}/{total_videos}"]
progress = (idx + 1) / total_videos
title = "Index Comments"
self.task.send_progress(message, progress=progress, title=title)

View File

@ -114,9 +114,8 @@ class BaseTask(Task):
message.update({"messages": ["New task received."]})
RedisArchivist().set_message(key, message)
def send_progress(self, message_lines, progress=False):
def send_progress(self, message_lines, progress=False, title=False):
"""send progress message"""
print(f"{self.request.id}: {progress}")
message, key = self._build_message()
message.update(
{
@ -124,6 +123,9 @@ class BaseTask(Task):
"progress": progress,
}
)
if title:
message["title"] = title
RedisArchivist().set_message(key, message)
def _build_message(self, level="info"):
@ -162,7 +164,7 @@ def update_subscribed(self):
extrac_dl.delay(missing_videos)
@shared_task(name="download_pending", bind=True)
@shared_task(name="download_pending", bind=True, base=BaseTask)
def download_pending(self, from_queue=True):
"""download latest pending videos"""
manager = TaskManager()
@ -171,7 +173,7 @@ def download_pending(self, from_queue=True):
return
manager.init(self)
downloader = VideoDownloader()
downloader = VideoDownloader(task=self)
if from_queue:
downloader.add_pending()
downloader.run_queue()