modify _get_next to for auto_only attr

This commit is contained in:
simon 2023-04-21 16:11:37 +07:00
parent 5cd845e55d
commit 1b6b219e02
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
2 changed files with 32 additions and 71 deletions

View File

@ -6,14 +6,13 @@ functionality:
- move to archive - move to archive
""" """
import json
import os import os
import shutil import shutil
from datetime import datetime from datetime import datetime
from home.src.download.queue import PendingList from home.src.download.queue import PendingList
from home.src.download.subscriptions import PlaylistSubscription from home.src.download.subscriptions import PlaylistSubscription
from home.src.download.yt_dlp_base import CookieHandler, YtWrap from home.src.download.yt_dlp_base import YtWrap
from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.index.channel import YoutubeChannel from home.src.index.channel import YoutubeChannel
from home.src.index.comments import CommentList from home.src.index.comments import CommentList
@ -22,7 +21,6 @@ from home.src.index.video import YoutubeVideo, index_new_video
from home.src.index.video_constants import VideoTypeEnum from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig from home.src.ta.config import AppConfig
from home.src.ta.helper import clean_string, ignore_filelist from home.src.ta.helper import clean_string, ignore_filelist
from home.src.ta.ta_redis import RedisQueue
class DownloadPostProcess: class DownloadPostProcess:
@ -159,29 +157,17 @@ class VideoDownloader:
self.channels = set() self.channels = set()
self.videos = set() self.videos = set()
def run_queue(self): def run_queue(self, auto_only=False):
"""setup download queue in redis loop until no more items""" """setup download queue in redis loop until no more items"""
self._setup_queue() self._get_overwrites()
queue = RedisQueue(queue_name="dl_queue")
limit_queue = self.config["downloads"]["limit_count"]
if limit_queue:
queue.trim(limit_queue - 1)
while True: while True:
youtube_data = queue.get_next() video_data = self._get_next(auto_only)
if self.task.is_stopped() or not youtube_data: if self.task.is_stopped() or not video_data:
queue.clear()
break break
youtube_data = json.loads(youtube_data) youtube_id = video_data.get("youtube_id")
youtube_id = youtube_data.get("youtube_id") video_type = VideoTypeEnum(video_data["vid_type"])
print(f"{youtube_id}: Downloading type: {video_type.value}")
tmp_vid_type = youtube_data.get(
"vid_type", VideoTypeEnum.VIDEOS.value
)
video_type = VideoTypeEnum(tmp_vid_type)
print(f"{youtube_id}: Downloading type: {video_type}")
success = self._dl_single_vid(youtube_id) success = self._dl_single_vid(youtube_id)
if not success: if not success:
@ -212,61 +198,39 @@ class VideoDownloader:
) )
self.move_to_archive(vid_dict) self.move_to_archive(vid_dict)
if queue.has_item():
message = "Continue with next video."
else:
message = "Download queue is finished."
if self.task:
self.task.send_progress([message])
self._delete_from_pending(youtube_id) self._delete_from_pending(youtube_id)
# post processing # post processing
self._add_subscribed_channels() self._add_subscribed_channels()
DownloadPostProcess(self).run() DownloadPostProcess(self).run()
def _setup_queue(self): def _get_next(self, auto_only):
"""setup required and validate""" """get next item in queue"""
if self.config["downloads"]["cookie_import"]: must_list = [{"term": {"status": {"value": "pending"}}}]
valid = CookieHandler(self.config).validate() if auto_only:
if not valid: must_list.append({"term": {"auto_start": {"value": True}}})
return
data = {
"size": 1,
"query": {"bool": {"must": must_list}},
"sort": [
{"auto_start": {"order": "desc"}},
{"timestamp": {"order": "asc"}},
],
}
path = "ta_download/_search"
response, _ = ElasticWrap(path).get(data=data)
if not response["hits"]["hits"]:
return False
return response["hits"]["hits"][0]["_source"]
def _get_overwrites(self):
"""get channel overwrites"""
pending = PendingList() pending = PendingList()
pending.get_download()
pending.get_channels() pending.get_channels()
self.video_overwrites = pending.video_overwrites self.video_overwrites = pending.video_overwrites
def add_pending(self):
"""add pending videos to download queue"""
if self.task:
self.task.send_progress(["Scanning your download queue."])
pending = PendingList()
pending.get_download()
to_add = [
json.dumps(
{
"youtube_id": i["youtube_id"],
# Using .value in default val to match what would be
# decoded when parsing json if not set
"vid_type": i.get("vid_type", VideoTypeEnum.VIDEOS.value),
}
)
for i in pending.all_pending
]
if not to_add:
# there is nothing pending
print("download queue is empty")
if self.task:
self.task.send_progress(["Download queue is empty."])
return
RedisQueue(queue_name="dl_queue").add_list(to_add)
def _progress_hook(self, response): def _progress_hook(self, response):
"""process the progress_hooks from yt_dlp""" """process the progress_hooks from yt_dlp"""
progress = False progress = False

View File

@ -180,7 +180,7 @@ def update_subscribed(self):
@shared_task(name="download_pending", bind=True, base=BaseTask) @shared_task(name="download_pending", bind=True, base=BaseTask)
def download_pending(self, from_queue=True): def download_pending(self, auto_only=False):
"""download latest pending videos""" """download latest pending videos"""
manager = TaskManager() manager = TaskManager()
if manager.is_pending(self): if manager.is_pending(self):
@ -189,10 +189,7 @@ def download_pending(self, from_queue=True):
return return
manager.init(self) manager.init(self)
downloader = VideoDownloader(task=self) VideoDownloader(task=self).run_queue(auto_only)
if from_queue:
downloader.add_pending()
downloader.run_queue()
@shared_task(name="extract_download", bind=True, base=BaseTask) @shared_task(name="extract_download", bind=True, base=BaseTask)