split json backup files, #406

This commit is contained in:
Simon 2023-09-25 15:59:31 +07:00
parent 77fef5de57
commit 515b724047
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
3 changed files with 25 additions and 13 deletions

View File

@ -246,9 +246,10 @@ class ThumbManager(ThumbManagerBase):
class ValidatorCallback:
"""handle callback validate thumbnails page by page"""
def __init__(self, source, index_name):
def __init__(self, source, index_name, counter=0):
self.source = source
self.index_name = index_name
self.counter = counter
def run(self):
"""run the task for page"""
@ -384,9 +385,10 @@ class EmbedCallback:
MEDIA_DIR = CONFIG["application"]["videos"]
FORMAT = MP4Cover.FORMAT_JPEG
def __init__(self, source, index_name):
def __init__(self, source, index_name, counter=0):
self.source = source
self.index_name = index_name
self.counter = counter
def run(self):
"""run embed"""

View File

@ -18,6 +18,8 @@ from home.src.ta.helper import get_mapping, ignore_filelist
class ElasticBackup:
"""dump index to nd-json files for later bulk import"""
INDEX_SPLIT = ["comment"]
def __init__(self, reason=False, task=False):
self.config = AppConfig().config
self.cache_dir = self.config["application"]["cache_dir"]
@ -51,14 +53,18 @@ class ElasticBackup:
def backup_index(self, index_name):
"""export all documents of a single index"""
paginate = IndexPaginate(
f"ta_{index_name}",
data={"query": {"match_all": {}}},
keep_source=True,
callback=BackupCallback,
task=self.task,
total=self._get_total(index_name),
)
paginate_kwargs = {
"data": {"query": {"match_all": {}}},
"keep_source": True,
"callback": BackupCallback,
"task": self.task,
"total": self._get_total(index_name),
}
if index_name in self.INDEX_SPLIT:
paginate_kwargs.update({"size": 200})
paginate = IndexPaginate(f"ta_{index_name}", **paginate_kwargs)
_ = paginate.get_results()
@staticmethod
@ -206,9 +212,10 @@ class ElasticBackup:
class BackupCallback:
"""handle backup ndjson writer as callback for IndexPaginate"""
def __init__(self, source, index_name):
def __init__(self, source, index_name, counter=0):
self.source = source
self.index_name = index_name
self.counter = counter
self.timestamp = datetime.now().strftime("%Y%m%d")
def run(self):
@ -237,7 +244,8 @@ class BackupCallback:
def _write_es_json(self, file_content):
"""write nd-json file for es _bulk API to disk"""
cache_dir = AppConfig().config["application"]["cache_dir"]
file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json"
index = self.index_name.lstrip("ta_")
file_name = f"es_{index}-{self.timestamp}-{self.counter}.json"
file_path = os.path.join(cache_dir, "backup", file_name)
with open(file_path, "a+", encoding="utf-8") as f:
f.write(file_content)

View File

@ -204,7 +204,9 @@ class IndexPaginate:
all_results.append(hit["_source"])
if self.kwargs.get("callback"):
self.kwargs.get("callback")(all_hits, self.index_name).run()
self.kwargs.get("callback")(
all_hits, self.index_name, counter=counter
).run()
if self.kwargs.get("task"):
print(f"{self.index_name}: processing page {counter}")