refactor backup-restore tasks
This commit is contained in:
parent
04a559e471
commit
6b2bd06393
|
@ -18,12 +18,13 @@ from home.src.ta.helper import get_mapping, ignore_filelist
|
|||
class ElasticBackup:
|
||||
"""dump index to nd-json files for later bulk import"""
|
||||
|
||||
def __init__(self, reason=False):
|
||||
def __init__(self, reason=False, task=False):
|
||||
self.config = AppConfig().config
|
||||
self.cache_dir = self.config["application"]["cache_dir"]
|
||||
self.timestamp = datetime.now().strftime("%Y%m%d")
|
||||
self.index_config = get_mapping()
|
||||
self.reason = reason
|
||||
self.task = task
|
||||
|
||||
def backup_all_indexes(self):
|
||||
"""backup all indexes, add reason to init"""
|
||||
|
@ -44,18 +45,26 @@ class ElasticBackup:
|
|||
if self.reason == "auto":
|
||||
self.rotate_backup()
|
||||
|
||||
@staticmethod
|
||||
def backup_index(index_name):
|
||||
def backup_index(self, index_name):
|
||||
"""export all documents of a single index"""
|
||||
data = {
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"_doc": {"order": "desc"}}],
|
||||
}
|
||||
paginate = IndexPaginate(
|
||||
f"ta_{index_name}", data, keep_source=True, callback=BackupCallback
|
||||
f"ta_{index_name}",
|
||||
data={"query": {"match_all": {}}},
|
||||
keep_source=True,
|
||||
callback=BackupCallback,
|
||||
task=self.task,
|
||||
total=self._get_total(index_name),
|
||||
)
|
||||
_ = paginate.get_results()
|
||||
|
||||
@staticmethod
|
||||
def _get_total(index_name):
|
||||
"""get total documents in index"""
|
||||
path = f"ta_{index_name}/_count"
|
||||
response, _ = ElasticWrap(path).get()
|
||||
|
||||
return response.get("count")
|
||||
|
||||
def zip_it(self):
|
||||
"""pack it up into single zip file"""
|
||||
file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip"
|
||||
|
@ -142,7 +151,8 @@ class ElasticBackup:
|
|||
"""go through the unpacked files and restore"""
|
||||
backup_dir = os.path.join(self.cache_dir, "backup")
|
||||
|
||||
for json_f in zip_content:
|
||||
for idx, json_f in enumerate(zip_content):
|
||||
self._notify_restore(idx, json_f, len(zip_content))
|
||||
file_name = os.path.join(backup_dir, json_f)
|
||||
|
||||
if not json_f.startswith("es_") or not json_f.endswith(".json"):
|
||||
|
@ -153,6 +163,12 @@ class ElasticBackup:
|
|||
self.post_bulk_restore(file_name)
|
||||
os.remove(file_name)
|
||||
|
||||
def _notify_restore(self, idx, json_f, total_files):
|
||||
"""notify restore progress"""
|
||||
message = [f"Restore index from json backup file {json_f}."]
|
||||
progress = (idx + 1) / total_files
|
||||
self.task.send_progress(message_lines=message, progress=progress)
|
||||
|
||||
@staticmethod
|
||||
def index_exists(index_name):
|
||||
"""check if index already exists to skip"""
|
||||
|
|
|
@ -97,8 +97,10 @@ class IndexPaginate:
|
|||
"""use search_after to go through whole index
|
||||
kwargs:
|
||||
- size: int, overwrite DEFAULT_SIZE
|
||||
- keep_source: bool, keep _source key from es resutls
|
||||
- callback: obj, Class with run method collback for every loop
|
||||
- keep_source: bool, keep _source key from es results
|
||||
- callback: obj, Class implementing run method callback for every loop
|
||||
- task: task object to send notification
|
||||
- total: int, total items in index for progress message
|
||||
"""
|
||||
|
||||
DEFAULT_SIZE = 500
|
||||
|
@ -107,12 +109,10 @@ class IndexPaginate:
|
|||
self.index_name = index_name
|
||||
self.data = data
|
||||
self.pit_id = False
|
||||
self.size = kwargs.get("size")
|
||||
self.keep_source = kwargs.get("keep_source")
|
||||
self.callback = kwargs.get("callback")
|
||||
self.kwargs = kwargs
|
||||
|
||||
def get_results(self):
|
||||
"""get all results"""
|
||||
"""get all results, add task and total for notifications"""
|
||||
self.get_pit()
|
||||
self.validate_data()
|
||||
all_results = self.run_loop()
|
||||
|
@ -130,7 +130,7 @@ class IndexPaginate:
|
|||
if "sort" not in self.data.keys():
|
||||
self.data.update({"sort": [{"_doc": {"order": "desc"}}]})
|
||||
|
||||
self.data["size"] = self.size or self.DEFAULT_SIZE
|
||||
self.data["size"] = self.kwargs.get("size") or self.DEFAULT_SIZE
|
||||
self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"}
|
||||
|
||||
def run_loop(self):
|
||||
|
@ -140,30 +140,40 @@ class IndexPaginate:
|
|||
while True:
|
||||
response, _ = ElasticWrap("_search").get(data=self.data)
|
||||
all_hits = response["hits"]["hits"]
|
||||
if all_hits:
|
||||
for hit in all_hits:
|
||||
if self.keep_source:
|
||||
source = hit
|
||||
else:
|
||||
source = hit["_source"]
|
||||
|
||||
if not self.callback:
|
||||
all_results.append(source)
|
||||
|
||||
if self.callback:
|
||||
self.callback(all_hits, self.index_name).run()
|
||||
if counter % 10 == 0:
|
||||
print(f"{self.index_name}: processing page {counter}")
|
||||
counter = counter + 1
|
||||
|
||||
# update search_after with last hit data
|
||||
self.data["search_after"] = all_hits[-1]["sort"]
|
||||
else:
|
||||
if not all_hits:
|
||||
break
|
||||
|
||||
for hit in all_hits:
|
||||
if self.kwargs.get("keep_source"):
|
||||
all_results.append(hit)
|
||||
else:
|
||||
all_results.append(hit["_source"])
|
||||
|
||||
if self.kwargs.get("callback"):
|
||||
self.kwargs.get("callback")(all_hits, self.index_name).run()
|
||||
|
||||
if counter % 10 == 0 and self.kwargs.get("task"):
|
||||
print(f"{self.index_name}: processing page {counter}")
|
||||
self._notify(len(all_results))
|
||||
|
||||
counter += 1
|
||||
|
||||
# update search_after with last hit data
|
||||
self.data["search_after"] = all_hits[-1]["sort"]
|
||||
|
||||
return all_results
|
||||
|
||||
def _notify(self, processed):
|
||||
"""send notification on task"""
|
||||
total = self.kwargs.get("total")
|
||||
progress = (processed + 1) / total
|
||||
message = f"Processing {self.index_name} {processed}/{total}"
|
||||
print(message)
|
||||
self.kwargs.get("task").send_progress(
|
||||
message_lines=[message],
|
||||
progress=progress,
|
||||
)
|
||||
|
||||
def clean_pit(self):
|
||||
"""delete pit from elastic search"""
|
||||
data = {"id": self.pit_id}
|
||||
ElasticWrap("_pit").delete(data=data)
|
||||
ElasticWrap("_pit").delete(data={"id": self.pit_id})
|
||||
|
|
|
@ -223,7 +223,7 @@ def run_manual_import(self):
|
|||
ImportFolderScanner(task=self).scan()
|
||||
|
||||
|
||||
@shared_task(bind=True, name="run_backup")
|
||||
@shared_task(bind=True, name="run_backup", base=BaseTask)
|
||||
def run_backup(self, reason="auto"):
|
||||
"""called from settings page, dump backup to zip file"""
|
||||
manager = TaskManager()
|
||||
|
@ -232,10 +232,10 @@ def run_backup(self, reason="auto"):
|
|||
return
|
||||
|
||||
manager.init(self)
|
||||
ElasticBackup(reason=reason).backup_all_indexes()
|
||||
ElasticBackup(reason=reason, task=self).backup_all_indexes()
|
||||
|
||||
|
||||
@shared_task(bind=True, name="restore_backup")
|
||||
@shared_task(bind=True, name="restore_backup", base=BaseTask)
|
||||
def run_restore_backup(self, filename):
|
||||
"""called from settings page, dump backup to zip file"""
|
||||
manager = TaskManager()
|
||||
|
@ -244,8 +244,9 @@ def run_restore_backup(self, filename):
|
|||
return
|
||||
|
||||
manager.init(self)
|
||||
self.send_progress(["Reset your Index"])
|
||||
ElasitIndexWrap().reset()
|
||||
ElasticBackup().restore(filename)
|
||||
ElasticBackup(task=self).restore(filename)
|
||||
print("index restore finished")
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue