diff --git a/tubearchivist/home/src/download/queue.py b/tubearchivist/home/src/download/queue.py index 272c34d..e13119b 100644 --- a/tubearchivist/home/src/download/queue.py +++ b/tubearchivist/home/src/download/queue.py @@ -5,36 +5,119 @@ Functionality: """ import json -import os from datetime import datetime -import requests import yt_dlp -from home.src.download.subscriptions import ChannelSubscription -from home.src.es.connect import IndexPaginate +from home.src.download.subscriptions import ( + ChannelSubscription, + PlaylistSubscription, +) +from home.src.download.thumbnails import ThumbManager +from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.index.playlist import YoutubePlaylist -from home.src.ta.config import AppConfig -from home.src.ta.helper import DurationConverter, ignore_filelist +from home.src.ta.helper import DurationConverter from home.src.ta.ta_redis import RedisArchivist -class PendingList: - """manage the pending videos list""" - - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - VIDEOS = CONFIG["application"]["videos"] +class PendingIndex: + """base class holding all export methods""" def __init__(self): - self.all_channel_ids = False - self.all_downloaded = False - self.missing_from_playlists = [] + self.all_pending = False + self.all_ignored = False + self.all_videos = False + self.all_channels = False + self.all_overwrites = False + self.to_skip = False - def parse_url_list(self, youtube_ids): + def get_download(self): + """get a list of all pending videos in ta_download""" + data = { + "query": {"match_all": {}}, + "sort": [{"timestamp": {"order": "asc"}}], + } + all_results = IndexPaginate("ta_download", data).get_results() + + self.all_pending = [] + self.all_ignored = [] + self.to_skip = [] + + for result in all_results: + self.to_skip.append(result["youtube_id"]) + if result["status"] == "pending": + self.all_pending.append(result) + elif result["status"] == "ignore": + self.all_ignored.append(result) + + def get_indexed(self): + """get a list of all videos indexed""" + data = { + "query": {"match_all": {}}, + "sort": [{"published": {"order": "desc"}}], + } + self.all_videos = IndexPaginate("ta_video", data).get_results() + for video in self.all_videos: + self.to_skip.append(video["youtube_id"]) + + def get_channels(self): + """get a list of all channels indexed""" + self.all_channels = [] + self.all_overwrites = {} + data = { + "query": {"match_all": {}}, + "sort": [{"channel_id": {"order": "asc"}}], + } + channels = IndexPaginate("ta_channel", data).get_results() + + for channel in channels: + channel_id = channel["channel_id"] + self.all_channels.append(channel_id) + if channel.get("channel_overwrites"): + self.all_overwrites.update( + {channel_id: channel.get("channel_overwrites")} + ) + + +class PendingInteract: + """interact with items in download queue""" + + def __init__(self, video_id=False, status=False): + self.video_id = video_id + self.status = status + + def delete_item(self): + """delete single item from pending""" + path = f"ta_download/_doc/{self.video_id}" + _, _ = ElasticWrap(path).delete() + + def delete_by_status(self): + """delete all matching item by status""" + data = {"query": {"term": {"status": {"value": self.status}}}} + path = "ta_download/_delete_by_query" + _, _ = ElasticWrap(path).post(data=data) + + def update_status(self): + """update status field of pending item""" + data = {"doc": {"status": self.status}} + path = f"ta_download/_update/{self.video_id}" + _, _ = ElasticWrap(path).post(data=data) + + +class PendingList(PendingIndex): + """manage the pending videos list""" + + def __init__(self, youtube_ids): + super().__init__() + self.youtube_ids = youtube_ids + self.to_skip = False + self.missing_videos = False + + def parse_url_list(self): """extract youtube ids from list""" - missing_videos = [] - for entry in youtube_ids: + self.missing_videos = [] + self.get_download() + self.get_indexed() + for entry in self.youtube_ids: # notify mess_dict = { "status": "message:add", @@ -43,97 +126,89 @@ class PendingList: "message": "Extracting lists", } RedisArchivist().set_message("message:add", mess_dict) - # extract - url = entry["url"] - url_type = entry["type"] - if url_type == "video": - missing_videos.append(url) - elif url_type == "channel": - video_results = ChannelSubscription().get_last_youtube_videos( - url, limit=False - ) - youtube_ids = [i[0] for i in video_results] - missing_videos = missing_videos + youtube_ids - elif url_type == "playlist": - self.missing_from_playlists.append(entry) - playlist = YoutubePlaylist(url) - playlist.build_json() - video_results = playlist.json_data.get("playlist_entries") - youtube_ids = [i["youtube_id"] for i in video_results] - missing_videos = missing_videos + youtube_ids + self._process_entry(entry) - return missing_videos + def _process_entry(self, entry): + """process single entry from url list""" + if entry["type"] == "video": + self._add_video(entry["url"]) + elif entry["type"] == "channel": + self._parse_channel(entry["url"]) + elif entry["type"] == "playlist": + self._parse_playlist(entry["url"]) + new_thumbs = PlaylistSubscription().process_url_str( + [entry], subscribed=False + ) + ThumbManager().download_playlist(new_thumbs) + else: + raise ValueError(f"invalid url_type: {entry}") - def add_to_pending(self, missing_videos, ignore=False): - """build the bulk json data from pending""" - # check if channel is indexed - channel_handler = ChannelSubscription() - all_indexed = channel_handler.get_channels(subscribed_only=False) - self.all_channel_ids = [i["channel_id"] for i in all_indexed] - # check if already there - self.all_downloaded = self.get_all_downloaded() + def _add_video(self, url): + """add video to list""" + if url not in self.missing_videos and url not in self.to_skip: + self.missing_videos.append(url) + + def _parse_channel(self, url): + """add all videos of channel to list""" + video_results = ChannelSubscription().get_last_youtube_videos( + url, limit=False + ) + youtube_ids = [i[0] for i in video_results] + for video_id in youtube_ids: + self._add_video(video_id) + + def _parse_playlist(self, url): + """add all videos of playlist to list""" + playlist = YoutubePlaylist(url) + playlist.build_json() + video_results = playlist.json_data.get("playlist_entries") + youtube_ids = [i["youtube_id"] for i in video_results] + for video_id in youtube_ids: + self._add_video(video_id) + + def add_to_pending(self, status="pending"): + """add missing videos to pending list""" + self.get_channels() + bulk_list = [] + + thumb_handler = ThumbManager() + for idx, youtube_id in enumerate(self.missing_videos): + video_details = self._get_youtube_details(youtube_id) + if not video_details: + continue + + video_details["status"] = status + action = {"create": {"_id": youtube_id, "_index": "ta_download"}} + bulk_list.append(json.dumps(action)) + bulk_list.append(json.dumps(video_details)) + + thumb_needed = [(youtube_id, video_details["vid_thumb_url"])] + thumb_handler.download_vid(thumb_needed) + self._notify_add(idx) - bulk_list, all_videos_added = self.build_bulk(missing_videos, ignore) # add last newline bulk_list.append("\n") query_str = "\n".join(bulk_list) - headers = {"Content-type": "application/x-ndjson"} - url = self.ES_URL + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request) - raise ValueError("failed to add video to download queue") + _, _ = ElasticWrap("_bulk").post(query_str, ndjson=True) - return all_videos_added + def _notify_add(self, idx): + """send notification for adding videos to download queue""" + progress = f"{idx + 1}/{len(self.missing_videos)}" + mess_dict = { + "status": "message:add", + "level": "info", + "title": "Adding new videos to download queue.", + "message": "Progress: " + progress, + } + if idx + 1 == len(self.missing_videos): + RedisArchivist().set_message("message:add", mess_dict, expire=4) + else: + RedisArchivist().set_message("message:add", mess_dict) - def build_bulk(self, missing_videos, ignore=False): - """build the bulk lists""" - bulk_list = [] - all_videos_added = [] + if idx + 1 % 25 == 0: + print("adding to queue progress: " + progress) - for idx, youtube_id in enumerate(missing_videos): - # check if already downloaded - if youtube_id in self.all_downloaded: - continue - - video = self.get_youtube_details(youtube_id) - # skip on download error - if not video: - continue - - channel_indexed = video["channel_id"] in self.all_channel_ids - video["channel_indexed"] = channel_indexed - if ignore: - video["status"] = "ignore" - else: - video["status"] = "pending" - action = {"create": {"_id": youtube_id, "_index": "ta_download"}} - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(video)) - all_videos_added.append((youtube_id, video["vid_thumb_url"])) - # notify - progress = f"{idx + 1}/{len(missing_videos)}" - mess_dict = { - "status": "message:add", - "level": "info", - "title": "Adding new videos to download queue.", - "message": "Progress: " + progress, - } - if idx + 1 == len(missing_videos): - RedisArchivist().set_message( - "message:add", mess_dict, expire=4 - ) - else: - RedisArchivist().set_message("message:add", mess_dict) - if idx + 1 % 25 == 0: - print("adding to queue progress: " + progress) - - return bulk_list, all_videos_added - - @staticmethod - def get_youtube_details(youtube_id): + def _get_youtube_details(self, youtube_id): """get details from youtubedl for single pending video""" obs = { "default_search": "ytsearch", @@ -151,113 +226,29 @@ class PendingList: # stop if video is streaming live now if vid["is_live"]: return False - # parse response - seconds = vid["duration"] - duration_str = DurationConverter.get_str(seconds) + + return self._parse_youtube_details(vid) + + def _parse_youtube_details(self, vid): + """parse response""" + vid_id = vid.get("id") + duration_str = DurationConverter.get_str(vid["duration"]) if duration_str == "NA": - print(f"skip extracting duration for: {youtube_id}") - upload_date = vid["upload_date"] - upload_dt = datetime.strptime(upload_date, "%Y%m%d") - published = upload_dt.strftime("%Y-%m-%d") + print(f"skip extracting duration for: {vid_id}") + published = datetime.strptime(vid["upload_date"], "%Y%m%d").strftime( + "%Y-%m-%d" + ) + # build dict youtube_details = { - "youtube_id": youtube_id, + "youtube_id": vid_id, "channel_name": vid["channel"], "vid_thumb_url": vid["thumbnail"], "title": vid["title"], "channel_id": vid["channel_id"], + "channel_indexed": vid["channel_id"] in self.all_channels, "duration": duration_str, "published": published, "timestamp": int(datetime.now().strftime("%s")), } return youtube_details - - @staticmethod - def get_all_pending(): - """get a list of all pending videos in ta_download""" - data = { - "query": {"match_all": {}}, - "sort": [{"timestamp": {"order": "asc"}}], - } - all_results = IndexPaginate("ta_download", data).get_results() - - all_pending = [] - all_ignore = [] - - for result in all_results: - if result["status"] == "pending": - all_pending.append(result) - elif result["status"] == "ignore": - all_ignore.append(result) - - return all_pending, all_ignore - - @staticmethod - def get_all_indexed(): - """get a list of all videos indexed""" - - data = { - "query": {"match_all": {}}, - "sort": [{"published": {"order": "desc"}}], - } - all_indexed = IndexPaginate("ta_video", data).get_results() - - return all_indexed - - def get_all_downloaded(self): - """get a list of all videos in archive""" - channel_folders = os.listdir(self.VIDEOS) - all_channel_folders = ignore_filelist(channel_folders) - all_downloaded = [] - for channel_folder in all_channel_folders: - channel_path = os.path.join(self.VIDEOS, channel_folder) - videos = os.listdir(channel_path) - all_videos = ignore_filelist(videos) - youtube_vids = [i[9:20] for i in all_videos] - for youtube_id in youtube_vids: - all_downloaded.append(youtube_id) - return all_downloaded - - def delete_from_pending(self, youtube_id): - """delete the youtube_id from ta_download""" - url = f"{self.ES_URL}/ta_download/_doc/{youtube_id}" - response = requests.delete(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) - - def delete_pending(self, status): - """delete download queue based on status value""" - data = {"query": {"term": {"status": {"value": status}}}} - payload = json.dumps(data) - url = self.ES_URL + "/ta_download/_delete_by_query" - headers = {"Content-type": "application/json"} - response = requests.post( - url, data=payload, headers=headers, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) - - def ignore_from_pending(self, ignore_list): - """build the bulk query string""" - - stamp = int(datetime.now().strftime("%s")) - bulk_list = [] - - for youtube_id in ignore_list: - action = {"update": {"_id": youtube_id, "_index": "ta_download"}} - source = {"doc": {"status": "ignore", "timestamp": stamp}} - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(source)) - - # add last newline - bulk_list.append("\n") - query_str = "\n".join(bulk_list) - - headers = {"Content-type": "application/x-ndjson"} - url = self.ES_URL + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request) - raise ValueError("failed to set video to ignore")