From b64a2f81951add4204ca6c48d1409a2bb7663a07 Mon Sep 17 00:00:00 2001 From: simon Date: Sat, 22 Jan 2022 17:48:54 +0700 Subject: [PATCH] squash index.py refactor commits --- tubearchivist/home/src/download.py | 138 ++--- tubearchivist/home/src/es.py | 88 ++- tubearchivist/home/src/frontend.py | 2 +- tubearchivist/home/src/index.py | 898 +++++++++-------------------- tubearchivist/home/src/reindex.py | 104 ++-- tubearchivist/home/tasks.py | 30 +- tubearchivist/home/views.py | 10 +- 7 files changed, 464 insertions(+), 806 deletions(-) diff --git a/tubearchivist/home/src/download.py b/tubearchivist/home/src/download.py index 429c90d..0dbd0be 100644 --- a/tubearchivist/home/src/download.py +++ b/tubearchivist/home/src/download.py @@ -15,6 +15,7 @@ from time import sleep import requests import yt_dlp from home.src.config import AppConfig +from home.src.es import IndexPaginate from home.src.helper import ( DurationConverter, RedisArchivist, @@ -23,7 +24,6 @@ from home.src.helper import ( ignore_filelist, ) from home.src.index import ( - IndexPaginate, YoutubeChannel, YoutubePlaylist, YoutubeVideo, @@ -69,7 +69,9 @@ class PendingList: missing_videos = missing_videos + youtube_ids elif url_type == "playlist": self.missing_from_playlists.append(entry) - video_results = YoutubePlaylist(url).get_entries() + playlist = YoutubePlaylist(url) + playlist.build_json() + video_results = playlist.json_data.get("playlist_entries") youtube_ids = [i["youtube_id"] for i in video_results] missing_videos = missing_videos + youtube_ids @@ -346,34 +348,14 @@ class ChannelSubscription: return missing_videos - def change_subscribe(self, channel_id, channel_subscribed): + @staticmethod + def change_subscribe(channel_id, channel_subscribed): """subscribe or unsubscribe from channel and update""" - if not isinstance(channel_subscribed, bool): - print("invalid status, should be bool") - return - headers = {"Content-type": "application/json"} - channel_handler = YoutubeChannel(channel_id) - channel_dict = channel_handler.channel_dict - channel_dict["channel_subscribed"] = channel_subscribed - if channel_subscribed: - # handle subscribe - url = self.es_url + "/ta_channel/_doc/" + channel_id - payload = json.dumps(channel_dict) - print(channel_dict) - else: - url = self.es_url + "/ta_channel/_update/" + channel_id - payload = json.dumps({"doc": channel_dict}) - # update channel - request = requests.post( - url, data=payload, headers=headers, auth=self.es_auth - ) - if not request.ok: - print(request.text) - raise ValueError("failed change subscribe status") - # sync to videos - channel_handler.sync_to_videos() - if channel_handler.source == "scraped": - channel_handler.get_channel_art() + channel = YoutubeChannel(channel_id) + channel.build_json() + channel.json_data["channel_subscribed"] = channel_subscribed + channel.upload_to_es() + channel.sync_to_videos() class PlaylistSubscription: @@ -413,20 +395,15 @@ class PlaylistSubscription: print(f"{playlist_id} not a playlist, skipping...") continue - playlist_h = YoutubePlaylist( - playlist_id, all_youtube_ids=all_youtube_ids - ) - if not playlist_h.get_es_playlist(): - playlist_h.get_playlist_dict() - playlist_h.playlist_dict["playlist_subscribed"] = subscribed - playlist_h.upload_to_es() - playlist_h.add_vids_to_playlist() - thumb = playlist_h.playlist_dict["playlist_thumbnail"] - new_thumbs.append((playlist_id, thumb)) - self.channel_validate(playlist_h) - else: - self.change_subscribe(playlist_id, subscribe_status=True) - + playlist_h = YoutubePlaylist(playlist_id) + playlist_h.all_youtube_ids = all_youtube_ids + playlist_h.build_json() + playlist_h.json_data["playlist_subscribed"] = subscribed + playlist_h.upload_to_es() + playlist_h.add_vids_to_playlist() + self.channel_validate(playlist_h.json_data["playlist_channel_id"]) + thumb = playlist_h.json_data["playlist_thumbnail"] + new_thumbs.append((playlist_id, thumb)) # notify message = { "status": "message:subplaylist", @@ -441,41 +418,18 @@ class PlaylistSubscription: return new_thumbs @staticmethod - def channel_validate(playlist_handler): + def channel_validate(channel_id): """make sure channel of playlist is there""" - channel_id = playlist_handler.playlist_dict["playlist_channel_id"] - channel_handler = YoutubeChannel(channel_id) - if channel_handler.source == "scraped": - channel_handler.channel_dict["channel_subscribed"] = False - channel_handler.upload_to_es() - channel_handler.get_channel_art() + channel = YoutubeChannel(channel_id) + channel.build_json() - def change_subscribe(self, playlist_id, subscribe_status): + @staticmethod + def change_subscribe(playlist_id, subscribe_status): """change the subscribe status of a playlist""" - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - playlist_handler = YoutubePlaylist(playlist_id) - playlist_handler.get_playlist_dict() - subed_now = playlist_handler.playlist_dict["playlist_subscribed"] - - if subed_now == subscribe_status: - # status already as expected, do nothing - return False - - # update subscribed status - headers = {"Content-type": "application/json"} - url = f"{es_url}/ta_playlist/_update/{playlist_id}" - payload = json.dumps( - {"doc": {"playlist_subscribed": subscribe_status}} - ) - response = requests.post( - url, data=payload, headers=headers, auth=es_auth - ) - if not response.ok: - print(response.text) - raise ValueError("failed to change subscribe status") - - return True + playlist = YoutubePlaylist(playlist_id) + playlist.build_json() + playlist.json_data["playlist_subscribed"] = subscribe_status + playlist.upload_to_es() @staticmethod def get_to_ignore(): @@ -493,26 +447,25 @@ class PlaylistSubscription: to_ignore = self.get_to_ignore() missing_videos = [] - counter = 1 - for playlist_id in all_playlists: + for idx, playlist_id in enumerate(all_playlists): size_limit = self.config["subscriptions"]["channel_size"] - playlist_handler = YoutubePlaylist(playlist_id) - playlist = playlist_handler.update_playlist() + playlist = YoutubePlaylist(playlist_id) + playlist.update_playlist() if not playlist: - playlist_handler.deactivate() + playlist.deactivate() continue + playlist_entries = playlist.json_data["playlist_entries"] if size_limit: - playlist_entries = playlist["playlist_entries"][:size_limit] - else: - playlist_entries = playlist["playlist_entries"] + del playlist_entries[size_limit:] + all_missing = [i for i in playlist_entries if not i["downloaded"]] message = { "status": "message:rescan", "level": "info", "title": "Scanning playlists: Looking for new videos.", - "message": f"Progress: {counter}/{len(all_playlists)}", + "message": f"Progress: {idx + 1}/{len(all_playlists)}", } RedisArchivist().set_message("message:rescan", message=message) @@ -520,7 +473,6 @@ class PlaylistSubscription: youtube_id = video["youtube_id"] if youtube_id not in to_ignore: missing_videos.append(youtube_id) - counter = counter + 1 return missing_videos @@ -751,15 +703,15 @@ class VideoDownloader: playlists = YoutubeChannel(channel_id).get_indexed_playlists() all_playlist_ids = [i["playlist_id"] for i in playlists] for id_p, playlist_id in enumerate(all_playlist_ids): - playlist_handler = YoutubePlaylist( - playlist_id, all_youtube_ids=all_youtube_ids - ) - playlist_dict = playlist_handler.update_playlist() - if not playlist_dict: - playlist_handler.deactivate() - continue + playlist = YoutubePlaylist(playlist_id) + playlist.all_youtube_ids = all_youtube_ids + playlist.build_json(scrape=True) + if not playlist.json_data: + playlist.deactivate() + + playlist.add_vids_to_playlist() + playlist.upload_to_es() - playlist_handler.add_vids_to_playlist() # notify title = ( "Processing playlists for channels: " diff --git a/tubearchivist/home/src/es.py b/tubearchivist/home/src/es.py index ea44a24..ce72863 100644 --- a/tubearchivist/home/src/es.py +++ b/tubearchivist/home/src/es.py @@ -29,7 +29,6 @@ class ElasticWrap: def get(self, data=False): """get data from es""" - if data: response = requests.get(self.url, json=data, auth=self.auth) else: @@ -39,7 +38,7 @@ class ElasticWrap: return response.json(), response.status_code - def post(self, data, ndjson=False): + def post(self, data=False, ndjson=False): """post data to es""" if ndjson: headers = {"Content-type": "application/x-ndjson"} @@ -48,20 +47,23 @@ class ElasticWrap: headers = {"Content-type": "application/json"} payload = json.dumps(data) - response = requests.post( - self.url, data=payload, header=headers, auth=self.auth - ) + if data: + response = requests.post( + self.url, data=payload, headers=headers, auth=self.auth + ) + else: + response = requests.post(self.url, headers=headers, auth=self.auth) if not response.ok: print(response.text) return response.json(), response.status_code - def put(self, data): + def put(self, data, refresh=False): """put data to es""" - response = requests.put( - f"{self.url}/?refresh=true", json=data, auth=self.auth - ) + if refresh: + self.url = f"{self.url}/?refresh=true" + response = requests.put(f"{self.url}", json=data, auth=self.auth) if not response.ok: print(response.text) print(data) @@ -69,10 +71,74 @@ class ElasticWrap: return response.json(), response.status_code - def delete(self): + def delete(self, data=False): """delete document from es""" - response = requests.delete(self.url, auth=self.auth) + if data: + response = requests.delete(self.url, json=data, auth=self.auth) + else: + response = requests.delete(self.url, auth=self.auth) + if not response.ok: print(response.text) return response.json(), response.status_code + + +class IndexPaginate: + """use search_after to go through whole index""" + + DEFAULT_SIZE = 500 + + def __init__(self, index_name, data, size=False): + self.index_name = index_name + self.data = data + self.pit_id = False + self.size = size + + def get_results(self): + """get all results""" + self.get_pit() + self.validate_data() + all_results = self.run_loop() + self.clean_pit() + return all_results + + def get_pit(self): + """get pit for index""" + path = f"{self.index_name}/_pit?keep_alive=10m" + response, _ = ElasticWrap(path).post() + self.pit_id = response["id"] + + def validate_data(self): + """add pit and size to data""" + if "sort" not in self.data.keys(): + print(self.data) + raise ValueError("missing sort key in data") + + size = self.size or self.DEFAULT_SIZE + + self.data["size"] = size + self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"} + + def run_loop(self): + """loop through results until last hit""" + all_results = [] + while True: + response, _ = ElasticWrap("_search").get(data=self.data) + all_hits = response["hits"]["hits"] + if all_hits: + for hit in all_hits: + source = hit["_source"] + search_after = hit["sort"] + all_results.append(source) + # update search_after with last hit data + self.data["search_after"] = search_after + else: + break + + return all_results + + def clean_pit(self): + """delete pit from elastic search""" + data = {"id": self.pit_id} + ElasticWrap("_pit").delete(data=data) diff --git a/tubearchivist/home/src/frontend.py b/tubearchivist/home/src/frontend.py index c81283e..b6451e6 100644 --- a/tubearchivist/home/src/frontend.py +++ b/tubearchivist/home/src/frontend.py @@ -306,7 +306,7 @@ class PostData: playlist_dict = self.exec_val playlist_id = playlist_dict["playlist-id"] playlist_action = playlist_dict["playlist-action"] - print(f"delete {playlist_action} from playlist {playlist_id}") + print(f"{playlist_id}: delete playlist {playlist_action}") if playlist_action == "metadata": YoutubePlaylist(playlist_id).delete_metadata() elif playlist_action == "all": diff --git a/tubearchivist/home/src/index.py b/tubearchivist/home/src/index.py index c7deda7..fd22f17 100644 --- a/tubearchivist/home/src/index.py +++ b/tubearchivist/home/src/index.py @@ -9,13 +9,12 @@ import json import os import re from datetime import datetime -from time import sleep import requests import yt_dlp from bs4 import BeautifulSoup from home.src.config import AppConfig -from home.src.es import ElasticWrap +from home.src.es import ElasticWrap, IndexPaginate from home.src.helper import DurationConverter, UrlListParser, clean_string from home.src.thumbnails import ThumbManager from ryd_client import ryd_client @@ -26,50 +25,53 @@ class YouTubeItem: es_path = False index_name = False + yt_base = False + yt_obs = { + "quiet": True, + "default_search": "ytsearch", + "skip_download": True, + "check_formats": "selected", + "noplaylist": True, + } def __init__(self, youtube_id): - self.app_conf = self._get_conf() self.youtube_id = youtube_id + self.config = False + self.app_conf = False self.youtube_meta = False self.json_data = False + self._get_conf() - @staticmethod - def _get_conf(): + def _get_conf(self): """read user conf""" - config = AppConfig().config - return config["application"] + self.config = AppConfig().config + self.app_conf = self.config["application"] def get_from_youtube(self): """use yt-dlp to get meta data from youtube""" - print("getting from youtube") - obs = { - "quiet": True, - "default_search": "ytsearch", - "skip_download": True, - "check_formats": "selected", - "noplaylist": True, - } - + print(f"{self.youtube_id}: get metadata from youtube") try: - response = yt_dlp.YoutubeDL(obs).extract_info(self.youtube_id) + yt_item = yt_dlp.YoutubeDL(self.yt_obs) + response = yt_item.extract_info(self.yt_base + self.youtube_id) except ( yt_dlp.utils.ExtractorError, yt_dlp.utils.DownloadError, ): - print("failed to get info for " + self.youtube_id) + print(f"{self.youtube_id}: failed to get info from youtube") self.youtube_meta = False self.youtube_meta = response def get_from_es(self): """get indexed data from elastic search""" - print("get from es") - response, _ = ElasticWrap(self.es_path).get() - self.json_data = response + print(f"{self.youtube_id}: get metadata from es") + response, _ = ElasticWrap(f"{self.es_path}").get() + source = response.get("_source") + self.json_data = source def upload_to_es(self): """add json_data to elastic""" - _, _ = ElasticWrap(self.es_path).put(self.json_data) + _, _ = ElasticWrap(self.es_path).put(self.json_data, refresh=True) def deactivate(self): """deactivate document in es""" @@ -78,7 +80,7 @@ class YouTubeItem: "channel": "channel_active", "playlist": "playlist_active", } - update_path = f"ta_{self.index_name}/_update/{self.youtube_id}" + update_path = f"{self.index_name}/_update/{self.youtube_id}" data = { "script": f"ctx._source.{key_match.get(self.index_name)} = false" } @@ -86,28 +88,39 @@ class YouTubeItem: def del_in_es(self): """delete item from elastic search""" - print("delete from es") + print(f"{self.youtube_id}: delete from es") _, _ = ElasticWrap(self.es_path).delete() -class YoutubeVideoNew(YouTubeItem): +class YoutubeVideo(YouTubeItem): """represents a single youtube video""" es_path = False - index_name = "video" + index_name = "ta_video" + yt_base = "https://www.youtube.com/watch?v=" def __init__(self, youtube_id): super().__init__(youtube_id) self.channel_id = False - self.es_path = f"ta_video/_doc/{youtube_id}" + self.es_path = f"{self.index_name}/_doc/{youtube_id}" def build_json(self): """build json dict of video""" self.get_from_youtube() - self.process_youtube_meta() - self.add_stats() + if not self.youtube_meta: + return - def process_youtube_meta(self): + self._process_youtube_meta() + self._add_channel() + self._add_stats() + self.add_file_path() + self.add_player() + if self.config["downloads"]["integrate_ryd"]: + self._get_ryd_stats() + + return + + def _process_youtube_meta(self): """extract relevant fields from youtube""" # extract self.channel_id = self.youtube_meta["channel_id"] @@ -129,7 +142,13 @@ class YoutubeVideoNew(YouTubeItem): "active": True, } - def add_stats(self): + def _add_channel(self): + """add channel dict to video json_data""" + channel = YoutubeChannel(self.channel_id) + channel.build_json(upload=True) + self.json_data.update({"channel": channel.json_data}) + + def _add_stats(self): """add stats dicst to json_data""" # likes like_count = self.youtube_meta.get("like_count", 0) @@ -145,8 +164,28 @@ class YoutubeVideoNew(YouTubeItem): } ) - def add_player(self, vid_path): + def build_dl_cache_path(self): + """find video path in dl cache""" + cache_dir = self.app_conf["cache_dir"] + cache_path = f"{cache_dir}/download/" + all_cached = os.listdir(cache_path) + for file_cached in all_cached: + if self.youtube_id in file_cached: + vid_path = os.path.join(cache_path, file_cached) + return vid_path + + return False + + def add_player(self): """add player information for new videos""" + try: + # when indexing from download task + vid_path = self.build_dl_cache_path() + except FileNotFoundError: + # when reindexing + base = self.app_conf["videos"] + vid_path = os.path.join(base, self.json_data["media_url"]) + duration_handler = DurationConverter() duration = duration_handler.get_sec(vid_path) duration_str = duration_handler.get_str(duration) @@ -173,23 +212,22 @@ class YoutubeVideoNew(YouTubeItem): self.json_data["media_url"] = media_url def delete_media_file(self): - """delete video file, meta data, thumbnails""" - # delete media file + """delete video file, meta data""" + self.get_from_es() video_base = self.app_conf["videos"] media_url = self.json_data["media_url"] - print(f"delete {media_url} from file system") + print(f"{self.youtube_id}: delete {media_url} from file system") to_delete = os.path.join(video_base, media_url) os.remove(to_delete) - # delete from index self.del_in_es() - def get_ryd_stats(self): + def _get_ryd_stats(self): """get optional stats from returnyoutubedislikeapi.com""" try: - print(f"get ryd stats for: {self.youtube_id}") + print(f"{self.youtube_id}: get ryd stats") result = ryd_client.get(self.youtube_id) except requests.exceptions.ConnectionError: - print(f"failed to query ryd api, skipping {self.youtube_id}") + print(f"{self.youtube_id}: failed to query ryd api, skipping") return False if result["status"] == 404: @@ -204,61 +242,42 @@ class YoutubeVideoNew(YouTubeItem): return True - -class YoutubeChannel: - """represents a single youtube channel""" - - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - CACHE_DIR = CONFIG["application"]["cache_dir"] - VIDEOS = CONFIG["application"]["videos"] +class ChannelScraper: + """custom scraper using bs4 to scrape channel about page + will be able to be integrated into yt-dlp + once #2237 and #2350 are merged upstream + """ def __init__(self, channel_id): self.channel_id = channel_id - self.json_data = None - self.source = None - self.channel_dict = self.build_channel_dict() + self.soup = False + self.yt_json = False + self.json_data = False - def build_channel_dict(self, scrape=False): - """combine the dicts build from extracted json payload""" - if scrape: - channel_dict = False - else: - channel_dict = self.get_es_channel() - if not channel_dict: - print("scrape data from youtube") - self.scrape_channel() - channel_dict = self.parse_channel_main() - channel_dict.update(self.parse_channel_meta()) - self.source = "scraped" - return channel_dict + def get_json(self): + """main method to return channel dict""" + self.get_soup() + self._extract_yt_json() + self._parse_channel_main() + self._parse_channel_meta() + return self.json_data - def get_es_channel(self): - """get from elastic search first if possible""" - channel_id = self.channel_id - url = f"{self.ES_URL}/ta_channel/_doc/{channel_id}" - response = requests.get(url, auth=self.ES_AUTH) - if response.ok: - channel_source = response.json()["_source"] - self.source = "elastic" - return channel_source - return False - - def scrape_channel(self): - """scrape channel page for additional infos""" - channel_id = self.channel_id - url = f"https://www.youtube.com/channel/{channel_id}/about?hl=en" + def get_soup(self): + """return soup from youtube""" + print(f"{self.channel_id}: scrape channel data from youtube") + url = f"https://www.youtube.com/channel/{self.channel_id}/about?hl=en" cookies = {"CONSENT": "YES+xxxxxxxxxxxxxxxxxxxxxxxxxxx"} response = requests.get(url, cookies=cookies) if response.ok: channel_page = response.text else: - print(f"failed to extract channel info for: {channel_id}") + print(f"{self.channel_id}: failed to extract channel info") raise ConnectionError - soup = BeautifulSoup(channel_page, "html.parser") - # load script into json - all_scripts = soup.find("body").find_all("script") + self.soup = BeautifulSoup(channel_page, "html.parser") + + def _extract_yt_json(self): + """parse soup and get ytInitialData json""" + all_scripts = self.soup.find("body").find_all("script") for script in all_scripts: if "var ytInitialData = " in str(script): script_content = str(script) @@ -266,16 +285,37 @@ class YoutubeChannel: # extract payload script_content = script_content.split("var ytInitialData = ")[1] json_raw = script_content.rstrip(";") - json_data = json.loads(json_raw) - # add to self - self.json_data = json_data + self.yt_json = json.loads(json_raw) - def parse_channel_main(self): + def _parse_channel_main(self): """extract maintab values from scraped channel json data""" - main_tab = self.json_data["header"]["c4TabbedHeaderRenderer"] - channel_name = main_tab["title"] - last_refresh = int(datetime.now().strftime("%s")) - # channel_subs + main_tab = self.yt_json["header"]["c4TabbedHeaderRenderer"] + # build and return dict + self.json_data = { + "channel_active": True, + "channel_last_refresh": int(datetime.now().strftime("%s")), + "channel_subs": self._get_channel_subs(main_tab), + "channel_name": main_tab["title"], + "channel_banner_url": self._get_thumbnails(main_tab, "banner"), + "channel_tvart_url": self._get_thumbnails(main_tab, "tvBanner"), + "channel_id": self.channel_id, + "channel_subscribed": False, + } + + @staticmethod + def _get_thumbnails(main_tab, thumb_name): + """extract banner url from main_tab""" + try: + all_banners = main_tab[thumb_name]["thumbnails"] + banner = sorted(all_banners, key=lambda k: k["width"])[-1]["url"] + except KeyError: + banner = False + + return banner + + @staticmethod + def _get_channel_subs(main_tab): + """process main_tab to get channel subs as int""" try: sub_text_simple = main_tab["subscriberCountText"]["simpleText"] sub_text = sub_text_simple.split(" ")[0] @@ -290,34 +330,18 @@ class YoutubeChannel: print(message) except KeyError: channel_subs = 0 - # banner - try: - all_banners = main_tab["banner"]["thumbnails"] - banner = sorted(all_banners, key=lambda k: k["width"])[-1]["url"] - except KeyError: - banner = False - # build and return dict - main_channel_dict = { - "channel_active": True, - "channel_last_refresh": last_refresh, - "channel_subs": channel_subs, - "channel_banner_url": banner, - "channel_name": channel_name, - "channel_id": self.channel_id, - } - return main_channel_dict - def parse_channel_meta(self): + return channel_subs + + def _parse_channel_meta(self): """extract meta tab values from channel payload""" # meta tab - json_data = self.json_data - meta_tab = json_data["metadata"]["channelMetadataRenderer"] - description = meta_tab["description"] + meta_tab = self.yt_json["metadata"]["channelMetadataRenderer"] all_thumbs = meta_tab["avatar"]["thumbnails"] thumb_url = sorted(all_thumbs, key=lambda k: k["width"])[-1]["url"] # stats tab renderer = "twoColumnBrowseResultsRenderer" - all_tabs = json_data["contents"][renderer]["tabs"] + all_tabs = self.yt_json["contents"][renderer]["tabs"] for tab in all_tabs: if "tabRenderer" in tab.keys(): if tab["tabRenderer"]["title"] == "About": @@ -333,81 +357,81 @@ class YoutubeChannel: except KeyError: channel_views = 0 - meta_channel_dict = { - "channel_description": description, - "channel_thumb_url": thumb_url, - "channel_views": channel_views, - } + self.json_data.update( + { + "channel_description": meta_tab["description"], + "channel_thumb_url": thumb_url, + "channel_views": channel_views, + } + ) - return meta_channel_dict + +class YoutubeChannel(YouTubeItem): + """represents a single youtube channel""" + + es_path = False + index_name = "ta_channel" + yt_base = "https://www.youtube.com/channel/" + + def __init__(self, youtube_id): + super().__init__(youtube_id) + self.es_path = f"{self.index_name}/_doc/{youtube_id}" + + def build_json(self, upload=False): + """get from es or from youtube""" + self.get_from_es() + if self.json_data: + return + + self.get_from_youtube() + if upload: + self.upload_to_es() + return + + def get_from_youtube(self): + """use bs4 to scrape channel about page""" + self.json_data = ChannelScraper(self.youtube_id).get_json() + self.get_channel_art() def get_channel_art(self): """download channel art for new channels""" - channel_id = self.channel_id - channel_thumb = self.channel_dict["channel_thumb_url"] - channel_banner = self.channel_dict["channel_banner_url"] + channel_id = self.youtube_id + channel_thumb = self.json_data["channel_thumb_url"] + channel_banner = self.json_data["channel_banner_url"] ThumbManager().download_chan( [(channel_id, channel_thumb, channel_banner)] ) - def upload_to_es(self): - """upload channel data to elastic search""" - url = f"{self.ES_URL}/ta_channel/_doc/{self.channel_id}" - response = requests.put(url, json=self.channel_dict, auth=self.ES_AUTH) - print(f"added {self.channel_id} to es") - if not response.ok: - print(response.text) - raise ValueError("failed to add channel to index") - def sync_to_videos(self): """sync new channel_dict to all videos of channel""" - headers = {"Content-type": "application/json"} - channel_id = self.channel_id # add ingest pipeline processors = [] - for field, value in self.channel_dict.items(): + for field, value in self.json_data.items(): line = {"set": {"field": "channel." + field, "value": value}} processors.append(line) - data = {"description": channel_id, "processors": processors} - payload = json.dumps(data) - url = self.ES_URL + "/_ingest/pipeline/" + channel_id - request = requests.put( - url, data=payload, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) + data = {"description": self.youtube_id, "processors": processors} + ingest_path = f"_ingest/pipeline/{self.youtube_id}" + _, _ = ElasticWrap(ingest_path).put(data) # apply pipeline - data = {"query": {"match": {"channel.channel_id": channel_id}}} - payload = json.dumps(data) - url = self.ES_URL + "/ta_video/_update_by_query?pipeline=" + channel_id - request = requests.post( - url, data=payload, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) + data = {"query": {"match": {"channel.channel_id": self.youtube_id}}} + update_path = f"ta_video/_update_by_query?pipeline={self.youtube_id}" + _, _ = ElasticWrap(update_path).post(data) def get_folder_path(self): """get folder where media files get stored""" - channel_name = self.channel_dict["channel_name"] + channel_name = self.json_data["channel_name"] folder_name = clean_string(channel_name) - folder_path = os.path.join(self.VIDEOS, folder_name) + folder_path = os.path.join(self.app_conf["videos"], folder_name) return folder_path def delete_es_videos(self): """delete all channel documents from elasticsearch""" - headers = {"Content-type": "application/json"} data = { "query": { - "term": {"channel.channel_id": {"value": self.channel_id}} + "term": {"channel.channel_id": {"value": self.youtube_id}} } } - payload = json.dumps(data) - url = self.ES_URL + "/ta_video/_delete_by_query" - response = requests.post( - url, data=payload, headers=headers, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) + _, _ = ElasticWrap("ta_video/_delete_by_query").post(data) def delete_playlists(self): """delete all indexed playlist from es""" @@ -418,9 +442,10 @@ class YoutubeChannel: def delete_channel(self): """delete channel and all videos""" - print(f"deleting {self.channel_id} and all matching media files") + print(f"{self.youtube_id}: delete channel") + self.get_from_es() folder_path = self.get_folder_path() - print("delete all media files") + print(f"{self.youtube_id}: delete all media files") try: all_videos = os.listdir(folder_path) for video in all_videos: @@ -430,20 +455,16 @@ class YoutubeChannel: except FileNotFoundError: print(f"no videos found for {folder_path}") - ThumbManager().delete_chan_thumb(self.channel_id) - print("delete indexed playlists") + print(f"{self.youtube_id}: delete indexed playlists") self.delete_playlists() - print("delete indexed videos") + print(f"{self.youtube_id}: delete indexed videos") self.delete_es_videos() - url = self.ES_URL + "/ta_channel/_doc/" + self.channel_id - response = requests.delete(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) + self.del_in_es() def get_all_playlists(self): """get all playlists owned by this channel""" url = ( - f"https://www.youtube.com/channel/{self.channel_id}" + f"https://www.youtube.com/channel/{self.youtube_id}" + "/playlists?view=1&sort=dd&shelf_id=0" ) obs = { @@ -460,7 +481,7 @@ class YoutubeChannel: """get all indexed playlists from channel""" data = { "query": { - "term": {"playlist_channel_id": {"value": self.channel_id}} + "term": {"playlist_channel_id": {"value": self.youtube_id}} }, "sort": [{"playlist_channel.keyword": {"order": "desc"}}], } @@ -468,328 +489,85 @@ class YoutubeChannel: return all_playlists -class YoutubeVideo: - """represents a single youtube video""" +class YoutubePlaylist(YouTubeItem): + """represents a single youtube playlist""" - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - CACHE_DIR = CONFIG["application"]["cache_dir"] - VIDEOS = CONFIG["application"]["videos"] + es_path = False + index_name = "ta_playlist" + yt_obs = { + "default_search": "ytsearch", + "quiet": True, + "skip_download": True, + "extract_flat": True, + } + yt_base = "https://www.youtube.com/playlist?list=" def __init__(self, youtube_id): - self.youtube_id = youtube_id - self.channel_id = None - self.vid_dict = None + super().__init__(youtube_id) + self.es_path = f"{self.index_name}/_doc/{youtube_id}" + self.all_members = False + self.nav = False + self.all_youtube_ids = [] - def get_vid_dict(self): - """wrapper to loop around yt_dlp to retry on failure""" - print(f"get video data for {self.youtube_id}") - vid_dict = False - for i in range(3): - try: - vid_dict = self.get_youtubedl_vid_data() - except KeyError as e: - print(e) - sleep((i + 1) ** 2) - continue - else: - break + def build_json(self, scrape=False): + """collection to create json_data""" + if not scrape: + self.get_from_es() - self.vid_dict = vid_dict - if self.CONFIG["downloads"]["integrate_ryd"]: - self.get_ryd_stats() + if scrape or not self.json_data: + self.get_from_youtube() + self.process_youtube_meta() + self.get_entries() + self.json_data["playlist_entries"] = self.all_members + self.get_playlist_art() - def get_youtubedl_vid_data(self): - """parse youtubedl extract info""" - youtube_id = self.youtube_id - obs = { - "quiet": True, - "default_search": "ytsearch", - "skip_download": True, - "check_formats": "selected", - "noplaylist": True, - } - try: - vid = yt_dlp.YoutubeDL(obs).extract_info(youtube_id) - except ( - yt_dlp.utils.ExtractorError, - yt_dlp.utils.DownloadError, - ): - print("failed to get info for " + youtube_id) - return False - # extract - self.channel_id = vid["channel_id"] - upload_date = vid["upload_date"] - upload_date_time = datetime.strptime(upload_date, "%Y%m%d") - published = upload_date_time.strftime("%Y-%m-%d") - last_refresh = int(datetime.now().strftime("%s")) - # likes - try: - like_count = vid["like_count"] - except KeyError: - like_count = 0 - try: - dislike_count = vid["dislike_count"] - except KeyError: - dislike_count = 0 - # build dicts - stats = { - "view_count": vid["view_count"], - "like_count": like_count, - "dislike_count": dislike_count, - "average_rating": vid["average_rating"], - } - vid_basic = { - "title": vid["title"], - "description": vid["description"], - "category": vid["categories"], - "vid_thumb_url": vid["thumbnail"], - "tags": vid["tags"], - "published": published, - "stats": stats, - "vid_last_refresh": last_refresh, - "date_downloaded": last_refresh, - "youtube_id": youtube_id, - "active": True, - "channel": False, - } - - return vid_basic - - def add_player(self, missing_vid): - """add player information for new videos""" - cache_path = self.CACHE_DIR + "/download/" - videos = self.VIDEOS - - if missing_vid: - # coming from scan_filesystem - channel_name, file_name, _ = missing_vid - vid_path = os.path.join(videos, channel_name, file_name) - else: - # coming from VideoDownload - all_cached = os.listdir(cache_path) - for file_cached in all_cached: - if self.youtube_id in file_cached: - vid_path = os.path.join(cache_path, file_cached) - break - - duration_handler = DurationConverter() - duration = duration_handler.get_sec(vid_path) - duration_str = duration_handler.get_str(duration) - player = { - "watched": False, - "duration": duration, - "duration_str": duration_str, - } - self.vid_dict["player"] = player - - def build_file_path(self, channel_name): - """build media_url from where file will be located""" - clean_channel_name = clean_string(channel_name) - timestamp = self.vid_dict["published"].replace("-", "") - youtube_id = self.vid_dict["youtube_id"] - title = self.vid_dict["title"] - clean_title = clean_string(title) - filename = f"{timestamp}_{youtube_id}_{clean_title}.mp4" - media_url = os.path.join(clean_channel_name, filename) - self.vid_dict["media_url"] = media_url - - def get_es_data(self): - """get current data from elastic search""" - url = self.ES_URL + "/ta_video/_doc/" + self.youtube_id - response = requests.get(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) - es_vid_dict = json.loads(response.text) - return es_vid_dict - - def upload_to_es(self): - """upload video data to elastic search""" - url = f"{self.ES_URL}/ta_video/_doc/{self.youtube_id}/?refresh=true" - response = requests.put(url, json=self.vid_dict, auth=self.ES_AUTH) - if not response.ok: - print(response.text) - raise ValueError("failed to add video to index") - - def deactivate(self): - """deactivate document on extractor error""" - youtube_id = self.youtube_id - headers = {"Content-type": "application/json"} - url = f"{self.ES_URL}/ta_video/_update/{youtube_id}" - data = {"script": "ctx._source.active = false"} - json_str = json.dumps(data) - response = requests.post( - url, data=json_str, headers=headers, auth=self.ES_AUTH - ) - print(f"deactivated {youtube_id}") - if not response.ok: - print(response.text) - - def delete_media_file(self): - """delete video file, meta data, thumbnails""" - # delete media file - es_vid_dict = self.get_es_data() - media_url = es_vid_dict["_source"]["media_url"] - print(f"delete {media_url} from file system") - to_delete = os.path.join(self.VIDEOS, media_url) - os.remove(to_delete) - # delete from index - url = f"{self.ES_URL}/ta_video/_doc/{self.youtube_id}" - response = requests.delete(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) - # delete thumbs from cache - ThumbManager().delete_vid_thumb(self.youtube_id) - - def get_ryd_stats(self): - """get optional stats from returnyoutubedislikeapi.com""" - try: - print(f"get ryd stats for: {self.youtube_id}") - result = ryd_client.get(self.youtube_id) - except requests.exceptions.ConnectionError: - print(f"failed to query ryd api, skipping {self.youtube_id}") - return False - - if result["status"] == 404: - return False - - dislikes = { - "dislike_count": result["dislikes"], - "average_rating": result["rating"], - } - self.vid_dict["stats"].update(dislikes) - - return True - - -class YoutubePlaylist: - """represent a single playlist on YouTube""" - - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - - def __init__(self, playlist_id, all_youtube_ids=False): - self.playlist_id = playlist_id - self.stamp = int(datetime.now().strftime("%s")) - self.all_youtube_ids = all_youtube_ids - self.playlist_dict = False - - def get_playlist_dict(self, scrape=False): - """get data from es or youtube""" - print(f"get playlist with id {self.playlist_id}") - - if scrape: - playlist_dict = self.get_youtube_playlist() - if not playlist_dict: - return False - playlist_dict["playlist_entries"] = self.get_entries() - else: - playlist_dict = self.get_es_playlist() - if not playlist_dict: - playlist_dict = self.get_youtube_playlist() - playlist_dict["playlist_entries"] = self.get_entries() - - self.playlist_dict = playlist_dict - return True - - def get_youtube_playlist(self): - """get meta data dict from youtube""" - url = "https://www.youtube.com/playlist?list=" + self.playlist_id - obs = { - "default_search": "ytsearch", - "quiet": True, - "skip_download": True, - "extract_flat": True, - "playlistend": 0, - } - try: - playlist = yt_dlp.YoutubeDL(obs).extract_info(url, download=False) - except ( - yt_dlp.utils.ExtractorError, - yt_dlp.utils.DownloadError, - ): - print("failed to get info for " + self.playlist_id) - return False - - playlist_es = { - "playlist_id": self.playlist_id, + def process_youtube_meta(self): + """extract relevant fields from youtube""" + self.json_data = { + "playlist_id": self.youtube_id, "playlist_active": True, "playlist_subscribed": False, - "playlist_name": playlist["title"], - "playlist_channel": playlist["channel"], - "playlist_channel_id": playlist["channel_id"], - "playlist_thumbnail": playlist["thumbnails"][-1]["url"], - "playlist_description": playlist["description"] or False, - "playlist_last_refresh": self.stamp, + "playlist_name": self.youtube_meta["title"], + "playlist_channel": self.youtube_meta["channel"], + "playlist_channel_id": self.youtube_meta["channel_id"], + "playlist_thumbnail": self.youtube_meta["thumbnails"][-1]["url"], + "playlist_description": self.youtube_meta["description"] or False, + "playlist_last_refresh": int(datetime.now().strftime("%s")), } - return playlist_es - - def get_es_playlist(self): - """get indexed data from es""" - url = f"{self.ES_URL}/ta_playlist/_doc/{self.playlist_id}" - response = requests.get(url, auth=self.ES_AUTH) - if response.ok: - return json.loads(response.text)["_source"] - - return False - def get_entries(self, playlistend=False): """get all videos in playlist""" - url = "https://www.youtube.com/playlist?list=" + self.playlist_id - obs = { - "default_search": "ytsearch", - "quiet": True, - "skip_download": True, - "extract_flat": True, - } if playlistend: - obs["playlistend"] = playlistend - - try: - playlist = yt_dlp.YoutubeDL(obs).extract_info(url, download=False) - except ( - yt_dlp.utils.ExtractorError, - yt_dlp.utils.DownloadError, - ): - print("failed to get plealist entries for " + self.playlist_id) - return False - + # implement playlist end + print(playlistend) all_members = [] - for idx, entry in enumerate(playlist["entries"]): - uploader = entry["uploader"] - youtube_id = entry["id"] + for idx, entry in enumerate(self.youtube_meta["entries"]): if self.all_youtube_ids: - downloaded = youtube_id in self.all_youtube_ids + downloaded = entry["id"] in self.all_youtube_ids else: downloaded = False - if not uploader: + if not entry["uploader"]: continue to_append = { - "youtube_id": youtube_id, + "youtube_id": entry["id"], "title": entry["title"], - "uploader": uploader, + "uploader": entry["uploader"], "idx": idx, "downloaded": downloaded, } all_members.append(to_append) - return all_members + self.all_members = all_members - def upload_to_es(self): - """add playlist to es with its entries""" - playlist = self.playlist_dict - url = f"{self.ES_URL}/ta_playlist/_doc/{self.playlist_id}" - response = requests.put(url, json=playlist, auth=self.ES_AUTH) - if not response.ok: - print(response.text) - raise ValueError("failed to add playlist to index") + @staticmethod + def get_playlist_art(): + """download artwork of playlist""" + thumbnails = ThumbManager() + missing_playlists = thumbnails.get_missing_playlists() + thumbnails.download_playlist(missing_playlists) def add_vids_to_playlist(self): """sync the playlist id to videos""" - playlist_dict = self.playlist_dict script = ( 'if (!ctx._source.containsKey("playlist")) ' + "{ctx._source.playlist = [params.playlist]} " @@ -799,14 +577,14 @@ class YoutubePlaylist: ) bulk_list = [] - for entry in playlist_dict["playlist_entries"]: - youtube_id = entry["youtube_id"] - action = {"update": {"_id": youtube_id, "_index": "ta_video"}} + for entry in self.json_data["playlist_entries"]: + video_id = entry["youtube_id"] + action = {"update": {"_id": video_id, "_index": "ta_video"}} source = { "script": { "source": script, "lang": "painless", - "params": {"playlist": self.playlist_id}, + "params": {"playlist": self.youtube_id}, } } bulk_list.append(json.dumps(action)) @@ -815,34 +593,30 @@ class YoutubePlaylist: # add last newline bulk_list.append("\n") query_str = "\n".join(bulk_list) - headers = {"Content-type": "application/x-ndjson"} - url = self.ES_URL + "/_bulk" - response = requests.post( - url, data=query_str, headers=headers, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) + + ElasticWrap("_bulk").post(query_str, ndjson=True) def update_playlist(self): """update metadata for playlist with data from YouTube""" - subscribed = self.get_es_playlist()["playlist_subscribed"] - self.get_playlist_dict(scrape=True) - if not self.playlist_dict: + self.get_from_es() + subscribed = self.json_data["playlist_subscribed"] + self.get_from_youtube() + if not self.json_data: # return false to deactivate return False - self.playlist_dict["playlist_subscribed"] = subscribed + self.json_data["playlist_subscribed"] = subscribed self.upload_to_es() - return self.playlist_dict + return True def build_nav(self, youtube_id): """find next and previous in playlist of a given youtube_id""" - all_entries_available = self.playlist_dict["playlist_entries"] + all_entries_available = self.json_data["playlist_entries"] all_entries = [i for i in all_entries_available if i["downloaded"]] current = [i for i in all_entries if i["youtube_id"] == youtube_id] # stop if not found or playlist of 1 if not current or not len(all_entries) > 1: - return False + return current_idx = all_entries.index(current[0]) if current_idx == 0: @@ -861,17 +635,17 @@ class YoutubePlaylist: next_thumb = ThumbManager().vid_thumb_path(next_item["youtube_id"]) next_item["vid_thumb"] = next_thumb - nav = { + self.nav = { "playlist_meta": { "current_idx": current[0]["idx"], - "playlist_id": self.playlist_id, - "playlist_name": self.playlist_dict["playlist_name"], - "playlist_channel": self.playlist_dict["playlist_channel"], + "playlist_id": self.youtube_id, + "playlist_name": self.json_data["playlist_name"], + "playlist_channel": self.json_data["playlist_channel"], }, "playlist_previous": previous_item, "playlist_next": next_item, } - return nav + return def delete_metadata(self): """delete metadata for playlist""" @@ -881,58 +655,30 @@ class YoutubePlaylist: ) data = { "query": { - "term": {"playlist.keyword": {"value": self.playlist_id}} + "term": {"playlist.keyword": {"value": self.youtube_id}} }, "script": { "source": script, "lang": "painless", - "params": {"playlist": self.playlist_id}, + "params": {"playlist": self.youtube_id}, }, } - payload = json.dumps(data) - url = f"{self.ES_URL}/ta_video/_update_by_query" - headers = {"Content-type": "application/json"} - response = requests.post( - url, data=payload, headers=headers, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) - - self.delete_playlist() + _, _ = ElasticWrap("ta_video/_update_by_query").post(data) + self.del_in_es() def delete_videos_playlist(self): """delete playlist with all videos""" - print(f"delete playlist {self.playlist_id} with all videos") - self.get_playlist_dict() + print(f"{self.youtube_id}: delete playlist") + self.get_from_es() all_youtube_id = [ i["youtube_id"] - for i in self.playlist_dict["playlist_entries"] + for i in self.json_data["playlist_entries"] if i["downloaded"] ] for youtube_id in all_youtube_id: YoutubeVideo(youtube_id).delete_media_file() - self.delete_playlist() - - def delete_playlist(self): - """delete only playlist document""" - url = f"{self.ES_URL}/ta_playlist/_doc/{self.playlist_id}" - response = requests.delete(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) - - def deactivate(self): - """deactivate document on extractor error""" - headers = {"Content-type": "application/json"} - url = f"{self.ES_URL}/ta_playlist/_update/{self.playlist_id}" - data = {"script": "ctx._source.playlist_active = false"} - json_str = json.dumps(data) - response = requests.post( - url, data=json_str, headers=headers, auth=self.ES_AUTH - ) - print(f"deactivated {self.playlist_id}") - if not response.ok: - print(response.text) + self.delete_metadata() class WatchState: @@ -1052,104 +798,12 @@ class WatchState: raise ValueError("failed mark playlist as watched") -class IndexPaginate: - """use search_after to go through whole index""" - - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - HEADERS = {"Content-type": "application/json"} - DEFAULT_SIZE = 500 - - def __init__(self, index_name, data, size=False): - self.index_name = index_name - self.data = data - self.pit_id = False - self.size = size - - def get_results(self): - """get all results""" - self.get_pit() - self.validate_data() - all_results = self.run_loop() - self.clean_pit() - return all_results - - def get_pit(self): - """get pit for index""" - url = f"{self.ES_URL}/{self.index_name}/_pit?keep_alive=10m" - response = requests.post(url, auth=self.ES_AUTH) - json_data = json.loads(response.text) - - self.pit_id = json_data["id"] - - def validate_data(self): - """add pit and size to data""" - if "sort" not in self.data.keys(): - print(self.data) - raise ValueError("missing sort key in data") - - size = self.size or self.DEFAULT_SIZE - - self.data["size"] = size - self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"} - - def run_loop(self): - """loop through results until last hit""" - query_str = json.dumps(self.data) - url = self.ES_URL + "/_search" - - all_results = [] - while True: - response = requests.get( - url, data=query_str, headers=self.HEADERS, auth=self.ES_AUTH - ) - json_data = json.loads(response.text) - all_hits = json_data["hits"]["hits"] - if all_hits: - for hit in all_hits: - source = hit["_source"] - search_after = hit["sort"] - all_results.append(source) - # update search_after with last hit data - self.data["search_after"] = search_after - query_str = json.dumps(self.data) - else: - break - - return all_results - - def clean_pit(self): - """delete pit from elastic search""" - query_str = json.dumps({"id": self.pit_id}) - requests.delete( - self.ES_URL + "/_pit", - data=query_str, - headers=self.HEADERS, - auth=self.ES_AUTH, - ) - - -def index_new_video(youtube_id, missing_vid=False): - """combine video and channel classes for new video index""" - vid_handler = YoutubeVideo(youtube_id) - vid_handler.get_vid_dict() - if not vid_handler.vid_dict: +def index_new_video(youtube_id): + """combined classes to create new video in index""" + video = YoutubeVideo(youtube_id) + video.build_json() + if not video.json_data: raise ValueError("failed to get metadata for " + youtube_id) - channel_handler = YoutubeChannel(vid_handler.channel_id) - # add filepath to vid_dict - channel_name = channel_handler.channel_dict["channel_name"] - vid_handler.build_file_path(channel_name) - # add channel and player to video - vid_handler.add_player(missing_vid) - vid_handler.vid_dict["channel"] = channel_handler.channel_dict - # add new channel to es - if channel_handler.source == "scraped": - channel_handler.channel_dict["channel_subscribed"] = False - channel_handler.upload_to_es() - channel_handler.get_channel_art() - # upload video to es - vid_handler.upload_to_es() - # return vid_dict for further processing - return vid_handler.vid_dict + video.upload_to_es() + return video.json_data diff --git a/tubearchivist/home/src/reindex.py b/tubearchivist/home/src/reindex.py index ea9900a..bb93cae 100644 --- a/tubearchivist/home/src/reindex.py +++ b/tubearchivist/home/src/reindex.py @@ -189,87 +189,73 @@ class Reindex: all_channels = channel_sub_handler.get_channels(subscribed_only=False) all_channel_ids = [i["channel_id"] for i in all_channels] - counter = 1 for channel_id in all_channel_ids: - channel_index = YoutubeChannel(channel_id) - subscribed = channel_index.channel_dict["channel_subscribed"] - channel_index.channel_dict = channel_index.build_channel_dict( - scrape=True - ) - channel_index.channel_dict["channel_subscribed"] = subscribed - channel_index.upload_to_es() - channel_index.sync_to_videos() - counter = counter + 1 + channel = YoutubeChannel(channel_id) + subscribed = channel.json_data["channel_subscribed"] + channel.get_from_youtube() + channel.json_data["channel_subscribed"] = subscribed + channel.upload_to_es() + channel.sync_to_videos() + if sleep_interval: sleep(sleep_interval) @staticmethod def reindex_single_video(youtube_id): """refresh data for single video""" - vid_handler = YoutubeVideo(youtube_id) - vid_handler.get_vid_dict() - if not vid_handler.vid_dict: - # stop if deactivated - vid_handler.deactivate() - return + video = YoutubeVideo(youtube_id) - es_vid_dict = vid_handler.get_es_data() - player = es_vid_dict["_source"]["player"] - date_downloaded = es_vid_dict["_source"]["date_downloaded"] - channel_dict = es_vid_dict["_source"]["channel"] - channel_name = channel_dict["channel_name"] - try: - playlist = es_vid_dict["_source"]["playlist"] - except KeyError: - playlist = False + # read current state + video.get_from_es() + player = video.json_data["player"] + date_downloaded = video.json_data["date_downloaded"] + channel_dict = video.json_data["channel"] + playlist = video.json_data.get("playlist") - vid_handler.build_file_path(channel_name) - # add to vid_dict - vid_handler.vid_dict["player"] = player - vid_handler.vid_dict["date_downloaded"] = date_downloaded - vid_handler.vid_dict["channel"] = channel_dict + # get new + video.build_json() + if not video.json_data: + video.deactivate() + + # add back + video.json_data["player"] = player + video.json_data["date_downloaded"] = date_downloaded + video.json_data["channel"] = channel_dict if playlist: - vid_handler.vid_dict["playlist"] = playlist - # update - vid_handler.upload_to_es() + video.json_data["playlist"] = playlist + + video.upload_to_es() + thumb_handler = ThumbManager() thumb_handler.delete_vid_thumb(youtube_id) - to_download = (youtube_id, vid_handler.vid_dict["vid_thumb_url"]) + to_download = (youtube_id, video.json_data["vid_thumb_url"]) thumb_handler.download_vid([to_download], notify=False) @staticmethod def reindex_single_channel(channel_id): """refresh channel data and sync to videos""" - channel_handler = YoutubeChannel(channel_id) - subscribed = channel_handler.channel_dict["channel_subscribed"] - channel_handler.channel_dict = channel_handler.build_channel_dict( - scrape=True - ) - channel_handler.channel_dict["channel_subscribed"] = subscribed - # update - channel_handler.upload_to_es() - channel_handler.sync_to_videos() - thumb_handler = ThumbManager() - thumb_handler.delete_chan_thumb(channel_id) - channel_thumb = channel_handler.channel_dict["channel_thumb_url"] - channel_banner = channel_handler.channel_dict["channel_banner_url"] - to_download = (channel_id, channel_thumb, channel_banner) - thumb_handler.download_chan([to_download]) + channel = YoutubeChannel(channel_id) + channel.get_from_es() + subscribed = channel.json_data["channel_subscribed"] + channel.get_from_youtube() + channel.json_data["channel_subscribed"] = subscribed + channel.upload_to_es() + channel.sync_to_videos() @staticmethod def reindex_single_playlist(playlist_id, all_indexed_ids): """refresh playlist data""" - playlist_handler = YoutubePlaylist( - playlist_id, all_youtube_ids=all_indexed_ids - ) - playlist = playlist_handler.update_playlist() - if not playlist: - playlist_handler.deactivate() + playlist = YoutubePlaylist(playlist_id) + playlist.get_from_es() + subscribed = playlist.json_data["playlist_subscribed"] + playlist.all_youtube_ids = all_indexed_ids + playlist.build_json(scrape=True) + if not playlist.json_data: + playlist.deactivate() return - playlist_thumbnail = (playlist_id, playlist["playlist_thumbnail"]) - thumb_handler = ThumbManager() - thumb_handler.download_playlist([playlist_thumbnail]) + playlist.json_data["playlist_subscribed"] = subscribed + playlist.upload_to_es() return def reindex(self): @@ -586,7 +572,7 @@ def scan_filesystem(): print("index new videos") for missing_vid in filesystem_handler.to_index: youtube_id = missing_vid[2] - index_new_video(youtube_id, missing_vid=missing_vid) + index_new_video(youtube_id) def reindex_old_documents(): diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index ad43d67..995cf8f 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -266,17 +266,16 @@ def subscribe_to(url_str): @shared_task def index_channel_playlists(channel_id): """add all playlists of channel to index""" - channel_handler = YoutubeChannel(channel_id) - channel_name = channel_handler.channel_dict["channel_name"] + channel = YoutubeChannel(channel_id) # notify mess_dict = { "status": "message:playlistscan", "level": "info", "title": "Looking for playlists", - "message": f'Scanning channel "{channel_name}" in progress', + "message": f'Scanning channel "{channel.youtube_id}" in progress', } RedisArchivist().set_message("message:playlistscan", mess_dict) - all_playlists = channel_handler.get_all_playlists() + all_playlists = channel.get_all_playlists() if not all_playlists: print(f"no playlists found for channel {channel_id}") @@ -295,28 +294,29 @@ def index_channel_playlists(channel_id): } RedisArchivist().set_message("message:playlistscan", mess_dict) print("add playlist: " + playlist_title) - playlist_handler = YoutubePlaylist( - playlist_id, all_youtube_ids=all_youtube_ids - ) - playlist_handler.get_playlist_dict() - if not playlist_handler.playlist_dict: + + playlist = YoutubePlaylist(playlist_id) + playlist.all_youtube_ids = all_youtube_ids + playlist.build_json() + + if not playlist.json_data: # skip if not available continue + # don't add if no videos downloaded downloaded = [ i - for i in playlist_handler.playlist_dict["playlist_entries"] + for i in playlist.json_data["playlist_entries"] if i["downloaded"] ] if not downloaded: continue - playlist_handler.upload_to_es() - playlist_handler.add_vids_to_playlist() + + playlist.upload_to_es() + playlist.add_vids_to_playlist() if all_playlists: - handler = ThumbManager() - missing_playlists = handler.get_missing_playlists() - handler.download_playlist(missing_playlists) + playlist.get_playlist_art() return diff --git a/tubearchivist/home/views.py b/tubearchivist/home/views.py index b3de6c4..687c0aa 100644 --- a/tubearchivist/home/views.py +++ b/tubearchivist/home/views.py @@ -624,11 +624,11 @@ class VideoView(View): """build playlist nav if available""" all_navs = [] for playlist_id in playlists: - handler = YoutubePlaylist(playlist_id) - handler.get_playlist_dict() - nav = handler.build_nav(video_id) - if nav: - all_navs.append(nav) + playlist = YoutubePlaylist(playlist_id) + playlist.get_from_es() + playlist.build_nav(video_id) + if playlist.nav: + all_navs.append(playlist.nav) return all_navs