From fb4c11bd38f620d544bfaf6df877f7bdec68058c Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 23 Mar 2022 17:17:17 +0700 Subject: [PATCH] refactor ElasticIndex and ElasticBackup to use ElasticWrap and IndexPaginate --- tubearchivist/home/src/es/index_setup.py | 169 ++++++----------------- 1 file changed, 43 insertions(+), 126 deletions(-) diff --git a/tubearchivist/home/src/es/index_setup.py b/tubearchivist/home/src/es/index_setup.py index 8b0ed97..4f20cd3 100644 --- a/tubearchivist/home/src/es/index_setup.py +++ b/tubearchivist/home/src/es/index_setup.py @@ -10,7 +10,7 @@ import os import zipfile from datetime import datetime -import requests +from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.ta.config import AppConfig from home.src.ta.helper import ignore_filelist @@ -20,11 +20,6 @@ class ElasticIndex: handle mapping and settings on elastic search for a given index """ - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - HEADERS = {"Content-type": "application/json"} - def __init__(self, index_name, expected_map, expected_set): self.index_name = index_name self.expected_map = expected_map @@ -33,15 +28,9 @@ class ElasticIndex: def index_exists(self): """check if index already exists and return mapping if it does""" - index_name = self.index_name - url = f"{self.ES_URL}/ta_{index_name}" - response = requests.get(url, auth=self.ES_AUTH) - exists = response.ok - - if exists: - details = response.json()[f"ta_{index_name}"] - else: - details = False + response, status_code = ElasticWrap(f"ta_{self.index_name}").get() + exists = status_code == 200 + details = response.get(f"ta_{self.index_name}", False) return exists, details @@ -110,63 +99,41 @@ class ElasticIndex: def rebuild_index(self): """rebuild with new mapping""" - # backup self.reindex("backup") - # delete original self.delete_index(backup=False) - # create new self.create_blank() self.reindex("restore") - # delete backup self.delete_index() def reindex(self, method): """create on elastic search""" - index_name = self.index_name if method == "backup": - source = f"ta_{index_name}" - destination = f"ta_{index_name}_backup" + source = f"ta_{self.index_name}" + destination = f"ta_{self.index_name}_backup" elif method == "restore": - source = f"ta_{index_name}_backup" - destination = f"ta_{index_name}" + source = f"ta_{self.index_name}_backup" + destination = f"ta_{self.index_name}" - query = {"source": {"index": source}, "dest": {"index": destination}} - data = json.dumps(query) - url = self.ES_URL + "/_reindex?refresh=true" - response = requests.post( - url=url, data=data, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) + data = {"source": {"index": source}, "dest": {"index": destination}} + _, _ = ElasticWrap("_reindex?refresh=true").post(data=data) def delete_index(self, backup=True): """delete index passed as argument""" + path = f"ta_{self.index_name}" if backup: - url = f"{self.ES_URL}/ta_{self.index_name}_backup" - else: - url = f"{self.ES_URL}/ta_{self.index_name}" - response = requests.delete(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) + path = path + "_backup" + + _, _ = ElasticWrap(path).delete() def create_blank(self): """apply new mapping and settings for blank new index""" - expected_map = self.expected_map - expected_set = self.expected_set - # stich payload - payload = {} - if expected_set: - payload.update({"settings": expected_set}) - if expected_map: - payload.update({"mappings": {"properties": expected_map}}) - # create - url = f"{self.ES_URL}/ta_{self.index_name}" - data = json.dumps(payload) - response = requests.put( - url=url, data=data, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) + data = {} + if self.expected_set: + data.update({"settings": self.expected_set}) + if self.expected_map: + data.update({"mappings": {"properties": self.expected_map}}) + + _, _ = ElasticWrap(f"ta_{self.index_name}").put(data) class ElasticBackup: @@ -174,52 +141,21 @@ class ElasticBackup: def __init__(self, index_config, reason): self.config = AppConfig().config + self.cache_dir = self.config["application"]["cache_dir"] self.index_config = index_config self.reason = reason self.timestamp = datetime.now().strftime("%Y%m%d") self.backup_files = [] - def get_all_documents(self, index_name): + @staticmethod + def get_all_documents(index_name): """export all documents of a single index""" - headers = {"Content-type": "application/json"} - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - # get PIT ID - url = f"{es_url}/ta_{index_name}/_pit?keep_alive=1m" - response = requests.post(url, auth=es_auth) - json_data = json.loads(response.text) - pit_id = json_data["id"] - # build query data = { "query": {"match_all": {}}, - "size": 100, - "pit": {"id": pit_id, "keep_alive": "1m"}, - "sort": [{"_id": {"order": "asc"}}], + "sort": [{"_doc": {"order": "desc"}}], } - query_str = json.dumps(data) - url = es_url + "/_search" - # loop until nothing left - all_results = [] - while True: - response = requests.get( - url, data=query_str, headers=headers, auth=es_auth - ) - json_data = json.loads(response.text) - all_hits = json_data["hits"]["hits"] - if all_hits: - for hit in all_hits: - search_after = hit["sort"] - all_results.append(hit) - # update search_after with last hit data - data["search_after"] = search_after - query_str = json.dumps(data) - else: - break - # clean up PIT - query_str = json.dumps({"id": pit_id}) - requests.delete( - es_url + "/_pit", data=query_str, headers=headers, auth=es_auth - ) + paginate = IndexPaginate(f"ta_{index_name}", data, keep_source=True) + all_results = paginate.get_results() return all_results @@ -244,9 +180,8 @@ class ElasticBackup: def write_es_json(self, file_content, index_name): """write nd-json file for es _bulk API to disk""" - cache_dir = self.config["application"]["cache_dir"] file_name = f"es_{index_name}-{self.timestamp}.json" - file_path = os.path.join(cache_dir, "backup", file_name) + file_path = os.path.join(self.cache_dir, "backup", file_name) with open(file_path, "w", encoding="utf-8") as f: f.write(file_content) @@ -254,9 +189,8 @@ class ElasticBackup: def write_ta_json(self, all_results, index_name): """write generic json file to disk""" - cache_dir = self.config["application"]["cache_dir"] file_name = f"ta_{index_name}-{self.timestamp}.json" - file_path = os.path.join(cache_dir, "backup", file_name) + file_path = os.path.join(self.cache_dir, "backup", file_name) to_write = [i["_source"] for i in all_results] file_content = json.dumps(to_write) with open(file_path, "w", encoding="utf-8") as f: @@ -266,9 +200,8 @@ class ElasticBackup: def zip_it(self): """pack it up into single zip file""" - cache_dir = self.config["application"]["cache_dir"] file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" - backup_folder = os.path.join(cache_dir, "backup") + backup_folder = os.path.join(self.cache_dir, "backup") backup_file = os.path.join(backup_folder, file_name) with zipfile.ZipFile( @@ -283,29 +216,18 @@ class ElasticBackup: def post_bulk_restore(self, file_name): """send bulk to es""" - cache_dir = self.config["application"]["cache_dir"] - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - headers = {"Content-type": "application/x-ndjson"} - file_path = os.path.join(cache_dir, file_name) - + file_path = os.path.join(self.cache_dir, file_name) with open(file_path, "r", encoding="utf-8") as f: - query_str = f.read() + data = f.read() - if not query_str.strip(): + if not data.strip(): return - url = es_url + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=es_auth - ) - if not request.ok: - print(request.text) + _, _ = ElasticWrap("_bulk").post(data=data, ndjson=True) def get_all_backup_files(self): """build all available backup files for view""" - cache_dir = self.config["application"]["cache_dir"] - backup_dir = os.path.join(cache_dir, "backup") + backup_dir = os.path.join(self.cache_dir, "backup") backup_files = os.listdir(backup_dir) all_backup_files = ignore_filelist(backup_files) all_available_backups = [ @@ -336,8 +258,7 @@ class ElasticBackup: def unpack_zip_backup(self, filename): """extract backup zip and return filelist""" - cache_dir = self.config["application"]["cache_dir"] - backup_dir = os.path.join(cache_dir, "backup") + backup_dir = os.path.join(self.cache_dir, "backup") file_path = os.path.join(backup_dir, filename) with zipfile.ZipFile(file_path, "r") as z: @@ -348,9 +269,7 @@ class ElasticBackup: def restore_json_files(self, zip_content): """go through the unpacked files and restore""" - - cache_dir = self.config["application"]["cache_dir"] - backup_dir = os.path.join(cache_dir, "backup") + backup_dir = os.path.join(self.cache_dir, "backup") for json_f in zip_content: @@ -364,14 +283,13 @@ class ElasticBackup: self.post_bulk_restore(file_name) os.remove(file_name) - def index_exists(self, index_name): + @staticmethod + def index_exists(index_name): """check if index already exists to skip""" - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - url = f"{es_url}/ta_{index_name}" - response = requests.get(url, auth=es_auth) + _, status_code = ElasticWrap(f"ta_{index_name}").get() + exists = status_code == 200 - return response.ok + return exists def rotate_backup(self): """delete old backups if needed""" @@ -386,8 +304,7 @@ class ElasticBackup: print("no backup files to rotate") return - cache_dir = self.config["application"]["cache_dir"] - backup_dir = os.path.join(cache_dir, "backup") + backup_dir = os.path.join(self.cache_dir, "backup") all_to_delete = auto[rotate:] for to_delete in all_to_delete: