2022-01-23 12:32:08 +00:00
|
|
|
"""
|
|
|
|
functionality:
|
|
|
|
- setup elastic index at first start
|
|
|
|
- verify and update index mapping and settings if needed
|
|
|
|
- backup and restore metadata
|
|
|
|
"""
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
import json
|
|
|
|
import os
|
2021-09-16 10:34:20 +00:00
|
|
|
import zipfile
|
2021-09-05 17:10:14 +00:00
|
|
|
from datetime import datetime
|
|
|
|
|
2022-03-23 10:17:17 +00:00
|
|
|
from home.src.es.connect import ElasticWrap, IndexPaginate
|
2022-01-22 15:13:37 +00:00
|
|
|
from home.src.ta.config import AppConfig
|
|
|
|
from home.src.ta.helper import ignore_filelist
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ElasticIndex:
|
|
|
|
"""
|
|
|
|
handle mapping and settings on elastic search for a given index
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, index_name, expected_map, expected_set):
|
|
|
|
self.index_name = index_name
|
|
|
|
self.expected_map = expected_map
|
|
|
|
self.expected_set = expected_set
|
|
|
|
self.exists, self.details = self.index_exists()
|
|
|
|
|
|
|
|
def index_exists(self):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""check if index already exists and return mapping if it does"""
|
2022-03-23 10:17:17 +00:00
|
|
|
response, status_code = ElasticWrap(f"ta_{self.index_name}").get()
|
|
|
|
exists = status_code == 200
|
|
|
|
details = response.get(f"ta_{self.index_name}", False)
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
return exists, details
|
|
|
|
|
|
|
|
def validate(self):
|
|
|
|
"""
|
|
|
|
check if all expected mappings and settings match
|
|
|
|
returns True when rebuild is needed
|
|
|
|
"""
|
|
|
|
|
|
|
|
if self.expected_map:
|
|
|
|
rebuild = self.validate_mappings()
|
|
|
|
if rebuild:
|
|
|
|
return rebuild
|
|
|
|
|
|
|
|
if self.expected_set:
|
|
|
|
rebuild = self.validate_settings()
|
|
|
|
if rebuild:
|
|
|
|
return rebuild
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
def validate_mappings(self):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""check if all mappings are as expected"""
|
|
|
|
now_map = self.details["mappings"]["properties"]
|
2021-09-05 17:10:14 +00:00
|
|
|
|
2022-03-26 03:52:57 +00:00
|
|
|
for key, value in self.expected_map.items():
|
2021-09-05 17:10:14 +00:00
|
|
|
# nested
|
2021-09-21 09:25:22 +00:00
|
|
|
if list(value.keys()) == ["properties"]:
|
|
|
|
for key_n, value_n in value["properties"].items():
|
2022-03-26 03:52:57 +00:00
|
|
|
if key not in now_map:
|
|
|
|
print(key_n, value_n)
|
|
|
|
return True
|
2021-09-21 09:25:22 +00:00
|
|
|
if key_n not in now_map[key]["properties"].keys():
|
2021-09-05 17:10:14 +00:00
|
|
|
print(key_n, value_n)
|
|
|
|
return True
|
2021-09-21 09:25:22 +00:00
|
|
|
if not value_n == now_map[key]["properties"][key_n]:
|
2021-09-05 17:10:14 +00:00
|
|
|
print(key_n, value_n)
|
|
|
|
return True
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
# not nested
|
|
|
|
if key not in now_map.keys():
|
|
|
|
print(key, value)
|
|
|
|
return True
|
|
|
|
if not value == now_map[key]:
|
|
|
|
print(key, value)
|
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
def validate_settings(self):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""check if all settings are as expected"""
|
2021-09-05 17:10:14 +00:00
|
|
|
|
2021-09-21 09:25:22 +00:00
|
|
|
now_set = self.details["settings"]["index"]
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
for key, value in self.expected_set.items():
|
|
|
|
if key not in now_set.keys():
|
|
|
|
print(key, value)
|
|
|
|
return True
|
|
|
|
|
|
|
|
if not value == now_set[key]:
|
|
|
|
print(key, value)
|
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
def rebuild_index(self):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""rebuild with new mapping"""
|
|
|
|
self.reindex("backup")
|
2021-09-05 17:10:14 +00:00
|
|
|
self.delete_index(backup=False)
|
|
|
|
self.create_blank()
|
2021-09-21 09:25:22 +00:00
|
|
|
self.reindex("restore")
|
2021-09-05 17:10:14 +00:00
|
|
|
self.delete_index()
|
|
|
|
|
|
|
|
def reindex(self, method):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""create on elastic search"""
|
|
|
|
if method == "backup":
|
2022-03-23 10:17:17 +00:00
|
|
|
source = f"ta_{self.index_name}"
|
|
|
|
destination = f"ta_{self.index_name}_backup"
|
2021-09-21 09:25:22 +00:00
|
|
|
elif method == "restore":
|
2022-03-23 10:17:17 +00:00
|
|
|
source = f"ta_{self.index_name}_backup"
|
|
|
|
destination = f"ta_{self.index_name}"
|
|
|
|
|
|
|
|
data = {"source": {"index": source}, "dest": {"index": destination}}
|
|
|
|
_, _ = ElasticWrap("_reindex?refresh=true").post(data=data)
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
def delete_index(self, backup=True):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""delete index passed as argument"""
|
2022-03-23 10:17:17 +00:00
|
|
|
path = f"ta_{self.index_name}"
|
2021-09-05 17:10:14 +00:00
|
|
|
if backup:
|
2022-03-23 10:17:17 +00:00
|
|
|
path = path + "_backup"
|
|
|
|
|
|
|
|
_, _ = ElasticWrap(path).delete()
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
def create_blank(self):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""apply new mapping and settings for blank new index"""
|
2022-03-23 10:17:17 +00:00
|
|
|
data = {}
|
|
|
|
if self.expected_set:
|
|
|
|
data.update({"settings": self.expected_set})
|
|
|
|
if self.expected_map:
|
|
|
|
data.update({"mappings": {"properties": self.expected_map}})
|
|
|
|
|
|
|
|
_, _ = ElasticWrap(f"ta_{self.index_name}").put(data)
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ElasticBackup:
|
2021-09-21 09:25:22 +00:00
|
|
|
"""dump index to nd-json files for later bulk import"""
|
2021-09-05 17:10:14 +00:00
|
|
|
|
2021-12-13 14:20:48 +00:00
|
|
|
def __init__(self, index_config, reason):
|
2021-09-05 17:10:14 +00:00
|
|
|
self.config = AppConfig().config
|
2022-03-23 10:17:17 +00:00
|
|
|
self.cache_dir = self.config["application"]["cache_dir"]
|
2021-09-05 17:10:14 +00:00
|
|
|
self.index_config = index_config
|
2021-12-13 14:20:48 +00:00
|
|
|
self.reason = reason
|
2021-09-21 09:25:22 +00:00
|
|
|
self.timestamp = datetime.now().strftime("%Y%m%d")
|
2021-09-16 10:34:20 +00:00
|
|
|
self.backup_files = []
|
2021-09-05 17:10:14 +00:00
|
|
|
|
2022-03-23 10:17:17 +00:00
|
|
|
@staticmethod
|
|
|
|
def get_all_documents(index_name):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""export all documents of a single index"""
|
2021-09-05 17:10:14 +00:00
|
|
|
data = {
|
|
|
|
"query": {"match_all": {}},
|
2022-03-23 10:17:17 +00:00
|
|
|
"sort": [{"_doc": {"order": "desc"}}],
|
2021-09-05 17:10:14 +00:00
|
|
|
}
|
2022-03-23 10:17:17 +00:00
|
|
|
paginate = IndexPaginate(f"ta_{index_name}", data, keep_source=True)
|
|
|
|
all_results = paginate.get_results()
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
return all_results
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def build_bulk(all_results):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""build bulk query data from all_results"""
|
2021-09-05 17:10:14 +00:00
|
|
|
bulk_list = []
|
|
|
|
|
|
|
|
for document in all_results:
|
2021-09-21 09:25:22 +00:00
|
|
|
document_id = document["_id"]
|
|
|
|
es_index = document["_index"]
|
2021-09-16 10:34:20 +00:00
|
|
|
action = {"index": {"_index": es_index, "_id": document_id}}
|
2021-09-21 09:25:22 +00:00
|
|
|
source = document["_source"]
|
2021-09-05 17:10:14 +00:00
|
|
|
bulk_list.append(json.dumps(action))
|
|
|
|
bulk_list.append(json.dumps(source))
|
|
|
|
|
|
|
|
# add last newline
|
2021-09-21 09:25:22 +00:00
|
|
|
bulk_list.append("\n")
|
|
|
|
file_content = "\n".join(bulk_list)
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
return file_content
|
|
|
|
|
2021-09-16 10:34:20 +00:00
|
|
|
def write_es_json(self, file_content, index_name):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""write nd-json file for es _bulk API to disk"""
|
|
|
|
file_name = f"es_{index_name}-{self.timestamp}.json"
|
2022-03-23 10:17:17 +00:00
|
|
|
file_path = os.path.join(self.cache_dir, "backup", file_name)
|
2021-09-21 09:25:22 +00:00
|
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
2021-09-16 10:34:20 +00:00
|
|
|
f.write(file_content)
|
|
|
|
|
|
|
|
self.backup_files.append(file_path)
|
|
|
|
|
|
|
|
def write_ta_json(self, all_results, index_name):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""write generic json file to disk"""
|
|
|
|
file_name = f"ta_{index_name}-{self.timestamp}.json"
|
2022-03-23 10:17:17 +00:00
|
|
|
file_path = os.path.join(self.cache_dir, "backup", file_name)
|
2021-09-21 09:25:22 +00:00
|
|
|
to_write = [i["_source"] for i in all_results]
|
2021-09-16 10:34:20 +00:00
|
|
|
file_content = json.dumps(to_write)
|
2021-09-21 09:25:22 +00:00
|
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
2021-09-05 17:10:14 +00:00
|
|
|
f.write(file_content)
|
|
|
|
|
2021-09-16 10:34:20 +00:00
|
|
|
self.backup_files.append(file_path)
|
|
|
|
|
|
|
|
def zip_it(self):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""pack it up into single zip file"""
|
2021-12-13 14:20:48 +00:00
|
|
|
file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip"
|
2022-03-23 10:17:17 +00:00
|
|
|
backup_folder = os.path.join(self.cache_dir, "backup")
|
2021-09-18 10:10:16 +00:00
|
|
|
backup_file = os.path.join(backup_folder, file_name)
|
2021-09-16 10:34:20 +00:00
|
|
|
|
|
|
|
with zipfile.ZipFile(
|
2021-09-21 09:25:22 +00:00
|
|
|
backup_file, "w", compression=zipfile.ZIP_DEFLATED
|
|
|
|
) as zip_f:
|
2021-09-16 10:34:20 +00:00
|
|
|
for backup_file in self.backup_files:
|
2021-09-18 10:10:16 +00:00
|
|
|
zip_f.write(backup_file, os.path.basename(backup_file))
|
2021-09-16 10:34:20 +00:00
|
|
|
|
|
|
|
# cleanup
|
|
|
|
for backup_file in self.backup_files:
|
|
|
|
os.remove(backup_file)
|
|
|
|
|
2021-09-05 17:10:14 +00:00
|
|
|
def post_bulk_restore(self, file_name):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""send bulk to es"""
|
2022-03-23 10:17:17 +00:00
|
|
|
file_path = os.path.join(self.cache_dir, file_name)
|
2021-09-21 09:25:22 +00:00
|
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
2022-03-23 10:17:17 +00:00
|
|
|
data = f.read()
|
2021-09-05 17:10:14 +00:00
|
|
|
|
2022-03-23 10:17:17 +00:00
|
|
|
if not data.strip():
|
2021-09-20 12:10:39 +00:00
|
|
|
return
|
|
|
|
|
2022-03-23 10:17:17 +00:00
|
|
|
_, _ = ElasticWrap("_bulk").post(data=data, ndjson=True)
|
2021-09-05 17:10:14 +00:00
|
|
|
|
2021-12-14 12:05:58 +00:00
|
|
|
def get_all_backup_files(self):
|
|
|
|
"""build all available backup files for view"""
|
2022-03-23 10:17:17 +00:00
|
|
|
backup_dir = os.path.join(self.cache_dir, "backup")
|
2021-09-25 11:59:54 +00:00
|
|
|
backup_files = os.listdir(backup_dir)
|
|
|
|
all_backup_files = ignore_filelist(backup_files)
|
2021-09-05 17:10:14 +00:00
|
|
|
all_available_backups = [
|
2021-09-21 09:25:22 +00:00
|
|
|
i
|
2021-09-25 11:59:54 +00:00
|
|
|
for i in all_backup_files
|
2021-09-21 09:25:22 +00:00
|
|
|
if i.startswith("ta_") and i.endswith(".zip")
|
2021-09-05 17:10:14 +00:00
|
|
|
]
|
2021-12-14 12:05:58 +00:00
|
|
|
all_available_backups.sort(reverse=True)
|
|
|
|
|
|
|
|
backup_dicts = []
|
|
|
|
for backup_file in all_available_backups:
|
|
|
|
file_split = backup_file.split("-")
|
|
|
|
if len(file_split) == 2:
|
|
|
|
timestamp = file_split[1].strip(".zip")
|
|
|
|
reason = False
|
|
|
|
elif len(file_split) == 3:
|
|
|
|
timestamp = file_split[1]
|
|
|
|
reason = file_split[2].strip(".zip")
|
|
|
|
|
|
|
|
to_add = {
|
|
|
|
"filename": backup_file,
|
|
|
|
"timestamp": timestamp,
|
|
|
|
"reason": reason,
|
|
|
|
}
|
|
|
|
backup_dicts.append(to_add)
|
|
|
|
|
|
|
|
return backup_dicts
|
|
|
|
|
|
|
|
def unpack_zip_backup(self, filename):
|
|
|
|
"""extract backup zip and return filelist"""
|
2022-03-23 10:17:17 +00:00
|
|
|
backup_dir = os.path.join(self.cache_dir, "backup")
|
2021-12-14 12:05:58 +00:00
|
|
|
file_path = os.path.join(backup_dir, filename)
|
2021-09-18 10:10:16 +00:00
|
|
|
|
2021-09-21 09:25:22 +00:00
|
|
|
with zipfile.ZipFile(file_path, "r") as z:
|
2021-09-18 10:10:16 +00:00
|
|
|
zip_content = z.namelist()
|
|
|
|
z.extractall(backup_dir)
|
|
|
|
|
|
|
|
return zip_content
|
|
|
|
|
|
|
|
def restore_json_files(self, zip_content):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""go through the unpacked files and restore"""
|
2022-03-23 10:17:17 +00:00
|
|
|
backup_dir = os.path.join(self.cache_dir, "backup")
|
2021-09-18 10:10:16 +00:00
|
|
|
|
|
|
|
for json_f in zip_content:
|
|
|
|
|
|
|
|
file_name = os.path.join(backup_dir, json_f)
|
|
|
|
|
2021-09-21 09:25:22 +00:00
|
|
|
if not json_f.startswith("es_") or not json_f.endswith(".json"):
|
2021-09-18 10:10:16 +00:00
|
|
|
os.remove(file_name)
|
|
|
|
continue
|
|
|
|
|
2021-09-21 09:25:22 +00:00
|
|
|
print("restoring: " + json_f)
|
2021-09-05 17:10:14 +00:00
|
|
|
self.post_bulk_restore(file_name)
|
2021-09-18 10:10:16 +00:00
|
|
|
os.remove(file_name)
|
2021-09-05 17:10:14 +00:00
|
|
|
|
2022-03-23 10:17:17 +00:00
|
|
|
@staticmethod
|
|
|
|
def index_exists(index_name):
|
2021-11-26 09:19:18 +00:00
|
|
|
"""check if index already exists to skip"""
|
2022-03-23 10:17:17 +00:00
|
|
|
_, status_code = ElasticWrap(f"ta_{index_name}").get()
|
|
|
|
exists = status_code == 200
|
2021-11-26 09:19:18 +00:00
|
|
|
|
2022-03-23 10:17:17 +00:00
|
|
|
return exists
|
2021-11-26 09:19:18 +00:00
|
|
|
|
2021-12-14 12:40:46 +00:00
|
|
|
def rotate_backup(self):
|
|
|
|
"""delete old backups if needed"""
|
|
|
|
rotate = self.config["scheduler"]["run_backup_rotate"]
|
|
|
|
if not rotate:
|
|
|
|
return
|
|
|
|
|
|
|
|
all_backup_files = self.get_all_backup_files()
|
|
|
|
auto = [i for i in all_backup_files if i["reason"] == "auto"]
|
|
|
|
|
|
|
|
if len(auto) <= rotate:
|
|
|
|
print("no backup files to rotate")
|
|
|
|
return
|
|
|
|
|
2022-03-23 10:17:17 +00:00
|
|
|
backup_dir = os.path.join(self.cache_dir, "backup")
|
2021-12-14 12:40:46 +00:00
|
|
|
|
|
|
|
all_to_delete = auto[rotate:]
|
|
|
|
for to_delete in all_to_delete:
|
|
|
|
file_path = os.path.join(backup_dir, to_delete["filename"])
|
|
|
|
print(f"remove old backup file: {file_path}")
|
|
|
|
os.remove(file_path)
|
|
|
|
|
2021-09-05 17:10:14 +00:00
|
|
|
|
2022-01-22 15:13:37 +00:00
|
|
|
def get_mapping():
|
|
|
|
"""read index_mapping.json and get expected mapping and settings"""
|
|
|
|
with open("home/src/es/index_mapping.json", "r", encoding="utf-8") as f:
|
2022-01-27 08:32:58 +00:00
|
|
|
index_config = json.load(f).get("index_config")
|
2021-12-14 12:40:46 +00:00
|
|
|
|
2022-01-22 15:13:37 +00:00
|
|
|
return index_config
|
2021-09-05 17:10:14 +00:00
|
|
|
|
|
|
|
|
|
|
|
def index_check(force_restore=False):
|
2021-09-21 09:25:22 +00:00
|
|
|
"""check if all indexes are created and have correct mapping"""
|
2021-09-08 05:31:33 +00:00
|
|
|
|
|
|
|
backed_up = False
|
2022-01-22 15:13:37 +00:00
|
|
|
index_config = get_mapping()
|
2021-09-08 05:31:33 +00:00
|
|
|
|
2022-01-22 15:13:37 +00:00
|
|
|
for index in index_config:
|
2021-09-21 09:25:22 +00:00
|
|
|
index_name = index["index_name"]
|
|
|
|
expected_map = index["expected_map"]
|
|
|
|
expected_set = index["expected_set"]
|
2021-09-05 17:10:14 +00:00
|
|
|
handler = ElasticIndex(index_name, expected_map, expected_set)
|
|
|
|
# force restore
|
|
|
|
if force_restore:
|
|
|
|
handler.delete_index(backup=False)
|
|
|
|
handler.create_blank()
|
|
|
|
continue
|
|
|
|
|
|
|
|
# create new
|
|
|
|
if not handler.exists:
|
2021-09-21 09:25:22 +00:00
|
|
|
print(f"create new blank index with name ta_{index_name}...")
|
2021-09-05 17:10:14 +00:00
|
|
|
handler.create_blank()
|
|
|
|
continue
|
|
|
|
|
|
|
|
# validate index
|
|
|
|
rebuild = handler.validate()
|
|
|
|
if rebuild:
|
2021-09-08 05:31:33 +00:00
|
|
|
# make backup before rebuild
|
|
|
|
if not backed_up:
|
2021-09-21 09:25:22 +00:00
|
|
|
print("running backup first")
|
2021-12-13 14:20:48 +00:00
|
|
|
backup_all_indexes(reason="update")
|
2021-09-08 05:31:33 +00:00
|
|
|
backed_up = True
|
|
|
|
|
2021-09-21 09:25:22 +00:00
|
|
|
print(f"applying new mappings to index ta_{index_name}...")
|
2021-09-05 17:10:14 +00:00
|
|
|
handler.rebuild_index()
|
|
|
|
continue
|
|
|
|
|
|
|
|
# else all good
|
2021-09-21 09:25:22 +00:00
|
|
|
print(f"ta_{index_name} index is created and up to date...")
|
2022-01-22 15:13:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
def get_available_backups():
|
|
|
|
"""return dict of available backups for settings view"""
|
|
|
|
index_config = get_mapping()
|
|
|
|
backup_handler = ElasticBackup(index_config, reason=False)
|
|
|
|
all_backup_files = backup_handler.get_all_backup_files()
|
|
|
|
return all_backup_files
|
|
|
|
|
|
|
|
|
|
|
|
def backup_all_indexes(reason):
|
|
|
|
"""backup all es indexes to disk"""
|
|
|
|
index_config = get_mapping()
|
|
|
|
backup_handler = ElasticBackup(index_config, reason)
|
|
|
|
|
|
|
|
for index in backup_handler.index_config:
|
|
|
|
index_name = index["index_name"]
|
2022-04-15 13:41:38 +00:00
|
|
|
print(f"backup: export in progress for {index_name}")
|
2022-01-22 15:13:37 +00:00
|
|
|
if not backup_handler.index_exists(index_name):
|
|
|
|
continue
|
|
|
|
all_results = backup_handler.get_all_documents(index_name)
|
|
|
|
file_content = backup_handler.build_bulk(all_results)
|
|
|
|
backup_handler.write_es_json(file_content, index_name)
|
|
|
|
backup_handler.write_ta_json(all_results, index_name)
|
|
|
|
|
|
|
|
backup_handler.zip_it()
|
|
|
|
|
|
|
|
if reason == "auto":
|
|
|
|
backup_handler.rotate_backup()
|
|
|
|
|
|
|
|
|
|
|
|
def restore_from_backup(filename):
|
|
|
|
"""restore indexes from backup file"""
|
|
|
|
# delete
|
|
|
|
index_check(force_restore=True)
|
|
|
|
# recreate
|
|
|
|
index_config = get_mapping()
|
|
|
|
backup_handler = ElasticBackup(index_config, reason=False)
|
|
|
|
zip_content = backup_handler.unpack_zip_backup(filename)
|
|
|
|
backup_handler.restore_json_files(zip_content)
|