refactor ElasticIndex and ElasticBackup to use ElasticWrap and IndexPaginate

This commit is contained in:
simon 2022-03-23 17:17:17 +07:00
parent bfcc538ed1
commit fb4c11bd38
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 43 additions and 126 deletions

View File

@ -10,7 +10,7 @@ import os
import zipfile import zipfile
from datetime import datetime from datetime import datetime
import requests from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.ta.config import AppConfig from home.src.ta.config import AppConfig
from home.src.ta.helper import ignore_filelist from home.src.ta.helper import ignore_filelist
@ -20,11 +20,6 @@ class ElasticIndex:
handle mapping and settings on elastic search for a given index handle mapping and settings on elastic search for a given index
""" """
CONFIG = AppConfig().config
ES_URL = CONFIG["application"]["es_url"]
ES_AUTH = CONFIG["application"]["es_auth"]
HEADERS = {"Content-type": "application/json"}
def __init__(self, index_name, expected_map, expected_set): def __init__(self, index_name, expected_map, expected_set):
self.index_name = index_name self.index_name = index_name
self.expected_map = expected_map self.expected_map = expected_map
@ -33,15 +28,9 @@ class ElasticIndex:
def index_exists(self): def index_exists(self):
"""check if index already exists and return mapping if it does""" """check if index already exists and return mapping if it does"""
index_name = self.index_name response, status_code = ElasticWrap(f"ta_{self.index_name}").get()
url = f"{self.ES_URL}/ta_{index_name}" exists = status_code == 200
response = requests.get(url, auth=self.ES_AUTH) details = response.get(f"ta_{self.index_name}", False)
exists = response.ok
if exists:
details = response.json()[f"ta_{index_name}"]
else:
details = False
return exists, details return exists, details
@ -110,63 +99,41 @@ class ElasticIndex:
def rebuild_index(self): def rebuild_index(self):
"""rebuild with new mapping""" """rebuild with new mapping"""
# backup
self.reindex("backup") self.reindex("backup")
# delete original
self.delete_index(backup=False) self.delete_index(backup=False)
# create new
self.create_blank() self.create_blank()
self.reindex("restore") self.reindex("restore")
# delete backup
self.delete_index() self.delete_index()
def reindex(self, method): def reindex(self, method):
"""create on elastic search""" """create on elastic search"""
index_name = self.index_name
if method == "backup": if method == "backup":
source = f"ta_{index_name}" source = f"ta_{self.index_name}"
destination = f"ta_{index_name}_backup" destination = f"ta_{self.index_name}_backup"
elif method == "restore": elif method == "restore":
source = f"ta_{index_name}_backup" source = f"ta_{self.index_name}_backup"
destination = f"ta_{index_name}" destination = f"ta_{self.index_name}"
query = {"source": {"index": source}, "dest": {"index": destination}} data = {"source": {"index": source}, "dest": {"index": destination}}
data = json.dumps(query) _, _ = ElasticWrap("_reindex?refresh=true").post(data=data)
url = self.ES_URL + "/_reindex?refresh=true"
response = requests.post(
url=url, data=data, headers=self.HEADERS, auth=self.ES_AUTH
)
if not response.ok:
print(response.text)
def delete_index(self, backup=True): def delete_index(self, backup=True):
"""delete index passed as argument""" """delete index passed as argument"""
path = f"ta_{self.index_name}"
if backup: if backup:
url = f"{self.ES_URL}/ta_{self.index_name}_backup" path = path + "_backup"
else:
url = f"{self.ES_URL}/ta_{self.index_name}" _, _ = ElasticWrap(path).delete()
response = requests.delete(url, auth=self.ES_AUTH)
if not response.ok:
print(response.text)
def create_blank(self): def create_blank(self):
"""apply new mapping and settings for blank new index""" """apply new mapping and settings for blank new index"""
expected_map = self.expected_map data = {}
expected_set = self.expected_set if self.expected_set:
# stich payload data.update({"settings": self.expected_set})
payload = {} if self.expected_map:
if expected_set: data.update({"mappings": {"properties": self.expected_map}})
payload.update({"settings": expected_set})
if expected_map: _, _ = ElasticWrap(f"ta_{self.index_name}").put(data)
payload.update({"mappings": {"properties": expected_map}})
# create
url = f"{self.ES_URL}/ta_{self.index_name}"
data = json.dumps(payload)
response = requests.put(
url=url, data=data, headers=self.HEADERS, auth=self.ES_AUTH
)
if not response.ok:
print(response.text)
class ElasticBackup: class ElasticBackup:
@ -174,52 +141,21 @@ class ElasticBackup:
def __init__(self, index_config, reason): def __init__(self, index_config, reason):
self.config = AppConfig().config self.config = AppConfig().config
self.cache_dir = self.config["application"]["cache_dir"]
self.index_config = index_config self.index_config = index_config
self.reason = reason self.reason = reason
self.timestamp = datetime.now().strftime("%Y%m%d") self.timestamp = datetime.now().strftime("%Y%m%d")
self.backup_files = [] self.backup_files = []
def get_all_documents(self, index_name): @staticmethod
def get_all_documents(index_name):
"""export all documents of a single index""" """export all documents of a single index"""
headers = {"Content-type": "application/json"}
es_url = self.config["application"]["es_url"]
es_auth = self.config["application"]["es_auth"]
# get PIT ID
url = f"{es_url}/ta_{index_name}/_pit?keep_alive=1m"
response = requests.post(url, auth=es_auth)
json_data = json.loads(response.text)
pit_id = json_data["id"]
# build query
data = { data = {
"query": {"match_all": {}}, "query": {"match_all": {}},
"size": 100, "sort": [{"_doc": {"order": "desc"}}],
"pit": {"id": pit_id, "keep_alive": "1m"},
"sort": [{"_id": {"order": "asc"}}],
} }
query_str = json.dumps(data) paginate = IndexPaginate(f"ta_{index_name}", data, keep_source=True)
url = es_url + "/_search" all_results = paginate.get_results()
# loop until nothing left
all_results = []
while True:
response = requests.get(
url, data=query_str, headers=headers, auth=es_auth
)
json_data = json.loads(response.text)
all_hits = json_data["hits"]["hits"]
if all_hits:
for hit in all_hits:
search_after = hit["sort"]
all_results.append(hit)
# update search_after with last hit data
data["search_after"] = search_after
query_str = json.dumps(data)
else:
break
# clean up PIT
query_str = json.dumps({"id": pit_id})
requests.delete(
es_url + "/_pit", data=query_str, headers=headers, auth=es_auth
)
return all_results return all_results
@ -244,9 +180,8 @@ class ElasticBackup:
def write_es_json(self, file_content, index_name): def write_es_json(self, file_content, index_name):
"""write nd-json file for es _bulk API to disk""" """write nd-json file for es _bulk API to disk"""
cache_dir = self.config["application"]["cache_dir"]
file_name = f"es_{index_name}-{self.timestamp}.json" file_name = f"es_{index_name}-{self.timestamp}.json"
file_path = os.path.join(cache_dir, "backup", file_name) file_path = os.path.join(self.cache_dir, "backup", file_name)
with open(file_path, "w", encoding="utf-8") as f: with open(file_path, "w", encoding="utf-8") as f:
f.write(file_content) f.write(file_content)
@ -254,9 +189,8 @@ class ElasticBackup:
def write_ta_json(self, all_results, index_name): def write_ta_json(self, all_results, index_name):
"""write generic json file to disk""" """write generic json file to disk"""
cache_dir = self.config["application"]["cache_dir"]
file_name = f"ta_{index_name}-{self.timestamp}.json" file_name = f"ta_{index_name}-{self.timestamp}.json"
file_path = os.path.join(cache_dir, "backup", file_name) file_path = os.path.join(self.cache_dir, "backup", file_name)
to_write = [i["_source"] for i in all_results] to_write = [i["_source"] for i in all_results]
file_content = json.dumps(to_write) file_content = json.dumps(to_write)
with open(file_path, "w", encoding="utf-8") as f: with open(file_path, "w", encoding="utf-8") as f:
@ -266,9 +200,8 @@ class ElasticBackup:
def zip_it(self): def zip_it(self):
"""pack it up into single zip file""" """pack it up into single zip file"""
cache_dir = self.config["application"]["cache_dir"]
file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip"
backup_folder = os.path.join(cache_dir, "backup") backup_folder = os.path.join(self.cache_dir, "backup")
backup_file = os.path.join(backup_folder, file_name) backup_file = os.path.join(backup_folder, file_name)
with zipfile.ZipFile( with zipfile.ZipFile(
@ -283,29 +216,18 @@ class ElasticBackup:
def post_bulk_restore(self, file_name): def post_bulk_restore(self, file_name):
"""send bulk to es""" """send bulk to es"""
cache_dir = self.config["application"]["cache_dir"] file_path = os.path.join(self.cache_dir, file_name)
es_url = self.config["application"]["es_url"]
es_auth = self.config["application"]["es_auth"]
headers = {"Content-type": "application/x-ndjson"}
file_path = os.path.join(cache_dir, file_name)
with open(file_path, "r", encoding="utf-8") as f: with open(file_path, "r", encoding="utf-8") as f:
query_str = f.read() data = f.read()
if not query_str.strip(): if not data.strip():
return return
url = es_url + "/_bulk" _, _ = ElasticWrap("_bulk").post(data=data, ndjson=True)
request = requests.post(
url, data=query_str, headers=headers, auth=es_auth
)
if not request.ok:
print(request.text)
def get_all_backup_files(self): def get_all_backup_files(self):
"""build all available backup files for view""" """build all available backup files for view"""
cache_dir = self.config["application"]["cache_dir"] backup_dir = os.path.join(self.cache_dir, "backup")
backup_dir = os.path.join(cache_dir, "backup")
backup_files = os.listdir(backup_dir) backup_files = os.listdir(backup_dir)
all_backup_files = ignore_filelist(backup_files) all_backup_files = ignore_filelist(backup_files)
all_available_backups = [ all_available_backups = [
@ -336,8 +258,7 @@ class ElasticBackup:
def unpack_zip_backup(self, filename): def unpack_zip_backup(self, filename):
"""extract backup zip and return filelist""" """extract backup zip and return filelist"""
cache_dir = self.config["application"]["cache_dir"] backup_dir = os.path.join(self.cache_dir, "backup")
backup_dir = os.path.join(cache_dir, "backup")
file_path = os.path.join(backup_dir, filename) file_path = os.path.join(backup_dir, filename)
with zipfile.ZipFile(file_path, "r") as z: with zipfile.ZipFile(file_path, "r") as z:
@ -348,9 +269,7 @@ class ElasticBackup:
def restore_json_files(self, zip_content): def restore_json_files(self, zip_content):
"""go through the unpacked files and restore""" """go through the unpacked files and restore"""
backup_dir = os.path.join(self.cache_dir, "backup")
cache_dir = self.config["application"]["cache_dir"]
backup_dir = os.path.join(cache_dir, "backup")
for json_f in zip_content: for json_f in zip_content:
@ -364,14 +283,13 @@ class ElasticBackup:
self.post_bulk_restore(file_name) self.post_bulk_restore(file_name)
os.remove(file_name) os.remove(file_name)
def index_exists(self, index_name): @staticmethod
def index_exists(index_name):
"""check if index already exists to skip""" """check if index already exists to skip"""
es_url = self.config["application"]["es_url"] _, status_code = ElasticWrap(f"ta_{index_name}").get()
es_auth = self.config["application"]["es_auth"] exists = status_code == 200
url = f"{es_url}/ta_{index_name}"
response = requests.get(url, auth=es_auth)
return response.ok return exists
def rotate_backup(self): def rotate_backup(self):
"""delete old backups if needed""" """delete old backups if needed"""
@ -386,8 +304,7 @@ class ElasticBackup:
print("no backup files to rotate") print("no backup files to rotate")
return return
cache_dir = self.config["application"]["cache_dir"] backup_dir = os.path.join(self.cache_dir, "backup")
backup_dir = os.path.join(cache_dir, "backup")
all_to_delete = auto[rotate:] all_to_delete = auto[rotate:]
for to_delete in all_to_delete: for to_delete in all_to_delete: