diff --git a/tubearchivist/api/views.py b/tubearchivist/api/views.py index 165ca60..c4bcb08 100644 --- a/tubearchivist/api/views.py +++ b/tubearchivist/api/views.py @@ -1,9 +1,9 @@ """all API views""" import requests -from home.src.config import AppConfig -from home.src.helper import UrlListParser -from home.src.thumbnails import ThumbManager +from home.src.ta.config import AppConfig +from home.src.ta.helper import UrlListParser +from home.src.download.thumbnails import ThumbManager from home.tasks import extrac_dl, subscribe_to from rest_framework.authentication import ( SessionAuthentication, diff --git a/tubearchivist/config/settings.py b/tubearchivist/config/settings.py index c1eb432..5447788 100644 --- a/tubearchivist/config/settings.py +++ b/tubearchivist/config/settings.py @@ -15,7 +15,7 @@ from os import environ, path from pathlib import Path from corsheaders.defaults import default_headers -from home.src.config import AppConfig +from home.src.ta.config import AppConfig # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent diff --git a/tubearchivist/home/apps.py b/tubearchivist/home/apps.py index 0bdf819..f58193f 100644 --- a/tubearchivist/home/apps.py +++ b/tubearchivist/home/apps.py @@ -3,9 +3,9 @@ import os from django.apps import AppConfig -from home.src.config import AppConfig as ArchivistConfig -from home.src.helper import RedisArchivist -from home.src.index_management import index_check +from home.src.es.index_setup import index_check +from home.src.ta.config import AppConfig as ArchivistConfig +from home.src.ta.ta_redis import RedisArchivist def sync_redis_state(): diff --git a/tubearchivist/home/src/download.py b/tubearchivist/home/src/download.py deleted file mode 100644 index 0dbd0be..0000000 --- a/tubearchivist/home/src/download.py +++ /dev/null @@ -1,754 +0,0 @@ -""" -Functionality: -- handele the download queue -- manage subscriptions to channels -- manage subscriptions to playlists -- downloading videos -""" - -import json -import os -import shutil -from datetime import datetime -from time import sleep - -import requests -import yt_dlp -from home.src.config import AppConfig -from home.src.es import IndexPaginate -from home.src.helper import ( - DurationConverter, - RedisArchivist, - RedisQueue, - clean_string, - ignore_filelist, -) -from home.src.index import ( - YoutubeChannel, - YoutubePlaylist, - YoutubeVideo, - index_new_video, -) - - -class PendingList: - """manage the pending videos list""" - - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - VIDEOS = CONFIG["application"]["videos"] - - def __init__(self): - self.all_channel_ids = False - self.all_downloaded = False - self.missing_from_playlists = [] - - def parse_url_list(self, youtube_ids): - """extract youtube ids from list""" - missing_videos = [] - for entry in youtube_ids: - # notify - mess_dict = { - "status": "message:add", - "level": "info", - "title": "Adding to download queue.", - "message": "Extracting lists", - } - RedisArchivist().set_message("message:add", mess_dict) - # extract - url = entry["url"] - url_type = entry["type"] - if url_type == "video": - missing_videos.append(url) - elif url_type == "channel": - video_results = ChannelSubscription().get_last_youtube_videos( - url, limit=False - ) - youtube_ids = [i[0] for i in video_results] - missing_videos = missing_videos + youtube_ids - elif url_type == "playlist": - self.missing_from_playlists.append(entry) - playlist = YoutubePlaylist(url) - playlist.build_json() - video_results = playlist.json_data.get("playlist_entries") - youtube_ids = [i["youtube_id"] for i in video_results] - missing_videos = missing_videos + youtube_ids - - return missing_videos - - def add_to_pending(self, missing_videos, ignore=False): - """build the bulk json data from pending""" - # check if channel is indexed - channel_handler = ChannelSubscription() - all_indexed = channel_handler.get_channels(subscribed_only=False) - self.all_channel_ids = [i["channel_id"] for i in all_indexed] - # check if already there - self.all_downloaded = self.get_all_downloaded() - - bulk_list, all_videos_added = self.build_bulk(missing_videos, ignore) - # add last newline - bulk_list.append("\n") - query_str = "\n".join(bulk_list) - headers = {"Content-type": "application/x-ndjson"} - url = self.ES_URL + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request) - raise ValueError("failed to add video to download queue") - - return all_videos_added - - def build_bulk(self, missing_videos, ignore=False): - """build the bulk lists""" - bulk_list = [] - all_videos_added = [] - - for idx, youtube_id in enumerate(missing_videos): - # check if already downloaded - if youtube_id in self.all_downloaded: - continue - - video = self.get_youtube_details(youtube_id) - # skip on download error - if not video: - continue - - channel_indexed = video["channel_id"] in self.all_channel_ids - video["channel_indexed"] = channel_indexed - if ignore: - video["status"] = "ignore" - else: - video["status"] = "pending" - action = {"create": {"_id": youtube_id, "_index": "ta_download"}} - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(video)) - all_videos_added.append((youtube_id, video["vid_thumb_url"])) - # notify - progress = f"{idx + 1}/{len(missing_videos)}" - mess_dict = { - "status": "message:add", - "level": "info", - "title": "Adding new videos to download queue.", - "message": "Progress: " + progress, - } - if idx + 1 == len(missing_videos): - RedisArchivist().set_message( - "message:add", mess_dict, expire=4 - ) - else: - RedisArchivist().set_message("message:add", mess_dict) - if idx + 1 % 25 == 0: - print("adding to queue progress: " + progress) - - return bulk_list, all_videos_added - - @staticmethod - def get_youtube_details(youtube_id): - """get details from youtubedl for single pending video""" - obs = { - "default_search": "ytsearch", - "quiet": True, - "check_formats": "selected", - "noplaylist": True, - "writethumbnail": True, - "simulate": True, - } - try: - vid = yt_dlp.YoutubeDL(obs).extract_info(youtube_id) - except yt_dlp.utils.DownloadError: - print("failed to extract info for: " + youtube_id) - return False - # stop if video is streaming live now - if vid["is_live"]: - return False - # parse response - seconds = vid["duration"] - duration_str = DurationConverter.get_str(seconds) - if duration_str == "NA": - print(f"skip extracting duration for: {youtube_id}") - upload_date = vid["upload_date"] - upload_dt = datetime.strptime(upload_date, "%Y%m%d") - published = upload_dt.strftime("%Y-%m-%d") - # build dict - youtube_details = { - "youtube_id": youtube_id, - "channel_name": vid["channel"], - "vid_thumb_url": vid["thumbnail"], - "title": vid["title"], - "channel_id": vid["channel_id"], - "duration": duration_str, - "published": published, - "timestamp": int(datetime.now().strftime("%s")), - } - return youtube_details - - @staticmethod - def get_all_pending(): - """get a list of all pending videos in ta_download""" - data = { - "query": {"match_all": {}}, - "sort": [{"timestamp": {"order": "asc"}}], - } - all_results = IndexPaginate("ta_download", data).get_results() - - all_pending = [] - all_ignore = [] - - for result in all_results: - if result["status"] == "pending": - all_pending.append(result) - elif result["status"] == "ignore": - all_ignore.append(result) - - return all_pending, all_ignore - - @staticmethod - def get_all_indexed(): - """get a list of all videos indexed""" - - data = { - "query": {"match_all": {}}, - "sort": [{"published": {"order": "desc"}}], - } - all_indexed = IndexPaginate("ta_video", data).get_results() - - return all_indexed - - def get_all_downloaded(self): - """get a list of all videos in archive""" - channel_folders = os.listdir(self.VIDEOS) - all_channel_folders = ignore_filelist(channel_folders) - all_downloaded = [] - for channel_folder in all_channel_folders: - channel_path = os.path.join(self.VIDEOS, channel_folder) - videos = os.listdir(channel_path) - all_videos = ignore_filelist(videos) - youtube_vids = [i[9:20] for i in all_videos] - for youtube_id in youtube_vids: - all_downloaded.append(youtube_id) - return all_downloaded - - def delete_from_pending(self, youtube_id): - """delete the youtube_id from ta_download""" - url = f"{self.ES_URL}/ta_download/_doc/{youtube_id}" - response = requests.delete(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) - - def delete_pending(self, status): - """delete download queue based on status value""" - data = {"query": {"term": {"status": {"value": status}}}} - payload = json.dumps(data) - url = self.ES_URL + "/ta_download/_delete_by_query" - headers = {"Content-type": "application/json"} - response = requests.post( - url, data=payload, headers=headers, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) - - def ignore_from_pending(self, ignore_list): - """build the bulk query string""" - - stamp = int(datetime.now().strftime("%s")) - bulk_list = [] - - for youtube_id in ignore_list: - action = {"update": {"_id": youtube_id, "_index": "ta_download"}} - source = {"doc": {"status": "ignore", "timestamp": stamp}} - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(source)) - - # add last newline - bulk_list.append("\n") - query_str = "\n".join(bulk_list) - - headers = {"Content-type": "application/x-ndjson"} - url = self.ES_URL + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request) - raise ValueError("failed to set video to ignore") - - -class ChannelSubscription: - """manage the list of channels subscribed""" - - def __init__(self): - config = AppConfig().config - self.es_url = config["application"]["es_url"] - self.es_auth = config["application"]["es_auth"] - self.channel_size = config["subscriptions"]["channel_size"] - - @staticmethod - def get_channels(subscribed_only=True): - """get a list of all channels subscribed to""" - data = { - "sort": [{"channel_name.keyword": {"order": "asc"}}], - } - if subscribed_only: - data["query"] = {"term": {"channel_subscribed": {"value": True}}} - else: - data["query"] = {"match_all": {}} - - all_channels = IndexPaginate("ta_channel", data).get_results() - - return all_channels - - def get_last_youtube_videos(self, channel_id, limit=True): - """get a list of last videos from channel""" - url = f"https://www.youtube.com/channel/{channel_id}/videos" - obs = { - "default_search": "ytsearch", - "quiet": True, - "skip_download": True, - "extract_flat": True, - } - if limit: - obs["playlistend"] = self.channel_size - chan = yt_dlp.YoutubeDL(obs).extract_info(url, download=False) - last_videos = [(i["id"], i["title"]) for i in chan["entries"]] - return last_videos - - def find_missing(self): - """add missing videos from subscribed channels to pending""" - all_channels = self.get_channels() - pending_handler = PendingList() - all_pending, all_ignore = pending_handler.get_all_pending() - all_ids = [i["youtube_id"] for i in all_ignore + all_pending] - all_downloaded = pending_handler.get_all_downloaded() - to_ignore = all_ids + all_downloaded - - missing_videos = [] - - for idx, channel in enumerate(all_channels): - channel_id = channel["channel_id"] - last_videos = self.get_last_youtube_videos(channel_id) - for video in last_videos: - if video[0] not in to_ignore: - missing_videos.append(video[0]) - # notify - message = { - "status": "message:rescan", - "level": "info", - "title": "Scanning channels: Looking for new videos.", - "message": f"Progress: {idx + 1}/{len(all_channels)}", - } - if idx + 1 == len(all_channels): - RedisArchivist().set_message( - "message:rescan", message=message, expire=4 - ) - else: - RedisArchivist().set_message("message:rescan", message=message) - - return missing_videos - - @staticmethod - def change_subscribe(channel_id, channel_subscribed): - """subscribe or unsubscribe from channel and update""" - channel = YoutubeChannel(channel_id) - channel.build_json() - channel.json_data["channel_subscribed"] = channel_subscribed - channel.upload_to_es() - channel.sync_to_videos() - - -class PlaylistSubscription: - """manage the playlist download functionality""" - - def __init__(self): - self.config = AppConfig().config - - @staticmethod - def get_playlists(subscribed_only=True): - """get a list of all active playlists""" - data = { - "sort": [{"playlist_channel.keyword": {"order": "desc"}}], - } - data["query"] = { - "bool": {"must": [{"term": {"playlist_active": {"value": True}}}]} - } - if subscribed_only: - data["query"]["bool"]["must"].append( - {"term": {"playlist_subscribed": {"value": True}}} - ) - - all_playlists = IndexPaginate("ta_playlist", data).get_results() - - return all_playlists - - def process_url_str(self, new_playlists, subscribed=True): - """process playlist subscribe form url_str""" - all_indexed = PendingList().get_all_indexed() - all_youtube_ids = [i["youtube_id"] for i in all_indexed] - - new_thumbs = [] - for idx, playlist in enumerate(new_playlists): - url_type = playlist["type"] - playlist_id = playlist["url"] - if not url_type == "playlist": - print(f"{playlist_id} not a playlist, skipping...") - continue - - playlist_h = YoutubePlaylist(playlist_id) - playlist_h.all_youtube_ids = all_youtube_ids - playlist_h.build_json() - playlist_h.json_data["playlist_subscribed"] = subscribed - playlist_h.upload_to_es() - playlist_h.add_vids_to_playlist() - self.channel_validate(playlist_h.json_data["playlist_channel_id"]) - thumb = playlist_h.json_data["playlist_thumbnail"] - new_thumbs.append((playlist_id, thumb)) - # notify - message = { - "status": "message:subplaylist", - "level": "info", - "title": "Subscribing to Playlists", - "message": f"Processing {idx + 1} of {len(new_playlists)}", - } - RedisArchivist().set_message( - "message:subplaylist", message=message - ) - - return new_thumbs - - @staticmethod - def channel_validate(channel_id): - """make sure channel of playlist is there""" - channel = YoutubeChannel(channel_id) - channel.build_json() - - @staticmethod - def change_subscribe(playlist_id, subscribe_status): - """change the subscribe status of a playlist""" - playlist = YoutubePlaylist(playlist_id) - playlist.build_json() - playlist.json_data["playlist_subscribed"] = subscribe_status - playlist.upload_to_es() - - @staticmethod - def get_to_ignore(): - """get all youtube_ids already downloaded or ignored""" - pending_handler = PendingList() - all_pending, all_ignore = pending_handler.get_all_pending() - all_ids = [i["youtube_id"] for i in all_ignore + all_pending] - all_downloaded = pending_handler.get_all_downloaded() - to_ignore = all_ids + all_downloaded - return to_ignore - - def find_missing(self): - """find videos in subscribed playlists not downloaded yet""" - all_playlists = [i["playlist_id"] for i in self.get_playlists()] - to_ignore = self.get_to_ignore() - - missing_videos = [] - for idx, playlist_id in enumerate(all_playlists): - size_limit = self.config["subscriptions"]["channel_size"] - playlist = YoutubePlaylist(playlist_id) - playlist.update_playlist() - if not playlist: - playlist.deactivate() - continue - - playlist_entries = playlist.json_data["playlist_entries"] - if size_limit: - del playlist_entries[size_limit:] - - all_missing = [i for i in playlist_entries if not i["downloaded"]] - - message = { - "status": "message:rescan", - "level": "info", - "title": "Scanning playlists: Looking for new videos.", - "message": f"Progress: {idx + 1}/{len(all_playlists)}", - } - RedisArchivist().set_message("message:rescan", message=message) - - for video in all_missing: - youtube_id = video["youtube_id"] - if youtube_id not in to_ignore: - missing_videos.append(youtube_id) - - return missing_videos - - -class VideoDownloader: - """ - handle the video download functionality - if not initiated with list, take from queue - """ - - def __init__(self, youtube_id_list=False): - self.youtube_id_list = youtube_id_list - self.config = AppConfig().config - self.channels = set() - - def run_queue(self): - """setup download queue in redis loop until no more items""" - queue = RedisQueue("dl_queue") - - limit_queue = self.config["downloads"]["limit_count"] - if limit_queue: - queue.trim(limit_queue - 1) - - while True: - youtube_id = queue.get_next() - if not youtube_id: - break - - try: - self.dl_single_vid(youtube_id) - except yt_dlp.utils.DownloadError: - print("failed to download " + youtube_id) - continue - vid_dict = index_new_video(youtube_id) - self.channels.add(vid_dict["channel"]["channel_id"]) - self.move_to_archive(vid_dict) - self.delete_from_pending(youtube_id) - - autodelete_days = self.config["downloads"]["autodelete_days"] - if autodelete_days: - print(f"auto delete older than {autodelete_days} days") - self.auto_delete_watched(autodelete_days) - - @staticmethod - def add_pending(): - """add pending videos to download queue""" - mess_dict = { - "status": "message:download", - "level": "info", - "title": "Looking for videos to download", - "message": "Scanning your download queue.", - } - RedisArchivist().set_message("message:download", mess_dict) - all_pending, _ = PendingList().get_all_pending() - to_add = [i["youtube_id"] for i in all_pending] - if not to_add: - # there is nothing pending - print("download queue is empty") - mess_dict = { - "status": "message:download", - "level": "error", - "title": "Download queue is empty", - "message": "Add some videos to the queue first.", - } - RedisArchivist().set_message("message:download", mess_dict) - return - - queue = RedisQueue("dl_queue") - queue.add_list(to_add) - - @staticmethod - def progress_hook(response): - """process the progress_hooks from yt_dlp""" - # title - path = os.path.split(response["filename"])[-1][12:] - filename = os.path.splitext(os.path.splitext(path)[0])[0] - filename_clean = filename.replace("_", " ") - title = "Downloading: " + filename_clean - # message - try: - percent = response["_percent_str"] - size = response["_total_bytes_str"] - speed = response["_speed_str"] - eta = response["_eta_str"] - message = f"{percent} of {size} at {speed} - time left: {eta}" - except KeyError: - message = "processing" - mess_dict = { - "status": "message:download", - "level": "info", - "title": title, - "message": message, - } - RedisArchivist().set_message("message:download", mess_dict) - - def build_obs(self): - """build obs dictionary for yt-dlp""" - obs = { - "default_search": "ytsearch", - "merge_output_format": "mp4", - "restrictfilenames": True, - "outtmpl": ( - self.config["application"]["cache_dir"] - + "/download/" - + self.config["application"]["file_template"] - ), - "progress_hooks": [self.progress_hook], - "noprogress": True, - "quiet": True, - "continuedl": True, - "retries": 3, - "writethumbnail": False, - "noplaylist": True, - "check_formats": "selected", - } - if self.config["downloads"]["format"]: - obs["format"] = self.config["downloads"]["format"] - if self.config["downloads"]["limit_speed"]: - obs["ratelimit"] = self.config["downloads"]["limit_speed"] * 1024 - - throttle = self.config["downloads"]["throttledratelimit"] - if throttle: - obs["throttledratelimit"] = throttle * 1024 - - postprocessors = [] - - if self.config["downloads"]["add_metadata"]: - postprocessors.append( - { - "key": "FFmpegMetadata", - "add_chapters": True, - "add_metadata": True, - } - ) - - if self.config["downloads"]["add_thumbnail"]: - postprocessors.append( - { - "key": "EmbedThumbnail", - "already_have_thumbnail": True, - } - ) - obs["writethumbnail"] = True - - obs["postprocessors"] = postprocessors - - return obs - - def dl_single_vid(self, youtube_id): - """download single video""" - dl_cache = self.config["application"]["cache_dir"] + "/download/" - obs = self.build_obs() - - # check if already in cache to continue from there - all_cached = ignore_filelist(os.listdir(dl_cache)) - for file_name in all_cached: - if youtube_id in file_name: - obs["outtmpl"] = os.path.join(dl_cache, file_name) - with yt_dlp.YoutubeDL(obs) as ydl: - try: - ydl.download([youtube_id]) - except yt_dlp.utils.DownloadError: - print("retry failed download: " + youtube_id) - sleep(10) - ydl.download([youtube_id]) - - if obs["writethumbnail"]: - # webp files don't get cleaned up automatically - all_cached = ignore_filelist(os.listdir(dl_cache)) - to_clean = [i for i in all_cached if not i.endswith(".mp4")] - for file_name in to_clean: - file_path = os.path.join(dl_cache, file_name) - os.remove(file_path) - - def move_to_archive(self, vid_dict): - """move downloaded video from cache to archive""" - videos = self.config["application"]["videos"] - host_uid = self.config["application"]["HOST_UID"] - host_gid = self.config["application"]["HOST_GID"] - channel_name = clean_string(vid_dict["channel"]["channel_name"]) - # make archive folder with correct permissions - new_folder = os.path.join(videos, channel_name) - if not os.path.exists(new_folder): - os.makedirs(new_folder) - if host_uid and host_gid: - os.chown(new_folder, host_uid, host_gid) - # find real filename - cache_dir = self.config["application"]["cache_dir"] - all_cached = ignore_filelist(os.listdir(cache_dir + "/download/")) - for file_str in all_cached: - if vid_dict["youtube_id"] in file_str: - old_file = file_str - old_file_path = os.path.join(cache_dir, "download", old_file) - new_file_path = os.path.join(videos, vid_dict["media_url"]) - # move media file and fix permission - shutil.move(old_file_path, new_file_path) - if host_uid and host_gid: - os.chown(new_file_path, host_uid, host_gid) - - def delete_from_pending(self, youtube_id): - """delete downloaded video from pending index if its there""" - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - url = f"{es_url}/ta_download/_doc/{youtube_id}" - response = requests.delete(url, auth=es_auth) - if not response.ok and not response.status_code == 404: - print(response.text) - - def add_subscribed_channels(self): - """add all channels subscribed to refresh""" - all_subscribed = PlaylistSubscription().get_playlists() - if not all_subscribed: - return - - channel_ids = [i["playlist_channel_id"] for i in all_subscribed] - for channel_id in channel_ids: - self.channels.add(channel_id) - - return - - def validate_playlists(self): - """look for playlist needing to update""" - print("sync playlists") - self.add_subscribed_channels() - all_indexed = PendingList().get_all_indexed() - all_youtube_ids = [i["youtube_id"] for i in all_indexed] - for id_c, channel_id in enumerate(self.channels): - playlists = YoutubeChannel(channel_id).get_indexed_playlists() - all_playlist_ids = [i["playlist_id"] for i in playlists] - for id_p, playlist_id in enumerate(all_playlist_ids): - playlist = YoutubePlaylist(playlist_id) - playlist.all_youtube_ids = all_youtube_ids - playlist.build_json(scrape=True) - if not playlist.json_data: - playlist.deactivate() - - playlist.add_vids_to_playlist() - playlist.upload_to_es() - - # notify - title = ( - "Processing playlists for channels: " - + f"{id_c + 1}/{len(self.channels)}" - ) - message = f"Progress: {id_p + 1}/{len(all_playlist_ids)}" - mess_dict = { - "status": "message:download", - "level": "info", - "title": title, - "message": message, - } - if id_p + 1 == len(all_playlist_ids): - RedisArchivist().set_message( - "message:download", mess_dict, expire=4 - ) - else: - RedisArchivist().set_message("message:download", mess_dict) - - @staticmethod - def auto_delete_watched(autodelete_days): - """delete watched videos after x days""" - now = int(datetime.now().strftime("%s")) - now_lte = now - autodelete_days * 24 * 60 * 60 - data = { - "query": {"range": {"player.watched_date": {"lte": now_lte}}}, - "sort": [{"player.watched_date": {"order": "asc"}}], - } - all_to_delete = IndexPaginate("ta_video", data).get_results() - all_youtube_ids = [i["youtube_id"] for i in all_to_delete] - if not all_youtube_ids: - return - - for youtube_id in all_youtube_ids: - print(f"autodelete {youtube_id}") - YoutubeVideo(youtube_id).delete_media_file() - - print("add deleted to ignore list") - pending_handler = PendingList() - pending_handler.add_to_pending(all_youtube_ids, ignore=True) diff --git a/tubearchivist/home/src/download/__init__.py b/tubearchivist/home/src/download/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tubearchivist/home/src/download/queue.py b/tubearchivist/home/src/download/queue.py new file mode 100644 index 0000000..c8748e6 --- /dev/null +++ b/tubearchivist/home/src/download/queue.py @@ -0,0 +1,259 @@ +"""handle download queue""" + +import json +import os +from datetime import datetime + +import requests +import yt_dlp +from home.src.download.subscriptions import ChannelSubscription +from home.src.es.connect import IndexPaginate +from home.src.index.playlist import YoutubePlaylist +from home.src.ta.config import AppConfig +from home.src.ta.helper import DurationConverter, ignore_filelist +from home.src.ta.ta_redis import RedisArchivist + + +class PendingList: + """manage the pending videos list""" + + CONFIG = AppConfig().config + ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] + VIDEOS = CONFIG["application"]["videos"] + + def __init__(self): + self.all_channel_ids = False + self.all_downloaded = False + self.missing_from_playlists = [] + + def parse_url_list(self, youtube_ids): + """extract youtube ids from list""" + missing_videos = [] + for entry in youtube_ids: + # notify + mess_dict = { + "status": "message:add", + "level": "info", + "title": "Adding to download queue.", + "message": "Extracting lists", + } + RedisArchivist().set_message("message:add", mess_dict) + # extract + url = entry["url"] + url_type = entry["type"] + if url_type == "video": + missing_videos.append(url) + elif url_type == "channel": + video_results = ChannelSubscription().get_last_youtube_videos( + url, limit=False + ) + youtube_ids = [i[0] for i in video_results] + missing_videos = missing_videos + youtube_ids + elif url_type == "playlist": + self.missing_from_playlists.append(entry) + playlist = YoutubePlaylist(url) + playlist.build_json() + video_results = playlist.json_data.get("playlist_entries") + youtube_ids = [i["youtube_id"] for i in video_results] + missing_videos = missing_videos + youtube_ids + + return missing_videos + + def add_to_pending(self, missing_videos, ignore=False): + """build the bulk json data from pending""" + # check if channel is indexed + channel_handler = ChannelSubscription() + all_indexed = channel_handler.get_channels(subscribed_only=False) + self.all_channel_ids = [i["channel_id"] for i in all_indexed] + # check if already there + self.all_downloaded = self.get_all_downloaded() + + bulk_list, all_videos_added = self.build_bulk(missing_videos, ignore) + # add last newline + bulk_list.append("\n") + query_str = "\n".join(bulk_list) + headers = {"Content-type": "application/x-ndjson"} + url = self.ES_URL + "/_bulk" + request = requests.post( + url, data=query_str, headers=headers, auth=self.ES_AUTH + ) + if not request.ok: + print(request) + raise ValueError("failed to add video to download queue") + + return all_videos_added + + def build_bulk(self, missing_videos, ignore=False): + """build the bulk lists""" + bulk_list = [] + all_videos_added = [] + + for idx, youtube_id in enumerate(missing_videos): + # check if already downloaded + if youtube_id in self.all_downloaded: + continue + + video = self.get_youtube_details(youtube_id) + # skip on download error + if not video: + continue + + channel_indexed = video["channel_id"] in self.all_channel_ids + video["channel_indexed"] = channel_indexed + if ignore: + video["status"] = "ignore" + else: + video["status"] = "pending" + action = {"create": {"_id": youtube_id, "_index": "ta_download"}} + bulk_list.append(json.dumps(action)) + bulk_list.append(json.dumps(video)) + all_videos_added.append((youtube_id, video["vid_thumb_url"])) + # notify + progress = f"{idx + 1}/{len(missing_videos)}" + mess_dict = { + "status": "message:add", + "level": "info", + "title": "Adding new videos to download queue.", + "message": "Progress: " + progress, + } + if idx + 1 == len(missing_videos): + RedisArchivist().set_message( + "message:add", mess_dict, expire=4 + ) + else: + RedisArchivist().set_message("message:add", mess_dict) + if idx + 1 % 25 == 0: + print("adding to queue progress: " + progress) + + return bulk_list, all_videos_added + + @staticmethod + def get_youtube_details(youtube_id): + """get details from youtubedl for single pending video""" + obs = { + "default_search": "ytsearch", + "quiet": True, + "check_formats": "selected", + "noplaylist": True, + "writethumbnail": True, + "simulate": True, + } + try: + vid = yt_dlp.YoutubeDL(obs).extract_info(youtube_id) + except yt_dlp.utils.DownloadError: + print("failed to extract info for: " + youtube_id) + return False + # stop if video is streaming live now + if vid["is_live"]: + return False + # parse response + seconds = vid["duration"] + duration_str = DurationConverter.get_str(seconds) + if duration_str == "NA": + print(f"skip extracting duration for: {youtube_id}") + upload_date = vid["upload_date"] + upload_dt = datetime.strptime(upload_date, "%Y%m%d") + published = upload_dt.strftime("%Y-%m-%d") + # build dict + youtube_details = { + "youtube_id": youtube_id, + "channel_name": vid["channel"], + "vid_thumb_url": vid["thumbnail"], + "title": vid["title"], + "channel_id": vid["channel_id"], + "duration": duration_str, + "published": published, + "timestamp": int(datetime.now().strftime("%s")), + } + return youtube_details + + @staticmethod + def get_all_pending(): + """get a list of all pending videos in ta_download""" + data = { + "query": {"match_all": {}}, + "sort": [{"timestamp": {"order": "asc"}}], + } + all_results = IndexPaginate("ta_download", data).get_results() + + all_pending = [] + all_ignore = [] + + for result in all_results: + if result["status"] == "pending": + all_pending.append(result) + elif result["status"] == "ignore": + all_ignore.append(result) + + return all_pending, all_ignore + + @staticmethod + def get_all_indexed(): + """get a list of all videos indexed""" + + data = { + "query": {"match_all": {}}, + "sort": [{"published": {"order": "desc"}}], + } + all_indexed = IndexPaginate("ta_video", data).get_results() + + return all_indexed + + def get_all_downloaded(self): + """get a list of all videos in archive""" + channel_folders = os.listdir(self.VIDEOS) + all_channel_folders = ignore_filelist(channel_folders) + all_downloaded = [] + for channel_folder in all_channel_folders: + channel_path = os.path.join(self.VIDEOS, channel_folder) + videos = os.listdir(channel_path) + all_videos = ignore_filelist(videos) + youtube_vids = [i[9:20] for i in all_videos] + for youtube_id in youtube_vids: + all_downloaded.append(youtube_id) + return all_downloaded + + def delete_from_pending(self, youtube_id): + """delete the youtube_id from ta_download""" + url = f"{self.ES_URL}/ta_download/_doc/{youtube_id}" + response = requests.delete(url, auth=self.ES_AUTH) + if not response.ok: + print(response.text) + + def delete_pending(self, status): + """delete download queue based on status value""" + data = {"query": {"term": {"status": {"value": status}}}} + payload = json.dumps(data) + url = self.ES_URL + "/ta_download/_delete_by_query" + headers = {"Content-type": "application/json"} + response = requests.post( + url, data=payload, headers=headers, auth=self.ES_AUTH + ) + if not response.ok: + print(response.text) + + def ignore_from_pending(self, ignore_list): + """build the bulk query string""" + + stamp = int(datetime.now().strftime("%s")) + bulk_list = [] + + for youtube_id in ignore_list: + action = {"update": {"_id": youtube_id, "_index": "ta_download"}} + source = {"doc": {"status": "ignore", "timestamp": stamp}} + bulk_list.append(json.dumps(action)) + bulk_list.append(json.dumps(source)) + + # add last newline + bulk_list.append("\n") + query_str = "\n".join(bulk_list) + + headers = {"Content-type": "application/x-ndjson"} + url = self.ES_URL + "/_bulk" + request = requests.post( + url, data=query_str, headers=headers, auth=self.ES_AUTH + ) + if not request.ok: + print(request) + raise ValueError("failed to set video to ignore") diff --git a/tubearchivist/home/src/download/subscriptions.py b/tubearchivist/home/src/download/subscriptions.py new file mode 100644 index 0000000..d610137 --- /dev/null +++ b/tubearchivist/home/src/download/subscriptions.py @@ -0,0 +1,210 @@ +"""handle subscriptions""" + +import yt_dlp +from home.src.download import queue # partial import +from home.src.es.connect import IndexPaginate +from home.src.index.channel import YoutubeChannel +from home.src.index.playlist import YoutubePlaylist +from home.src.ta.config import AppConfig +from home.src.ta.ta_redis import RedisArchivist + + +class ChannelSubscription: + """manage the list of channels subscribed""" + + def __init__(self): + config = AppConfig().config + self.es_url = config["application"]["es_url"] + self.es_auth = config["application"]["es_auth"] + self.channel_size = config["subscriptions"]["channel_size"] + + @staticmethod + def get_channels(subscribed_only=True): + """get a list of all channels subscribed to""" + data = { + "sort": [{"channel_name.keyword": {"order": "asc"}}], + } + if subscribed_only: + data["query"] = {"term": {"channel_subscribed": {"value": True}}} + else: + data["query"] = {"match_all": {}} + + all_channels = IndexPaginate("ta_channel", data).get_results() + + return all_channels + + def get_last_youtube_videos(self, channel_id, limit=True): + """get a list of last videos from channel""" + url = f"https://www.youtube.com/channel/{channel_id}/videos" + obs = { + "default_search": "ytsearch", + "quiet": True, + "skip_download": True, + "extract_flat": True, + } + if limit: + obs["playlistend"] = self.channel_size + chan = yt_dlp.YoutubeDL(obs).extract_info(url, download=False) + last_videos = [(i["id"], i["title"]) for i in chan["entries"]] + return last_videos + + def find_missing(self): + """add missing videos from subscribed channels to pending""" + all_channels = self.get_channels() + pending_handler = queue.PendingList() + all_pending, all_ignore = pending_handler.get_all_pending() + all_ids = [i["youtube_id"] for i in all_ignore + all_pending] + all_downloaded = pending_handler.get_all_downloaded() + to_ignore = all_ids + all_downloaded + + missing_videos = [] + + for idx, channel in enumerate(all_channels): + channel_id = channel["channel_id"] + last_videos = self.get_last_youtube_videos(channel_id) + for video in last_videos: + if video[0] not in to_ignore: + missing_videos.append(video[0]) + # notify + message = { + "status": "message:rescan", + "level": "info", + "title": "Scanning channels: Looking for new videos.", + "message": f"Progress: {idx + 1}/{len(all_channels)}", + } + if idx + 1 == len(all_channels): + RedisArchivist().set_message( + "message:rescan", message=message, expire=4 + ) + else: + RedisArchivist().set_message("message:rescan", message=message) + + return missing_videos + + @staticmethod + def change_subscribe(channel_id, channel_subscribed): + """subscribe or unsubscribe from channel and update""" + channel = YoutubeChannel(channel_id) + channel.build_json() + channel.json_data["channel_subscribed"] = channel_subscribed + channel.upload_to_es() + channel.sync_to_videos() + + +class PlaylistSubscription: + """manage the playlist download functionality""" + + def __init__(self): + self.config = AppConfig().config + + @staticmethod + def get_playlists(subscribed_only=True): + """get a list of all active playlists""" + data = { + "sort": [{"playlist_channel.keyword": {"order": "desc"}}], + } + data["query"] = { + "bool": {"must": [{"term": {"playlist_active": {"value": True}}}]} + } + if subscribed_only: + data["query"]["bool"]["must"].append( + {"term": {"playlist_subscribed": {"value": True}}} + ) + + all_playlists = IndexPaginate("ta_playlist", data).get_results() + + return all_playlists + + def process_url_str(self, new_playlists, subscribed=True): + """process playlist subscribe form url_str""" + all_indexed = queue.PendingList().get_all_indexed() + all_youtube_ids = [i["youtube_id"] for i in all_indexed] + + new_thumbs = [] + for idx, playlist in enumerate(new_playlists): + url_type = playlist["type"] + playlist_id = playlist["url"] + if not url_type == "playlist": + print(f"{playlist_id} not a playlist, skipping...") + continue + + playlist_h = YoutubePlaylist(playlist_id) + playlist_h.all_youtube_ids = all_youtube_ids + playlist_h.build_json() + playlist_h.json_data["playlist_subscribed"] = subscribed + playlist_h.upload_to_es() + playlist_h.add_vids_to_playlist() + self.channel_validate(playlist_h.json_data["playlist_channel_id"]) + thumb = playlist_h.json_data["playlist_thumbnail"] + new_thumbs.append((playlist_id, thumb)) + # notify + message = { + "status": "message:subplaylist", + "level": "info", + "title": "Subscribing to Playlists", + "message": f"Processing {idx + 1} of {len(new_playlists)}", + } + RedisArchivist().set_message( + "message:subplaylist", message=message + ) + + return new_thumbs + + @staticmethod + def channel_validate(channel_id): + """make sure channel of playlist is there""" + channel = YoutubeChannel(channel_id) + channel.build_json() + + @staticmethod + def change_subscribe(playlist_id, subscribe_status): + """change the subscribe status of a playlist""" + playlist = YoutubePlaylist(playlist_id) + playlist.build_json() + playlist.json_data["playlist_subscribed"] = subscribe_status + playlist.upload_to_es() + + @staticmethod + def get_to_ignore(): + """get all youtube_ids already downloaded or ignored""" + pending_handler = queue.PendingList() + all_pending, all_ignore = pending_handler.get_all_pending() + all_ids = [i["youtube_id"] for i in all_ignore + all_pending] + all_downloaded = pending_handler.get_all_downloaded() + to_ignore = all_ids + all_downloaded + return to_ignore + + def find_missing(self): + """find videos in subscribed playlists not downloaded yet""" + all_playlists = [i["playlist_id"] for i in self.get_playlists()] + to_ignore = self.get_to_ignore() + + missing_videos = [] + for idx, playlist_id in enumerate(all_playlists): + size_limit = self.config["subscriptions"]["channel_size"] + playlist = YoutubePlaylist(playlist_id) + playlist.update_playlist() + if not playlist: + playlist.deactivate() + continue + + playlist_entries = playlist.json_data["playlist_entries"] + if size_limit: + del playlist_entries[size_limit:] + + all_missing = [i for i in playlist_entries if not i["downloaded"]] + + message = { + "status": "message:rescan", + "level": "info", + "title": "Scanning playlists: Looking for new videos.", + "message": f"Progress: {idx + 1}/{len(all_playlists)}", + } + RedisArchivist().set_message("message:rescan", message=message) + + for video in all_missing: + youtube_id = video["youtube_id"] + if youtube_id not in to_ignore: + missing_videos.append(youtube_id) + + return missing_videos diff --git a/tubearchivist/home/src/thumbnails.py b/tubearchivist/home/src/download/thumbnails.py similarity index 94% rename from tubearchivist/home/src/thumbnails.py rename to tubearchivist/home/src/download/thumbnails.py index d9f6549..305bbd8 100644 --- a/tubearchivist/home/src/thumbnails.py +++ b/tubearchivist/home/src/download/thumbnails.py @@ -7,10 +7,12 @@ import os from collections import Counter from time import sleep -import home.src.download as download import requests -from home.src.config import AppConfig -from home.src.helper import RedisArchivist, ignore_filelist +from home.src.download import queue # partial import +from home.src.download import subscriptions # partial import +from home.src.ta.config import AppConfig +from home.src.ta.helper import ignore_filelist +from home.src.ta.ta_redis import RedisArchivist from mutagen.mp4 import MP4, MP4Cover from PIL import Image @@ -55,8 +57,8 @@ class ThumbManager: def get_needed_thumbs(self, missing_only=False): """get a list of all missing thumbnails""" all_thumbs = self.get_all_thumbs() - all_indexed = download.PendingList().get_all_indexed() - all_in_queue, all_ignored = download.PendingList().get_all_pending() + all_indexed = queue.PendingList().get_all_indexed() + all_in_queue, all_ignored = queue.PendingList().get_all_pending() needed_thumbs = [] for video in all_indexed: @@ -84,9 +86,8 @@ class ThumbManager: all_channel_art = os.listdir(self.CHANNEL_DIR) files = [i[0:24] for i in all_channel_art] cached_channel_ids = [k for (k, v) in Counter(files).items() if v > 1] - channels = download.ChannelSubscription().get_channels( - subscribed_only=False - ) + channel_sub = subscriptions.ChannelSubscription() + channels = channel_sub.get_channels(subscribed_only=False) missing_channels = [] for channel in channels: @@ -104,10 +105,8 @@ class ThumbManager: """get all missing playlist artwork""" all_downloaded = ignore_filelist(os.listdir(self.PLAYLIST_DIR)) all_ids_downloaded = [i.replace(".jpg", "") for i in all_downloaded] - - playlists = download.PlaylistSubscription().get_playlists( - subscribed_only=False - ) + playlist_sub = subscriptions.PlaylistSubscription() + playlists = playlist_sub.get_playlists(subscribed_only=False) missing_playlists = [] for playlist in playlists: @@ -276,7 +275,7 @@ class ThumbManager: def get_thumb_list(self): """get list of mediafiles and matching thumbnails""" - all_indexed = download.PendingList().get_all_indexed() + all_indexed = queue.PendingList().get_all_indexed() video_list = [] for video in all_indexed: youtube_id = video["youtube_id"] diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py new file mode 100644 index 0000000..80c8b86 --- /dev/null +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -0,0 +1,295 @@ +"""handle yt_dlp downloads""" + +import os +import shutil +from datetime import datetime +from time import sleep + +import requests +import yt_dlp +from home.src.download.queue import PendingList +from home.src.download.subscriptions import PlaylistSubscription +from home.src.es.connect import IndexPaginate +from home.src.index.channel import YoutubeChannel +from home.src.index.playlist import YoutubePlaylist +from home.src.index.video import YoutubeVideo, index_new_video +from home.src.ta.config import AppConfig +from home.src.ta.helper import clean_string, ignore_filelist +from home.src.ta.ta_redis import RedisArchivist, RedisQueue + + +class VideoDownloader: + """ + handle the video download functionality + if not initiated with list, take from queue + """ + + def __init__(self, youtube_id_list=False): + self.youtube_id_list = youtube_id_list + self.config = AppConfig().config + self.channels = set() + + def run_queue(self): + """setup download queue in redis loop until no more items""" + queue = RedisQueue("dl_queue") + + limit_queue = self.config["downloads"]["limit_count"] + if limit_queue: + queue.trim(limit_queue - 1) + + while True: + youtube_id = queue.get_next() + if not youtube_id: + break + + try: + self.dl_single_vid(youtube_id) + except yt_dlp.utils.DownloadError: + print("failed to download " + youtube_id) + continue + vid_dict = index_new_video(youtube_id) + self.channels.add(vid_dict["channel"]["channel_id"]) + self.move_to_archive(vid_dict) + self.delete_from_pending(youtube_id) + + autodelete_days = self.config["downloads"]["autodelete_days"] + if autodelete_days: + print(f"auto delete older than {autodelete_days} days") + self.auto_delete_watched(autodelete_days) + + @staticmethod + def add_pending(): + """add pending videos to download queue""" + mess_dict = { + "status": "message:download", + "level": "info", + "title": "Looking for videos to download", + "message": "Scanning your download queue.", + } + RedisArchivist().set_message("message:download", mess_dict) + all_pending, _ = PendingList().get_all_pending() + to_add = [i["youtube_id"] for i in all_pending] + if not to_add: + # there is nothing pending + print("download queue is empty") + mess_dict = { + "status": "message:download", + "level": "error", + "title": "Download queue is empty", + "message": "Add some videos to the queue first.", + } + RedisArchivist().set_message("message:download", mess_dict) + return + + queue = RedisQueue("dl_queue") + queue.add_list(to_add) + + @staticmethod + def progress_hook(response): + """process the progress_hooks from yt_dlp""" + # title + path = os.path.split(response["filename"])[-1][12:] + filename = os.path.splitext(os.path.splitext(path)[0])[0] + filename_clean = filename.replace("_", " ") + title = "Downloading: " + filename_clean + # message + try: + percent = response["_percent_str"] + size = response["_total_bytes_str"] + speed = response["_speed_str"] + eta = response["_eta_str"] + message = f"{percent} of {size} at {speed} - time left: {eta}" + except KeyError: + message = "processing" + mess_dict = { + "status": "message:download", + "level": "info", + "title": title, + "message": message, + } + RedisArchivist().set_message("message:download", mess_dict) + + def build_obs(self): + """build obs dictionary for yt-dlp""" + obs = { + "default_search": "ytsearch", + "merge_output_format": "mp4", + "restrictfilenames": True, + "outtmpl": ( + self.config["application"]["cache_dir"] + + "/download/" + + self.config["application"]["file_template"] + ), + "progress_hooks": [self.progress_hook], + "noprogress": True, + "quiet": True, + "continuedl": True, + "retries": 3, + "writethumbnail": False, + "noplaylist": True, + "check_formats": "selected", + } + if self.config["downloads"]["format"]: + obs["format"] = self.config["downloads"]["format"] + if self.config["downloads"]["limit_speed"]: + obs["ratelimit"] = self.config["downloads"]["limit_speed"] * 1024 + + throttle = self.config["downloads"]["throttledratelimit"] + if throttle: + obs["throttledratelimit"] = throttle * 1024 + + postprocessors = [] + + if self.config["downloads"]["add_metadata"]: + postprocessors.append( + { + "key": "FFmpegMetadata", + "add_chapters": True, + "add_metadata": True, + } + ) + + if self.config["downloads"]["add_thumbnail"]: + postprocessors.append( + { + "key": "EmbedThumbnail", + "already_have_thumbnail": True, + } + ) + obs["writethumbnail"] = True + + obs["postprocessors"] = postprocessors + + return obs + + def dl_single_vid(self, youtube_id): + """download single video""" + dl_cache = self.config["application"]["cache_dir"] + "/download/" + obs = self.build_obs() + + # check if already in cache to continue from there + all_cached = ignore_filelist(os.listdir(dl_cache)) + for file_name in all_cached: + if youtube_id in file_name: + obs["outtmpl"] = os.path.join(dl_cache, file_name) + with yt_dlp.YoutubeDL(obs) as ydl: + try: + ydl.download([youtube_id]) + except yt_dlp.utils.DownloadError: + print("retry failed download: " + youtube_id) + sleep(10) + ydl.download([youtube_id]) + + if obs["writethumbnail"]: + # webp files don't get cleaned up automatically + all_cached = ignore_filelist(os.listdir(dl_cache)) + to_clean = [i for i in all_cached if not i.endswith(".mp4")] + for file_name in to_clean: + file_path = os.path.join(dl_cache, file_name) + os.remove(file_path) + + def move_to_archive(self, vid_dict): + """move downloaded video from cache to archive""" + videos = self.config["application"]["videos"] + host_uid = self.config["application"]["HOST_UID"] + host_gid = self.config["application"]["HOST_GID"] + channel_name = clean_string(vid_dict["channel"]["channel_name"]) + # make archive folder with correct permissions + new_folder = os.path.join(videos, channel_name) + if not os.path.exists(new_folder): + os.makedirs(new_folder) + if host_uid and host_gid: + os.chown(new_folder, host_uid, host_gid) + # find real filename + cache_dir = self.config["application"]["cache_dir"] + all_cached = ignore_filelist(os.listdir(cache_dir + "/download/")) + for file_str in all_cached: + if vid_dict["youtube_id"] in file_str: + old_file = file_str + old_file_path = os.path.join(cache_dir, "download", old_file) + new_file_path = os.path.join(videos, vid_dict["media_url"]) + # move media file and fix permission + shutil.move(old_file_path, new_file_path) + if host_uid and host_gid: + os.chown(new_file_path, host_uid, host_gid) + + def delete_from_pending(self, youtube_id): + """delete downloaded video from pending index if its there""" + es_url = self.config["application"]["es_url"] + es_auth = self.config["application"]["es_auth"] + url = f"{es_url}/ta_download/_doc/{youtube_id}" + response = requests.delete(url, auth=es_auth) + if not response.ok and not response.status_code == 404: + print(response.text) + + def add_subscribed_channels(self): + """add all channels subscribed to refresh""" + all_subscribed = PlaylistSubscription().get_playlists() + if not all_subscribed: + return + + channel_ids = [i["playlist_channel_id"] for i in all_subscribed] + for channel_id in channel_ids: + self.channels.add(channel_id) + + return + + def validate_playlists(self): + """look for playlist needing to update""" + print("sync playlists") + self.add_subscribed_channels() + all_indexed = PendingList().get_all_indexed() + all_youtube_ids = [i["youtube_id"] for i in all_indexed] + for id_c, channel_id in enumerate(self.channels): + playlists = YoutubeChannel(channel_id).get_indexed_playlists() + all_playlist_ids = [i["playlist_id"] for i in playlists] + for id_p, playlist_id in enumerate(all_playlist_ids): + playlist = YoutubePlaylist(playlist_id) + playlist.all_youtube_ids = all_youtube_ids + playlist.build_json(scrape=True) + if not playlist.json_data: + playlist.deactivate() + + playlist.add_vids_to_playlist() + playlist.upload_to_es() + + # notify + title = ( + "Processing playlists for channels: " + + f"{id_c + 1}/{len(self.channels)}" + ) + message = f"Progress: {id_p + 1}/{len(all_playlist_ids)}" + mess_dict = { + "status": "message:download", + "level": "info", + "title": title, + "message": message, + } + if id_p + 1 == len(all_playlist_ids): + RedisArchivist().set_message( + "message:download", mess_dict, expire=4 + ) + else: + RedisArchivist().set_message("message:download", mess_dict) + + @staticmethod + def auto_delete_watched(autodelete_days): + """delete watched videos after x days""" + now = int(datetime.now().strftime("%s")) + now_lte = now - autodelete_days * 24 * 60 * 60 + data = { + "query": {"range": {"player.watched_date": {"lte": now_lte}}}, + "sort": [{"player.watched_date": {"order": "asc"}}], + } + all_to_delete = IndexPaginate("ta_video", data).get_results() + all_youtube_ids = [i["youtube_id"] for i in all_to_delete] + if not all_youtube_ids: + return + + for youtube_id in all_youtube_ids: + print(f"autodelete {youtube_id}") + YoutubeVideo(youtube_id).delete_media_file() + + print("add deleted to ignore list") + pending_handler = PendingList() + pending_handler.add_to_pending(all_youtube_ids, ignore=True) diff --git a/tubearchivist/home/src/es/__init__.py b/tubearchivist/home/src/es/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tubearchivist/home/src/es.py b/tubearchivist/home/src/es/connect.py similarity index 99% rename from tubearchivist/home/src/es.py rename to tubearchivist/home/src/es/connect.py index ce72863..7cf7d8c 100644 --- a/tubearchivist/home/src/es.py +++ b/tubearchivist/home/src/es/connect.py @@ -3,7 +3,7 @@ import json import requests -from home.src.config import AppConfig +from home.src.ta.config import AppConfig class ElasticWrap: diff --git a/tubearchivist/home/src/es/index_mapping.json b/tubearchivist/home/src/es/index_mapping.json new file mode 100644 index 0000000..db413fb --- /dev/null +++ b/tubearchivist/home/src/es/index_mapping.json @@ -0,0 +1,274 @@ +{ + "index_config": [{ + "index_name": "channel", + "expected_map": { + "channel_id": { + "type": "keyword" + }, + "channel_name": { + "type": "text", + "analyzer": "english", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256, + "normalizer": "to_lower" + }, + "search_as_you_type": { + "type": "search_as_you_type", + "doc_values": false, + "max_shingle_size": 3 + } + } + }, + "channel_banner_url": { + "type": "keyword", + "index": false + }, + "channel_thumb_url": { + "type": "keyword", + "index": false + }, + "channel_description": { + "type": "text" + }, + "channel_last_refresh": { + "type": "date", + "format": "epoch_second" + } + }, + "expected_set": { + "analysis": { + "normalizer": { + "to_lower": { + "type": "custom", + "filter": ["lowercase"] + } + } + }, + "number_of_replicas": "0" + } + }, + { + "index_name": "video", + "expected_map": { + "vid_thumb_url": { + "type": "text", + "index": false + }, + "date_downloaded": { + "type": "date" + }, + "channel": { + "properties": { + "channel_id": { + "type": "keyword" + }, + "channel_name": { + "type": "text", + "analyzer": "english", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256, + "normalizer": "to_lower" + }, + "search_as_you_type": { + "type": "search_as_you_type", + "doc_values": false, + "max_shingle_size": 3 + } + } + }, + "channel_banner_url": { + "type": "keyword", + "index": false + }, + "channel_thumb_url": { + "type": "keyword", + "index": false + }, + "channel_description": { + "type": "text" + }, + "channel_last_refresh": { + "type": "date", + "format": "epoch_second" + } + } + }, + "description": { + "type": "text" + }, + "media_url": { + "type": "keyword", + "index": false + }, + "tags": { + "type": "text", + "analyzer": "english", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "title": { + "type": "text", + "analyzer": "english", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256, + "normalizer": "to_lower" + }, + "search_as_you_type": { + "type": "search_as_you_type", + "doc_values": false, + "max_shingle_size": 3 + } + } + }, + "vid_last_refresh": { + "type": "date" + }, + "youtube_id": { + "type": "keyword" + }, + "published": { + "type": "date" + }, + "playlist": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256, + "normalizer": "to_lower" + } + } + } + }, + "expected_set": { + "analysis": { + "normalizer": { + "to_lower": { + "type": "custom", + "filter": ["lowercase"] + } + } + }, + "number_of_replicas": "0" + } + }, + { + "index_name": "download", + "expected_map": { + "timestamp": { + "type": "date" + }, + "channel_id": { + "type": "keyword" + }, + "channel_name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256, + "normalizer": "to_lower" + } + } + }, + "status": { + "type": "keyword" + }, + "title": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256, + "normalizer": "to_lower" + } + } + }, + "vid_thumb_url": { + "type": "keyword" + }, + "youtube_id": { + "type": "keyword" + } + }, + "expected_set": { + "analysis": { + "normalizer": { + "to_lower": { + "type": "custom", + "filter": ["lowercase"] + } + } + }, + "number_of_replicas": "0" + } + }, + { + "index_name": "playlist", + "expected_map": { + "playlist_id": { + "type": "keyword" + }, + "playlist_description": { + "type": "text" + }, + "playlist_name": { + "type": "text", + "analyzer": "english", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256, + "normalizer": "to_lower" + }, + "search_as_you_type": { + "type": "search_as_you_type", + "doc_values": false, + "max_shingle_size": 3 + } + } + }, + "playlist_channel": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256, + "normalizer": "to_lower" + } + } + }, + "playlist_channel_id": { + "type": "keyword" + }, + "playlist_thumbnail": { + "type": "keyword" + }, + "playlist_last_refresh": { + "type": "date" + } + }, + "expected_set": { + "analysis": { + "normalizer": { + "to_lower": { + "type": "custom", + "filter": ["lowercase"] + } + } + }, + "number_of_replicas": "0" + } + } + ] +} \ No newline at end of file diff --git a/tubearchivist/home/src/index_management.py b/tubearchivist/home/src/es/index_setup.py similarity index 66% rename from tubearchivist/home/src/index_management.py rename to tubearchivist/home/src/es/index_setup.py index 63f0a10..6b12a30 100644 --- a/tubearchivist/home/src/index_management.py +++ b/tubearchivist/home/src/es/index_setup.py @@ -1,10 +1,4 @@ -""" -Functionality: -- initial elastic search setup -- index configuration is represented in INDEX_CONFIG -- index mapping and settings validation -- backup and restore -""" +"""setup and verify needed elastic indexes""" import json import os @@ -12,213 +6,8 @@ import zipfile from datetime import datetime import requests -from home.src.config import AppConfig -from home.src.helper import ignore_filelist - -# expected mapping and settings -INDEX_CONFIG = [ - { - "index_name": "channel", - "expected_map": { - "channel_id": { - "type": "keyword", - }, - "channel_name": { - "type": "text", - "analyzer": "english", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256, - "normalizer": "to_lower", - }, - "search_as_you_type": { - "type": "search_as_you_type", - "doc_values": False, - "max_shingle_size": 3, - }, - }, - }, - "channel_banner_url": {"type": "keyword", "index": False}, - "channel_thumb_url": {"type": "keyword", "index": False}, - "channel_description": {"type": "text"}, - "channel_last_refresh": {"type": "date", "format": "epoch_second"}, - }, - "expected_set": { - "analysis": { - "normalizer": { - "to_lower": {"type": "custom", "filter": ["lowercase"]} - } - }, - "number_of_replicas": "0", - }, - }, - { - "index_name": "video", - "expected_map": { - "vid_thumb_url": {"type": "text", "index": False}, - "date_downloaded": {"type": "date"}, - "channel": { - "properties": { - "channel_id": { - "type": "keyword", - }, - "channel_name": { - "type": "text", - "analyzer": "english", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256, - "normalizer": "to_lower", - }, - "search_as_you_type": { - "type": "search_as_you_type", - "doc_values": False, - "max_shingle_size": 3, - }, - }, - }, - "channel_banner_url": {"type": "keyword", "index": False}, - "channel_thumb_url": {"type": "keyword", "index": False}, - "channel_description": {"type": "text"}, - "channel_last_refresh": { - "type": "date", - "format": "epoch_second", - }, - } - }, - "description": {"type": "text"}, - "media_url": {"type": "keyword", "index": False}, - "tags": { - "type": "text", - "analyzer": "english", - "fields": { - "keyword": {"type": "keyword", "ignore_above": 256} - }, - }, - "title": { - "type": "text", - "analyzer": "english", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256, - "normalizer": "to_lower", - }, - "search_as_you_type": { - "type": "search_as_you_type", - "doc_values": False, - "max_shingle_size": 3, - }, - }, - }, - "vid_last_refresh": {"type": "date"}, - "youtube_id": {"type": "keyword"}, - "published": {"type": "date"}, - "playlist": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256, - "normalizer": "to_lower", - } - }, - }, - }, - "expected_set": { - "analysis": { - "normalizer": { - "to_lower": {"type": "custom", "filter": ["lowercase"]} - } - }, - "number_of_replicas": "0", - }, - }, - { - "index_name": "download", - "expected_map": { - "timestamp": {"type": "date"}, - "channel_id": {"type": "keyword"}, - "channel_name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256, - "normalizer": "to_lower", - } - }, - }, - "status": {"type": "keyword"}, - "title": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256, - "normalizer": "to_lower", - } - }, - }, - "vid_thumb_url": {"type": "keyword"}, - "youtube_id": {"type": "keyword"}, - }, - "expected_set": { - "analysis": { - "normalizer": { - "to_lower": {"type": "custom", "filter": ["lowercase"]} - } - }, - "number_of_replicas": "0", - }, - }, - { - "index_name": "playlist", - "expected_map": { - "playlist_id": {"type": "keyword"}, - "playlist_description": {"type": "text"}, - "playlist_name": { - "type": "text", - "analyzer": "english", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256, - "normalizer": "to_lower", - }, - "search_as_you_type": { - "type": "search_as_you_type", - "doc_values": False, - "max_shingle_size": 3, - }, - }, - }, - "playlist_channel": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256, - "normalizer": "to_lower", - } - }, - }, - "playlist_channel_id": {"type": "keyword"}, - "playlist_thumbnail": {"type": "keyword"}, - "playlist_last_refresh": {"type": "date"}, - }, - "expected_set": { - "analysis": { - "normalizer": { - "to_lower": {"type": "custom", "filter": ["lowercase"]} - } - }, - "number_of_replicas": "0", - }, - }, -] +from home.src.ta.config import AppConfig +from home.src.ta.helper import ignore_filelist class ElasticIndex: @@ -602,48 +391,22 @@ class ElasticBackup: os.remove(file_path) -def get_available_backups(): - """return dict of available backups for settings view""" - backup_handler = ElasticBackup(INDEX_CONFIG, reason=False) - all_backup_files = backup_handler.get_all_backup_files() - return all_backup_files +def get_mapping(): + """read index_mapping.json and get expected mapping and settings""" + with open("home/src/es/index_mapping.json", "r", encoding="utf-8") as f: + config_str = f.read() + index_config = json.loads(config_str).get("index_config") - -def backup_all_indexes(reason): - """backup all es indexes to disk""" - backup_handler = ElasticBackup(INDEX_CONFIG, reason) - - for index in backup_handler.index_config: - index_name = index["index_name"] - if not backup_handler.index_exists(index_name): - continue - all_results = backup_handler.get_all_documents(index_name) - file_content = backup_handler.build_bulk(all_results) - backup_handler.write_es_json(file_content, index_name) - backup_handler.write_ta_json(all_results, index_name) - - backup_handler.zip_it() - - if reason == "auto": - backup_handler.rotate_backup() - - -def restore_from_backup(filename): - """restore indexes from backup file""" - # delete - index_check(force_restore=True) - # recreate - backup_handler = ElasticBackup(INDEX_CONFIG, reason=False) - zip_content = backup_handler.unpack_zip_backup(filename) - backup_handler.restore_json_files(zip_content) + return index_config def index_check(force_restore=False): """check if all indexes are created and have correct mapping""" backed_up = False + index_config = get_mapping() - for index in INDEX_CONFIG: + for index in index_config: index_name = index["index_name"] expected_map = index["expected_map"] expected_set = index["expected_set"] @@ -675,3 +438,42 @@ def index_check(force_restore=False): # else all good print(f"ta_{index_name} index is created and up to date...") + + +def get_available_backups(): + """return dict of available backups for settings view""" + index_config = get_mapping() + backup_handler = ElasticBackup(index_config, reason=False) + all_backup_files = backup_handler.get_all_backup_files() + return all_backup_files + + +def backup_all_indexes(reason): + """backup all es indexes to disk""" + index_config = get_mapping() + backup_handler = ElasticBackup(index_config, reason) + + for index in backup_handler.index_config: + index_name = index["index_name"] + if not backup_handler.index_exists(index_name): + continue + all_results = backup_handler.get_all_documents(index_name) + file_content = backup_handler.build_bulk(all_results) + backup_handler.write_es_json(file_content, index_name) + backup_handler.write_ta_json(all_results, index_name) + + backup_handler.zip_it() + + if reason == "auto": + backup_handler.rotate_backup() + + +def restore_from_backup(filename): + """restore indexes from backup file""" + # delete + index_check(force_restore=True) + # recreate + index_config = get_mapping() + backup_handler = ElasticBackup(index_config, reason=False) + zip_content = backup_handler.unpack_zip_backup(filename) + backup_handler.restore_json_files(zip_content) diff --git a/tubearchivist/home/src/frontend/__init__.py b/tubearchivist/home/src/frontend/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tubearchivist/home/src/frontend.py b/tubearchivist/home/src/frontend/api_calls.py similarity index 96% rename from tubearchivist/home/src/frontend.py rename to tubearchivist/home/src/frontend/api_calls.py index b6451e6..dff7636 100644 --- a/tubearchivist/home/src/frontend.py +++ b/tubearchivist/home/src/frontend/api_calls.py @@ -4,19 +4,18 @@ Functionality: - called via user input """ -from home.src.download import ( +from home.src.download.queue import PendingList +from home.src.download.subscriptions import ( ChannelSubscription, - PendingList, PlaylistSubscription, ) -from home.src.helper import RedisArchivist, RedisQueue, UrlListParser -from home.src.index import ( - WatchState, - YoutubeChannel, - YoutubePlaylist, - YoutubeVideo, -) -from home.src.searching import SearchForm +from home.src.frontend.searching import SearchForm +from home.src.frontend.watched import WatchState +from home.src.index.channel import YoutubeChannel +from home.src.index.playlist import YoutubePlaylist +from home.src.index.video import YoutubeVideo +from home.src.ta.helper import UrlListParser +from home.src.ta.ta_redis import RedisArchivist, RedisQueue from home.tasks import ( download_pending, download_single, diff --git a/tubearchivist/home/forms.py b/tubearchivist/home/src/frontend/forms.py similarity index 100% rename from tubearchivist/home/forms.py rename to tubearchivist/home/src/frontend/forms.py diff --git a/tubearchivist/home/src/searching.py b/tubearchivist/home/src/frontend/searching.py similarity index 77% rename from tubearchivist/home/src/searching.py rename to tubearchivist/home/src/frontend/searching.py index 23c3ddd..bca2742 100644 --- a/tubearchivist/home/src/searching.py +++ b/tubearchivist/home/src/frontend/searching.py @@ -6,14 +6,12 @@ Functionality: - calculate pagination values """ -import math import urllib.parse from datetime import datetime -from home.src.config import AppConfig -from home.src.es import ElasticWrap -from home.src.helper import RedisArchivist -from home.src.thumbnails import ThumbManager +from home.src.download.thumbnails import ThumbManager +from home.src.es.connect import ElasticWrap +from home.src.ta.config import AppConfig class SearchHandler: @@ -203,62 +201,3 @@ class SearchForm: } return all_results - - -class Pagination: - """ - figure out the pagination based on page size and total_hits - """ - - def __init__(self, page_get, user_id, search_get=False): - self.user_id = user_id - self.page_size = self.get_page_size() - self.page_get = page_get - self.search_get = search_get - self.pagination = self.first_guess() - - def get_page_size(self): - """get default or user modified page_size""" - key = f"{self.user_id}:page_size" - page_size = RedisArchivist().get_message(key)["status"] - if not page_size: - config = AppConfig().config - page_size = config["archive"]["page_size"] - - return page_size - - def first_guess(self): - """build first guess before api call""" - page_get = self.page_get - if page_get in [0, 1]: - page_from = 0 - prev_pages = False - elif page_get > 1: - page_from = (page_get - 1) * self.page_size - prev_pages = [ - i for i in range(page_get - 1, page_get - 6, -1) if i > 1 - ] - prev_pages.reverse() - pagination = { - "page_size": self.page_size, - "page_from": page_from, - "prev_pages": prev_pages, - "current_page": page_get, - } - if self.search_get: - pagination.update({"search_get": self.search_get}) - return pagination - - def validate(self, total_hits): - """validate pagination with total_hits after making api call""" - page_get = self.page_get - max_pages = math.ceil(total_hits / self.page_size) - if page_get < max_pages and max_pages > 1: - self.pagination["last_page"] = max_pages - else: - self.pagination["last_page"] = False - next_pages = [ - i for i in range(page_get + 1, page_get + 6) if 1 < i < max_pages - ] - - self.pagination["next_pages"] = next_pages diff --git a/tubearchivist/home/src/frontend/watched.py b/tubearchivist/home/src/frontend/watched.py new file mode 100644 index 0000000..0769232 --- /dev/null +++ b/tubearchivist/home/src/frontend/watched.py @@ -0,0 +1,125 @@ +"""handle watch state""" + +import json +from datetime import datetime + +import requests +from home.src.ta.config import AppConfig +from home.src.ta.helper import UrlListParser + + +class WatchState: + """handle watched checkbox for videos and channels""" + + CONFIG = AppConfig().config + ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] + HEADERS = {"Content-type": "application/json"} + + def __init__(self, youtube_id): + self.youtube_id = youtube_id + self.stamp = int(datetime.now().strftime("%s")) + + def mark_as_watched(self): + """update es with new watched value""" + url_type = self.dedect_type() + if url_type == "video": + self.mark_vid_watched() + elif url_type == "channel": + self.mark_channel_watched() + elif url_type == "playlist": + self.mark_playlist_watched() + + print(f"marked {self.youtube_id} as watched") + + def mark_as_unwatched(self): + """revert watched state to false""" + url_type = self.dedect_type() + if url_type == "video": + self.mark_vid_watched(revert=True) + + print(f"revert {self.youtube_id} as unwatched") + + def dedect_type(self): + """find youtube id type""" + print(self.youtube_id) + url_process = UrlListParser(self.youtube_id).process_list() + url_type = url_process[0]["type"] + return url_type + + def mark_vid_watched(self, revert=False): + """change watched status of single video""" + url = self.ES_URL + "/ta_video/_update/" + self.youtube_id + data = { + "doc": {"player": {"watched": True, "watched_date": self.stamp}} + } + if revert: + data["doc"]["player"]["watched"] = False + + payload = json.dumps(data) + request = requests.post( + url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH + ) + if not request.ok: + print(request.text) + raise ValueError("failed to mark video as watched") + + def mark_channel_watched(self): + """change watched status of every video in channel""" + data = { + "query": { + "bool": { + "must": [ + { + "term": { + "channel.channel_id": { + "value": self.youtube_id + } + } + }, + {"term": {"player.watched": {"value": False}}}, + ] + } + }, + "script": { + "source": "ctx._source.player['watched'] = true", + "lang": "painless", + }, + } + payload = json.dumps(data) + url = f"{self.ES_URL}/ta_video/_update_by_query" + request = requests.post( + url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH + ) + if not request.ok: + print(request.text) + raise ValueError("failed mark channel as watched") + + def mark_playlist_watched(self): + """change watched state of all videos in playlist""" + data = { + "query": { + "bool": { + "must": [ + { + "term": { + "playlist.keyword": {"value": self.youtube_id} + } + }, + {"term": {"player.watched": {"value": False}}}, + ] + } + }, + "script": { + "source": "ctx._source.player['watched'] = true", + "lang": "painless", + }, + } + payload = json.dumps(data) + url = f"{self.ES_URL}/ta_video/_update_by_query" + request = requests.post( + url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH + ) + if not request.ok: + print(request.text) + raise ValueError("failed mark playlist as watched") diff --git a/tubearchivist/home/src/index.py b/tubearchivist/home/src/index.py deleted file mode 100644 index fd22f17..0000000 --- a/tubearchivist/home/src/index.py +++ /dev/null @@ -1,809 +0,0 @@ -""" -Functionality: -- index new videos into elastisearch -- extract video info with yt_dlp -- scrape youtube channel page if needed -""" - -import json -import os -import re -from datetime import datetime - -import requests -import yt_dlp -from bs4 import BeautifulSoup -from home.src.config import AppConfig -from home.src.es import ElasticWrap, IndexPaginate -from home.src.helper import DurationConverter, UrlListParser, clean_string -from home.src.thumbnails import ThumbManager -from ryd_client import ryd_client - - -class YouTubeItem: - """base class for youtube""" - - es_path = False - index_name = False - yt_base = False - yt_obs = { - "quiet": True, - "default_search": "ytsearch", - "skip_download": True, - "check_formats": "selected", - "noplaylist": True, - } - - def __init__(self, youtube_id): - self.youtube_id = youtube_id - self.config = False - self.app_conf = False - self.youtube_meta = False - self.json_data = False - self._get_conf() - - def _get_conf(self): - """read user conf""" - self.config = AppConfig().config - self.app_conf = self.config["application"] - - def get_from_youtube(self): - """use yt-dlp to get meta data from youtube""" - print(f"{self.youtube_id}: get metadata from youtube") - try: - yt_item = yt_dlp.YoutubeDL(self.yt_obs) - response = yt_item.extract_info(self.yt_base + self.youtube_id) - except ( - yt_dlp.utils.ExtractorError, - yt_dlp.utils.DownloadError, - ): - print(f"{self.youtube_id}: failed to get info from youtube") - self.youtube_meta = False - - self.youtube_meta = response - - def get_from_es(self): - """get indexed data from elastic search""" - print(f"{self.youtube_id}: get metadata from es") - response, _ = ElasticWrap(f"{self.es_path}").get() - source = response.get("_source") - self.json_data = source - - def upload_to_es(self): - """add json_data to elastic""" - _, _ = ElasticWrap(self.es_path).put(self.json_data, refresh=True) - - def deactivate(self): - """deactivate document in es""" - key_match = { - "video": "active", - "channel": "channel_active", - "playlist": "playlist_active", - } - update_path = f"{self.index_name}/_update/{self.youtube_id}" - data = { - "script": f"ctx._source.{key_match.get(self.index_name)} = false" - } - _, _ = ElasticWrap(update_path).post(data) - - def del_in_es(self): - """delete item from elastic search""" - print(f"{self.youtube_id}: delete from es") - _, _ = ElasticWrap(self.es_path).delete() - - -class YoutubeVideo(YouTubeItem): - """represents a single youtube video""" - - es_path = False - index_name = "ta_video" - yt_base = "https://www.youtube.com/watch?v=" - - def __init__(self, youtube_id): - super().__init__(youtube_id) - self.channel_id = False - self.es_path = f"{self.index_name}/_doc/{youtube_id}" - - def build_json(self): - """build json dict of video""" - self.get_from_youtube() - if not self.youtube_meta: - return - - self._process_youtube_meta() - self._add_channel() - self._add_stats() - self.add_file_path() - self.add_player() - if self.config["downloads"]["integrate_ryd"]: - self._get_ryd_stats() - - return - - def _process_youtube_meta(self): - """extract relevant fields from youtube""" - # extract - self.channel_id = self.youtube_meta["channel_id"] - upload_date = self.youtube_meta["upload_date"] - upload_date_time = datetime.strptime(upload_date, "%Y%m%d") - published = upload_date_time.strftime("%Y-%m-%d") - last_refresh = int(datetime.now().strftime("%s")) - # build json_data basics - self.json_data = { - "title": self.youtube_meta["title"], - "description": self.youtube_meta["description"], - "category": self.youtube_meta["categories"], - "vid_thumb_url": self.youtube_meta["thumbnail"], - "tags": self.youtube_meta["tags"], - "published": published, - "vid_last_refresh": last_refresh, - "date_downloaded": last_refresh, - "youtube_id": self.youtube_id, - "active": True, - } - - def _add_channel(self): - """add channel dict to video json_data""" - channel = YoutubeChannel(self.channel_id) - channel.build_json(upload=True) - self.json_data.update({"channel": channel.json_data}) - - def _add_stats(self): - """add stats dicst to json_data""" - # likes - like_count = self.youtube_meta.get("like_count", 0) - dislike_count = self.youtube_meta.get("dislike_count", 0) - self.json_data.update( - { - "stats": { - "view_count": self.youtube_meta["view_count"], - "like_count": like_count, - "dislike_count": dislike_count, - "average_rating": self.youtube_meta["average_rating"], - } - } - ) - - def build_dl_cache_path(self): - """find video path in dl cache""" - cache_dir = self.app_conf["cache_dir"] - cache_path = f"{cache_dir}/download/" - all_cached = os.listdir(cache_path) - for file_cached in all_cached: - if self.youtube_id in file_cached: - vid_path = os.path.join(cache_path, file_cached) - return vid_path - - return False - - def add_player(self): - """add player information for new videos""" - try: - # when indexing from download task - vid_path = self.build_dl_cache_path() - except FileNotFoundError: - # when reindexing - base = self.app_conf["videos"] - vid_path = os.path.join(base, self.json_data["media_url"]) - - duration_handler = DurationConverter() - duration = duration_handler.get_sec(vid_path) - duration_str = duration_handler.get_str(duration) - self.json_data.update( - { - "player": { - "watched": False, - "duration": duration, - "duration_str": duration_str, - } - } - ) - - def add_file_path(self): - """build media_url for where file will be located""" - channel_name = self.json_data["channel"]["channel_name"] - clean_channel_name = clean_string(channel_name) - timestamp = self.json_data["published"].replace("-", "") - youtube_id = self.json_data["youtube_id"] - title = self.json_data["title"] - clean_title = clean_string(title) - filename = f"{timestamp}_{youtube_id}_{clean_title}.mp4" - media_url = os.path.join(clean_channel_name, filename) - self.json_data["media_url"] = media_url - - def delete_media_file(self): - """delete video file, meta data""" - self.get_from_es() - video_base = self.app_conf["videos"] - media_url = self.json_data["media_url"] - print(f"{self.youtube_id}: delete {media_url} from file system") - to_delete = os.path.join(video_base, media_url) - os.remove(to_delete) - self.del_in_es() - - def _get_ryd_stats(self): - """get optional stats from returnyoutubedislikeapi.com""" - try: - print(f"{self.youtube_id}: get ryd stats") - result = ryd_client.get(self.youtube_id) - except requests.exceptions.ConnectionError: - print(f"{self.youtube_id}: failed to query ryd api, skipping") - return False - - if result["status"] == 404: - return False - - dislikes = { - "dislike_count": result["dislikes"], - "average_rating": result["rating"], - } - self.json_data["stats"].update(dislikes) - - return True - - -class ChannelScraper: - """custom scraper using bs4 to scrape channel about page - will be able to be integrated into yt-dlp - once #2237 and #2350 are merged upstream - """ - - def __init__(self, channel_id): - self.channel_id = channel_id - self.soup = False - self.yt_json = False - self.json_data = False - - def get_json(self): - """main method to return channel dict""" - self.get_soup() - self._extract_yt_json() - self._parse_channel_main() - self._parse_channel_meta() - return self.json_data - - def get_soup(self): - """return soup from youtube""" - print(f"{self.channel_id}: scrape channel data from youtube") - url = f"https://www.youtube.com/channel/{self.channel_id}/about?hl=en" - cookies = {"CONSENT": "YES+xxxxxxxxxxxxxxxxxxxxxxxxxxx"} - response = requests.get(url, cookies=cookies) - if response.ok: - channel_page = response.text - else: - print(f"{self.channel_id}: failed to extract channel info") - raise ConnectionError - self.soup = BeautifulSoup(channel_page, "html.parser") - - def _extract_yt_json(self): - """parse soup and get ytInitialData json""" - all_scripts = self.soup.find("body").find_all("script") - for script in all_scripts: - if "var ytInitialData = " in str(script): - script_content = str(script) - break - # extract payload - script_content = script_content.split("var ytInitialData = ")[1] - json_raw = script_content.rstrip(";") - self.yt_json = json.loads(json_raw) - - def _parse_channel_main(self): - """extract maintab values from scraped channel json data""" - main_tab = self.yt_json["header"]["c4TabbedHeaderRenderer"] - # build and return dict - self.json_data = { - "channel_active": True, - "channel_last_refresh": int(datetime.now().strftime("%s")), - "channel_subs": self._get_channel_subs(main_tab), - "channel_name": main_tab["title"], - "channel_banner_url": self._get_thumbnails(main_tab, "banner"), - "channel_tvart_url": self._get_thumbnails(main_tab, "tvBanner"), - "channel_id": self.channel_id, - "channel_subscribed": False, - } - - @staticmethod - def _get_thumbnails(main_tab, thumb_name): - """extract banner url from main_tab""" - try: - all_banners = main_tab[thumb_name]["thumbnails"] - banner = sorted(all_banners, key=lambda k: k["width"])[-1]["url"] - except KeyError: - banner = False - - return banner - - @staticmethod - def _get_channel_subs(main_tab): - """process main_tab to get channel subs as int""" - try: - sub_text_simple = main_tab["subscriberCountText"]["simpleText"] - sub_text = sub_text_simple.split(" ")[0] - if sub_text[-1] == "K": - channel_subs = int(float(sub_text.replace("K", "")) * 1000) - elif sub_text[-1] == "M": - channel_subs = int(float(sub_text.replace("M", "")) * 1000000) - elif int(sub_text) >= 0: - channel_subs = int(sub_text) - else: - message = f"{sub_text} not dealt with" - print(message) - except KeyError: - channel_subs = 0 - - return channel_subs - - def _parse_channel_meta(self): - """extract meta tab values from channel payload""" - # meta tab - meta_tab = self.yt_json["metadata"]["channelMetadataRenderer"] - all_thumbs = meta_tab["avatar"]["thumbnails"] - thumb_url = sorted(all_thumbs, key=lambda k: k["width"])[-1]["url"] - # stats tab - renderer = "twoColumnBrowseResultsRenderer" - all_tabs = self.yt_json["contents"][renderer]["tabs"] - for tab in all_tabs: - if "tabRenderer" in tab.keys(): - if tab["tabRenderer"]["title"] == "About": - about_tab = tab["tabRenderer"]["content"][ - "sectionListRenderer" - ]["contents"][0]["itemSectionRenderer"]["contents"][0][ - "channelAboutFullMetadataRenderer" - ] - break - try: - channel_views_text = about_tab["viewCountText"]["simpleText"] - channel_views = int(re.sub(r"\D", "", channel_views_text)) - except KeyError: - channel_views = 0 - - self.json_data.update( - { - "channel_description": meta_tab["description"], - "channel_thumb_url": thumb_url, - "channel_views": channel_views, - } - ) - - -class YoutubeChannel(YouTubeItem): - """represents a single youtube channel""" - - es_path = False - index_name = "ta_channel" - yt_base = "https://www.youtube.com/channel/" - - def __init__(self, youtube_id): - super().__init__(youtube_id) - self.es_path = f"{self.index_name}/_doc/{youtube_id}" - - def build_json(self, upload=False): - """get from es or from youtube""" - self.get_from_es() - if self.json_data: - return - - self.get_from_youtube() - if upload: - self.upload_to_es() - return - - def get_from_youtube(self): - """use bs4 to scrape channel about page""" - self.json_data = ChannelScraper(self.youtube_id).get_json() - self.get_channel_art() - - def get_channel_art(self): - """download channel art for new channels""" - channel_id = self.youtube_id - channel_thumb = self.json_data["channel_thumb_url"] - channel_banner = self.json_data["channel_banner_url"] - ThumbManager().download_chan( - [(channel_id, channel_thumb, channel_banner)] - ) - - def sync_to_videos(self): - """sync new channel_dict to all videos of channel""" - # add ingest pipeline - processors = [] - for field, value in self.json_data.items(): - line = {"set": {"field": "channel." + field, "value": value}} - processors.append(line) - data = {"description": self.youtube_id, "processors": processors} - ingest_path = f"_ingest/pipeline/{self.youtube_id}" - _, _ = ElasticWrap(ingest_path).put(data) - # apply pipeline - data = {"query": {"match": {"channel.channel_id": self.youtube_id}}} - update_path = f"ta_video/_update_by_query?pipeline={self.youtube_id}" - _, _ = ElasticWrap(update_path).post(data) - - def get_folder_path(self): - """get folder where media files get stored""" - channel_name = self.json_data["channel_name"] - folder_name = clean_string(channel_name) - folder_path = os.path.join(self.app_conf["videos"], folder_name) - return folder_path - - def delete_es_videos(self): - """delete all channel documents from elasticsearch""" - data = { - "query": { - "term": {"channel.channel_id": {"value": self.youtube_id}} - } - } - _, _ = ElasticWrap("ta_video/_delete_by_query").post(data) - - def delete_playlists(self): - """delete all indexed playlist from es""" - all_playlists = self.get_indexed_playlists() - for playlist in all_playlists: - playlist_id = playlist["playlist_id"] - YoutubePlaylist(playlist_id).delete_metadata() - - def delete_channel(self): - """delete channel and all videos""" - print(f"{self.youtube_id}: delete channel") - self.get_from_es() - folder_path = self.get_folder_path() - print(f"{self.youtube_id}: delete all media files") - try: - all_videos = os.listdir(folder_path) - for video in all_videos: - video_path = os.path.join(folder_path, video) - os.remove(video_path) - os.rmdir(folder_path) - except FileNotFoundError: - print(f"no videos found for {folder_path}") - - print(f"{self.youtube_id}: delete indexed playlists") - self.delete_playlists() - print(f"{self.youtube_id}: delete indexed videos") - self.delete_es_videos() - self.del_in_es() - - def get_all_playlists(self): - """get all playlists owned by this channel""" - url = ( - f"https://www.youtube.com/channel/{self.youtube_id}" - + "/playlists?view=1&sort=dd&shelf_id=0" - ) - obs = { - "quiet": True, - "skip_download": True, - "extract_flat": True, - } - playlists = yt_dlp.YoutubeDL(obs).extract_info(url) - all_entries = [(i["id"], i["title"]) for i in playlists["entries"]] - - return all_entries - - def get_indexed_playlists(self): - """get all indexed playlists from channel""" - data = { - "query": { - "term": {"playlist_channel_id": {"value": self.youtube_id}} - }, - "sort": [{"playlist_channel.keyword": {"order": "desc"}}], - } - all_playlists = IndexPaginate("ta_playlist", data).get_results() - return all_playlists - - -class YoutubePlaylist(YouTubeItem): - """represents a single youtube playlist""" - - es_path = False - index_name = "ta_playlist" - yt_obs = { - "default_search": "ytsearch", - "quiet": True, - "skip_download": True, - "extract_flat": True, - } - yt_base = "https://www.youtube.com/playlist?list=" - - def __init__(self, youtube_id): - super().__init__(youtube_id) - self.es_path = f"{self.index_name}/_doc/{youtube_id}" - self.all_members = False - self.nav = False - self.all_youtube_ids = [] - - def build_json(self, scrape=False): - """collection to create json_data""" - if not scrape: - self.get_from_es() - - if scrape or not self.json_data: - self.get_from_youtube() - self.process_youtube_meta() - self.get_entries() - self.json_data["playlist_entries"] = self.all_members - self.get_playlist_art() - - def process_youtube_meta(self): - """extract relevant fields from youtube""" - self.json_data = { - "playlist_id": self.youtube_id, - "playlist_active": True, - "playlist_subscribed": False, - "playlist_name": self.youtube_meta["title"], - "playlist_channel": self.youtube_meta["channel"], - "playlist_channel_id": self.youtube_meta["channel_id"], - "playlist_thumbnail": self.youtube_meta["thumbnails"][-1]["url"], - "playlist_description": self.youtube_meta["description"] or False, - "playlist_last_refresh": int(datetime.now().strftime("%s")), - } - - def get_entries(self, playlistend=False): - """get all videos in playlist""" - if playlistend: - # implement playlist end - print(playlistend) - all_members = [] - for idx, entry in enumerate(self.youtube_meta["entries"]): - if self.all_youtube_ids: - downloaded = entry["id"] in self.all_youtube_ids - else: - downloaded = False - if not entry["uploader"]: - continue - to_append = { - "youtube_id": entry["id"], - "title": entry["title"], - "uploader": entry["uploader"], - "idx": idx, - "downloaded": downloaded, - } - all_members.append(to_append) - - self.all_members = all_members - - @staticmethod - def get_playlist_art(): - """download artwork of playlist""" - thumbnails = ThumbManager() - missing_playlists = thumbnails.get_missing_playlists() - thumbnails.download_playlist(missing_playlists) - - def add_vids_to_playlist(self): - """sync the playlist id to videos""" - script = ( - 'if (!ctx._source.containsKey("playlist")) ' - + "{ctx._source.playlist = [params.playlist]} " - + "else if (!ctx._source.playlist.contains(params.playlist)) " - + "{ctx._source.playlist.add(params.playlist)} " - + "else {ctx.op = 'none'}" - ) - - bulk_list = [] - for entry in self.json_data["playlist_entries"]: - video_id = entry["youtube_id"] - action = {"update": {"_id": video_id, "_index": "ta_video"}} - source = { - "script": { - "source": script, - "lang": "painless", - "params": {"playlist": self.youtube_id}, - } - } - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(source)) - - # add last newline - bulk_list.append("\n") - query_str = "\n".join(bulk_list) - - ElasticWrap("_bulk").post(query_str, ndjson=True) - - def update_playlist(self): - """update metadata for playlist with data from YouTube""" - self.get_from_es() - subscribed = self.json_data["playlist_subscribed"] - self.get_from_youtube() - if not self.json_data: - # return false to deactivate - return False - - self.json_data["playlist_subscribed"] = subscribed - self.upload_to_es() - return True - - def build_nav(self, youtube_id): - """find next and previous in playlist of a given youtube_id""" - all_entries_available = self.json_data["playlist_entries"] - all_entries = [i for i in all_entries_available if i["downloaded"]] - current = [i for i in all_entries if i["youtube_id"] == youtube_id] - # stop if not found or playlist of 1 - if not current or not len(all_entries) > 1: - return - - current_idx = all_entries.index(current[0]) - if current_idx == 0: - previous_item = False - else: - previous_item = all_entries[current_idx - 1] - prev_thumb = ThumbManager().vid_thumb_path( - previous_item["youtube_id"] - ) - previous_item["vid_thumb"] = prev_thumb - - if current_idx == len(all_entries) - 1: - next_item = False - else: - next_item = all_entries[current_idx + 1] - next_thumb = ThumbManager().vid_thumb_path(next_item["youtube_id"]) - next_item["vid_thumb"] = next_thumb - - self.nav = { - "playlist_meta": { - "current_idx": current[0]["idx"], - "playlist_id": self.youtube_id, - "playlist_name": self.json_data["playlist_name"], - "playlist_channel": self.json_data["playlist_channel"], - }, - "playlist_previous": previous_item, - "playlist_next": next_item, - } - return - - def delete_metadata(self): - """delete metadata for playlist""" - script = ( - "ctx._source.playlist.removeAll(" - + "Collections.singleton(params.playlist)) " - ) - data = { - "query": { - "term": {"playlist.keyword": {"value": self.youtube_id}} - }, - "script": { - "source": script, - "lang": "painless", - "params": {"playlist": self.youtube_id}, - }, - } - _, _ = ElasticWrap("ta_video/_update_by_query").post(data) - self.del_in_es() - - def delete_videos_playlist(self): - """delete playlist with all videos""" - print(f"{self.youtube_id}: delete playlist") - self.get_from_es() - all_youtube_id = [ - i["youtube_id"] - for i in self.json_data["playlist_entries"] - if i["downloaded"] - ] - for youtube_id in all_youtube_id: - YoutubeVideo(youtube_id).delete_media_file() - - self.delete_metadata() - - -class WatchState: - """handle watched checkbox for videos and channels""" - - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - HEADERS = {"Content-type": "application/json"} - - def __init__(self, youtube_id): - self.youtube_id = youtube_id - self.stamp = int(datetime.now().strftime("%s")) - - def mark_as_watched(self): - """update es with new watched value""" - url_type = self.dedect_type() - if url_type == "video": - self.mark_vid_watched() - elif url_type == "channel": - self.mark_channel_watched() - elif url_type == "playlist": - self.mark_playlist_watched() - - print(f"marked {self.youtube_id} as watched") - - def mark_as_unwatched(self): - """revert watched state to false""" - url_type = self.dedect_type() - if url_type == "video": - self.mark_vid_watched(revert=True) - - print(f"revert {self.youtube_id} as unwatched") - - def dedect_type(self): - """find youtube id type""" - print(self.youtube_id) - url_process = UrlListParser(self.youtube_id).process_list() - url_type = url_process[0]["type"] - return url_type - - def mark_vid_watched(self, revert=False): - """change watched status of single video""" - url = self.ES_URL + "/ta_video/_update/" + self.youtube_id - data = { - "doc": {"player": {"watched": True, "watched_date": self.stamp}} - } - if revert: - data["doc"]["player"]["watched"] = False - - payload = json.dumps(data) - request = requests.post( - url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) - raise ValueError("failed to mark video as watched") - - def mark_channel_watched(self): - """change watched status of every video in channel""" - data = { - "query": { - "bool": { - "must": [ - { - "term": { - "channel.channel_id": { - "value": self.youtube_id - } - } - }, - {"term": {"player.watched": {"value": False}}}, - ] - } - }, - "script": { - "source": "ctx._source.player['watched'] = true", - "lang": "painless", - }, - } - payload = json.dumps(data) - url = f"{self.ES_URL}/ta_video/_update_by_query" - request = requests.post( - url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) - raise ValueError("failed mark channel as watched") - - def mark_playlist_watched(self): - """change watched state of all videos in playlist""" - data = { - "query": { - "bool": { - "must": [ - { - "term": { - "playlist.keyword": {"value": self.youtube_id} - } - }, - {"term": {"player.watched": {"value": False}}}, - ] - } - }, - "script": { - "source": "ctx._source.player['watched'] = true", - "lang": "painless", - }, - } - payload = json.dumps(data) - url = f"{self.ES_URL}/ta_video/_update_by_query" - request = requests.post( - url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) - raise ValueError("failed mark playlist as watched") - - -def index_new_video(youtube_id): - """combined classes to create new video in index""" - video = YoutubeVideo(youtube_id) - video.build_json() - if not video.json_data: - raise ValueError("failed to get metadata for " + youtube_id) - - video.upload_to_es() - return video.json_data diff --git a/tubearchivist/home/src/index/__init__.py b/tubearchivist/home/src/index/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tubearchivist/home/src/index/channel.py b/tubearchivist/home/src/index/channel.py new file mode 100644 index 0000000..a8b1525 --- /dev/null +++ b/tubearchivist/home/src/index/channel.py @@ -0,0 +1,262 @@ +"""handle single channel in index""" + +import json +import os +import re +from datetime import datetime + +import requests +import yt_dlp +from bs4 import BeautifulSoup +from home.src.download.thumbnails import ThumbManager +from home.src.es.connect import ElasticWrap, IndexPaginate +from home.src.index.generic import YouTubeItem +from home.src.index.playlist import YoutubePlaylist +from home.src.ta.helper import clean_string + + +class ChannelScraper: + """custom scraper using bs4 to scrape channel about page + will be able to be integrated into yt-dlp + once #2237 and #2350 are merged upstream + """ + + def __init__(self, channel_id): + self.channel_id = channel_id + self.soup = False + self.yt_json = False + self.json_data = False + + def get_json(self): + """main method to return channel dict""" + self.get_soup() + self._extract_yt_json() + self._parse_channel_main() + self._parse_channel_meta() + return self.json_data + + def get_soup(self): + """return soup from youtube""" + print(f"{self.channel_id}: scrape channel data from youtube") + url = f"https://www.youtube.com/channel/{self.channel_id}/about?hl=en" + cookies = {"CONSENT": "YES+xxxxxxxxxxxxxxxxxxxxxxxxxxx"} + response = requests.get(url, cookies=cookies) + if response.ok: + channel_page = response.text + else: + print(f"{self.channel_id}: failed to extract channel info") + raise ConnectionError + self.soup = BeautifulSoup(channel_page, "html.parser") + + def _extract_yt_json(self): + """parse soup and get ytInitialData json""" + all_scripts = self.soup.find("body").find_all("script") + for script in all_scripts: + if "var ytInitialData = " in str(script): + script_content = str(script) + break + # extract payload + script_content = script_content.split("var ytInitialData = ")[1] + json_raw = script_content.rstrip(";") + self.yt_json = json.loads(json_raw) + + def _parse_channel_main(self): + """extract maintab values from scraped channel json data""" + main_tab = self.yt_json["header"]["c4TabbedHeaderRenderer"] + # build and return dict + self.json_data = { + "channel_active": True, + "channel_last_refresh": int(datetime.now().strftime("%s")), + "channel_subs": self._get_channel_subs(main_tab), + "channel_name": main_tab["title"], + "channel_banner_url": self._get_thumbnails(main_tab, "banner"), + "channel_tvart_url": self._get_thumbnails(main_tab, "tvBanner"), + "channel_id": self.channel_id, + "channel_subscribed": False, + } + + @staticmethod + def _get_thumbnails(main_tab, thumb_name): + """extract banner url from main_tab""" + try: + all_banners = main_tab[thumb_name]["thumbnails"] + banner = sorted(all_banners, key=lambda k: k["width"])[-1]["url"] + except KeyError: + banner = False + + return banner + + @staticmethod + def _get_channel_subs(main_tab): + """process main_tab to get channel subs as int""" + try: + sub_text_simple = main_tab["subscriberCountText"]["simpleText"] + sub_text = sub_text_simple.split(" ")[0] + if sub_text[-1] == "K": + channel_subs = int(float(sub_text.replace("K", "")) * 1000) + elif sub_text[-1] == "M": + channel_subs = int(float(sub_text.replace("M", "")) * 1000000) + elif int(sub_text) >= 0: + channel_subs = int(sub_text) + else: + message = f"{sub_text} not dealt with" + print(message) + except KeyError: + channel_subs = 0 + + return channel_subs + + def _parse_channel_meta(self): + """extract meta tab values from channel payload""" + # meta tab + meta_tab = self.yt_json["metadata"]["channelMetadataRenderer"] + all_thumbs = meta_tab["avatar"]["thumbnails"] + thumb_url = sorted(all_thumbs, key=lambda k: k["width"])[-1]["url"] + # stats tab + renderer = "twoColumnBrowseResultsRenderer" + all_tabs = self.yt_json["contents"][renderer]["tabs"] + for tab in all_tabs: + if "tabRenderer" in tab.keys(): + if tab["tabRenderer"]["title"] == "About": + about_tab = tab["tabRenderer"]["content"][ + "sectionListRenderer" + ]["contents"][0]["itemSectionRenderer"]["contents"][0][ + "channelAboutFullMetadataRenderer" + ] + break + try: + channel_views_text = about_tab["viewCountText"]["simpleText"] + channel_views = int(re.sub(r"\D", "", channel_views_text)) + except KeyError: + channel_views = 0 + + self.json_data.update( + { + "channel_description": meta_tab["description"], + "channel_thumb_url": thumb_url, + "channel_views": channel_views, + } + ) + + +class YoutubeChannel(YouTubeItem): + """represents a single youtube channel""" + + es_path = False + index_name = "ta_channel" + yt_base = "https://www.youtube.com/channel/" + + def __init__(self, youtube_id): + super().__init__(youtube_id) + self.es_path = f"{self.index_name}/_doc/{youtube_id}" + + def build_json(self, upload=False): + """get from es or from youtube""" + self.get_from_es() + if self.json_data: + return + + self.get_from_youtube() + if upload: + self.upload_to_es() + return + + def get_from_youtube(self): + """use bs4 to scrape channel about page""" + self.json_data = ChannelScraper(self.youtube_id).get_json() + self.get_channel_art() + + def get_channel_art(self): + """download channel art for new channels""" + channel_id = self.youtube_id + channel_thumb = self.json_data["channel_thumb_url"] + channel_banner = self.json_data["channel_banner_url"] + ThumbManager().download_chan( + [(channel_id, channel_thumb, channel_banner)] + ) + + def sync_to_videos(self): + """sync new channel_dict to all videos of channel""" + # add ingest pipeline + processors = [] + for field, value in self.json_data.items(): + line = {"set": {"field": "channel." + field, "value": value}} + processors.append(line) + data = {"description": self.youtube_id, "processors": processors} + ingest_path = f"_ingest/pipeline/{self.youtube_id}" + _, _ = ElasticWrap(ingest_path).put(data) + # apply pipeline + data = {"query": {"match": {"channel.channel_id": self.youtube_id}}} + update_path = f"ta_video/_update_by_query?pipeline={self.youtube_id}" + _, _ = ElasticWrap(update_path).post(data) + + def get_folder_path(self): + """get folder where media files get stored""" + channel_name = self.json_data["channel_name"] + folder_name = clean_string(channel_name) + folder_path = os.path.join(self.app_conf["videos"], folder_name) + return folder_path + + def delete_es_videos(self): + """delete all channel documents from elasticsearch""" + data = { + "query": { + "term": {"channel.channel_id": {"value": self.youtube_id}} + } + } + _, _ = ElasticWrap("ta_video/_delete_by_query").post(data) + + def delete_playlists(self): + """delete all indexed playlist from es""" + all_playlists = self.get_indexed_playlists() + for playlist in all_playlists: + playlist_id = playlist["playlist_id"] + YoutubePlaylist(playlist_id).delete_metadata() + + def delete_channel(self): + """delete channel and all videos""" + print(f"{self.youtube_id}: delete channel") + self.get_from_es() + folder_path = self.get_folder_path() + print(f"{self.youtube_id}: delete all media files") + try: + all_videos = os.listdir(folder_path) + for video in all_videos: + video_path = os.path.join(folder_path, video) + os.remove(video_path) + os.rmdir(folder_path) + except FileNotFoundError: + print(f"no videos found for {folder_path}") + + print(f"{self.youtube_id}: delete indexed playlists") + self.delete_playlists() + print(f"{self.youtube_id}: delete indexed videos") + self.delete_es_videos() + self.del_in_es() + + def get_all_playlists(self): + """get all playlists owned by this channel""" + url = ( + f"https://www.youtube.com/channel/{self.youtube_id}" + + "/playlists?view=1&sort=dd&shelf_id=0" + ) + obs = { + "quiet": True, + "skip_download": True, + "extract_flat": True, + } + playlists = yt_dlp.YoutubeDL(obs).extract_info(url) + all_entries = [(i["id"], i["title"]) for i in playlists["entries"]] + + return all_entries + + def get_indexed_playlists(self): + """get all indexed playlists from channel""" + data = { + "query": { + "term": {"playlist_channel_id": {"value": self.youtube_id}} + }, + "sort": [{"playlist_channel.keyword": {"order": "desc"}}], + } + all_playlists = IndexPaginate("ta_playlist", data).get_results() + return all_playlists diff --git a/tubearchivist/home/src/reindex.py b/tubearchivist/home/src/index/filesystem.py similarity index 52% rename from tubearchivist/home/src/reindex.py rename to tubearchivist/home/src/index/filesystem.py index bb93cae..5a33501 100644 --- a/tubearchivist/home/src/reindex.py +++ b/tubearchivist/home/src/index/filesystem.py @@ -11,276 +11,15 @@ import re import shutil import subprocess from datetime import datetime -from math import ceil -from time import sleep import requests -from home.src.config import AppConfig -from home.src.download import ChannelSubscription, PendingList, VideoDownloader -from home.src.helper import ( - RedisArchivist, - clean_string, - get_total_hits, - ignore_filelist, -) -from home.src.index import ( - YoutubeChannel, - YoutubePlaylist, - YoutubeVideo, - index_new_video, -) -from home.src.thumbnails import ThumbManager - - -class Reindex: - """check for outdated documents and refresh data from youtube""" - - def __init__(self): - # config - config = AppConfig().config - self.sleep_interval = config["downloads"]["sleep_interval"] - self.es_url = config["application"]["es_url"] - self.es_auth = config["application"]["es_auth"] - self.refresh_interval = config["scheduler"]["check_reindex_days"] - self.integrate_ryd = config["downloads"]["integrate_ryd"] - # scan - self.all_youtube_ids = False - self.all_channel_ids = False - self.all_playlist_ids = False - - def get_daily(self): - """get daily refresh values""" - total_videos = get_total_hits( - "ta_video", self.es_url, self.es_auth, "active" - ) - video_daily = ceil(total_videos / self.refresh_interval * 1.2) - total_channels = get_total_hits( - "ta_channel", self.es_url, self.es_auth, "channel_active" - ) - channel_daily = ceil(total_channels / self.refresh_interval * 1.2) - total_playlists = get_total_hits( - "ta_playlist", self.es_url, self.es_auth, "playlist_active" - ) - playlist_daily = ceil(total_playlists / self.refresh_interval * 1.2) - return (video_daily, channel_daily, playlist_daily) - - def get_outdated_vids(self, size): - """get daily videos to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 - data = { - "size": size, - "query": { - "bool": { - "must": [ - {"match": {"active": True}}, - {"range": {"vid_last_refresh": {"lte": now_lte}}}, - ] - } - }, - "sort": [{"vid_last_refresh": {"order": "asc"}}], - "_source": False, - } - query_str = json.dumps(data) - url = self.es_url + "/ta_video/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_youtube_ids = [i["_id"] for i in response_dict["hits"]["hits"]] - return all_youtube_ids - - def get_unrated_vids(self): - """get all videos without rating if ryd integration is enabled""" - headers = {"Content-type": "application/json"} - data = { - "size": 200, - "query": { - "bool": { - "must_not": [{"exists": {"field": "stats.average_rating"}}] - } - }, - } - query_str = json.dumps(data) - url = self.es_url + "/ta_video/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - missing_rating = [i["_id"] for i in response_dict["hits"]["hits"]] - self.all_youtube_ids = self.all_youtube_ids + missing_rating - - def get_outdated_channels(self, size): - """get daily channels to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 - data = { - "size": size, - "query": { - "bool": { - "must": [ - {"match": {"channel_active": True}}, - {"range": {"channel_last_refresh": {"lte": now_lte}}}, - ] - } - }, - "sort": [{"channel_last_refresh": {"order": "asc"}}], - "_source": False, - } - query_str = json.dumps(data) - url = self.es_url + "/ta_channel/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_channel_ids = [i["_id"] for i in response_dict["hits"]["hits"]] - return all_channel_ids - - def get_outdated_playlists(self, size): - """get daily outdated playlists to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 - data = { - "size": size, - "query": { - "bool": { - "must": [ - {"match": {"playlist_active": True}}, - {"range": {"playlist_last_refresh": {"lte": now_lte}}}, - ] - } - }, - "sort": [{"playlist_last_refresh": {"order": "asc"}}], - "_source": False, - } - query_str = json.dumps(data) - url = self.es_url + "/ta_playlist/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_playlist_ids = [i["_id"] for i in response_dict["hits"]["hits"]] - return all_playlist_ids - - def check_outdated(self): - """add missing vids and channels""" - video_daily, channel_daily, playlist_daily = self.get_daily() - self.all_youtube_ids = self.get_outdated_vids(video_daily) - self.all_channel_ids = self.get_outdated_channels(channel_daily) - self.all_playlist_ids = self.get_outdated_playlists(playlist_daily) - if self.integrate_ryd: - self.get_unrated_vids() - - def rescrape_all_channels(self): - """sync new data from channel to all matching videos""" - sleep_interval = self.sleep_interval - channel_sub_handler = ChannelSubscription() - all_channels = channel_sub_handler.get_channels(subscribed_only=False) - all_channel_ids = [i["channel_id"] for i in all_channels] - - for channel_id in all_channel_ids: - channel = YoutubeChannel(channel_id) - subscribed = channel.json_data["channel_subscribed"] - channel.get_from_youtube() - channel.json_data["channel_subscribed"] = subscribed - channel.upload_to_es() - channel.sync_to_videos() - - if sleep_interval: - sleep(sleep_interval) - - @staticmethod - def reindex_single_video(youtube_id): - """refresh data for single video""" - video = YoutubeVideo(youtube_id) - - # read current state - video.get_from_es() - player = video.json_data["player"] - date_downloaded = video.json_data["date_downloaded"] - channel_dict = video.json_data["channel"] - playlist = video.json_data.get("playlist") - - # get new - video.build_json() - if not video.json_data: - video.deactivate() - - # add back - video.json_data["player"] = player - video.json_data["date_downloaded"] = date_downloaded - video.json_data["channel"] = channel_dict - if playlist: - video.json_data["playlist"] = playlist - - video.upload_to_es() - - thumb_handler = ThumbManager() - thumb_handler.delete_vid_thumb(youtube_id) - to_download = (youtube_id, video.json_data["vid_thumb_url"]) - thumb_handler.download_vid([to_download], notify=False) - - @staticmethod - def reindex_single_channel(channel_id): - """refresh channel data and sync to videos""" - channel = YoutubeChannel(channel_id) - channel.get_from_es() - subscribed = channel.json_data["channel_subscribed"] - channel.get_from_youtube() - channel.json_data["channel_subscribed"] = subscribed - channel.upload_to_es() - channel.sync_to_videos() - - @staticmethod - def reindex_single_playlist(playlist_id, all_indexed_ids): - """refresh playlist data""" - playlist = YoutubePlaylist(playlist_id) - playlist.get_from_es() - subscribed = playlist.json_data["playlist_subscribed"] - playlist.all_youtube_ids = all_indexed_ids - playlist.build_json(scrape=True) - if not playlist.json_data: - playlist.deactivate() - return - - playlist.json_data["playlist_subscribed"] = subscribed - playlist.upload_to_es() - return - - def reindex(self): - """reindex what's needed""" - # videos - print(f"reindexing {len(self.all_youtube_ids)} videos") - for youtube_id in self.all_youtube_ids: - self.reindex_single_video(youtube_id) - if self.sleep_interval: - sleep(self.sleep_interval) - # channels - print(f"reindexing {len(self.all_channel_ids)} channels") - for channel_id in self.all_channel_ids: - self.reindex_single_channel(channel_id) - if self.sleep_interval: - sleep(self.sleep_interval) - # playlist - print(f"reindexing {len(self.all_playlist_ids)} playlists") - if self.all_playlist_ids: - all_indexed = PendingList().get_all_indexed() - all_indexed_ids = [i["youtube_id"] for i in all_indexed] - for playlist_id in self.all_playlist_ids: - self.reindex_single_playlist(playlist_id, all_indexed_ids) - if self.sleep_interval: - sleep(self.sleep_interval) +from home.src.download.queue import PendingList +from home.src.download.yt_dlp_handler import VideoDownloader +from home.src.index.reindex import Reindex +from home.src.index.video import index_new_video +from home.src.ta.config import AppConfig +from home.src.ta.helper import clean_string, ignore_filelist +from home.src.ta.ta_redis import RedisArchivist class FilesystemScanner: diff --git a/tubearchivist/home/src/index/generic.py b/tubearchivist/home/src/index/generic.py new file mode 100644 index 0000000..6f88e37 --- /dev/null +++ b/tubearchivist/home/src/index/generic.py @@ -0,0 +1,139 @@ +"""generic base class for indexing documents""" + +import math + +import yt_dlp +from home.src.es.connect import ElasticWrap +from home.src.ta.config import AppConfig +from home.src.ta.ta_redis import RedisArchivist + + +class YouTubeItem: + """base class for youtube""" + + es_path = False + index_name = False + yt_base = False + yt_obs = { + "quiet": True, + "default_search": "ytsearch", + "skip_download": True, + "check_formats": "selected", + "noplaylist": True, + } + + def __init__(self, youtube_id): + self.youtube_id = youtube_id + self.config = False + self.app_conf = False + self.youtube_meta = False + self.json_data = False + self._get_conf() + + def _get_conf(self): + """read user conf""" + self.config = AppConfig().config + self.app_conf = self.config["application"] + + def get_from_youtube(self): + """use yt-dlp to get meta data from youtube""" + print(f"{self.youtube_id}: get metadata from youtube") + try: + yt_item = yt_dlp.YoutubeDL(self.yt_obs) + response = yt_item.extract_info(self.yt_base + self.youtube_id) + except ( + yt_dlp.utils.ExtractorError, + yt_dlp.utils.DownloadError, + ): + print(f"{self.youtube_id}: failed to get info from youtube") + self.youtube_meta = False + + self.youtube_meta = response + + def get_from_es(self): + """get indexed data from elastic search""" + print(f"{self.youtube_id}: get metadata from es") + response, _ = ElasticWrap(f"{self.es_path}").get() + source = response.get("_source") + self.json_data = source + + def upload_to_es(self): + """add json_data to elastic""" + _, _ = ElasticWrap(self.es_path).put(self.json_data, refresh=True) + + def deactivate(self): + """deactivate document in es""" + key_match = { + "video": "active", + "channel": "channel_active", + "playlist": "playlist_active", + } + update_path = f"{self.index_name}/_update/{self.youtube_id}" + data = { + "script": f"ctx._source.{key_match.get(self.index_name)} = false" + } + _, _ = ElasticWrap(update_path).post(data) + + def del_in_es(self): + """delete item from elastic search""" + print(f"{self.youtube_id}: delete from es") + _, _ = ElasticWrap(self.es_path).delete() + + +class Pagination: + """ + figure out the pagination based on page size and total_hits + """ + + def __init__(self, page_get, user_id, search_get=False): + self.user_id = user_id + self.page_size = self.get_page_size() + self.page_get = page_get + self.search_get = search_get + self.pagination = self.first_guess() + + def get_page_size(self): + """get default or user modified page_size""" + key = f"{self.user_id}:page_size" + page_size = RedisArchivist().get_message(key)["status"] + if not page_size: + config = AppConfig().config + page_size = config["archive"]["page_size"] + + return page_size + + def first_guess(self): + """build first guess before api call""" + page_get = self.page_get + if page_get in [0, 1]: + page_from = 0 + prev_pages = False + elif page_get > 1: + page_from = (page_get - 1) * self.page_size + prev_pages = [ + i for i in range(page_get - 1, page_get - 6, -1) if i > 1 + ] + prev_pages.reverse() + pagination = { + "page_size": self.page_size, + "page_from": page_from, + "prev_pages": prev_pages, + "current_page": page_get, + } + if self.search_get: + pagination.update({"search_get": self.search_get}) + return pagination + + def validate(self, total_hits): + """validate pagination with total_hits after making api call""" + page_get = self.page_get + max_pages = math.ceil(total_hits / self.page_size) + if page_get < max_pages and max_pages > 1: + self.pagination["last_page"] = max_pages + else: + self.pagination["last_page"] = False + next_pages = [ + i for i in range(page_get + 1, page_get + 6) if 1 < i < max_pages + ] + + self.pagination["next_pages"] = next_pages diff --git a/tubearchivist/home/src/index/playlist.py b/tubearchivist/home/src/index/playlist.py new file mode 100644 index 0000000..2bd2d3a --- /dev/null +++ b/tubearchivist/home/src/index/playlist.py @@ -0,0 +1,201 @@ +"""handle playlist""" + +import json +from datetime import datetime + +from home.src.download.thumbnails import ThumbManager +from home.src.es.connect import ElasticWrap +from home.src.index.generic import YouTubeItem +from home.src.index.video import YoutubeVideo + + +class YoutubePlaylist(YouTubeItem): + """represents a single youtube playlist""" + + es_path = False + index_name = "ta_playlist" + yt_obs = { + "default_search": "ytsearch", + "quiet": True, + "skip_download": True, + "extract_flat": True, + } + yt_base = "https://www.youtube.com/playlist?list=" + + def __init__(self, youtube_id): + super().__init__(youtube_id) + self.es_path = f"{self.index_name}/_doc/{youtube_id}" + self.all_members = False + self.nav = False + self.all_youtube_ids = [] + + def build_json(self, scrape=False): + """collection to create json_data""" + if not scrape: + self.get_from_es() + + if scrape or not self.json_data: + self.get_from_youtube() + self.process_youtube_meta() + self.get_entries() + self.json_data["playlist_entries"] = self.all_members + self.get_playlist_art() + + def process_youtube_meta(self): + """extract relevant fields from youtube""" + self.json_data = { + "playlist_id": self.youtube_id, + "playlist_active": True, + "playlist_subscribed": False, + "playlist_name": self.youtube_meta["title"], + "playlist_channel": self.youtube_meta["channel"], + "playlist_channel_id": self.youtube_meta["channel_id"], + "playlist_thumbnail": self.youtube_meta["thumbnails"][-1]["url"], + "playlist_description": self.youtube_meta["description"] or False, + "playlist_last_refresh": int(datetime.now().strftime("%s")), + } + + def get_entries(self, playlistend=False): + """get all videos in playlist""" + if playlistend: + # implement playlist end + print(playlistend) + all_members = [] + for idx, entry in enumerate(self.youtube_meta["entries"]): + if self.all_youtube_ids: + downloaded = entry["id"] in self.all_youtube_ids + else: + downloaded = False + if not entry["uploader"]: + continue + to_append = { + "youtube_id": entry["id"], + "title": entry["title"], + "uploader": entry["uploader"], + "idx": idx, + "downloaded": downloaded, + } + all_members.append(to_append) + + self.all_members = all_members + + @staticmethod + def get_playlist_art(): + """download artwork of playlist""" + thumbnails = ThumbManager() + missing_playlists = thumbnails.get_missing_playlists() + thumbnails.download_playlist(missing_playlists) + + def add_vids_to_playlist(self): + """sync the playlist id to videos""" + script = ( + 'if (!ctx._source.containsKey("playlist")) ' + + "{ctx._source.playlist = [params.playlist]} " + + "else if (!ctx._source.playlist.contains(params.playlist)) " + + "{ctx._source.playlist.add(params.playlist)} " + + "else {ctx.op = 'none'}" + ) + + bulk_list = [] + for entry in self.json_data["playlist_entries"]: + video_id = entry["youtube_id"] + action = {"update": {"_id": video_id, "_index": "ta_video"}} + source = { + "script": { + "source": script, + "lang": "painless", + "params": {"playlist": self.youtube_id}, + } + } + bulk_list.append(json.dumps(action)) + bulk_list.append(json.dumps(source)) + + # add last newline + bulk_list.append("\n") + query_str = "\n".join(bulk_list) + + ElasticWrap("_bulk").post(query_str, ndjson=True) + + def update_playlist(self): + """update metadata for playlist with data from YouTube""" + self.get_from_es() + subscribed = self.json_data["playlist_subscribed"] + self.get_from_youtube() + if not self.json_data: + # return false to deactivate + return False + + self.json_data["playlist_subscribed"] = subscribed + self.upload_to_es() + return True + + def build_nav(self, youtube_id): + """find next and previous in playlist of a given youtube_id""" + all_entries_available = self.json_data["playlist_entries"] + all_entries = [i for i in all_entries_available if i["downloaded"]] + current = [i for i in all_entries if i["youtube_id"] == youtube_id] + # stop if not found or playlist of 1 + if not current or not len(all_entries) > 1: + return + + current_idx = all_entries.index(current[0]) + if current_idx == 0: + previous_item = False + else: + previous_item = all_entries[current_idx - 1] + prev_thumb = ThumbManager().vid_thumb_path( + previous_item["youtube_id"] + ) + previous_item["vid_thumb"] = prev_thumb + + if current_idx == len(all_entries) - 1: + next_item = False + else: + next_item = all_entries[current_idx + 1] + next_thumb = ThumbManager().vid_thumb_path(next_item["youtube_id"]) + next_item["vid_thumb"] = next_thumb + + self.nav = { + "playlist_meta": { + "current_idx": current[0]["idx"], + "playlist_id": self.youtube_id, + "playlist_name": self.json_data["playlist_name"], + "playlist_channel": self.json_data["playlist_channel"], + }, + "playlist_previous": previous_item, + "playlist_next": next_item, + } + return + + def delete_metadata(self): + """delete metadata for playlist""" + script = ( + "ctx._source.playlist.removeAll(" + + "Collections.singleton(params.playlist)) " + ) + data = { + "query": { + "term": {"playlist.keyword": {"value": self.youtube_id}} + }, + "script": { + "source": script, + "lang": "painless", + "params": {"playlist": self.youtube_id}, + }, + } + _, _ = ElasticWrap("ta_video/_update_by_query").post(data) + self.del_in_es() + + def delete_videos_playlist(self): + """delete playlist with all videos""" + print(f"{self.youtube_id}: delete playlist") + self.get_from_es() + all_youtube_id = [ + i["youtube_id"] + for i in self.json_data["playlist_entries"] + if i["downloaded"] + ] + for youtube_id in all_youtube_id: + YoutubeVideo(youtube_id).delete_media_file() + + self.delete_metadata() diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py new file mode 100644 index 0000000..b59d5a5 --- /dev/null +++ b/tubearchivist/home/src/index/reindex.py @@ -0,0 +1,267 @@ +"""periodically refresh documents""" + +import json +from datetime import datetime +from math import ceil +from time import sleep + +import requests +from home.src.download.queue import PendingList +from home.src.download.subscriptions import ChannelSubscription +from home.src.download.thumbnails import ThumbManager +from home.src.index.channel import YoutubeChannel +from home.src.index.playlist import YoutubePlaylist +from home.src.index.video import YoutubeVideo +from home.src.ta.config import AppConfig +from home.src.ta.helper import get_total_hits + + +class Reindex: + """check for outdated documents and refresh data from youtube""" + + def __init__(self): + # config + config = AppConfig().config + self.sleep_interval = config["downloads"]["sleep_interval"] + self.es_url = config["application"]["es_url"] + self.es_auth = config["application"]["es_auth"] + self.refresh_interval = config["scheduler"]["check_reindex_days"] + self.integrate_ryd = config["downloads"]["integrate_ryd"] + # scan + self.all_youtube_ids = False + self.all_channel_ids = False + self.all_playlist_ids = False + + def get_daily(self): + """get daily refresh values""" + total_videos = get_total_hits( + "ta_video", self.es_url, self.es_auth, "active" + ) + video_daily = ceil(total_videos / self.refresh_interval * 1.2) + total_channels = get_total_hits( + "ta_channel", self.es_url, self.es_auth, "channel_active" + ) + channel_daily = ceil(total_channels / self.refresh_interval * 1.2) + total_playlists = get_total_hits( + "ta_playlist", self.es_url, self.es_auth, "playlist_active" + ) + playlist_daily = ceil(total_playlists / self.refresh_interval * 1.2) + return (video_daily, channel_daily, playlist_daily) + + def get_outdated_vids(self, size): + """get daily videos to refresh""" + headers = {"Content-type": "application/json"} + now = int(datetime.now().strftime("%s")) + now_lte = now - self.refresh_interval * 24 * 60 * 60 + data = { + "size": size, + "query": { + "bool": { + "must": [ + {"match": {"active": True}}, + {"range": {"vid_last_refresh": {"lte": now_lte}}}, + ] + } + }, + "sort": [{"vid_last_refresh": {"order": "asc"}}], + "_source": False, + } + query_str = json.dumps(data) + url = self.es_url + "/ta_video/_search" + response = requests.get( + url, data=query_str, headers=headers, auth=self.es_auth + ) + if not response.ok: + print(response.text) + response_dict = json.loads(response.text) + all_youtube_ids = [i["_id"] for i in response_dict["hits"]["hits"]] + return all_youtube_ids + + def get_unrated_vids(self): + """get all videos without rating if ryd integration is enabled""" + headers = {"Content-type": "application/json"} + data = { + "size": 200, + "query": { + "bool": { + "must_not": [{"exists": {"field": "stats.average_rating"}}] + } + }, + } + query_str = json.dumps(data) + url = self.es_url + "/ta_video/_search" + response = requests.get( + url, data=query_str, headers=headers, auth=self.es_auth + ) + if not response.ok: + print(response.text) + response_dict = json.loads(response.text) + missing_rating = [i["_id"] for i in response_dict["hits"]["hits"]] + self.all_youtube_ids = self.all_youtube_ids + missing_rating + + def get_outdated_channels(self, size): + """get daily channels to refresh""" + headers = {"Content-type": "application/json"} + now = int(datetime.now().strftime("%s")) + now_lte = now - self.refresh_interval * 24 * 60 * 60 + data = { + "size": size, + "query": { + "bool": { + "must": [ + {"match": {"channel_active": True}}, + {"range": {"channel_last_refresh": {"lte": now_lte}}}, + ] + } + }, + "sort": [{"channel_last_refresh": {"order": "asc"}}], + "_source": False, + } + query_str = json.dumps(data) + url = self.es_url + "/ta_channel/_search" + response = requests.get( + url, data=query_str, headers=headers, auth=self.es_auth + ) + if not response.ok: + print(response.text) + response_dict = json.loads(response.text) + all_channel_ids = [i["_id"] for i in response_dict["hits"]["hits"]] + return all_channel_ids + + def get_outdated_playlists(self, size): + """get daily outdated playlists to refresh""" + headers = {"Content-type": "application/json"} + now = int(datetime.now().strftime("%s")) + now_lte = now - self.refresh_interval * 24 * 60 * 60 + data = { + "size": size, + "query": { + "bool": { + "must": [ + {"match": {"playlist_active": True}}, + {"range": {"playlist_last_refresh": {"lte": now_lte}}}, + ] + } + }, + "sort": [{"playlist_last_refresh": {"order": "asc"}}], + "_source": False, + } + query_str = json.dumps(data) + url = self.es_url + "/ta_playlist/_search" + response = requests.get( + url, data=query_str, headers=headers, auth=self.es_auth + ) + if not response.ok: + print(response.text) + response_dict = json.loads(response.text) + all_playlist_ids = [i["_id"] for i in response_dict["hits"]["hits"]] + return all_playlist_ids + + def check_outdated(self): + """add missing vids and channels""" + video_daily, channel_daily, playlist_daily = self.get_daily() + self.all_youtube_ids = self.get_outdated_vids(video_daily) + self.all_channel_ids = self.get_outdated_channels(channel_daily) + self.all_playlist_ids = self.get_outdated_playlists(playlist_daily) + if self.integrate_ryd: + self.get_unrated_vids() + + def rescrape_all_channels(self): + """sync new data from channel to all matching videos""" + sleep_interval = self.sleep_interval + channel_sub_handler = ChannelSubscription() + all_channels = channel_sub_handler.get_channels(subscribed_only=False) + all_channel_ids = [i["channel_id"] for i in all_channels] + + for channel_id in all_channel_ids: + channel = YoutubeChannel(channel_id) + subscribed = channel.json_data["channel_subscribed"] + channel.get_from_youtube() + channel.json_data["channel_subscribed"] = subscribed + channel.upload_to_es() + channel.sync_to_videos() + + if sleep_interval: + sleep(sleep_interval) + + @staticmethod + def reindex_single_video(youtube_id): + """refresh data for single video""" + video = YoutubeVideo(youtube_id) + + # read current state + video.get_from_es() + player = video.json_data["player"] + date_downloaded = video.json_data["date_downloaded"] + channel_dict = video.json_data["channel"] + playlist = video.json_data.get("playlist") + + # get new + video.build_json() + if not video.json_data: + video.deactivate() + + # add back + video.json_data["player"] = player + video.json_data["date_downloaded"] = date_downloaded + video.json_data["channel"] = channel_dict + if playlist: + video.json_data["playlist"] = playlist + + video.upload_to_es() + + thumb_handler = ThumbManager() + thumb_handler.delete_vid_thumb(youtube_id) + to_download = (youtube_id, video.json_data["vid_thumb_url"]) + thumb_handler.download_vid([to_download], notify=False) + + @staticmethod + def reindex_single_channel(channel_id): + """refresh channel data and sync to videos""" + channel = YoutubeChannel(channel_id) + channel.get_from_es() + subscribed = channel.json_data["channel_subscribed"] + channel.get_from_youtube() + channel.json_data["channel_subscribed"] = subscribed + channel.upload_to_es() + channel.sync_to_videos() + + @staticmethod + def reindex_single_playlist(playlist_id, all_indexed_ids): + """refresh playlist data""" + playlist = YoutubePlaylist(playlist_id) + playlist.get_from_es() + subscribed = playlist.json_data["playlist_subscribed"] + playlist.all_youtube_ids = all_indexed_ids + playlist.build_json(scrape=True) + if not playlist.json_data: + playlist.deactivate() + return + + playlist.json_data["playlist_subscribed"] = subscribed + playlist.upload_to_es() + return + + def reindex(self): + """reindex what's needed""" + # videos + print(f"reindexing {len(self.all_youtube_ids)} videos") + for youtube_id in self.all_youtube_ids: + self.reindex_single_video(youtube_id) + if self.sleep_interval: + sleep(self.sleep_interval) + # channels + print(f"reindexing {len(self.all_channel_ids)} channels") + for channel_id in self.all_channel_ids: + self.reindex_single_channel(channel_id) + if self.sleep_interval: + sleep(self.sleep_interval) + # playlist + print(f"reindexing {len(self.all_playlist_ids)} playlists") + if self.all_playlist_ids: + all_indexed = PendingList().get_all_indexed() + all_indexed_ids = [i["youtube_id"] for i in all_indexed] + for playlist_id in self.all_playlist_ids: + self.reindex_single_playlist(playlist_id, all_indexed_ids) + if self.sleep_interval: + sleep(self.sleep_interval) diff --git a/tubearchivist/home/src/index/video.py b/tubearchivist/home/src/index/video.py new file mode 100644 index 0000000..7b71e36 --- /dev/null +++ b/tubearchivist/home/src/index/video.py @@ -0,0 +1,171 @@ +"""handle single video index""" + +import os +from datetime import datetime + +import requests +from home.src.index import channel as ta_channel +from home.src.index.generic import YouTubeItem +from home.src.ta.helper import DurationConverter, clean_string +from ryd_client import ryd_client + + +class YoutubeVideo(YouTubeItem): + """represents a single youtube video""" + + es_path = False + index_name = "ta_video" + yt_base = "https://www.youtube.com/watch?v=" + + def __init__(self, youtube_id): + super().__init__(youtube_id) + self.channel_id = False + self.es_path = f"{self.index_name}/_doc/{youtube_id}" + + def build_json(self): + """build json dict of video""" + self.get_from_youtube() + if not self.youtube_meta: + return + + self._process_youtube_meta() + self._add_channel() + self._add_stats() + self.add_file_path() + self.add_player() + if self.config["downloads"]["integrate_ryd"]: + self._get_ryd_stats() + + return + + def _process_youtube_meta(self): + """extract relevant fields from youtube""" + # extract + self.channel_id = self.youtube_meta["channel_id"] + upload_date = self.youtube_meta["upload_date"] + upload_date_time = datetime.strptime(upload_date, "%Y%m%d") + published = upload_date_time.strftime("%Y-%m-%d") + last_refresh = int(datetime.now().strftime("%s")) + # build json_data basics + self.json_data = { + "title": self.youtube_meta["title"], + "description": self.youtube_meta["description"], + "category": self.youtube_meta["categories"], + "vid_thumb_url": self.youtube_meta["thumbnail"], + "tags": self.youtube_meta["tags"], + "published": published, + "vid_last_refresh": last_refresh, + "date_downloaded": last_refresh, + "youtube_id": self.youtube_id, + "active": True, + } + + def _add_channel(self): + """add channel dict to video json_data""" + channel = ta_channel.YoutubeChannel(self.channel_id) + channel.build_json(upload=True) + self.json_data.update({"channel": channel.json_data}) + + def _add_stats(self): + """add stats dicst to json_data""" + # likes + like_count = self.youtube_meta.get("like_count", 0) + dislike_count = self.youtube_meta.get("dislike_count", 0) + self.json_data.update( + { + "stats": { + "view_count": self.youtube_meta["view_count"], + "like_count": like_count, + "dislike_count": dislike_count, + "average_rating": self.youtube_meta["average_rating"], + } + } + ) + + def build_dl_cache_path(self): + """find video path in dl cache""" + cache_dir = self.app_conf["cache_dir"] + cache_path = f"{cache_dir}/download/" + all_cached = os.listdir(cache_path) + for file_cached in all_cached: + if self.youtube_id in file_cached: + vid_path = os.path.join(cache_path, file_cached) + return vid_path + + return False + + def add_player(self): + """add player information for new videos""" + try: + # when indexing from download task + vid_path = self.build_dl_cache_path() + except FileNotFoundError: + # when reindexing + base = self.app_conf["videos"] + vid_path = os.path.join(base, self.json_data["media_url"]) + + duration_handler = DurationConverter() + duration = duration_handler.get_sec(vid_path) + duration_str = duration_handler.get_str(duration) + self.json_data.update( + { + "player": { + "watched": False, + "duration": duration, + "duration_str": duration_str, + } + } + ) + + def add_file_path(self): + """build media_url for where file will be located""" + channel_name = self.json_data["channel"]["channel_name"] + clean_channel_name = clean_string(channel_name) + timestamp = self.json_data["published"].replace("-", "") + youtube_id = self.json_data["youtube_id"] + title = self.json_data["title"] + clean_title = clean_string(title) + filename = f"{timestamp}_{youtube_id}_{clean_title}.mp4" + media_url = os.path.join(clean_channel_name, filename) + self.json_data["media_url"] = media_url + + def delete_media_file(self): + """delete video file, meta data""" + self.get_from_es() + video_base = self.app_conf["videos"] + media_url = self.json_data["media_url"] + print(f"{self.youtube_id}: delete {media_url} from file system") + to_delete = os.path.join(video_base, media_url) + os.remove(to_delete) + self.del_in_es() + + def _get_ryd_stats(self): + """get optional stats from returnyoutubedislikeapi.com""" + try: + print(f"{self.youtube_id}: get ryd stats") + result = ryd_client.get(self.youtube_id) + except requests.exceptions.ConnectionError: + print(f"{self.youtube_id}: failed to query ryd api, skipping") + return False + + if result["status"] == 404: + return False + + dislikes = { + "dislike_count": result["dislikes"], + "average_rating": result["rating"], + } + self.json_data["stats"].update(dislikes) + + return True + + +def index_new_video(youtube_id): + """combined classes to create new video in index""" + video = YoutubeVideo(youtube_id) + video.build_json() + if not video.json_data: + raise ValueError("failed to get metadata for " + youtube_id) + + video.upload_to_es() + return video.json_data diff --git a/tubearchivist/home/src/ta/__init__.py b/tubearchivist/home/src/ta/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tubearchivist/home/src/config.py b/tubearchivist/home/src/ta/config.py similarity index 99% rename from tubearchivist/home/src/config.py rename to tubearchivist/home/src/ta/config.py index 5690716..569b230 100644 --- a/tubearchivist/home/src/config.py +++ b/tubearchivist/home/src/ta/config.py @@ -10,7 +10,7 @@ import os import re from celery.schedules import crontab -from home.src.helper import RedisArchivist +from home.src.ta.ta_redis import RedisArchivist class AppConfig: diff --git a/tubearchivist/home/src/helper.py b/tubearchivist/home/src/ta/helper.py similarity index 57% rename from tubearchivist/home/src/helper.py rename to tubearchivist/home/src/ta/helper.py index 61f698e..4788636 100644 --- a/tubearchivist/home/src/helper.py +++ b/tubearchivist/home/src/ta/helper.py @@ -4,14 +4,12 @@ Loose collection of helper functions """ import json -import os import re import string import subprocess import unicodedata from urllib.parse import parse_qs, urlparse -import redis import requests import yt_dlp @@ -149,153 +147,6 @@ class UrlListParser: return channel_id -class RedisArchivist: - """collection of methods to interact with redis""" - - REDIS_HOST = os.environ.get("REDIS_HOST") - REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 - NAME_SPACE = "ta:" - CHANNELS = [ - "download", - "add", - "rescan", - "subchannel", - "subplaylist", - "playlistscan", - "setting", - ] - - def __init__(self): - self.redis_connection = redis.Redis( - host=self.REDIS_HOST, port=self.REDIS_PORT - ) - - def set_message(self, key, message, expire=True): - """write new message to redis""" - self.redis_connection.execute_command( - "JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message) - ) - - if expire: - if isinstance(expire, bool): - secs = 20 - else: - secs = expire - self.redis_connection.execute_command( - "EXPIRE", self.NAME_SPACE + key, secs - ) - - def get_message(self, key): - """get message dict from redis""" - reply = self.redis_connection.execute_command( - "JSON.GET", self.NAME_SPACE + key - ) - if reply: - json_str = json.loads(reply) - else: - json_str = {"status": False} - - return json_str - - def del_message(self, key): - """delete key from redis""" - response = self.redis_connection.execute_command( - "DEL", self.NAME_SPACE + key - ) - return response - - def get_lock(self, lock_key): - """handle lock for task management""" - redis_lock = self.redis_connection.lock(self.NAME_SPACE + lock_key) - return redis_lock - - def get_progress(self): - """get a list of all progress messages""" - all_messages = [] - for channel in self.CHANNELS: - key = "message:" + channel - reply = self.redis_connection.execute_command( - "JSON.GET", self.NAME_SPACE + key - ) - if reply: - json_str = json.loads(reply) - all_messages.append(json_str) - - return all_messages - - @staticmethod - def monitor_cache_dir(cache_dir): - """ - look at download cache dir directly as alternative progress info - """ - dl_cache = os.path.join(cache_dir, "download") - all_cache_file = os.listdir(dl_cache) - cache_file = ignore_filelist(all_cache_file) - if cache_file: - filename = cache_file[0][12:].replace("_", " ").split(".")[0] - mess_dict = { - "status": "message:download", - "level": "info", - "title": "Downloading: " + filename, - "message": "", - } - else: - return False - - return mess_dict - - -class RedisQueue: - """dynamically interact with the download queue in redis""" - - REDIS_HOST = os.environ.get("REDIS_HOST") - REDIS_PORT = os.environ.get("REDIS_PORT") - NAME_SPACE = "ta:" - - if not REDIS_PORT: - REDIS_PORT = 6379 - - def __init__(self, key): - self.key = self.NAME_SPACE + key - self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) - - def get_all(self): - """return all elements in list""" - result = self.conn.execute_command("LRANGE", self.key, 0, -1) - all_elements = [i.decode() for i in result] - return all_elements - - def add_list(self, to_add): - """add list to queue""" - self.conn.execute_command("RPUSH", self.key, *to_add) - - def add_priority(self, to_add): - """add single video to front of queue""" - self.clear_item(to_add) - self.conn.execute_command("LPUSH", self.key, to_add) - - def get_next(self): - """return next element in the queue, False if none""" - result = self.conn.execute_command("LPOP", self.key) - if not result: - return False - - next_element = result.decode() - return next_element - - def clear(self): - """delete list from redis""" - self.conn.execute_command("DEL", self.key) - - def clear_item(self, to_clear): - """remove single item from list if it's there""" - self.conn.execute_command("LREM", self.key, 0, to_clear) - - def trim(self, size): - """trim the queue based on settings amount""" - self.conn.execute_command("LTRIM", self.key, 0, size) - - class DurationConverter: """ using ffmpeg to get and parse duration from filepath diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py new file mode 100644 index 0000000..60b8e0e --- /dev/null +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -0,0 +1,154 @@ +"""interact with redis""" + +import json +import os + +import redis +from home.src.ta.helper import ignore_filelist + + +class RedisArchivist: + """collection of methods to interact with redis""" + + REDIS_HOST = os.environ.get("REDIS_HOST") + REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 + NAME_SPACE = "ta:" + CHANNELS = [ + "download", + "add", + "rescan", + "subchannel", + "subplaylist", + "playlistscan", + "setting", + ] + + def __init__(self): + self.redis_connection = redis.Redis( + host=self.REDIS_HOST, port=self.REDIS_PORT + ) + + def set_message(self, key, message, expire=True): + """write new message to redis""" + self.redis_connection.execute_command( + "JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message) + ) + + if expire: + if isinstance(expire, bool): + secs = 20 + else: + secs = expire + self.redis_connection.execute_command( + "EXPIRE", self.NAME_SPACE + key, secs + ) + + def get_message(self, key): + """get message dict from redis""" + reply = self.redis_connection.execute_command( + "JSON.GET", self.NAME_SPACE + key + ) + if reply: + json_str = json.loads(reply) + else: + json_str = {"status": False} + + return json_str + + def del_message(self, key): + """delete key from redis""" + response = self.redis_connection.execute_command( + "DEL", self.NAME_SPACE + key + ) + return response + + def get_lock(self, lock_key): + """handle lock for task management""" + redis_lock = self.redis_connection.lock(self.NAME_SPACE + lock_key) + return redis_lock + + def get_progress(self): + """get a list of all progress messages""" + all_messages = [] + for channel in self.CHANNELS: + key = "message:" + channel + reply = self.redis_connection.execute_command( + "JSON.GET", self.NAME_SPACE + key + ) + if reply: + json_str = json.loads(reply) + all_messages.append(json_str) + + return all_messages + + @staticmethod + def monitor_cache_dir(cache_dir): + """ + look at download cache dir directly as alternative progress info + """ + dl_cache = os.path.join(cache_dir, "download") + all_cache_file = os.listdir(dl_cache) + cache_file = ignore_filelist(all_cache_file) + if cache_file: + filename = cache_file[0][12:].replace("_", " ").split(".")[0] + mess_dict = { + "status": "message:download", + "level": "info", + "title": "Downloading: " + filename, + "message": "", + } + else: + return False + + return mess_dict + + +class RedisQueue: + """dynamically interact with the download queue in redis""" + + REDIS_HOST = os.environ.get("REDIS_HOST") + REDIS_PORT = os.environ.get("REDIS_PORT") + NAME_SPACE = "ta:" + + if not REDIS_PORT: + REDIS_PORT = 6379 + + def __init__(self, key): + self.key = self.NAME_SPACE + key + self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) + + def get_all(self): + """return all elements in list""" + result = self.conn.execute_command("LRANGE", self.key, 0, -1) + all_elements = [i.decode() for i in result] + return all_elements + + def add_list(self, to_add): + """add list to queue""" + self.conn.execute_command("RPUSH", self.key, *to_add) + + def add_priority(self, to_add): + """add single video to front of queue""" + self.clear_item(to_add) + self.conn.execute_command("LPUSH", self.key, to_add) + + def get_next(self): + """return next element in the queue, False if none""" + result = self.conn.execute_command("LPOP", self.key) + if not result: + return False + + next_element = result.decode() + return next_element + + def clear(self): + """delete list from redis""" + self.conn.execute_command("DEL", self.key) + + def clear_item(self, to_clear): + """remove single item from list if it's there""" + self.conn.execute_command("LREM", self.key, 0, to_clear) + + def trim(self, size): + """trim the queue based on settings amount""" + self.conn.execute_command("LTRIM", self.key, 0, size) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 995cf8f..63d84b2 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -10,22 +10,24 @@ import os import home.apps as startup_apps from celery import Celery, shared_task -from home.src.config import AppConfig, ScheduleBuilder -from home.src.download import ( +from home.src.download.queue import PendingList +from home.src.download.subscriptions import ( ChannelSubscription, - PendingList, PlaylistSubscription, - VideoDownloader, ) -from home.src.helper import RedisArchivist, RedisQueue, UrlListParser -from home.src.index import YoutubeChannel, YoutubePlaylist -from home.src.index_management import backup_all_indexes, restore_from_backup -from home.src.reindex import ( +from home.src.download.thumbnails import ThumbManager, validate_thumbnails +from home.src.download.yt_dlp_handler import VideoDownloader +from home.src.es.index_setup import backup_all_indexes, restore_from_backup +from home.src.index.channel import YoutubeChannel +from home.src.index.filesystem import ( ManualImport, reindex_old_documents, scan_filesystem, ) -from home.src.thumbnails import ThumbManager, validate_thumbnails +from home.src.index.playlist import YoutubePlaylist +from home.src.ta.config import AppConfig, ScheduleBuilder +from home.src.ta.helper import UrlListParser +from home.src.ta.ta_redis import RedisArchivist, RedisQueue CONFIG = AppConfig().config REDIS_HOST = os.environ.get("REDIS_HOST") diff --git a/tubearchivist/home/views.py b/tubearchivist/home/views.py index 687c0aa..006a944 100644 --- a/tubearchivist/home/views.py +++ b/tubearchivist/home/views.py @@ -14,7 +14,9 @@ from django.contrib.auth.forms import AuthenticationForm from django.http import JsonResponse from django.shortcuts import redirect, render from django.views import View -from home.forms import ( +from home.src.es.index_setup import get_available_backups +from home.src.frontend.api_calls import PostData +from home.src.frontend.forms import ( AddToQueueForm, ApplicationSettingsForm, CustomAuthForm, @@ -24,12 +26,12 @@ from home.forms import ( SubscribeToPlaylistForm, UserSettingsForm, ) -from home.src.config import AppConfig, ScheduleBuilder -from home.src.frontend import PostData -from home.src.helper import RedisArchivist, UrlListParser -from home.src.index import YoutubePlaylist -from home.src.index_management import get_available_backups -from home.src.searching import Pagination, SearchHandler +from home.src.frontend.searching import SearchHandler +from home.src.index.generic import Pagination +from home.src.index.playlist import YoutubePlaylist +from home.src.ta.config import AppConfig, ScheduleBuilder +from home.src.ta.helper import UrlListParser +from home.src.ta.ta_redis import RedisArchivist from home.tasks import extrac_dl, subscribe_to from rest_framework.authtoken.models import Token