Refactor IndexPaginate

- add callback function
- implement callback for ElasticBackup
This commit is contained in:
simon 2022-07-16 17:55:18 +07:00
parent 3b17c01c6d
commit 6bb7f80ea2
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
2 changed files with 74 additions and 47 deletions

View File

@ -91,16 +91,22 @@ class ElasticWrap:
class IndexPaginate: class IndexPaginate:
"""use search_after to go through whole index""" """use search_after to go through whole index
kwargs:
- size: int, overwrite DEFAULT_SIZE
- keep_source: bool, keep _source key from es resutls
- callback: obj, Class with run method collback for every loop
"""
DEFAULT_SIZE = 500 DEFAULT_SIZE = 500
def __init__(self, index_name, data, size=False, keep_source=False): def __init__(self, index_name, data, **kwargs):
self.index_name = index_name self.index_name = index_name
self.data = data self.data = data
self.pit_id = False self.pit_id = False
self.size = size self.size = kwargs.get("size")
self.keep_source = keep_source self.keep_source = kwargs.get("keep_source")
self.callback = kwargs.get("callback")
def get_results(self): def get_results(self):
"""get all results""" """get all results"""
@ -122,14 +128,13 @@ class IndexPaginate:
print(self.data) print(self.data)
raise ValueError("missing sort key in data") raise ValueError("missing sort key in data")
size = self.size or self.DEFAULT_SIZE self.data["size"] = self.size or self.DEFAULT_SIZE
self.data["size"] = size
self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"} self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"}
def run_loop(self): def run_loop(self):
"""loop through results until last hit""" """loop through results until last hit"""
all_results = [] all_results = []
counter = 0
while True: while True:
response, _ = ElasticWrap("_search").get(data=self.data) response, _ = ElasticWrap("_search").get(data=self.data)
all_hits = response["hits"]["hits"] all_hits = response["hits"]["hits"]
@ -139,10 +144,18 @@ class IndexPaginate:
source = hit source = hit
else: else:
source = hit["_source"] source = hit["_source"]
search_after = hit["sort"]
all_results.append(source) if not self.callback:
all_results.append(source)
if self.callback:
self.callback(all_hits, self.index_name).run()
if counter % 10 == 0:
print(f"{self.index_name}: processing page {counter}")
counter = counter + 1
# update search_after with last hit data # update search_after with last hit data
self.data["search_after"] = search_after self.data["search_after"] = all_hits[-1]["sort"]
else: else:
break break

View File

@ -137,35 +137,24 @@ class ElasticIndex:
_, _ = ElasticWrap(f"ta_{self.index_name}").put(data) _, _ = ElasticWrap(f"ta_{self.index_name}").put(data)
class ElasticBackup: class BackupCallback:
"""dump index to nd-json files for later bulk import""" """handle backup ndjson writer as callback for IndexPaginate"""
def __init__(self, index_config, reason): def __init__(self, source, index_name):
self.config = AppConfig().config self.source = source
self.cache_dir = self.config["application"]["cache_dir"] self.index_name = index_name
self.index_config = index_config
self.reason = reason
self.timestamp = datetime.now().strftime("%Y%m%d") self.timestamp = datetime.now().strftime("%Y%m%d")
self.backup_files = []
@staticmethod def run(self):
def get_all_documents(index_name): """run the junk task"""
"""export all documents of a single index""" file_content = self._build_bulk()
data = { self._write_es_json(file_content)
"query": {"match_all": {}},
"sort": [{"_doc": {"order": "desc"}}],
}
paginate = IndexPaginate(f"ta_{index_name}", data, keep_source=True)
all_results = paginate.get_results()
return all_results def _build_bulk(self):
@staticmethod
def build_bulk(all_results):
"""build bulk query data from all_results""" """build bulk query data from all_results"""
bulk_list = [] bulk_list = []
for document in all_results: for document in self.source:
document_id = document["_id"] document_id = document["_id"]
es_index = document["_index"] es_index = document["_index"]
action = {"index": {"_index": es_index, "_id": document_id}} action = {"index": {"_index": es_index, "_id": document_id}}
@ -179,29 +168,56 @@ class ElasticBackup:
return file_content return file_content
def write_es_json(self, file_content, index_name): def _write_es_json(self, file_content):
"""write nd-json file for es _bulk API to disk""" """write nd-json file for es _bulk API to disk"""
file_name = f"es_{index_name}-{self.timestamp}.json" cache_dir = AppConfig().config["application"]["cache_dir"]
file_path = os.path.join(self.cache_dir, "backup", file_name) file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json"
with open(file_path, "w", encoding="utf-8") as f: file_path = os.path.join(cache_dir, "backup", file_name)
with open(file_path, "a+", encoding="utf-8") as f:
f.write(file_content) f.write(file_content)
self.backup_files.append(file_path)
class ElasticBackup:
"""dump index to nd-json files for later bulk import"""
def __init__(self, index_config, reason):
self.config = AppConfig().config
self.cache_dir = self.config["application"]["cache_dir"]
self.timestamp = datetime.now().strftime("%Y%m%d")
self.index_config = index_config
self.reason = reason
@staticmethod
def backup_index(index_name):
"""export all documents of a single index"""
data = {
"query": {"match_all": {}},
"sort": [{"_doc": {"order": "desc"}}],
}
paginate = IndexPaginate(
f"ta_{index_name}", data, keep_source=True, callback=BackupCallback
)
_ = paginate.get_results()
def zip_it(self): def zip_it(self):
"""pack it up into single zip file""" """pack it up into single zip file"""
file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip"
backup_folder = os.path.join(self.cache_dir, "backup") folder = os.path.join(self.cache_dir, "backup")
backup_file = os.path.join(backup_folder, file_name)
with zipfile.ZipFile( to_backup = []
backup_file, "w", compression=zipfile.ZIP_DEFLATED for file in os.listdir(folder):
) as zip_f: if file.endswith(".json"):
for backup_file in self.backup_files: to_backup.append(os.path.join(folder, file))
backup_file = os.path.join(folder, file_name)
comp = zipfile.ZIP_DEFLATED
with zipfile.ZipFile(backup_file, "w", compression=comp) as zip_f:
for backup_file in to_backup:
zip_f.write(backup_file, os.path.basename(backup_file)) zip_f.write(backup_file, os.path.basename(backup_file))
# cleanup # cleanup
for backup_file in self.backup_files: for backup_file in to_backup:
os.remove(backup_file) os.remove(backup_file)
def post_bulk_restore(self, file_name): def post_bulk_restore(self, file_name):
@ -369,9 +385,7 @@ def backup_all_indexes(reason):
print(f"backup: export in progress for {index_name}") print(f"backup: export in progress for {index_name}")
if not backup_handler.index_exists(index_name): if not backup_handler.index_exists(index_name):
continue continue
all_results = backup_handler.get_all_documents(index_name) backup_handler.backup_index(index_name)
file_content = backup_handler.build_bulk(all_results)
backup_handler.write_es_json(file_content, index_name)
backup_handler.zip_it() backup_handler.zip_it()