diff --git a/tubearchivist/home/src/es/backup.py b/tubearchivist/home/src/es/backup.py index 92ffa10c..d3a76dcf 100644 --- a/tubearchivist/home/src/es/backup.py +++ b/tubearchivist/home/src/es/backup.py @@ -18,12 +18,13 @@ from home.src.ta.helper import get_mapping, ignore_filelist class ElasticBackup: """dump index to nd-json files for later bulk import""" - def __init__(self, reason=False): + def __init__(self, reason=False, task=False): self.config = AppConfig().config self.cache_dir = self.config["application"]["cache_dir"] self.timestamp = datetime.now().strftime("%Y%m%d") self.index_config = get_mapping() self.reason = reason + self.task = task def backup_all_indexes(self): """backup all indexes, add reason to init""" @@ -44,18 +45,26 @@ class ElasticBackup: if self.reason == "auto": self.rotate_backup() - @staticmethod - def backup_index(index_name): + def backup_index(self, index_name): """export all documents of a single index""" - data = { - "query": {"match_all": {}}, - "sort": [{"_doc": {"order": "desc"}}], - } paginate = IndexPaginate( - f"ta_{index_name}", data, keep_source=True, callback=BackupCallback + f"ta_{index_name}", + data={"query": {"match_all": {}}}, + keep_source=True, + callback=BackupCallback, + task=self.task, + total=self._get_total(index_name), ) _ = paginate.get_results() + @staticmethod + def _get_total(index_name): + """get total documents in index""" + path = f"ta_{index_name}/_count" + response, _ = ElasticWrap(path).get() + + return response.get("count") + def zip_it(self): """pack it up into single zip file""" file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" @@ -142,7 +151,8 @@ class ElasticBackup: """go through the unpacked files and restore""" backup_dir = os.path.join(self.cache_dir, "backup") - for json_f in zip_content: + for idx, json_f in enumerate(zip_content): + self._notify_restore(idx, json_f, len(zip_content)) file_name = os.path.join(backup_dir, json_f) if not json_f.startswith("es_") or not json_f.endswith(".json"): @@ -153,6 +163,12 @@ class ElasticBackup: self.post_bulk_restore(file_name) os.remove(file_name) + def _notify_restore(self, idx, json_f, total_files): + """notify restore progress""" + message = [f"Restore index from json backup file {json_f}."] + progress = (idx + 1) / total_files + self.task.send_progress(message_lines=message, progress=progress) + @staticmethod def index_exists(index_name): """check if index already exists to skip""" diff --git a/tubearchivist/home/src/es/connect.py b/tubearchivist/home/src/es/connect.py index 4b59ace6..0c27ca23 100644 --- a/tubearchivist/home/src/es/connect.py +++ b/tubearchivist/home/src/es/connect.py @@ -97,8 +97,10 @@ class IndexPaginate: """use search_after to go through whole index kwargs: - size: int, overwrite DEFAULT_SIZE - - keep_source: bool, keep _source key from es resutls - - callback: obj, Class with run method collback for every loop + - keep_source: bool, keep _source key from es results + - callback: obj, Class implementing run method callback for every loop + - task: task object to send notification + - total: int, total items in index for progress message """ DEFAULT_SIZE = 500 @@ -107,12 +109,10 @@ class IndexPaginate: self.index_name = index_name self.data = data self.pit_id = False - self.size = kwargs.get("size") - self.keep_source = kwargs.get("keep_source") - self.callback = kwargs.get("callback") + self.kwargs = kwargs def get_results(self): - """get all results""" + """get all results, add task and total for notifications""" self.get_pit() self.validate_data() all_results = self.run_loop() @@ -130,7 +130,7 @@ class IndexPaginate: if "sort" not in self.data.keys(): self.data.update({"sort": [{"_doc": {"order": "desc"}}]}) - self.data["size"] = self.size or self.DEFAULT_SIZE + self.data["size"] = self.kwargs.get("size") or self.DEFAULT_SIZE self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"} def run_loop(self): @@ -140,30 +140,40 @@ class IndexPaginate: while True: response, _ = ElasticWrap("_search").get(data=self.data) all_hits = response["hits"]["hits"] - if all_hits: - for hit in all_hits: - if self.keep_source: - source = hit - else: - source = hit["_source"] - - if not self.callback: - all_results.append(source) - - if self.callback: - self.callback(all_hits, self.index_name).run() - if counter % 10 == 0: - print(f"{self.index_name}: processing page {counter}") - counter = counter + 1 - - # update search_after with last hit data - self.data["search_after"] = all_hits[-1]["sort"] - else: + if not all_hits: break + for hit in all_hits: + if self.kwargs.get("keep_source"): + all_results.append(hit) + else: + all_results.append(hit["_source"]) + + if self.kwargs.get("callback"): + self.kwargs.get("callback")(all_hits, self.index_name).run() + + if counter % 10 == 0 and self.kwargs.get("task"): + print(f"{self.index_name}: processing page {counter}") + self._notify(len(all_results)) + + counter += 1 + + # update search_after with last hit data + self.data["search_after"] = all_hits[-1]["sort"] + return all_results + def _notify(self, processed): + """send notification on task""" + total = self.kwargs.get("total") + progress = (processed + 1) / total + message = f"Processing {self.index_name} {processed}/{total}" + print(message) + self.kwargs.get("task").send_progress( + message_lines=[message], + progress=progress, + ) + def clean_pit(self): """delete pit from elastic search""" - data = {"id": self.pit_id} - ElasticWrap("_pit").delete(data=data) + ElasticWrap("_pit").delete(data={"id": self.pit_id}) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 5c62429d..9d9f6c14 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -223,7 +223,7 @@ def run_manual_import(self): ImportFolderScanner(task=self).scan() -@shared_task(bind=True, name="run_backup") +@shared_task(bind=True, name="run_backup", base=BaseTask) def run_backup(self, reason="auto"): """called from settings page, dump backup to zip file""" manager = TaskManager() @@ -232,10 +232,10 @@ def run_backup(self, reason="auto"): return manager.init(self) - ElasticBackup(reason=reason).backup_all_indexes() + ElasticBackup(reason=reason, task=self).backup_all_indexes() -@shared_task(bind=True, name="restore_backup") +@shared_task(bind=True, name="restore_backup", base=BaseTask) def run_restore_backup(self, filename): """called from settings page, dump backup to zip file""" manager = TaskManager() @@ -244,8 +244,9 @@ def run_restore_backup(self, filename): return manager.init(self) + self.send_progress(["Reset your Index"]) ElasitIndexWrap().reset() - ElasticBackup().restore(filename) + ElasticBackup(task=self).restore(filename) print("index restore finished")