diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 87949a8..91b5a26 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -9,7 +9,7 @@ import os from celery import Celery, shared_task from home.src.config import AppConfig from home.src.download import ChannelSubscription, PendingList, VideoDownloader -from home.src.helper import get_lock +from home.src.helper import RedisQueue, get_lock from home.src.index_management import backup_all_indexes, restore_from_backup from home.src.reindex import ManualImport, reindex_old_documents @@ -37,20 +37,47 @@ def update_subscribed(): @shared_task def download_pending(): """download latest pending videos""" - pending_handler = PendingList() - pending_vids = pending_handler.get_all_pending()[0] - to_download = [i["youtube_id"] for i in pending_vids] - to_download.reverse() - if to_download: - download_handler = VideoDownloader(to_download) - download_handler.download_list() + + have_lock = False + my_lock = get_lock("downloading") + + try: + have_lock = my_lock.acquire(blocking=False) + if have_lock: + downloader = VideoDownloader() + downloader.add_pending() + downloader.run_queue() + else: + print("Did not acquire download lock.") + + finally: + if have_lock: + my_lock.release() @shared_task def download_single(youtube_id): """start download single video now""" - download_handler = VideoDownloader([youtube_id]) - download_handler.download_list() + + queue = RedisQueue("dl_queue") + queue.add_priority(youtube_id) + print("Added to queue with priority: " + youtube_id) + + # start queue if needed + have_lock = False + my_lock = get_lock("downloading") + + try: + have_lock = my_lock.acquire(blocking=False) + if have_lock: + VideoDownloader().run_queue() + else: + print("Download queue already running.") + + finally: + # release if only single run + if have_lock and not queue.get_next(): + my_lock.release() @shared_task