From 8cca09e0ac83311836d98159e72aded5ba898d28 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 7 Jul 2022 12:34:57 +0700 Subject: [PATCH 1/4] remove ta_json backup file, keep only es_ndjson --- tubearchivist/home/src/es/index_setup.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/tubearchivist/home/src/es/index_setup.py b/tubearchivist/home/src/es/index_setup.py index 85c4e28..047a635 100644 --- a/tubearchivist/home/src/es/index_setup.py +++ b/tubearchivist/home/src/es/index_setup.py @@ -188,17 +188,6 @@ class ElasticBackup: self.backup_files.append(file_path) - def write_ta_json(self, all_results, index_name): - """write generic json file to disk""" - file_name = f"ta_{index_name}-{self.timestamp}.json" - file_path = os.path.join(self.cache_dir, "backup", file_name) - to_write = [i["_source"] for i in all_results] - file_content = json.dumps(to_write) - with open(file_path, "w", encoding="utf-8") as f: - f.write(file_content) - - self.backup_files.append(file_path) - def zip_it(self): """pack it up into single zip file""" file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" @@ -383,7 +372,6 @@ def backup_all_indexes(reason): all_results = backup_handler.get_all_documents(index_name) file_content = backup_handler.build_bulk(all_results) backup_handler.write_es_json(file_content, index_name) - backup_handler.write_ta_json(all_results, index_name) backup_handler.zip_it() From 91bccfd05712cf981027052d776b06d73ba4adef Mon Sep 17 00:00:00 2001 From: simon Date: Tue, 12 Jul 2022 11:41:03 +0700 Subject: [PATCH 2/4] deactivate non existing channels on reindex --- tubearchivist/home/src/index/channel.py | 13 +++++++++++++ tubearchivist/home/src/index/reindex.py | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/tubearchivist/home/src/index/channel.py b/tubearchivist/home/src/index/channel.py index eea9852..400c5f5 100644 --- a/tubearchivist/home/src/index/channel.py +++ b/tubearchivist/home/src/index/channel.py @@ -37,6 +37,9 @@ class ChannelScraper: """main method to return channel dict""" self.get_soup() self._extract_yt_json() + if self._is_deactivated(): + return False + self._parse_channel_main() self._parse_channel_meta() return self.json_data @@ -68,6 +71,16 @@ class ChannelScraper: json_raw = script_content.rstrip(";") self.yt_json = json.loads(json_raw) + def _is_deactivated(self): + """check if channel is deactivated""" + alert_text = "This channel does not exist." + alerts = self.yt_json.get("alerts") + if alerts and alert_text in str(alerts): + print(f"{self.channel_id}: {alert_text}") + return True + + return False + def _parse_channel_main(self): """extract maintab values from scraped channel json data""" main_tab = self.yt_json["header"]["c4TabbedHeaderRenderer"] diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index e79ab30..4434f59 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -195,6 +195,10 @@ class Reindex: subscribed = channel.json_data["channel_subscribed"] overwrites = channel.json_data.get("channel_overwrites", False) channel.get_from_youtube() + if not channel.json_data: + channel.deactivate() + return + channel.json_data["channel_subscribed"] = subscribed if overwrites: channel.json_data["channel_overwrites"] = overwrites From 3b17c01c6d4611481b4513669e65f58509be905f Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 14 Jul 2022 17:26:10 +0700 Subject: [PATCH 3/4] add TA_HOST env for ALLOWED_HOSTS --- docker_assets/run.sh | 1 + tubearchivist/config/settings.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docker_assets/run.sh b/docker_assets/run.sh index a3a11b1..effe843 100644 --- a/docker_assets/run.sh +++ b/docker_assets/run.sh @@ -13,6 +13,7 @@ required="Missing required environment variable" [[ -f $lockfile ]] || : "${TA_USERNAME:?$required}" : "${TA_PASSWORD:?$required}" : "${ELASTIC_PASSWORD:?$required}" +: "${TA_HOST:?$required}" # ugly nginx and uwsgi port overwrite with env vars if [[ -n "$TA_PORT" ]]; then diff --git a/tubearchivist/config/settings.py b/tubearchivist/config/settings.py index 321deff..7ed6bfe 100644 --- a/tubearchivist/config/settings.py +++ b/tubearchivist/config/settings.py @@ -30,7 +30,7 @@ SECRET_KEY = PW_HASH.hexdigest() # SECURITY WARNING: don't run with debug turned on in production! DEBUG = bool(environ.get("DJANGO_DEBUG")) -ALLOWED_HOSTS = ["*"] +ALLOWED_HOSTS = [i.strip() for i in environ.get("TA_HOST").split()] # Application definition From 6bb7f80ea2e6df4eff8e8bc9221a33f75bc53ba0 Mon Sep 17 00:00:00 2001 From: simon Date: Sat, 16 Jul 2022 17:55:18 +0700 Subject: [PATCH 4/4] Refactor IndexPaginate - add callback function - implement callback for ElasticBackup --- tubearchivist/home/src/es/connect.py | 33 ++++++--- tubearchivist/home/src/es/index_setup.py | 88 ++++++++++++++---------- 2 files changed, 74 insertions(+), 47 deletions(-) diff --git a/tubearchivist/home/src/es/connect.py b/tubearchivist/home/src/es/connect.py index e61ae14..75ab6a9 100644 --- a/tubearchivist/home/src/es/connect.py +++ b/tubearchivist/home/src/es/connect.py @@ -91,16 +91,22 @@ class ElasticWrap: class IndexPaginate: - """use search_after to go through whole index""" + """use search_after to go through whole index + kwargs: + - size: int, overwrite DEFAULT_SIZE + - keep_source: bool, keep _source key from es resutls + - callback: obj, Class with run method collback for every loop + """ DEFAULT_SIZE = 500 - def __init__(self, index_name, data, size=False, keep_source=False): + def __init__(self, index_name, data, **kwargs): self.index_name = index_name self.data = data self.pit_id = False - self.size = size - self.keep_source = keep_source + self.size = kwargs.get("size") + self.keep_source = kwargs.get("keep_source") + self.callback = kwargs.get("callback") def get_results(self): """get all results""" @@ -122,14 +128,13 @@ class IndexPaginate: print(self.data) raise ValueError("missing sort key in data") - size = self.size or self.DEFAULT_SIZE - - self.data["size"] = size + self.data["size"] = self.size or self.DEFAULT_SIZE self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"} def run_loop(self): """loop through results until last hit""" all_results = [] + counter = 0 while True: response, _ = ElasticWrap("_search").get(data=self.data) all_hits = response["hits"]["hits"] @@ -139,10 +144,18 @@ class IndexPaginate: source = hit else: source = hit["_source"] - search_after = hit["sort"] - all_results.append(source) + + if not self.callback: + all_results.append(source) + + if self.callback: + self.callback(all_hits, self.index_name).run() + if counter % 10 == 0: + print(f"{self.index_name}: processing page {counter}") + counter = counter + 1 + # update search_after with last hit data - self.data["search_after"] = search_after + self.data["search_after"] = all_hits[-1]["sort"] else: break diff --git a/tubearchivist/home/src/es/index_setup.py b/tubearchivist/home/src/es/index_setup.py index 047a635..553e3b9 100644 --- a/tubearchivist/home/src/es/index_setup.py +++ b/tubearchivist/home/src/es/index_setup.py @@ -137,35 +137,24 @@ class ElasticIndex: _, _ = ElasticWrap(f"ta_{self.index_name}").put(data) -class ElasticBackup: - """dump index to nd-json files for later bulk import""" +class BackupCallback: + """handle backup ndjson writer as callback for IndexPaginate""" - def __init__(self, index_config, reason): - self.config = AppConfig().config - self.cache_dir = self.config["application"]["cache_dir"] - self.index_config = index_config - self.reason = reason + def __init__(self, source, index_name): + self.source = source + self.index_name = index_name self.timestamp = datetime.now().strftime("%Y%m%d") - self.backup_files = [] - @staticmethod - def get_all_documents(index_name): - """export all documents of a single index""" - data = { - "query": {"match_all": {}}, - "sort": [{"_doc": {"order": "desc"}}], - } - paginate = IndexPaginate(f"ta_{index_name}", data, keep_source=True) - all_results = paginate.get_results() + def run(self): + """run the junk task""" + file_content = self._build_bulk() + self._write_es_json(file_content) - return all_results - - @staticmethod - def build_bulk(all_results): + def _build_bulk(self): """build bulk query data from all_results""" bulk_list = [] - for document in all_results: + for document in self.source: document_id = document["_id"] es_index = document["_index"] action = {"index": {"_index": es_index, "_id": document_id}} @@ -179,29 +168,56 @@ class ElasticBackup: return file_content - def write_es_json(self, file_content, index_name): + def _write_es_json(self, file_content): """write nd-json file for es _bulk API to disk""" - file_name = f"es_{index_name}-{self.timestamp}.json" - file_path = os.path.join(self.cache_dir, "backup", file_name) - with open(file_path, "w", encoding="utf-8") as f: + cache_dir = AppConfig().config["application"]["cache_dir"] + file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json" + file_path = os.path.join(cache_dir, "backup", file_name) + with open(file_path, "a+", encoding="utf-8") as f: f.write(file_content) - self.backup_files.append(file_path) + +class ElasticBackup: + """dump index to nd-json files for later bulk import""" + + def __init__(self, index_config, reason): + self.config = AppConfig().config + self.cache_dir = self.config["application"]["cache_dir"] + self.timestamp = datetime.now().strftime("%Y%m%d") + self.index_config = index_config + self.reason = reason + + @staticmethod + def backup_index(index_name): + """export all documents of a single index""" + data = { + "query": {"match_all": {}}, + "sort": [{"_doc": {"order": "desc"}}], + } + paginate = IndexPaginate( + f"ta_{index_name}", data, keep_source=True, callback=BackupCallback + ) + _ = paginate.get_results() def zip_it(self): """pack it up into single zip file""" file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" - backup_folder = os.path.join(self.cache_dir, "backup") - backup_file = os.path.join(backup_folder, file_name) + folder = os.path.join(self.cache_dir, "backup") - with zipfile.ZipFile( - backup_file, "w", compression=zipfile.ZIP_DEFLATED - ) as zip_f: - for backup_file in self.backup_files: + to_backup = [] + for file in os.listdir(folder): + if file.endswith(".json"): + to_backup.append(os.path.join(folder, file)) + + backup_file = os.path.join(folder, file_name) + + comp = zipfile.ZIP_DEFLATED + with zipfile.ZipFile(backup_file, "w", compression=comp) as zip_f: + for backup_file in to_backup: zip_f.write(backup_file, os.path.basename(backup_file)) # cleanup - for backup_file in self.backup_files: + for backup_file in to_backup: os.remove(backup_file) def post_bulk_restore(self, file_name): @@ -369,9 +385,7 @@ def backup_all_indexes(reason): print(f"backup: export in progress for {index_name}") if not backup_handler.index_exists(index_name): continue - all_results = backup_handler.get_all_documents(index_name) - file_content = backup_handler.build_bulk(all_results) - backup_handler.write_es_json(file_content, index_name) + backup_handler.backup_index(index_name) backup_handler.zip_it()