mirror of
https://github.com/tubearchivist/tubearchivist.git
synced 2025-02-13 11:30:13 +00:00
implement basics snapshot management
This commit is contained in:
parent
02ed521f21
commit
e7f960bf46
164
tubearchivist/home/src/es/snapshot.py
Normal file
164
tubearchivist/home/src/es/snapshot.py
Normal file
@ -0,0 +1,164 @@
|
||||
"""
|
||||
functionality:
|
||||
- handle snapshots in ES
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from os import environ
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from home.src.es.connect import ElasticWrap
|
||||
from home.src.es.index_setup import get_mapping
|
||||
|
||||
|
||||
class ElasticSnapshot:
|
||||
"""interact with snapshots on ES"""
|
||||
|
||||
REPO = "ta_snapshot"
|
||||
REPO_SETTINGS = {
|
||||
"compress": "true",
|
||||
"chunk_size": "1g",
|
||||
"location": "/usr/share/elasticsearch/data/snapshot",
|
||||
}
|
||||
POLICY = "ta_daily"
|
||||
|
||||
def __init__(self):
|
||||
self.all_indices = self._get_all_indices()
|
||||
|
||||
def _get_all_indices(self):
|
||||
"""return all indices names managed by TA"""
|
||||
mapping = get_mapping()
|
||||
all_indices = [f"ta_{i['index_name']}" for i in mapping]
|
||||
|
||||
return all_indices
|
||||
|
||||
def setup(self):
|
||||
"""setup the snapshot in ES, create or update if needed"""
|
||||
repo_exists = self._check_repo_exists()
|
||||
if not repo_exists:
|
||||
self.create_repo()
|
||||
|
||||
policy_exists = self._check_policy_exists()
|
||||
if not policy_exists:
|
||||
self.create_policy()
|
||||
|
||||
def _check_repo_exists(self):
|
||||
"""check if expected repo already exists"""
|
||||
path = f"_snapshot/{self.REPO}"
|
||||
response, statuscode = ElasticWrap(path).get()
|
||||
if statuscode == 200:
|
||||
print(f"snapshot: repo {self.REPO} already created")
|
||||
matching = response[self.REPO]["settings"] == self.REPO_SETTINGS
|
||||
if not matching:
|
||||
print(f"snapshot: update repo settings {self.REPO_SETTINGS}")
|
||||
|
||||
return matching
|
||||
|
||||
print(f"snapshot: setup repo {self.REPO} config {self.REPO_SETTINGS}")
|
||||
return False
|
||||
|
||||
def create_repo(self):
|
||||
"""create filesystem repo"""
|
||||
path = f"_snapshot/{self.REPO}"
|
||||
data = {
|
||||
"type": "fs",
|
||||
"settings": self.REPO_SETTINGS,
|
||||
}
|
||||
response, statuscode = ElasticWrap(path).post(data=data)
|
||||
if statuscode == 200:
|
||||
print(f"snapshot: repo setup correctly: {response}")
|
||||
|
||||
def _check_policy_exists(self):
|
||||
"""check if snapshot policy is set correctly"""
|
||||
path = f"_slm/policy/{self.POLICY}"
|
||||
response, statuscode = ElasticWrap(path).get()
|
||||
expected_policy = self._build_policy_data()
|
||||
if statuscode == 200:
|
||||
print(f"snapshot: policy {self.POLICY} exists")
|
||||
matching = response["ta_daily"]["policy"] == expected_policy
|
||||
if not matching:
|
||||
print(f"snapshot: update policy settings {expected_policy}")
|
||||
|
||||
return matching
|
||||
|
||||
print(f"snapshot: create policy {self.POLICY} {expected_policy}")
|
||||
return False
|
||||
|
||||
def create_policy(self):
|
||||
"""create snapshot lifetime policy"""
|
||||
path = f"_slm/policy/{self.POLICY}"
|
||||
data = self._build_policy_data()
|
||||
response, statuscode = ElasticWrap(path).put(data)
|
||||
if statuscode == 200:
|
||||
print(f"snapshot: policy setup correctly: {response}")
|
||||
|
||||
def _build_policy_data(self):
|
||||
"""build policy dict from config"""
|
||||
return {
|
||||
"schedule": "0 30 1 * * ?",
|
||||
"name": f"<{self.POLICY}_{{now/d}}>",
|
||||
"repository": self.REPO,
|
||||
"config": {
|
||||
"indices": self.all_indices,
|
||||
"include_global_state": True,
|
||||
},
|
||||
"retention": {
|
||||
"expire_after": "30d",
|
||||
"min_count": 5,
|
||||
"max_count": 50,
|
||||
},
|
||||
}
|
||||
|
||||
def take_snapshot_now(self):
|
||||
"""execute daily snapshot now"""
|
||||
path = f"_slm/policy/{self.POLICY}/_execute"
|
||||
response, statuscode = ElasticWrap(path).post()
|
||||
if statuscode == 200:
|
||||
print(f"snapshot: executing now: {response}")
|
||||
|
||||
def get_all_snapshots(self):
|
||||
"""get a list of all registered snapshots"""
|
||||
path = f"_snapshot/{self.REPO}/*?sort=start_time&order=desc"
|
||||
response, statuscode = ElasticWrap(path).get()
|
||||
if statuscode == 404:
|
||||
print("snapshots: not configured")
|
||||
return False
|
||||
|
||||
all_snapshots = response["snapshots"]
|
||||
if not all_snapshots:
|
||||
print("snapshots: no snapshots found")
|
||||
return False
|
||||
|
||||
snap_dicts = []
|
||||
for snapshot in all_snapshots:
|
||||
snap_dict = {
|
||||
"id": snapshot["snapshot"],
|
||||
"start": self._date_converter(snapshot["start_time"]),
|
||||
"end": self._date_converter(snapshot["end_time"]),
|
||||
"duration_s": snapshot["duration_in_millis"] // 1000,
|
||||
}
|
||||
snap_dicts.append(snap_dict)
|
||||
|
||||
return snap_dicts
|
||||
|
||||
@staticmethod
|
||||
def _date_converter(date_utc):
|
||||
"""convert datetime string"""
|
||||
expected_format = "%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
date = datetime.strptime(date_utc, expected_format)
|
||||
local_datetime = date.replace(tzinfo=ZoneInfo("localtime"))
|
||||
converted = local_datetime.astimezone(ZoneInfo(environ.get("TZ")))
|
||||
converted_str = converted.strftime("%Y-%m-%d %H:%M")
|
||||
|
||||
return converted_str
|
||||
|
||||
def restore_all(self, snapshot_name):
|
||||
"""restore snapshot by name"""
|
||||
for index in self.all_indices:
|
||||
_, _ = ElasticWrap(index).delete()
|
||||
|
||||
path = f"_snapshot/{self.REPO}/{snapshot_name}/_restore"
|
||||
data = {"indices": "*"}
|
||||
response, statuscode = ElasticWrap(path).post(data=data)
|
||||
if statuscode == 200:
|
||||
print(f"snapshot: executing now: {response}")
|
Loading…
Reference in New Issue
Block a user