diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index 529526f..7ed11f9 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -6,14 +6,13 @@ functionality: - move to archive """ -import json import os import shutil from datetime import datetime from home.src.download.queue import PendingList from home.src.download.subscriptions import PlaylistSubscription -from home.src.download.yt_dlp_base import CookieHandler, YtWrap +from home.src.download.yt_dlp_base import YtWrap from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.index.channel import YoutubeChannel from home.src.index.comments import CommentList @@ -22,7 +21,6 @@ from home.src.index.video import YoutubeVideo, index_new_video from home.src.index.video_constants import VideoTypeEnum from home.src.ta.config import AppConfig from home.src.ta.helper import clean_string, ignore_filelist -from home.src.ta.ta_redis import RedisQueue class DownloadPostProcess: @@ -159,29 +157,17 @@ class VideoDownloader: self.channels = set() self.videos = set() - def run_queue(self): + def run_queue(self, auto_only=False): """setup download queue in redis loop until no more items""" - self._setup_queue() - queue = RedisQueue(queue_name="dl_queue") - - limit_queue = self.config["downloads"]["limit_count"] - if limit_queue: - queue.trim(limit_queue - 1) - + self._get_overwrites() while True: - youtube_data = queue.get_next() - if self.task.is_stopped() or not youtube_data: - queue.clear() + video_data = self._get_next(auto_only) + if self.task.is_stopped() or not video_data: break - youtube_data = json.loads(youtube_data) - youtube_id = youtube_data.get("youtube_id") - - tmp_vid_type = youtube_data.get( - "vid_type", VideoTypeEnum.VIDEOS.value - ) - video_type = VideoTypeEnum(tmp_vid_type) - print(f"{youtube_id}: Downloading type: {video_type}") + youtube_id = video_data.get("youtube_id") + video_type = VideoTypeEnum(video_data["vid_type"]) + print(f"{youtube_id}: Downloading type: {video_type.value}") success = self._dl_single_vid(youtube_id) if not success: @@ -212,61 +198,39 @@ class VideoDownloader: ) self.move_to_archive(vid_dict) - - if queue.has_item(): - message = "Continue with next video." - else: - message = "Download queue is finished." - - if self.task: - self.task.send_progress([message]) - self._delete_from_pending(youtube_id) # post processing self._add_subscribed_channels() DownloadPostProcess(self).run() - def _setup_queue(self): - """setup required and validate""" - if self.config["downloads"]["cookie_import"]: - valid = CookieHandler(self.config).validate() - if not valid: - return + def _get_next(self, auto_only): + """get next item in queue""" + must_list = [{"term": {"status": {"value": "pending"}}}] + if auto_only: + must_list.append({"term": {"auto_start": {"value": True}}}) + data = { + "size": 1, + "query": {"bool": {"must": must_list}}, + "sort": [ + {"auto_start": {"order": "desc"}}, + {"timestamp": {"order": "asc"}}, + ], + } + path = "ta_download/_search" + response, _ = ElasticWrap(path).get(data=data) + if not response["hits"]["hits"]: + return False + + return response["hits"]["hits"][0]["_source"] + + def _get_overwrites(self): + """get channel overwrites""" pending = PendingList() - pending.get_download() pending.get_channels() self.video_overwrites = pending.video_overwrites - def add_pending(self): - """add pending videos to download queue""" - if self.task: - self.task.send_progress(["Scanning your download queue."]) - - pending = PendingList() - pending.get_download() - to_add = [ - json.dumps( - { - "youtube_id": i["youtube_id"], - # Using .value in default val to match what would be - # decoded when parsing json if not set - "vid_type": i.get("vid_type", VideoTypeEnum.VIDEOS.value), - } - ) - for i in pending.all_pending - ] - if not to_add: - # there is nothing pending - print("download queue is empty") - if self.task: - self.task.send_progress(["Download queue is empty."]) - - return - - RedisQueue(queue_name="dl_queue").add_list(to_add) - def _progress_hook(self, response): """process the progress_hooks from yt_dlp""" progress = False diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 2166db7..fee5986 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -180,7 +180,7 @@ def update_subscribed(self): @shared_task(name="download_pending", bind=True, base=BaseTask) -def download_pending(self, from_queue=True): +def download_pending(self, auto_only=False): """download latest pending videos""" manager = TaskManager() if manager.is_pending(self): @@ -189,10 +189,7 @@ def download_pending(self, from_queue=True): return manager.init(self) - downloader = VideoDownloader(task=self) - if from_queue: - downloader.add_pending() - downloader.run_queue() + VideoDownloader(task=self).run_queue(auto_only) @shared_task(name="extract_download", bind=True, base=BaseTask)