refactor download task, consolidate tasks

This commit is contained in:
simon 2023-03-02 14:17:03 +07:00
parent 4c0de78fb4
commit 666074ed49
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
4 changed files with 21 additions and 46 deletions

View File

@ -15,7 +15,7 @@ from home.src.index.video import SponsorBlock, YoutubeVideo
from home.src.ta.config import AppConfig
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.urlparser import Parser
from home.tasks import check_reindex, download_single, extrac_dl, subscribe_to
from home.tasks import check_reindex, download_pending, extrac_dl, subscribe_to
from rest_framework.authentication import (
SessionAuthentication,
TokenAuthentication,
@ -424,14 +424,15 @@ class DownloadApiView(ApiBaseView):
print(message)
return Response({"message": message}, status=400)
pending_video, status_code = PendingInteract(video_id).get_item()
_, status_code = PendingInteract(video_id).get_item()
if status_code == 404:
message = f"{video_id}: item not found {status_code}"
return Response({"message": message}, status=404)
print(f"{video_id}: change status to {item_status}")
if item_status == "priority":
download_single.delay(pending_video)
PendingInteract(youtube_id=video_id).prioritize()
download_pending.delay(from_queue=False)
else:
PendingInteract(video_id, item_status).update_status()
RedisQueue(queue_name="dl_queue").clear_item(video_id)

View File

@ -18,7 +18,7 @@ from home.src.index.playlist import YoutubePlaylist
from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig
from home.src.ta.helper import DurationConverter, is_shorts
from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
class PendingIndex:
@ -117,6 +117,16 @@ class PendingInteract:
path = f"ta_download/_update/{self.youtube_id}"
_, _ = ElasticWrap(path).post(data=data)
def prioritize(self):
"""prioritize pending item in redis queue"""
pending_video, _ = self.get_item()
vid_type = pending_video.get("vid_type", VideoTypeEnum.VIDEOS.value)
to_add = {
"youtube_id": pending_video["youtube_id"],
"vid_type": vid_type,
}
RedisQueue(queue_name="dl_queue").add_priority(to_add)
def get_item(self):
"""return pending item dict"""
path = f"ta_download/_doc/{self.youtube_id}"

View File

@ -128,8 +128,9 @@ class RedisQueue(RedisBase):
def add_priority(self, to_add):
"""add single video to front of queue"""
self.clear_item(to_add)
self.conn.execute_command("LPUSH", self.key, to_add)
item = json.dumps(to_add)
self.clear_item(item)
self.conn.execute_command("LPUSH", self.key, item)
def get_next(self):
"""return next element in the queue, False if none"""

View File

@ -6,7 +6,6 @@ Functionality:
because tasks are initiated at application start
"""
import json
import os
from celery import Celery, shared_task
@ -22,7 +21,6 @@ from home.src.es.index_setup import ElasitIndexWrap
from home.src.index.channel import YoutubeChannel
from home.src.index.filesystem import ImportFolderScanner, scan_filesystem
from home.src.index.reindex import Reindex, ReindexManual, ReindexOutdated
from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder
from home.src.ta.helper import clear_dl_cache
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
@ -64,7 +62,7 @@ def update_subscribed(self):
@shared_task(name="download_pending", bind=True)
def download_pending(self):
def download_pending(self, from_queue=True):
"""download latest pending videos"""
manager = TaskManager()
if manager.is_pending(self):
@ -73,46 +71,11 @@ def download_pending(self):
manager.init(self)
downloader = VideoDownloader()
downloader.add_pending()
if from_queue:
downloader.add_pending()
downloader.run_queue()
@shared_task(name="download_single")
def download_single(pending_video):
"""start download single video now"""
queue = RedisQueue(queue_name="dl_queue")
to_add = {
"youtube_id": pending_video["youtube_id"],
"vid_type": pending_video.get("vid_type", VideoTypeEnum.VIDEOS.value),
}
queue.add_priority(json.dumps(to_add))
print(f"Added to queue with priority: {to_add}")
# start queue if needed
have_lock = False
my_lock = RedisArchivist().get_lock("downloading")
try:
have_lock = my_lock.acquire(blocking=False)
if have_lock:
key = "message:download"
mess_dict = {
"status": key,
"level": "info",
"title": "Download single video",
"message": "processing",
}
RedisArchivist().set_message(key, mess_dict, expire=True)
VideoDownloader().run_queue()
else:
print("Download queue already running.")
finally:
# release if only single run
if have_lock and not queue.get_next():
my_lock.release()
@shared_task(name="extract_download")
def extrac_dl(youtube_ids):
"""parse list passed and add to pending"""