refactor rescan and add to pending

This commit is contained in:
simon 2023-03-01 19:12:27 +07:00
parent 6328e316f4
commit ea002ad45d
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
2 changed files with 85 additions and 60 deletions

View File

@ -265,3 +265,61 @@ class PlaylistSubscription:
missing_videos.append(youtube_id)
return missing_videos
class SubscriptionScanner:
"""add missing videos to queue"""
def __init__(self):
self.missing_videos = False
def scan(self):
"""scan channels and playlists"""
self.missing_videos = []
self._notify()
self._scan_channels()
self._scan_playlists()
if not self.missing_videos:
return
self.add_to_pending()
def _notify(self):
"""set redis notification"""
message = {
"status": "message:rescan",
"level": "info",
"title": "Rescanning channels and playlists.",
"message": "Looking for new videos.",
}
RedisArchivist().set_message("message:rescan", message, expire=True)
def _scan_channels(self):
"""get missing from channels"""
channel_handler = ChannelSubscription()
missing = channel_handler.find_missing()
if not missing:
return
for vid_id, vid_type in missing:
self.missing_videos.append(
{"type": "video", "vid_type": vid_type, "url": vid_id}
)
def _scan_playlists(self):
"""get missing from playlists"""
playlist_handler = PlaylistSubscription()
missing = playlist_handler.find_missing()
if not missing:
return
for i in missing:
self.missing_videos.append(
{"type": "video", "vid_type": VideoTypeEnum.VIDEOS, "url": i}
)
def add_to_pending(self):
"""add missing videos to pending queue"""
pending_handler = queue.PendingList(youtube_ids=self.missing_videos)
pending_handler.parse_url_list()
pending_handler.add_to_pending()

View File

@ -14,6 +14,7 @@ from home.src.download.queue import PendingList
from home.src.download.subscriptions import (
ChannelSubscription,
PlaylistSubscription,
SubscriptionScanner,
)
from home.src.download.thumbnails import ThumbFilesystem, ThumbValidator
from home.src.download.yt_dlp_handler import VideoDownloader
@ -26,6 +27,7 @@ from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder
from home.src.ta.helper import clear_dl_cache
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.task_manager import TaskManager
from home.src.ta.urlparser import Parser
CONFIG = AppConfig().config
@ -44,72 +46,37 @@ app.autodiscover_tasks()
app.conf.timezone = os.environ.get("TZ") or "UTC"
@shared_task(name="update_subscribed")
def update_subscribed():
@shared_task(name="update_subscribed", bind=True)
def update_subscribed(self):
"""look for missing videos and add to pending"""
message = {
"status": "message:rescan",
"level": "info",
"title": "Rescanning channels and playlists.",
"message": "Looking for new videos.",
}
RedisArchivist().set_message("message:rescan", message, expire=True)
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] rescan already running")
message = {
"status": "message:rescan",
"level": "error",
"title": "Rescanning channels and playlists.",
"message": "Rescan already in progress.",
}
RedisArchivist().set_message("message:rescan", message, expire=True)
return
have_lock = False
my_lock = RedisArchivist().get_lock("rescan")
try:
have_lock = my_lock.acquire(blocking=False)
if have_lock:
channel_handler = ChannelSubscription()
missing_from_channels = channel_handler.find_missing()
playlist_handler = PlaylistSubscription()
missing_from_playlists = playlist_handler.find_missing()
if missing_from_channels or missing_from_playlists:
channel_videos = [
{"type": "video", "vid_type": vid_type, "url": vid_id}
for vid_id, vid_type in missing_from_channels
]
playlist_videos = [
{
"type": "video",
"vid_type": VideoTypeEnum.VIDEOS,
"url": i,
}
for i in missing_from_playlists
]
pending_handler = PendingList(
youtube_ids=channel_videos + playlist_videos
)
pending_handler.parse_url_list()
pending_handler.add_to_pending()
else:
print("Did not acquire rescan lock.")
finally:
if have_lock:
my_lock.release()
manager.init(self)
SubscriptionScanner().scan()
@shared_task(name="download_pending")
def download_pending():
@shared_task(name="download_pending", bind=True)
def download_pending(self):
"""download latest pending videos"""
have_lock = False
my_lock = RedisArchivist().get_lock("downloading")
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] download queue already running")
return
try:
have_lock = my_lock.acquire(blocking=False)
if have_lock:
downloader = VideoDownloader()
downloader.add_pending()
downloader.run_queue()
else:
print("Did not acquire download lock.")
finally:
if have_lock:
my_lock.release()
manager.init(self)
downloader = VideoDownloader()
downloader.add_pending()
downloader.run_queue()
@shared_task(name="download_single")