From 6bb7f80ea2e6df4eff8e8bc9221a33f75bc53ba0 Mon Sep 17 00:00:00 2001 From: simon Date: Sat, 16 Jul 2022 17:55:18 +0700 Subject: [PATCH] Refactor IndexPaginate - add callback function - implement callback for ElasticBackup --- tubearchivist/home/src/es/connect.py | 33 ++++++--- tubearchivist/home/src/es/index_setup.py | 88 ++++++++++++++---------- 2 files changed, 74 insertions(+), 47 deletions(-) diff --git a/tubearchivist/home/src/es/connect.py b/tubearchivist/home/src/es/connect.py index e61ae14..75ab6a9 100644 --- a/tubearchivist/home/src/es/connect.py +++ b/tubearchivist/home/src/es/connect.py @@ -91,16 +91,22 @@ class ElasticWrap: class IndexPaginate: - """use search_after to go through whole index""" + """use search_after to go through whole index + kwargs: + - size: int, overwrite DEFAULT_SIZE + - keep_source: bool, keep _source key from es resutls + - callback: obj, Class with run method collback for every loop + """ DEFAULT_SIZE = 500 - def __init__(self, index_name, data, size=False, keep_source=False): + def __init__(self, index_name, data, **kwargs): self.index_name = index_name self.data = data self.pit_id = False - self.size = size - self.keep_source = keep_source + self.size = kwargs.get("size") + self.keep_source = kwargs.get("keep_source") + self.callback = kwargs.get("callback") def get_results(self): """get all results""" @@ -122,14 +128,13 @@ class IndexPaginate: print(self.data) raise ValueError("missing sort key in data") - size = self.size or self.DEFAULT_SIZE - - self.data["size"] = size + self.data["size"] = self.size or self.DEFAULT_SIZE self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"} def run_loop(self): """loop through results until last hit""" all_results = [] + counter = 0 while True: response, _ = ElasticWrap("_search").get(data=self.data) all_hits = response["hits"]["hits"] @@ -139,10 +144,18 @@ class IndexPaginate: source = hit else: source = hit["_source"] - search_after = hit["sort"] - all_results.append(source) + + if not self.callback: + all_results.append(source) + + if self.callback: + self.callback(all_hits, self.index_name).run() + if counter % 10 == 0: + print(f"{self.index_name}: processing page {counter}") + counter = counter + 1 + # update search_after with last hit data - self.data["search_after"] = search_after + self.data["search_after"] = all_hits[-1]["sort"] else: break diff --git a/tubearchivist/home/src/es/index_setup.py b/tubearchivist/home/src/es/index_setup.py index 047a635..553e3b9 100644 --- a/tubearchivist/home/src/es/index_setup.py +++ b/tubearchivist/home/src/es/index_setup.py @@ -137,35 +137,24 @@ class ElasticIndex: _, _ = ElasticWrap(f"ta_{self.index_name}").put(data) -class ElasticBackup: - """dump index to nd-json files for later bulk import""" +class BackupCallback: + """handle backup ndjson writer as callback for IndexPaginate""" - def __init__(self, index_config, reason): - self.config = AppConfig().config - self.cache_dir = self.config["application"]["cache_dir"] - self.index_config = index_config - self.reason = reason + def __init__(self, source, index_name): + self.source = source + self.index_name = index_name self.timestamp = datetime.now().strftime("%Y%m%d") - self.backup_files = [] - @staticmethod - def get_all_documents(index_name): - """export all documents of a single index""" - data = { - "query": {"match_all": {}}, - "sort": [{"_doc": {"order": "desc"}}], - } - paginate = IndexPaginate(f"ta_{index_name}", data, keep_source=True) - all_results = paginate.get_results() + def run(self): + """run the junk task""" + file_content = self._build_bulk() + self._write_es_json(file_content) - return all_results - - @staticmethod - def build_bulk(all_results): + def _build_bulk(self): """build bulk query data from all_results""" bulk_list = [] - for document in all_results: + for document in self.source: document_id = document["_id"] es_index = document["_index"] action = {"index": {"_index": es_index, "_id": document_id}} @@ -179,29 +168,56 @@ class ElasticBackup: return file_content - def write_es_json(self, file_content, index_name): + def _write_es_json(self, file_content): """write nd-json file for es _bulk API to disk""" - file_name = f"es_{index_name}-{self.timestamp}.json" - file_path = os.path.join(self.cache_dir, "backup", file_name) - with open(file_path, "w", encoding="utf-8") as f: + cache_dir = AppConfig().config["application"]["cache_dir"] + file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json" + file_path = os.path.join(cache_dir, "backup", file_name) + with open(file_path, "a+", encoding="utf-8") as f: f.write(file_content) - self.backup_files.append(file_path) + +class ElasticBackup: + """dump index to nd-json files for later bulk import""" + + def __init__(self, index_config, reason): + self.config = AppConfig().config + self.cache_dir = self.config["application"]["cache_dir"] + self.timestamp = datetime.now().strftime("%Y%m%d") + self.index_config = index_config + self.reason = reason + + @staticmethod + def backup_index(index_name): + """export all documents of a single index""" + data = { + "query": {"match_all": {}}, + "sort": [{"_doc": {"order": "desc"}}], + } + paginate = IndexPaginate( + f"ta_{index_name}", data, keep_source=True, callback=BackupCallback + ) + _ = paginate.get_results() def zip_it(self): """pack it up into single zip file""" file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" - backup_folder = os.path.join(self.cache_dir, "backup") - backup_file = os.path.join(backup_folder, file_name) + folder = os.path.join(self.cache_dir, "backup") - with zipfile.ZipFile( - backup_file, "w", compression=zipfile.ZIP_DEFLATED - ) as zip_f: - for backup_file in self.backup_files: + to_backup = [] + for file in os.listdir(folder): + if file.endswith(".json"): + to_backup.append(os.path.join(folder, file)) + + backup_file = os.path.join(folder, file_name) + + comp = zipfile.ZIP_DEFLATED + with zipfile.ZipFile(backup_file, "w", compression=comp) as zip_f: + for backup_file in to_backup: zip_f.write(backup_file, os.path.basename(backup_file)) # cleanup - for backup_file in self.backup_files: + for backup_file in to_backup: os.remove(backup_file) def post_bulk_restore(self, file_name): @@ -369,9 +385,7 @@ def backup_all_indexes(reason): print(f"backup: export in progress for {index_name}") if not backup_handler.index_exists(index_name): continue - all_results = backup_handler.get_all_documents(index_name) - file_content = backup_handler.build_bulk(all_results) - backup_handler.write_es_json(file_content, index_name) + backup_handler.backup_index(index_name) backup_handler.zip_it()