From fda520ad44803eabc36153fe2745f929d8d97556 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 23 Mar 2022 15:48:38 +0700 Subject: [PATCH] refactor and consolidate Reindex class --- tubearchivist/home/src/index/filesystem.py | 12 +- tubearchivist/home/src/index/reindex.py | 219 +++++++++------------ tubearchivist/home/src/ta/helper.py | 16 -- 3 files changed, 98 insertions(+), 149 deletions(-) diff --git a/tubearchivist/home/src/index/filesystem.py b/tubearchivist/home/src/index/filesystem.py index 0dc8302..74a40be 100644 --- a/tubearchivist/home/src/index/filesystem.py +++ b/tubearchivist/home/src/index/filesystem.py @@ -10,7 +10,6 @@ import os import re import shutil import subprocess -from datetime import datetime import requests from home.src.download.queue import PendingList @@ -319,10 +318,7 @@ def scan_filesystem(): def reindex_old_documents(): """daily refresh of old documents""" - # continue if needed - reindex_handler = Reindex() - reindex_handler.check_outdated() - reindex_handler.reindex() - # set timestamp - now = int(datetime.now().strftime("%s")) - RedisArchivist().set_message("last_reindex", now, expire=False) + handler = Reindex() + handler.check_outdated() + handler.reindex() + RedisArchivist().set_message("last_reindex", handler.now, expire=False) diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index cf1435f..842e254 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -4,85 +4,60 @@ functionality: - index and update in es """ -import json from datetime import datetime from math import ceil from time import sleep -import requests from home.src.download.queue import PendingList from home.src.download.thumbnails import ThumbManager +from home.src.es.connect import ElasticWrap from home.src.index.channel import YoutubeChannel from home.src.index.playlist import YoutubePlaylist from home.src.index.video import YoutubeVideo from home.src.ta.config import AppConfig -from home.src.ta.helper import get_total_hits class Reindex: """check for outdated documents and refresh data from youtube""" + MATCH_FIELD = { + "ta_video": "active", + "ta_channel": "channel_active", + "ta_playlist": "playlist_active", + } + MULTIPLY = 1.2 + def __init__(self): # config - config = AppConfig().config - self.sleep_interval = config["downloads"]["sleep_interval"] - self.es_url = config["application"]["es_url"] - self.es_auth = config["application"]["es_auth"] - self.refresh_interval = config["scheduler"]["check_reindex_days"] - self.integrate_ryd = config["downloads"]["integrate_ryd"] + self.now = int(datetime.now().strftime("%s")) + self.config = AppConfig().config + self.interval = self.config["scheduler"]["check_reindex_days"] # scan self.all_youtube_ids = False self.all_channel_ids = False self.all_playlist_ids = False - def get_daily(self): + def _get_daily(self): """get daily refresh values""" - total_videos = get_total_hits( - "ta_video", self.es_url, self.es_auth, "active" - ) - video_daily = ceil(total_videos / self.refresh_interval * 1.2) - total_channels = get_total_hits( - "ta_channel", self.es_url, self.es_auth, "channel_active" - ) - channel_daily = ceil(total_channels / self.refresh_interval * 1.2) - total_playlists = get_total_hits( - "ta_playlist", self.es_url, self.es_auth, "playlist_active" - ) - playlist_daily = ceil(total_playlists / self.refresh_interval * 1.2) + total_videos = self._get_total_hits("ta_video") + video_daily = ceil(total_videos / self.interval * self.MULTIPLY) + total_channels = self._get_total_hits("ta_channel") + channel_daily = ceil(total_channels / self.interval * self.MULTIPLY) + total_playlists = self._get_total_hits("ta_playlist") + playlist_daily = ceil(total_playlists / self.interval * self.MULTIPLY) return (video_daily, channel_daily, playlist_daily) - def get_outdated_vids(self, size): - """get daily videos to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 - data = { - "size": size, - "query": { - "bool": { - "must": [ - {"match": {"active": True}}, - {"range": {"vid_last_refresh": {"lte": now_lte}}}, - ] - } - }, - "sort": [{"vid_last_refresh": {"order": "asc"}}], - "_source": False, - } - query_str = json.dumps(data) - url = self.es_url + "/ta_video/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_youtube_ids = [i["_id"] for i in response_dict["hits"]["hits"]] - return all_youtube_ids + def _get_total_hits(self, index): + """get total hits from index""" + match_field = self.MATCH_FIELD[index] + path = f"{index}/_search?filter_path=hits.total" + data = {"query": {"match": {match_field: True}}} + response, _ = ElasticWrap(path).post(data=data) + total_hits = response["hits"]["total"]["value"] + return total_hits - def get_unrated_vids(self): - """get all videos without rating if ryd integration is enabled""" - headers = {"Content-type": "application/json"} + def _get_unrated_vids(self): + """get max 200 videos without rating if ryd integration is enabled""" data = { "size": 200, "query": { @@ -91,86 +66,78 @@ class Reindex: } }, } - query_str = json.dumps(data) - url = self.es_url + "/ta_video/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - missing_rating = [i["_id"] for i in response_dict["hits"]["hits"]] + response, _ = ElasticWrap("ta_video/_search").get(data=data) + + missing_rating = [i["_id"] for i in response["hits"]["hits"]] self.all_youtube_ids = self.all_youtube_ids + missing_rating - def get_outdated_channels(self, size): - """get daily channels to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 + def _get_outdated_vids(self, size): + """get daily videos to refresh""" + now_lte = self.now - self.interval * 24 * 60 * 60 + must_list = [ + {"match": {"active": True}}, + {"range": {"vid_last_refresh": {"lte": now_lte}}}, + ] data = { "size": size, - "query": { - "bool": { - "must": [ - {"match": {"channel_active": True}}, - {"range": {"channel_last_refresh": {"lte": now_lte}}}, - ] - } - }, + "query": {"bool": {"must": must_list}}, + "sort": [{"vid_last_refresh": {"order": "asc"}}], + "_source": False, + } + response, _ = ElasticWrap("ta_video/_search").get(data=data) + + all_youtube_ids = [i["_id"] for i in response["hits"]["hits"]] + return all_youtube_ids + + def _get_outdated_channels(self, size): + """get daily channels to refresh""" + now_lte = self.now - self.interval * 24 * 60 * 60 + must_list = [ + {"match": {"channel_active": True}}, + {"range": {"channel_last_refresh": {"lte": now_lte}}}, + ] + data = { + "size": size, + "query": {"bool": {"must": must_list}}, "sort": [{"channel_last_refresh": {"order": "asc"}}], "_source": False, } - query_str = json.dumps(data) - url = self.es_url + "/ta_channel/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_channel_ids = [i["_id"] for i in response_dict["hits"]["hits"]] + response, _ = ElasticWrap("ta_channel/_search").get(data=data) + + all_channel_ids = [i["_id"] for i in response["hits"]["hits"]] return all_channel_ids - def get_outdated_playlists(self, size): + def _get_outdated_playlists(self, size): """get daily outdated playlists to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 + now_lte = self.now - self.interval * 24 * 60 * 60 + must_list = [ + {"match": {"playlist_active": True}}, + {"range": {"playlist_last_refresh": {"lte": now_lte}}}, + ] data = { "size": size, - "query": { - "bool": { - "must": [ - {"match": {"playlist_active": True}}, - {"range": {"playlist_last_refresh": {"lte": now_lte}}}, - ] - } - }, + "query": {"bool": {"must": must_list}}, "sort": [{"playlist_last_refresh": {"order": "asc"}}], "_source": False, } - query_str = json.dumps(data) - url = self.es_url + "/ta_playlist/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_playlist_ids = [i["_id"] for i in response_dict["hits"]["hits"]] + response, _ = ElasticWrap("ta_playlist/_search").get(data=data) + + all_playlist_ids = [i["_id"] for i in response["hits"]["hits"]] return all_playlist_ids def check_outdated(self): """add missing vids and channels""" - video_daily, channel_daily, playlist_daily = self.get_daily() - self.all_youtube_ids = self.get_outdated_vids(video_daily) - self.all_channel_ids = self.get_outdated_channels(channel_daily) - self.all_playlist_ids = self.get_outdated_playlists(playlist_daily) - if self.integrate_ryd: - self.get_unrated_vids() + video_daily, channel_daily, playlist_daily = self._get_daily() + self.all_youtube_ids = self._get_outdated_vids(video_daily) + self.all_channel_ids = self._get_outdated_channels(channel_daily) + self.all_playlist_ids = self._get_outdated_playlists(playlist_daily) + + integrate_ryd = self.config["downloads"]["integrate_ryd"] + if integrate_ryd: + self._get_unrated_vids() @staticmethod - def reindex_single_video(youtube_id): + def _reindex_single_video(youtube_id): """refresh data for single video""" video = YoutubeVideo(youtube_id) @@ -204,20 +171,21 @@ class Reindex: return @staticmethod - def reindex_single_channel(channel_id): + def _reindex_single_channel(channel_id): """refresh channel data and sync to videos""" channel = YoutubeChannel(channel_id) channel.get_from_es() subscribed = channel.json_data["channel_subscribed"] - overwrites = channel.json_data["channel_overwrites"] + overwrites = channel.json_data.get("channel_overwrites", False) channel.get_from_youtube() channel.json_data["channel_subscribed"] = subscribed - channel.json_data["channel_overwrites"] = overwrites + if overwrites: + channel.json_data["channel_overwrites"] = overwrites channel.upload_to_es() channel.sync_to_videos() @staticmethod - def reindex_single_playlist(playlist_id, all_indexed_ids): + def _reindex_single_playlist(playlist_id, all_indexed_ids): """refresh playlist data""" playlist = YoutubePlaylist(playlist_id) playlist.get_from_es() @@ -234,18 +202,19 @@ class Reindex: def reindex(self): """reindex what's needed""" + sleep_interval = self.config["downloads"]["sleep_interval"] # videos print(f"reindexing {len(self.all_youtube_ids)} videos") for youtube_id in self.all_youtube_ids: - self.reindex_single_video(youtube_id) - if self.sleep_interval: - sleep(self.sleep_interval) + self._reindex_single_video(youtube_id) + if sleep_interval: + sleep(sleep_interval) # channels print(f"reindexing {len(self.all_channel_ids)} channels") for channel_id in self.all_channel_ids: - self.reindex_single_channel(channel_id) - if self.sleep_interval: - sleep(self.sleep_interval) + self._reindex_single_channel(channel_id) + if sleep_interval: + sleep(sleep_interval) # playlist print(f"reindexing {len(self.all_playlist_ids)} playlists") if self.all_playlist_ids: @@ -253,6 +222,6 @@ class Reindex: handler.get_indexed() all_indexed_ids = [i["youtube_id"] for i in handler.all_videos] for playlist_id in self.all_playlist_ids: - self.reindex_single_playlist(playlist_id, all_indexed_ids) - if self.sleep_interval: - sleep(self.sleep_interval) + self._reindex_single_playlist(playlist_id, all_indexed_ids) + if sleep_interval: + sleep(sleep_interval) diff --git a/tubearchivist/home/src/ta/helper.py b/tubearchivist/home/src/ta/helper.py index d577dcd..04cea5e 100644 --- a/tubearchivist/home/src/ta/helper.py +++ b/tubearchivist/home/src/ta/helper.py @@ -3,31 +3,15 @@ Loose collection of helper functions - don't import AppConfig class here to avoid circular imports """ -import json import re import string import subprocess import unicodedata from urllib.parse import parse_qs, urlparse -import requests import yt_dlp -def get_total_hits(index, es_url, es_auth, match_field): - """get total hits from index""" - headers = {"Content-type": "application/json"} - data = {"query": {"match": {match_field: True}}} - payload = json.dumps(data) - url = f"{es_url}/{index}/_search?filter_path=hits.total" - request = requests.post(url, data=payload, headers=headers, auth=es_auth) - if not request.ok: - print(request.text) - total_json = json.loads(request.text) - total_hits = total_json["hits"]["total"]["value"] - return total_hits - - def clean_string(file_name): """clean string to only asci characters""" whitelist = "-_.() " + string.ascii_letters + string.digits