diff --git a/docker-compose.yml b/docker-compose.yml index 65fee96..a3ac505 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,7 +33,7 @@ services: depends_on: - archivist-es archivist-es: - image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0 + image: docker.elastic.co/elasticsearch/elasticsearch:7.17.1 container_name: archivist-es restart: always environment: diff --git a/tubearchivist/home/apps.py b/tubearchivist/home/apps.py index f58193f..13e3986 100644 --- a/tubearchivist/home/apps.py +++ b/tubearchivist/home/apps.py @@ -8,46 +8,69 @@ from home.src.ta.config import AppConfig as ArchivistConfig from home.src.ta.ta_redis import RedisArchivist -def sync_redis_state(): - """make sure redis gets new config.json values""" - print("sync redis") - config_handler = ArchivistConfig() - config_handler.load_new_defaults() +class StartupCheck: + """checks to run at application startup""" + def __init__(self): + self.config_handler = ArchivistConfig() + self.redis_con = RedisArchivist() + self.has_run = self.get_has_run() -def make_folders(): - """make needed cache folders here so docker doesn't mess it up""" - folders = [ - "download", - "channels", - "videos", - "playlists", - "import", - "backup", - ] - config = ArchivistConfig().config - cache_dir = config["application"]["cache_dir"] - for folder in folders: - folder_path = os.path.join(cache_dir, folder) - try: - os.makedirs(folder_path) - except FileExistsError: - continue + def run(self): + """run all startup checks""" + print("run startup checks") + self.release_lock() + index_check() + self.sync_redis_state() + self.make_folders() + self.set_has_run() + def get_has_run(self): + """validate if check has already executed""" + return self.redis_con.get_message("startup_check") -def release_lock(): - """make sure there are no leftover locks set in redis on container start""" - all_locks = [ - "manual_import", - "downloading", - "dl_queue", - "dl_queue_id", - "rescan", - ] - for lock in all_locks: - response = RedisArchivist().del_message(lock) - if response: - print("deleted leftover key from redis: " + lock) + def set_has_run(self): + """startup checks run""" + message = {"status": True} + self.redis_con.set_message("startup_check", message, expire=120) + + def sync_redis_state(self): + """make sure redis gets new config.json values""" + print("sync redis") + self.config_handler.load_new_defaults() + + def make_folders(self): + """make needed cache folders here so docker doesn't mess it up""" + folders = [ + "download", + "channels", + "videos", + "playlists", + "import", + "backup", + ] + cache_dir = self.config_handler.config["application"]["cache_dir"] + for folder in folders: + folder_path = os.path.join(cache_dir, folder) + try: + os.makedirs(folder_path) + except FileExistsError: + continue + + def release_lock(self): + """make sure there are no leftover locks set in redis""" + all_locks = [ + "startup_check", + "manual_import", + "downloading", + "dl_queue", + "dl_queue_id", + "rescan", + ] + for lock in all_locks: + response = self.redis_con.del_message(lock) + if response: + print("deleted leftover key from redis: " + lock) class HomeConfig(AppConfig): @@ -57,7 +80,9 @@ class HomeConfig(AppConfig): name = "home" def ready(self): - release_lock() - index_check() - sync_redis_state() - make_folders() + startup = StartupCheck() + if startup.has_run["status"]: + print("startup checks run in other thread") + return + + startup.run() diff --git a/tubearchivist/home/src/download/queue.py b/tubearchivist/home/src/download/queue.py index 272c34d..d898b82 100644 --- a/tubearchivist/home/src/download/queue.py +++ b/tubearchivist/home/src/download/queue.py @@ -5,36 +5,132 @@ Functionality: """ import json -import os from datetime import datetime -import requests import yt_dlp -from home.src.download.subscriptions import ChannelSubscription -from home.src.es.connect import IndexPaginate +from home.src.download.subscriptions import ( + ChannelSubscription, + PlaylistSubscription, +) +from home.src.download.thumbnails import ThumbManager +from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.index.playlist import YoutubePlaylist -from home.src.ta.config import AppConfig -from home.src.ta.helper import DurationConverter, ignore_filelist +from home.src.ta.helper import DurationConverter from home.src.ta.ta_redis import RedisArchivist -class PendingList: - """manage the pending videos list""" - - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - VIDEOS = CONFIG["application"]["videos"] +class PendingIndex: + """base class holding all export methods""" def __init__(self): - self.all_channel_ids = False - self.all_downloaded = False - self.missing_from_playlists = [] + self.all_pending = False + self.all_ignored = False + self.all_videos = False + self.all_channels = False + self.channel_overwrites = False + self.video_overwrites = False + self.to_skip = False - def parse_url_list(self, youtube_ids): + def get_download(self): + """get a list of all pending videos in ta_download""" + data = { + "query": {"match_all": {}}, + "sort": [{"timestamp": {"order": "asc"}}], + } + all_results = IndexPaginate("ta_download", data).get_results() + + self.all_pending = [] + self.all_ignored = [] + self.to_skip = [] + + for result in all_results: + self.to_skip.append(result["youtube_id"]) + if result["status"] == "pending": + self.all_pending.append(result) + elif result["status"] == "ignore": + self.all_ignored.append(result) + + def get_indexed(self): + """get a list of all videos indexed""" + data = { + "query": {"match_all": {}}, + "sort": [{"published": {"order": "desc"}}], + } + self.all_videos = IndexPaginate("ta_video", data).get_results() + for video in self.all_videos: + self.to_skip.append(video["youtube_id"]) + + def get_channels(self): + """get a list of all channels indexed""" + self.all_channels = [] + self.channel_overwrites = {} + data = { + "query": {"match_all": {}}, + "sort": [{"channel_id": {"order": "asc"}}], + } + channels = IndexPaginate("ta_channel", data).get_results() + + for channel in channels: + channel_id = channel["channel_id"] + self.all_channels.append(channel_id) + if channel.get("channel_overwrites"): + self.channel_overwrites.update( + {channel_id: channel.get("channel_overwrites")} + ) + + self._map_overwrites() + + def _map_overwrites(self): + """map video ids to channel ids overwrites""" + self.video_overwrites = {} + for video in self.all_pending: + video_id = video["youtube_id"] + channel_id = video["channel_id"] + overwrites = self.channel_overwrites.get(channel_id, False) + if overwrites: + self.video_overwrites.update({video_id: overwrites}) + + +class PendingInteract: + """interact with items in download queue""" + + def __init__(self, video_id=False, status=False): + self.video_id = video_id + self.status = status + + def delete_item(self): + """delete single item from pending""" + path = f"ta_download/_doc/{self.video_id}" + _, _ = ElasticWrap(path).delete() + + def delete_by_status(self): + """delete all matching item by status""" + data = {"query": {"term": {"status": {"value": self.status}}}} + path = "ta_download/_delete_by_query" + _, _ = ElasticWrap(path).post(data=data) + + def update_status(self): + """update status field of pending item""" + data = {"doc": {"status": self.status}} + path = f"ta_download/_update/{self.video_id}" + _, _ = ElasticWrap(path).post(data=data) + + +class PendingList(PendingIndex): + """manage the pending videos list""" + + def __init__(self, youtube_ids=False): + super().__init__() + self.youtube_ids = youtube_ids + self.to_skip = False + self.missing_videos = False + + def parse_url_list(self): """extract youtube ids from list""" - missing_videos = [] - for entry in youtube_ids: + self.missing_videos = [] + self.get_download() + self.get_indexed() + for entry in self.youtube_ids: # notify mess_dict = { "status": "message:add", @@ -43,97 +139,89 @@ class PendingList: "message": "Extracting lists", } RedisArchivist().set_message("message:add", mess_dict) - # extract - url = entry["url"] - url_type = entry["type"] - if url_type == "video": - missing_videos.append(url) - elif url_type == "channel": - video_results = ChannelSubscription().get_last_youtube_videos( - url, limit=False - ) - youtube_ids = [i[0] for i in video_results] - missing_videos = missing_videos + youtube_ids - elif url_type == "playlist": - self.missing_from_playlists.append(entry) - playlist = YoutubePlaylist(url) - playlist.build_json() - video_results = playlist.json_data.get("playlist_entries") - youtube_ids = [i["youtube_id"] for i in video_results] - missing_videos = missing_videos + youtube_ids + self._process_entry(entry) - return missing_videos + def _process_entry(self, entry): + """process single entry from url list""" + if entry["type"] == "video": + self._add_video(entry["url"]) + elif entry["type"] == "channel": + self._parse_channel(entry["url"]) + elif entry["type"] == "playlist": + self._parse_playlist(entry["url"]) + new_thumbs = PlaylistSubscription().process_url_str( + [entry], subscribed=False + ) + ThumbManager().download_playlist(new_thumbs) + else: + raise ValueError(f"invalid url_type: {entry}") - def add_to_pending(self, missing_videos, ignore=False): - """build the bulk json data from pending""" - # check if channel is indexed - channel_handler = ChannelSubscription() - all_indexed = channel_handler.get_channels(subscribed_only=False) - self.all_channel_ids = [i["channel_id"] for i in all_indexed] - # check if already there - self.all_downloaded = self.get_all_downloaded() + def _add_video(self, url): + """add video to list""" + if url not in self.missing_videos and url not in self.to_skip: + self.missing_videos.append(url) + + def _parse_channel(self, url): + """add all videos of channel to list""" + video_results = ChannelSubscription().get_last_youtube_videos( + url, limit=False + ) + youtube_ids = [i[0] for i in video_results] + for video_id in youtube_ids: + self._add_video(video_id) + + def _parse_playlist(self, url): + """add all videos of playlist to list""" + playlist = YoutubePlaylist(url) + playlist.build_json() + video_results = playlist.json_data.get("playlist_entries") + youtube_ids = [i["youtube_id"] for i in video_results] + for video_id in youtube_ids: + self._add_video(video_id) + + def add_to_pending(self, status="pending"): + """add missing videos to pending list""" + self.get_channels() + bulk_list = [] + + thumb_handler = ThumbManager() + for idx, youtube_id in enumerate(self.missing_videos): + video_details = self.get_youtube_details(youtube_id) + if not video_details: + continue + + video_details["status"] = status + action = {"create": {"_id": youtube_id, "_index": "ta_download"}} + bulk_list.append(json.dumps(action)) + bulk_list.append(json.dumps(video_details)) + + thumb_needed = [(youtube_id, video_details["vid_thumb_url"])] + thumb_handler.download_vid(thumb_needed) + self._notify_add(idx) - bulk_list, all_videos_added = self.build_bulk(missing_videos, ignore) # add last newline bulk_list.append("\n") query_str = "\n".join(bulk_list) - headers = {"Content-type": "application/x-ndjson"} - url = self.ES_URL + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request) - raise ValueError("failed to add video to download queue") + _, _ = ElasticWrap("_bulk").post(query_str, ndjson=True) - return all_videos_added + def _notify_add(self, idx): + """send notification for adding videos to download queue""" + progress = f"{idx + 1}/{len(self.missing_videos)}" + mess_dict = { + "status": "message:add", + "level": "info", + "title": "Adding new videos to download queue.", + "message": "Progress: " + progress, + } + if idx + 1 == len(self.missing_videos): + RedisArchivist().set_message("message:add", mess_dict, expire=4) + else: + RedisArchivist().set_message("message:add", mess_dict) - def build_bulk(self, missing_videos, ignore=False): - """build the bulk lists""" - bulk_list = [] - all_videos_added = [] + if idx + 1 % 25 == 0: + print("adding to queue progress: " + progress) - for idx, youtube_id in enumerate(missing_videos): - # check if already downloaded - if youtube_id in self.all_downloaded: - continue - - video = self.get_youtube_details(youtube_id) - # skip on download error - if not video: - continue - - channel_indexed = video["channel_id"] in self.all_channel_ids - video["channel_indexed"] = channel_indexed - if ignore: - video["status"] = "ignore" - else: - video["status"] = "pending" - action = {"create": {"_id": youtube_id, "_index": "ta_download"}} - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(video)) - all_videos_added.append((youtube_id, video["vid_thumb_url"])) - # notify - progress = f"{idx + 1}/{len(missing_videos)}" - mess_dict = { - "status": "message:add", - "level": "info", - "title": "Adding new videos to download queue.", - "message": "Progress: " + progress, - } - if idx + 1 == len(missing_videos): - RedisArchivist().set_message( - "message:add", mess_dict, expire=4 - ) - else: - RedisArchivist().set_message("message:add", mess_dict) - if idx + 1 % 25 == 0: - print("adding to queue progress: " + progress) - - return bulk_list, all_videos_added - - @staticmethod - def get_youtube_details(youtube_id): + def get_youtube_details(self, youtube_id): """get details from youtubedl for single pending video""" obs = { "default_search": "ytsearch", @@ -151,113 +239,29 @@ class PendingList: # stop if video is streaming live now if vid["is_live"]: return False - # parse response - seconds = vid["duration"] - duration_str = DurationConverter.get_str(seconds) + + return self._parse_youtube_details(vid) + + def _parse_youtube_details(self, vid): + """parse response""" + vid_id = vid.get("id") + duration_str = DurationConverter.get_str(vid["duration"]) if duration_str == "NA": - print(f"skip extracting duration for: {youtube_id}") - upload_date = vid["upload_date"] - upload_dt = datetime.strptime(upload_date, "%Y%m%d") - published = upload_dt.strftime("%Y-%m-%d") + print(f"skip extracting duration for: {vid_id}") + published = datetime.strptime(vid["upload_date"], "%Y%m%d").strftime( + "%Y-%m-%d" + ) + # build dict youtube_details = { - "youtube_id": youtube_id, + "youtube_id": vid_id, "channel_name": vid["channel"], "vid_thumb_url": vid["thumbnail"], "title": vid["title"], "channel_id": vid["channel_id"], + "channel_indexed": vid["channel_id"] in self.all_channels, "duration": duration_str, "published": published, "timestamp": int(datetime.now().strftime("%s")), } return youtube_details - - @staticmethod - def get_all_pending(): - """get a list of all pending videos in ta_download""" - data = { - "query": {"match_all": {}}, - "sort": [{"timestamp": {"order": "asc"}}], - } - all_results = IndexPaginate("ta_download", data).get_results() - - all_pending = [] - all_ignore = [] - - for result in all_results: - if result["status"] == "pending": - all_pending.append(result) - elif result["status"] == "ignore": - all_ignore.append(result) - - return all_pending, all_ignore - - @staticmethod - def get_all_indexed(): - """get a list of all videos indexed""" - - data = { - "query": {"match_all": {}}, - "sort": [{"published": {"order": "desc"}}], - } - all_indexed = IndexPaginate("ta_video", data).get_results() - - return all_indexed - - def get_all_downloaded(self): - """get a list of all videos in archive""" - channel_folders = os.listdir(self.VIDEOS) - all_channel_folders = ignore_filelist(channel_folders) - all_downloaded = [] - for channel_folder in all_channel_folders: - channel_path = os.path.join(self.VIDEOS, channel_folder) - videos = os.listdir(channel_path) - all_videos = ignore_filelist(videos) - youtube_vids = [i[9:20] for i in all_videos] - for youtube_id in youtube_vids: - all_downloaded.append(youtube_id) - return all_downloaded - - def delete_from_pending(self, youtube_id): - """delete the youtube_id from ta_download""" - url = f"{self.ES_URL}/ta_download/_doc/{youtube_id}" - response = requests.delete(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) - - def delete_pending(self, status): - """delete download queue based on status value""" - data = {"query": {"term": {"status": {"value": status}}}} - payload = json.dumps(data) - url = self.ES_URL + "/ta_download/_delete_by_query" - headers = {"Content-type": "application/json"} - response = requests.post( - url, data=payload, headers=headers, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) - - def ignore_from_pending(self, ignore_list): - """build the bulk query string""" - - stamp = int(datetime.now().strftime("%s")) - bulk_list = [] - - for youtube_id in ignore_list: - action = {"update": {"_id": youtube_id, "_index": "ta_download"}} - source = {"doc": {"status": "ignore", "timestamp": stamp}} - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(source)) - - # add last newline - bulk_list.append("\n") - query_str = "\n".join(bulk_list) - - headers = {"Content-type": "application/x-ndjson"} - url = self.ES_URL + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request) - raise ValueError("failed to set video to ignore") diff --git a/tubearchivist/home/src/download/subscriptions.py b/tubearchivist/home/src/download/subscriptions.py index 84627b6..a460af9 100644 --- a/tubearchivist/home/src/download/subscriptions.py +++ b/tubearchivist/home/src/download/subscriptions.py @@ -61,11 +61,9 @@ class ChannelSubscription: def find_missing(self): """add missing videos from subscribed channels to pending""" all_channels = self.get_channels() - pending_handler = queue.PendingList() - all_pending, all_ignore = pending_handler.get_all_pending() - all_ids = [i["youtube_id"] for i in all_ignore + all_pending] - all_downloaded = pending_handler.get_all_downloaded() - to_ignore = all_ids + all_downloaded + pending = queue.PendingList() + pending.get_download() + pending.get_indexed() missing_videos = [] @@ -75,7 +73,7 @@ class ChannelSubscription: if last_videos: for video in last_videos: - if video[0] not in to_ignore: + if video[0] not in pending.to_skip: missing_videos.append(video[0]) # notify message = { @@ -129,7 +127,11 @@ class PlaylistSubscription: def process_url_str(self, new_playlists, subscribed=True): """process playlist subscribe form url_str""" - all_indexed = queue.PendingList().get_all_indexed() + data = { + "query": {"match_all": {}}, + "sort": [{"published": {"order": "desc"}}], + } + all_indexed = IndexPaginate("ta_video", data).get_results() all_youtube_ids = [i["youtube_id"] for i in all_indexed] new_thumbs = [] @@ -179,12 +181,11 @@ class PlaylistSubscription: @staticmethod def get_to_ignore(): """get all youtube_ids already downloaded or ignored""" - pending_handler = queue.PendingList() - all_pending, all_ignore = pending_handler.get_all_pending() - all_ids = [i["youtube_id"] for i in all_ignore + all_pending] - all_downloaded = pending_handler.get_all_downloaded() - to_ignore = all_ids + all_downloaded - return to_ignore + pending = queue.PendingList() + pending.get_download() + pending.get_indexed() + + return pending.to_skip def find_missing(self): """find videos in subscribed playlists not downloaded yet""" diff --git a/tubearchivist/home/src/download/thumbnails.py b/tubearchivist/home/src/download/thumbnails.py index 2ea8477..0ce492f 100644 --- a/tubearchivist/home/src/download/thumbnails.py +++ b/tubearchivist/home/src/download/thumbnails.py @@ -58,11 +58,13 @@ class ThumbManager: def get_needed_thumbs(self, missing_only=False): """get a list of all missing thumbnails""" all_thumbs = self.get_all_thumbs() - all_indexed = queue.PendingList().get_all_indexed() - all_in_queue, all_ignored = queue.PendingList().get_all_pending() + + pending = queue.PendingList() + pending.get_download() + pending.get_indexed() needed_thumbs = [] - for video in all_indexed: + for video in pending.all_videos: youtube_id = video["youtube_id"] thumb_url = video["vid_thumb_url"] if missing_only: @@ -71,7 +73,7 @@ class ThumbManager: else: needed_thumbs.append((youtube_id, thumb_url)) - for video in all_in_queue + all_ignored: + for video in pending.all_pending + pending.all_ignored: youtube_id = video["youtube_id"] thumb_url = video["vid_thumb_url"] if missing_only: @@ -276,9 +278,11 @@ class ThumbManager: def get_thumb_list(self): """get list of mediafiles and matching thumbnails""" - all_indexed = queue.PendingList().get_all_indexed() + pending = queue.PendingList() + pending.get_indexed() + video_list = [] - for video in all_indexed: + for video in pending.all_videos: youtube_id = video["youtube_id"] media_url = os.path.join(self.MEDIA_DIR, video["media_url"]) thumb_path = os.path.join( diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index be71049..e540426 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -24,6 +24,123 @@ from home.src.ta.helper import clean_string, ignore_filelist from home.src.ta.ta_redis import RedisArchivist, RedisQueue +class DownloadPostProcess: + """handle task to run after download queue finishes""" + + def __init__(self, download): + self.download = download + self.now = int(datetime.now().strftime("%s")) + self.pending = False + + def run(self): + """run all functions""" + self.pending = PendingList() + self.pending.get_download() + self.pending.get_channels() + self.pending.get_indexed() + self.auto_delete_all() + self.auto_delete_overwrites() + self.validate_playlists() + + def auto_delete_all(self): + """handle auto delete""" + autodelete_days = self.download.config["downloads"]["autodelete_days"] + if not autodelete_days: + return + + print(f"auto delete older than {autodelete_days} days") + now_lte = self.now - autodelete_days * 24 * 60 * 60 + data = { + "query": {"range": {"player.watched_date": {"lte": now_lte}}}, + "sort": [{"player.watched_date": {"order": "asc"}}], + } + self._auto_delete_watched(data) + + def auto_delete_overwrites(self): + """handle per channel auto delete from overwrites""" + for channel_id, value in self.pending.channel_overwrites.items(): + if "autodelete_days" in value: + autodelete_days = value.get("autodelete_days") + print(f"{channel_id}: delete older than {autodelete_days}d") + now_lte = self.now - autodelete_days * 24 * 60 * 60 + must_list = [ + {"range": {"player.watched_date": {"lte": now_lte}}}, + {"term": {"channel.channel_id": {"value": channel_id}}}, + ] + data = { + "query": {"bool": {"must": must_list}}, + "sort": [{"player.watched_date": {"order": "desc"}}], + } + self._auto_delete_watched(data) + + @staticmethod + def _auto_delete_watched(data): + """delete watched videos after x days""" + to_delete = IndexPaginate("ta_video", data).get_results() + if not to_delete: + return + + for video in to_delete: + youtube_id = video["youtube_id"] + print(f"{youtube_id}: auto delete video") + YoutubeVideo(youtube_id).delete_media_file() + + print("add deleted to ignore list") + vids = [{"type": "video", "url": i["youtube_id"]} for i in to_delete] + pending = PendingList(youtube_ids=vids) + pending.parse_url_list() + pending.add_to_pending(status="ignore") + + def validate_playlists(self): + """look for playlist needing to update""" + for id_c, channel_id in enumerate(self.download.channels): + channel = YoutubeChannel(channel_id) + overwrites = self.pending.channel_overwrites.get(channel_id, False) + if overwrites and overwrites.get("index_playlists"): + # validate from remote + channel.index_channel_playlists() + continue + + # validate from local + playlists = channel.get_indexed_playlists() + all_channel_playlist = [i["playlist_id"] for i in playlists] + self._validate_channel_playlist(all_channel_playlist, id_c) + + def _validate_channel_playlist(self, all_channel_playlist, id_c): + """scan channel for playlist needing update""" + all_youtube_ids = [i["youtube_id"] for i in self.pending.all_videos] + for id_p, playlist_id in enumerate(all_channel_playlist): + playlist = YoutubePlaylist(playlist_id) + playlist.all_youtube_ids = all_youtube_ids + playlist.build_json(scrape=True) + if not playlist.json_data: + playlist.deactivate() + + playlist.add_vids_to_playlist() + playlist.upload_to_es() + self._notify_playlist_progress(all_channel_playlist, id_c, id_p) + + def _notify_playlist_progress(self, all_channel_playlist, id_c, id_p): + """notify to UI""" + title = ( + "Processing playlists for channels: " + + f"{id_c + 1}/{len(self.download.channels)}" + ) + message = f"Progress: {id_p + 1}/{len(all_channel_playlist)}" + mess_dict = { + "status": "message:download", + "level": "info", + "title": title, + "message": message, + } + if id_p + 1 == len(all_channel_playlist): + RedisArchivist().set_message( + "message:download", mess_dict, expire=4 + ) + else: + RedisArchivist().set_message("message:download", mess_dict) + + class VideoDownloader: """ handle the video download functionality @@ -32,6 +149,7 @@ class VideoDownloader: def __init__(self, youtube_id_list=False): self.obs = False + self.video_overwrites = False self.youtube_id_list = youtube_id_list self.config = AppConfig().config self._build_obs() @@ -39,6 +157,11 @@ class VideoDownloader: def run_queue(self): """setup download queue in redis loop until no more items""" + pending = PendingList() + pending.get_download() + pending.get_channels() + self.video_overwrites = pending.video_overwrites + queue = RedisQueue("dl_queue") limit_queue = self.config["downloads"]["limit_count"] @@ -60,10 +183,9 @@ class VideoDownloader: self.move_to_archive(vid_dict) self._delete_from_pending(youtube_id) - autodelete_days = self.config["downloads"]["autodelete_days"] - if autodelete_days: - print(f"auto delete older than {autodelete_days} days") - self.auto_delete_watched(autodelete_days) + # post processing + self._add_subscribed_channels() + DownloadPostProcess(self).run() @staticmethod def add_pending(): @@ -75,8 +197,9 @@ class VideoDownloader: "message": "Scanning your download queue.", } RedisArchivist().set_message("message:download", mess_dict) - all_pending, _ = PendingList().get_all_pending() - to_add = [i["youtube_id"] for i in all_pending] + pending = PendingList() + pending.get_download() + to_add = [i["youtube_id"] for i in pending.all_pending] if not to_add: # there is nothing pending print("download queue is empty") @@ -181,16 +304,30 @@ class VideoDownloader: self.obs["postprocessors"] = postprocessors + def get_format_overwrites(self, youtube_id): + """get overwrites from single video""" + overwrites = self.video_overwrites.get(youtube_id, False) + if overwrites: + return overwrites.get("download_format", False) + + return False + def _dl_single_vid(self, youtube_id): """download single video""" + obs = self.obs.copy() + format_overwrite = self.get_format_overwrites(youtube_id) + if format_overwrite: + obs["format"] = format_overwrite + dl_cache = self.config["application"]["cache_dir"] + "/download/" # check if already in cache to continue from there all_cached = ignore_filelist(os.listdir(dl_cache)) for file_name in all_cached: if youtube_id in file_name: - self.obs["outtmpl"] = os.path.join(dl_cache, file_name) - with yt_dlp.YoutubeDL(self.obs) as ydl: + obs["outtmpl"] = os.path.join(dl_cache, file_name) + + with yt_dlp.YoutubeDL(obs) as ydl: try: ydl.download([youtube_id]) except yt_dlp.utils.DownloadError: @@ -254,63 +391,3 @@ class VideoDownloader: self.channels.add(channel_id) return - - def validate_playlists(self): - """look for playlist needing to update""" - print("sync playlists") - self._add_subscribed_channels() - all_indexed = PendingList().get_all_indexed() - all_youtube_ids = [i["youtube_id"] for i in all_indexed] - for id_c, channel_id in enumerate(self.channels): - playlists = YoutubeChannel(channel_id).get_indexed_playlists() - all_playlist_ids = [i["playlist_id"] for i in playlists] - for id_p, playlist_id in enumerate(all_playlist_ids): - playlist = YoutubePlaylist(playlist_id) - playlist.all_youtube_ids = all_youtube_ids - playlist.build_json(scrape=True) - if not playlist.json_data: - playlist.deactivate() - - playlist.add_vids_to_playlist() - playlist.upload_to_es() - - # notify - title = ( - "Processing playlists for channels: " - + f"{id_c + 1}/{len(self.channels)}" - ) - message = f"Progress: {id_p + 1}/{len(all_playlist_ids)}" - mess_dict = { - "status": "message:download", - "level": "info", - "title": title, - "message": message, - } - if id_p + 1 == len(all_playlist_ids): - RedisArchivist().set_message( - "message:download", mess_dict, expire=4 - ) - else: - RedisArchivist().set_message("message:download", mess_dict) - - @staticmethod - def auto_delete_watched(autodelete_days): - """delete watched videos after x days""" - now = int(datetime.now().strftime("%s")) - now_lte = now - autodelete_days * 24 * 60 * 60 - data = { - "query": {"range": {"player.watched_date": {"lte": now_lte}}}, - "sort": [{"player.watched_date": {"order": "asc"}}], - } - all_to_delete = IndexPaginate("ta_video", data).get_results() - all_youtube_ids = [i["youtube_id"] for i in all_to_delete] - if not all_youtube_ids: - return - - for youtube_id in all_youtube_ids: - print(f"autodelete {youtube_id}") - YoutubeVideo(youtube_id).delete_media_file() - - print("add deleted to ignore list") - pending_handler = PendingList() - pending_handler.add_to_pending(all_youtube_ids, ignore=True) diff --git a/tubearchivist/home/src/es/index_mapping.json b/tubearchivist/home/src/es/index_mapping.json index f30a82d..29f6b7e 100644 --- a/tubearchivist/home/src/es/index_mapping.json +++ b/tubearchivist/home/src/es/index_mapping.json @@ -39,6 +39,19 @@ "channel_last_refresh": { "type": "date", "format": "epoch_second" + }, + "channel_overwrites": { + "properties": { + "download_format": { + "type": "text" + }, + "autodelete_days": { + "type": "long" + }, + "index_playlists": { + "type": "boolean" + } + } } }, "expected_set": { @@ -102,6 +115,19 @@ "channel_last_refresh": { "type": "date", "format": "epoch_second" + }, + "channel_overwrites": { + "properties": { + "download_format": { + "type": "text" + }, + "autodelete_days": { + "type": "long" + }, + "index_playlists": { + "type": "boolean" + } + } } } }, diff --git a/tubearchivist/home/src/frontend/api_calls.py b/tubearchivist/home/src/frontend/api_calls.py index dff7636..f7f50e7 100644 --- a/tubearchivist/home/src/frontend/api_calls.py +++ b/tubearchivist/home/src/frontend/api_calls.py @@ -4,7 +4,7 @@ Functionality: - called via user input """ -from home.src.download.queue import PendingList +from home.src.download.queue import PendingInteract from home.src.download.subscriptions import ( ChannelSubscription, PlaylistSubscription, @@ -110,12 +110,11 @@ class PostData: def _ignore(self): """ignore from download queue""" - id_to_ignore = self.exec_val - print("ignore video " + id_to_ignore) - handler = PendingList() - handler.ignore_from_pending([id_to_ignore]) + video_id = self.exec_val + print(f"ignore video {video_id}") + PendingInteract(video_id=video_id, status="ignore").update_status() # also clear from redis queue - RedisQueue("dl_queue").clear_item(id_to_ignore) + RedisQueue("dl_queue").clear_item(video_id) return {"success": True} @staticmethod @@ -222,28 +221,25 @@ class PostData: def _forget_ignore(self): """delete from ta_download index""" - youtube_id = self.exec_val - print("forgetting from download index: " + youtube_id) - PendingList().delete_from_pending(youtube_id) + video_id = self.exec_val + print(f"forgetting from download index: {video_id}") + PendingInteract(video_id=video_id).delete_item() return {"success": True} def _add_single(self): """add single youtube_id to download queue""" - youtube_id = self.exec_val - print("add vid to dl queue: " + youtube_id) - PendingList().delete_from_pending(youtube_id) - youtube_ids = UrlListParser(youtube_id).process_list() - extrac_dl.delay(youtube_ids) + video_id = self.exec_val + print(f"add vid to dl queue: {video_id}") + PendingInteract(video_id=video_id).delete_item() + video_ids = UrlListParser(video_id).process_list() + extrac_dl.delay(video_ids) return {"success": True} def _delete_queue(self): """delete download queue""" status = self.exec_val print("deleting from download queue: " + status) - if status == "pending": - PendingList().delete_pending("pending") - elif status == "ignore": - PendingList().delete_pending("ignore") + PendingInteract(status=status).delete_by_status() return {"success": True} @staticmethod diff --git a/tubearchivist/home/src/frontend/forms.py b/tubearchivist/home/src/frontend/forms.py index 3e1b353..c4ac817 100644 --- a/tubearchivist/home/src/frontend/forms.py +++ b/tubearchivist/home/src/frontend/forms.py @@ -178,3 +178,19 @@ class SubscribeToPlaylistForm(forms.Form): } ), ) + + +class ChannelOverwriteForm(forms.Form): + """custom overwrites for channel settings""" + + PLAYLIST_INDEX = [ + ("", "-- change playlist index --"), + ("0", "Disable playlist index"), + ("1", "Enable playlist index"), + ] + + download_format = forms.CharField(label=False, required=False) + autodelete_days = forms.IntegerField(label=False, required=False) + index_playlists = forms.ChoiceField( + widget=forms.Select, choices=PLAYLIST_INDEX, required=False + ) diff --git a/tubearchivist/home/src/index/channel.py b/tubearchivist/home/src/index/channel.py index 57a1c4e..66a4fb3 100644 --- a/tubearchivist/home/src/index/channel.py +++ b/tubearchivist/home/src/index/channel.py @@ -12,11 +12,13 @@ from datetime import datetime import requests import yt_dlp from bs4 import BeautifulSoup +from home.src.download import queue # partial import from home.src.download.thumbnails import ThumbManager from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.index.generic import YouTubeItem from home.src.index.playlist import YoutubePlaylist from home.src.ta.helper import clean_string +from home.src.ta.ta_redis import RedisArchivist class ChannelScraper: @@ -153,6 +155,7 @@ class YoutubeChannel(YouTubeItem): def __init__(self, youtube_id): super().__init__(youtube_id) self.es_path = f"{self.index_name}/_doc/{youtube_id}" + self.all_playlists = False def build_json(self, upload=False): """get from es or from youtube""" @@ -241,6 +244,68 @@ class YoutubeChannel(YouTubeItem): self.delete_es_videos() self.del_in_es() + def index_channel_playlists(self): + """add all playlists of channel to index""" + print(f"{self.youtube_id}: index all playlists") + self.get_from_es() + channel_name = self.json_data["channel_name"] + mess_dict = { + "status": "message:playlistscan", + "level": "info", + "title": "Looking for playlists", + "message": f"{channel_name}: Scanning channel in progress", + } + RedisArchivist().set_message("message:playlistscan", mess_dict) + self.get_all_playlists() + if not self.all_playlists: + print(f"{self.youtube_id}: no playlists found.") + return + + all_youtube_ids = self.get_all_video_ids() + for idx, playlist in enumerate(self.all_playlists): + self._notify_single_playlist(idx, playlist) + self._index_single_playlist(playlist, all_youtube_ids) + + def _notify_single_playlist(self, idx, playlist): + """send notification""" + channel_name = self.json_data["channel_name"] + mess_dict = { + "status": "message:playlistscan", + "level": "info", + "title": f"{channel_name}: Scanning channel for playlists", + "message": f"Progress: {idx + 1}/{len(self.all_playlists)}", + } + RedisArchivist().set_message("message:playlistscan", mess_dict) + print("add playlist: " + playlist[1]) + + @staticmethod + def _index_single_playlist(playlist, all_youtube_ids): + """add single playlist if needed""" + playlist = YoutubePlaylist(playlist[0]) + playlist.all_youtube_ids = all_youtube_ids + playlist.build_json() + if not playlist.json_data: + return + + entries = playlist.json_data["playlist_entries"] + downloaded = [i for i in entries if i["downloaded"]] + if not downloaded: + return + + playlist.upload_to_es() + playlist.add_vids_to_playlist() + playlist.get_playlist_art() + + @staticmethod + def get_all_video_ids(): + """match all playlists with videos""" + handler = queue.PendingList() + handler.get_download() + handler.get_indexed() + all_youtube_ids = [i["youtube_id"] for i in handler.all_videos] + + return all_youtube_ids + def get_all_playlists(self): """get all playlists owned by this channel""" url = ( @@ -254,8 +319,7 @@ class YoutubeChannel(YouTubeItem): } playlists = yt_dlp.YoutubeDL(obs).extract_info(url) all_entries = [(i["id"], i["title"]) for i in playlists["entries"]] - - return all_entries + self.all_playlists = all_entries def get_indexed_playlists(self): """get all indexed playlists from channel""" @@ -267,3 +331,35 @@ class YoutubeChannel(YouTubeItem): } all_playlists = IndexPaginate("ta_playlist", data).get_results() return all_playlists + + def get_overwrites(self): + """get all per channel overwrites""" + return self.json_data.get("channel_overwrites", False) + + def set_overwrites(self, overwrites): + """set per channel overwrites""" + valid_keys = ["download_format", "autodelete_days", "index_playlists"] + + to_write = self.json_data.get("channel_overwrites", {}) + for key, value in overwrites.items(): + if key not in valid_keys: + raise ValueError(f"invalid overwrite key: {key}") + if value in [0, "0"]: + del to_write[key] + continue + if value == "1": + to_write[key] = True + continue + if value: + to_write.update({key: value}) + + self.json_data["channel_overwrites"] = to_write + + +def channel_overwrites(channel_id, overwrites): + """collection to overwrite settings per channel""" + channel = YoutubeChannel(channel_id) + channel.build_json() + channel.set_overwrites(overwrites) + channel.upload_to_es() + channel.sync_to_videos() diff --git a/tubearchivist/home/src/index/filesystem.py b/tubearchivist/home/src/index/filesystem.py index 0354e83..0dc8302 100644 --- a/tubearchivist/home/src/index/filesystem.py +++ b/tubearchivist/home/src/index/filesystem.py @@ -59,9 +59,11 @@ class FilesystemScanner: def get_all_indexed(): """get a list of all indexed videos""" index_handler = PendingList() - all_indexed_raw = index_handler.get_all_indexed() + index_handler.get_download() + index_handler.get_indexed() + all_indexed = [] - for video in all_indexed_raw: + for video in index_handler.all_videos: youtube_id = video["youtube_id"] media_url = video["media_url"] published = video["published"] diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index 6a0dd4f..cf1435f 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -11,7 +11,6 @@ from time import sleep import requests from home.src.download.queue import PendingList -from home.src.download.subscriptions import ChannelSubscription from home.src.download.thumbnails import ThumbManager from home.src.index.channel import YoutubeChannel from home.src.index.playlist import YoutubePlaylist @@ -170,24 +169,6 @@ class Reindex: if self.integrate_ryd: self.get_unrated_vids() - def rescrape_all_channels(self): - """sync new data from channel to all matching videos""" - sleep_interval = self.sleep_interval - channel_sub_handler = ChannelSubscription() - all_channels = channel_sub_handler.get_channels(subscribed_only=False) - all_channel_ids = [i["channel_id"] for i in all_channels] - - for channel_id in all_channel_ids: - channel = YoutubeChannel(channel_id) - subscribed = channel.json_data["channel_subscribed"] - channel.get_from_youtube() - channel.json_data["channel_subscribed"] = subscribed - channel.upload_to_es() - channel.sync_to_videos() - - if sleep_interval: - sleep(sleep_interval) - @staticmethod def reindex_single_video(youtube_id): """refresh data for single video""" @@ -228,8 +209,10 @@ class Reindex: channel = YoutubeChannel(channel_id) channel.get_from_es() subscribed = channel.json_data["channel_subscribed"] + overwrites = channel.json_data["channel_overwrites"] channel.get_from_youtube() channel.json_data["channel_subscribed"] = subscribed + channel.json_data["channel_overwrites"] = overwrites channel.upload_to_es() channel.sync_to_videos() @@ -266,8 +249,9 @@ class Reindex: # playlist print(f"reindexing {len(self.all_playlist_ids)} playlists") if self.all_playlist_ids: - all_indexed = PendingList().get_all_indexed() - all_indexed_ids = [i["youtube_id"] for i in all_indexed] + handler = PendingList() + handler.get_indexed() + all_indexed_ids = [i["youtube_id"] for i in handler.all_videos] for playlist_id in self.all_playlist_ids: self.reindex_single_playlist(playlist_id, all_indexed_ids) if self.sleep_interval: diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 63d84b2..b0f86fd 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -24,7 +24,6 @@ from home.src.index.filesystem import ( reindex_old_documents, scan_filesystem, ) -from home.src.index.playlist import YoutubePlaylist from home.src.ta.config import AppConfig, ScheduleBuilder from home.src.ta.helper import UrlListParser from home.src.ta.ta_redis import RedisArchivist, RedisQueue @@ -63,9 +62,11 @@ def update_subscribed(): missing_from_playlists = playlist_handler.find_missing() missing = missing_from_channels + missing_from_playlists if missing: - pending_handler = PendingList() - all_videos_added = pending_handler.add_to_pending(missing) - ThumbManager().download_vid(all_videos_added) + youtube_ids = [{"type": "video", "url": i} for i in missing] + pending_handler = PendingList(youtube_ids=youtube_ids) + pending_handler.parse_url_list() + pending_handler.add_to_pending() + else: print("Did not acquire rescan lock.") @@ -86,7 +87,6 @@ def download_pending(): downloader = VideoDownloader() downloader.add_pending() downloader.run_queue() - downloader.validate_playlists() else: print("Did not acquire download lock.") @@ -128,19 +128,9 @@ def download_single(youtube_id): @shared_task def extrac_dl(youtube_ids): """parse list passed and add to pending""" - pending_handler = PendingList() - missing_videos = pending_handler.parse_url_list(youtube_ids) - all_videos_added = pending_handler.add_to_pending(missing_videos) - missing_playlists = pending_handler.missing_from_playlists - - thumb_handler = ThumbManager() - if missing_playlists: - new_thumbs = PlaylistSubscription().process_url_str( - missing_playlists, subscribed=False - ) - thumb_handler.download_playlist(new_thumbs) - - thumb_handler.download_vid(all_videos_added) + pending_handler = PendingList(youtube_ids=youtube_ids) + pending_handler.parse_url_list() + pending_handler.add_to_pending() @shared_task(name="check_reindex") @@ -277,50 +267,7 @@ def index_channel_playlists(channel_id): "message": f'Scanning channel "{channel.youtube_id}" in progress', } RedisArchivist().set_message("message:playlistscan", mess_dict) - all_playlists = channel.get_all_playlists() - - if not all_playlists: - print(f"no playlists found for channel {channel_id}") - return - - all_indexed = PendingList().get_all_indexed() - all_youtube_ids = [i["youtube_id"] for i in all_indexed] - - for idx, (playlist_id, playlist_title) in enumerate(all_playlists): - # notify - mess_dict = { - "status": "message:playlistscan", - "level": "info", - "title": "Scanning channel for playlists", - "message": f"Progress: {idx + 1}/{len(all_playlists)}", - } - RedisArchivist().set_message("message:playlistscan", mess_dict) - print("add playlist: " + playlist_title) - - playlist = YoutubePlaylist(playlist_id) - playlist.all_youtube_ids = all_youtube_ids - playlist.build_json() - - if not playlist.json_data: - # skip if not available - continue - - # don't add if no videos downloaded - downloaded = [ - i - for i in playlist.json_data["playlist_entries"] - if i["downloaded"] - ] - if not downloaded: - continue - - playlist.upload_to_es() - playlist.add_vids_to_playlist() - - if all_playlists: - playlist.get_playlist_art() - - return + channel.index_channel_playlists() try: diff --git a/tubearchivist/home/templates/home/channel_id.html b/tubearchivist/home/templates/home/channel_id.html index 3a84037..b372237 100644 --- a/tubearchivist/home/templates/home/channel_id.html +++ b/tubearchivist/home/templates/home/channel_id.html @@ -6,6 +6,7 @@
+ +Description:
@@ -65,7 +94,6 @@