rewrote download_single and download_pending tasks for redis queue

This commit is contained in:
simon 2021-09-23 18:10:45 +07:00
parent 78d8bc9d24
commit 8fc5926ce7

View File

@ -9,7 +9,7 @@ import os
from celery import Celery, shared_task from celery import Celery, shared_task
from home.src.config import AppConfig from home.src.config import AppConfig
from home.src.download import ChannelSubscription, PendingList, VideoDownloader from home.src.download import ChannelSubscription, PendingList, VideoDownloader
from home.src.helper import get_lock from home.src.helper import RedisQueue, get_lock
from home.src.index_management import backup_all_indexes, restore_from_backup from home.src.index_management import backup_all_indexes, restore_from_backup
from home.src.reindex import ManualImport, reindex_old_documents from home.src.reindex import ManualImport, reindex_old_documents
@ -37,20 +37,47 @@ def update_subscribed():
@shared_task @shared_task
def download_pending(): def download_pending():
"""download latest pending videos""" """download latest pending videos"""
pending_handler = PendingList()
pending_vids = pending_handler.get_all_pending()[0] have_lock = False
to_download = [i["youtube_id"] for i in pending_vids] my_lock = get_lock("downloading")
to_download.reverse()
if to_download: try:
download_handler = VideoDownloader(to_download) have_lock = my_lock.acquire(blocking=False)
download_handler.download_list() if have_lock:
downloader = VideoDownloader()
downloader.add_pending()
downloader.run_queue()
else:
print("Did not acquire download lock.")
finally:
if have_lock:
my_lock.release()
@shared_task @shared_task
def download_single(youtube_id): def download_single(youtube_id):
"""start download single video now""" """start download single video now"""
download_handler = VideoDownloader([youtube_id])
download_handler.download_list() queue = RedisQueue("dl_queue")
queue.add_priority(youtube_id)
print("Added to queue with priority: " + youtube_id)
# start queue if needed
have_lock = False
my_lock = get_lock("downloading")
try:
have_lock = my_lock.acquire(blocking=False)
if have_lock:
VideoDownloader().run_queue()
else:
print("Download queue already running.")
finally:
# release if only single run
if have_lock and not queue.get_next():
my_lock.release()
@shared_task @shared_task