diff --git a/tubearchivist/home/src/download/thumbnails.py b/tubearchivist/home/src/download/thumbnails.py index 7041ec2..ca498c5 100644 --- a/tubearchivist/home/src/download/thumbnails.py +++ b/tubearchivist/home/src/download/thumbnails.py @@ -246,9 +246,10 @@ class ThumbManager(ThumbManagerBase): class ValidatorCallback: """handle callback validate thumbnails page by page""" - def __init__(self, source, index_name): + def __init__(self, source, index_name, counter=0): self.source = source self.index_name = index_name + self.counter = counter def run(self): """run the task for page""" @@ -384,9 +385,10 @@ class EmbedCallback: MEDIA_DIR = CONFIG["application"]["videos"] FORMAT = MP4Cover.FORMAT_JPEG - def __init__(self, source, index_name): + def __init__(self, source, index_name, counter=0): self.source = source self.index_name = index_name + self.counter = counter def run(self): """run embed""" diff --git a/tubearchivist/home/src/es/backup.py b/tubearchivist/home/src/es/backup.py index df1a481..3dc1cf5 100644 --- a/tubearchivist/home/src/es/backup.py +++ b/tubearchivist/home/src/es/backup.py @@ -18,6 +18,8 @@ from home.src.ta.helper import get_mapping, ignore_filelist class ElasticBackup: """dump index to nd-json files for later bulk import""" + INDEX_SPLIT = ["comment"] + def __init__(self, reason=False, task=False): self.config = AppConfig().config self.cache_dir = self.config["application"]["cache_dir"] @@ -51,14 +53,18 @@ class ElasticBackup: def backup_index(self, index_name): """export all documents of a single index""" - paginate = IndexPaginate( - f"ta_{index_name}", - data={"query": {"match_all": {}}}, - keep_source=True, - callback=BackupCallback, - task=self.task, - total=self._get_total(index_name), - ) + paginate_kwargs = { + "data": {"query": {"match_all": {}}}, + "keep_source": True, + "callback": BackupCallback, + "task": self.task, + "total": self._get_total(index_name), + } + + if index_name in self.INDEX_SPLIT: + paginate_kwargs.update({"size": 200}) + + paginate = IndexPaginate(f"ta_{index_name}", **paginate_kwargs) _ = paginate.get_results() @staticmethod @@ -206,9 +212,10 @@ class ElasticBackup: class BackupCallback: """handle backup ndjson writer as callback for IndexPaginate""" - def __init__(self, source, index_name): + def __init__(self, source, index_name, counter=0): self.source = source self.index_name = index_name + self.counter = counter self.timestamp = datetime.now().strftime("%Y%m%d") def run(self): @@ -237,7 +244,8 @@ class BackupCallback: def _write_es_json(self, file_content): """write nd-json file for es _bulk API to disk""" cache_dir = AppConfig().config["application"]["cache_dir"] - file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json" + index = self.index_name.lstrip("ta_") + file_name = f"es_{index}-{self.timestamp}-{self.counter}.json" file_path = os.path.join(cache_dir, "backup", file_name) with open(file_path, "a+", encoding="utf-8") as f: f.write(file_content) diff --git a/tubearchivist/home/src/es/connect.py b/tubearchivist/home/src/es/connect.py index 43e2f6e..a7c3ff5 100644 --- a/tubearchivist/home/src/es/connect.py +++ b/tubearchivist/home/src/es/connect.py @@ -204,7 +204,9 @@ class IndexPaginate: all_results.append(hit["_source"]) if self.kwargs.get("callback"): - self.kwargs.get("callback")(all_hits, self.index_name).run() + self.kwargs.get("callback")( + all_hits, self.index_name, counter=counter + ).run() if self.kwargs.get("task"): print(f"{self.index_name}: processing page {counter}")