refactor subs rescan to new task manager

This commit is contained in:
simon 2023-03-14 15:42:42 +07:00
parent 58530563ce
commit 78f04a2ffc
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
2 changed files with 52 additions and 47 deletions

View File

@ -19,8 +19,9 @@ from home.src.ta.urlparser import Parser
class ChannelSubscription:
"""manage the list of channels subscribed"""
def __init__(self):
def __init__(self, task=False):
self.config = AppConfig().config
self.task = task
@staticmethod
def get_channels(subscribed_only=True):
@ -102,12 +103,16 @@ class ChannelSubscription:
def find_missing(self):
"""add missing videos from subscribed channels to pending"""
all_channels = self.get_channels()
if not all_channels:
return False
pending = queue.PendingList()
pending.get_download()
pending.get_indexed()
missing_videos = []
total = len(all_channels)
for idx, channel in enumerate(all_channels):
channel_id = channel["channel_id"]
print(f"{channel_id}: find missing videos.")
@ -117,20 +122,13 @@ class ChannelSubscription:
for video_id, _, vid_type in last_videos:
if video_id not in pending.to_skip:
missing_videos.append((video_id, vid_type))
# notify
message = {
"status": "message:rescan",
"level": "info",
"title": "Scanning channels: Looking for new videos.",
"message": f"Progress: {idx + 1}/{len(all_channels)}",
}
if idx + 1 == len(all_channels):
expire = 4
else:
expire = True
RedisArchivist().set_message(
"message:rescan", message=message, expire=expire
if not self.task:
continue
self.task.send_progress(
message_lines=[f"Scanning Channel {idx + 1}/{total}"],
progress=(idx + 1) / total,
)
return missing_videos
@ -148,8 +146,9 @@ class ChannelSubscription:
class PlaylistSubscription:
"""manage the playlist download functionality"""
def __init__(self):
def __init__(self, task=False):
self.config = AppConfig().config
self.task = task
@staticmethod
def get_playlists(subscribed_only=True):
@ -233,14 +232,18 @@ class PlaylistSubscription:
def find_missing(self):
"""find videos in subscribed playlists not downloaded yet"""
all_playlists = [i["playlist_id"] for i in self.get_playlists()]
if not all_playlists:
return False
to_ignore = self.get_to_ignore()
missing_videos = []
total = len(all_playlists)
for idx, playlist_id in enumerate(all_playlists):
size_limit = self.config["subscriptions"]["channel_size"]
playlist = YoutubePlaylist(playlist_id)
playlist.update_playlist()
if not playlist:
is_active = playlist.update_playlist()
if not is_active:
playlist.deactivate()
continue
@ -250,28 +253,27 @@ class PlaylistSubscription:
all_missing = [i for i in playlist_entries if not i["downloaded"]]
message = {
"status": "message:rescan",
"level": "info",
"title": "Scanning playlists: Looking for new videos.",
"message": f"Progress: {idx + 1}/{len(all_playlists)}",
}
RedisArchivist().set_message(
"message:rescan", message=message, expire=True
)
for video in all_missing:
youtube_id = video["youtube_id"]
if youtube_id not in to_ignore:
missing_videos.append(youtube_id)
if not self.task:
continue
self.task.send_progress(
message_lines=[f"Scanning Playlists {idx + 1}/{total}"],
progress=(idx + 1) / total,
)
return missing_videos
class SubscriptionScanner:
"""add missing videos to queue"""
def __init__(self):
def __init__(self, task=False):
self.task = task
self.missing_videos = False
def scan(self):
@ -287,17 +289,11 @@ class SubscriptionScanner:
def _notify(self):
"""set redis notification"""
message = {
"status": "message:rescan",
"level": "info",
"title": "Rescanning channels and playlists.",
"message": "Looking for new videos.",
}
RedisArchivist().set_message("message:rescan", message, expire=True)
self.task.send_progress(["Rescanning channels and playlists."])
def _scan_channels(self):
"""get missing from channels"""
channel_handler = ChannelSubscription()
channel_handler = ChannelSubscription(task=self.task)
missing = channel_handler.find_missing()
if not missing:
return
@ -309,7 +305,7 @@ class SubscriptionScanner:
def _scan_playlists(self):
"""get missing from playlists"""
playlist_handler = PlaylistSubscription()
playlist_handler = PlaylistSubscription(task=self.task)
missing = playlist_handler.find_missing()
if not missing:
return

View File

@ -103,23 +103,32 @@ class BaseTask(Task):
def on_success(self, retval, task_id, args, kwargs):
"""callback task completed successfully"""
print(f"{task_id} success callback")
message, key = self._build_message(task_id)
message.update({"message": "Task completed successfully"})
message, key = self._build_message()
message.update({"messages": ["Task completed successfully"]})
RedisArchivist().set_message(key, message, expire=5)
def before_start(self, task_id, args, kwargs):
"""callback before initiating task"""
print(f"{self.name} create callback")
message, key = self._build_message(task_id)
message.update({"message": "New task received."})
message, key = self._build_message()
message.update({"messages": ["New task received."]})
RedisArchivist().set_message(key, message)
def print_progress(self, task_id, progress):
"""print progress"""
print(f"{task_id}: {progress}")
def send_progress(self, message_lines, progress=False):
"""send progress message"""
print(f"{self.request.id}: {progress}")
message, key = self._build_message()
message.update(
{
"messages": message_lines,
"progress": progress,
}
)
RedisArchivist().set_message(key, message)
def _build_message(self, task_id, level="info"):
def _build_message(self, level="info"):
"""build message dict"""
task_id = self.request.id
config = self.TASK_CONFIG.get(self.name)
message = {
"status": config.get("group"),
@ -131,7 +140,7 @@ class BaseTask(Task):
return message, key
@shared_task(name="update_subscribed", bind=True)
@shared_task(name="update_subscribed", bind=True, base=BaseTask)
def update_subscribed(self):
"""look for missing videos and add to pending"""
manager = TaskManager()
@ -147,7 +156,7 @@ def update_subscribed(self):
return
manager.init(self)
SubscriptionScanner().scan()
SubscriptionScanner(task=self).scan()
@shared_task(name="download_pending", bind=True)