refactor ElasticWrap dynamic kwargs

This commit is contained in:
Simon 2023-09-22 20:35:14 +07:00
parent b2bb7ea28e
commit 892e81c185
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 79 additions and 53 deletions

View File

@ -7,6 +7,7 @@ functionality:
import json import json
import os import os
from typing import Any
import requests import requests
import urllib3 import urllib3
@ -20,75 +21,93 @@ class ElasticWrap:
ES_URL: str = str(os.environ.get("ES_URL")) ES_URL: str = str(os.environ.get("ES_URL"))
ES_PASS: str = str(os.environ.get("ELASTIC_PASSWORD")) ES_PASS: str = str(os.environ.get("ELASTIC_PASSWORD"))
ES_USER: str = str(os.environ.get("ELASTIC_USER") or "elastic") ES_USER: str = str(os.environ.get("ELASTIC_USER") or "elastic")
ES_VERIFY_SSL: str = str(os.environ.get("ES_VERIFY_SSL") or "true") ES_DISABLE_VERIFY_SSL: bool = bool(os.environ.get("ES_DISABLE_VERIFY_SSL"))
def __init__(self, path): def __init__(self, path: str):
self.url = f"{self.ES_URL}/{path}" self.url: str = f"{self.ES_URL}/{path}"
self.auth = (self.ES_USER, self.ES_PASS) self.auth: tuple[str, str] = (self.ES_USER, self.ES_PASS)
self.verify_ssl = self.ES_VERIFY_SSL != "false"
if not self.verify_ssl: if not self.ES_DISABLE_VERIFY_SSL:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def get(self, data=False, timeout=10, print_error=True): def get(
self,
data: bool | dict = False,
timeout: int = 10,
print_error: bool = True,
) -> tuple[dict, int]:
"""get data from es""" """get data from es"""
kwargs: dict[str, Any] = {
"auth": self.auth,
"timeout": timeout,
}
if self.ES_DISABLE_VERIFY_SSL:
kwargs["verify"] = False
if data: if data:
response = requests.get( kwargs["json"] = data
self.url,
json=data, response = requests.get(self.url, **kwargs)
auth=self.auth,
timeout=timeout,
verify=self.verify_ssl,
)
else:
response = requests.get(
self.url,
auth=self.auth,
timeout=timeout,
verify=self.verify_ssl,
)
if print_error and not response.ok: if print_error and not response.ok:
print(response.text) print(response.text)
return response.json(), response.status_code return response.json(), response.status_code
def post(self, data=False, ndjson=False): def post(
self, data: bool | dict = False, ndjson: bool = False
) -> tuple[dict, int]:
"""post data to es""" """post data to es"""
if ndjson:
headers = {"Content-type": "application/x-ndjson"}
payload = data
else:
headers = {"Content-type": "application/json"}
payload = json.dumps(data)
if data: kwargs: dict[str, Any] = {"auth": self.auth}
response = requests.post(
self.url, if ndjson and data:
data=payload, kwargs.update(
headers=headers, {
auth=self.auth, "headers": {"Content-type": "application/x-ndjson"},
verify=self.verify_ssl, "data": data,
}
) )
else: elif data:
response = requests.post( kwargs.update(
self.url, {
headers=headers, "headers": {"Content-type": "application/json"},
auth=self.auth, "data": json.dumps(data),
verify=self.verify_ssl, }
) )
if self.ES_DISABLE_VERIFY_SSL:
kwargs["verify"] = False
response = requests.post(self.url, **kwargs)
if not response.ok: if not response.ok:
print(response.text) print(response.text)
return response.json(), response.status_code return response.json(), response.status_code
def put(self, data, refresh=False): def put(
self,
data: bool | dict = False,
refresh: bool = False,
) -> tuple[dict, Any]:
"""put data to es""" """put data to es"""
if refresh: if refresh:
self.url = f"{self.url}/?refresh=true" self.url = f"{self.url}/?refresh=true"
response = requests.put(
f"{self.url}", json=data, auth=self.auth, verify=self.verify_ssl kwargs: dict[str, Any] = {
) "json": data,
"auth": self.auth,
}
if self.ES_DISABLE_VERIFY_SSL:
kwargs["verify"] = False
response = requests.put(self.url, **kwargs)
if not response.ok: if not response.ok:
print(response.text) print(response.text)
print(data) print(data)
@ -96,18 +115,25 @@ class ElasticWrap:
return response.json(), response.status_code return response.json(), response.status_code
def delete(self, data=False, refresh=False): def delete(
self,
data: bool | dict = False,
refresh: bool = False,
) -> tuple[dict, Any]:
"""delete document from es""" """delete document from es"""
if refresh: if refresh:
self.url = f"{self.url}/?refresh=true" self.url = f"{self.url}/?refresh=true"
kwargs: dict[str, Any] = {"auth": self.auth}
if data: if data:
response = requests.delete( kwargs["json"] = data
self.url, json=data, auth=self.auth, verify=self.verify_ssl
) if self.ES_DISABLE_VERIFY_SSL:
else: kwargs["verify"] = False
response = requests.delete(
self.url, auth=self.auth, verify=self.verify_ssl response = requests.delete(self.url, **kwargs)
)
if not response.ok: if not response.ok:
print(response.text) print(response.text)