From 892e81c185020e24402ffdb1ec21b524e0e8f7ec Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 22 Sep 2023 20:35:14 +0700 Subject: [PATCH] refactor ElasticWrap dynamic kwargs --- tubearchivist/home/src/es/connect.py | 132 ++++++++++++++++----------- 1 file changed, 79 insertions(+), 53 deletions(-) diff --git a/tubearchivist/home/src/es/connect.py b/tubearchivist/home/src/es/connect.py index 7d3b79a..afbdc61 100644 --- a/tubearchivist/home/src/es/connect.py +++ b/tubearchivist/home/src/es/connect.py @@ -7,6 +7,7 @@ functionality: import json import os +from typing import Any import requests import urllib3 @@ -20,75 +21,93 @@ class ElasticWrap: ES_URL: str = str(os.environ.get("ES_URL")) ES_PASS: str = str(os.environ.get("ELASTIC_PASSWORD")) ES_USER: str = str(os.environ.get("ELASTIC_USER") or "elastic") - ES_VERIFY_SSL: str = str(os.environ.get("ES_VERIFY_SSL") or "true") + ES_DISABLE_VERIFY_SSL: bool = bool(os.environ.get("ES_DISABLE_VERIFY_SSL")) - def __init__(self, path): - self.url = f"{self.ES_URL}/{path}" - self.auth = (self.ES_USER, self.ES_PASS) - self.verify_ssl = self.ES_VERIFY_SSL != "false" + def __init__(self, path: str): + self.url: str = f"{self.ES_URL}/{path}" + self.auth: tuple[str, str] = (self.ES_USER, self.ES_PASS) - if not self.verify_ssl: + if not self.ES_DISABLE_VERIFY_SSL: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - def get(self, data=False, timeout=10, print_error=True): + def get( + self, + data: bool | dict = False, + timeout: int = 10, + print_error: bool = True, + ) -> tuple[dict, int]: """get data from es""" + + kwargs: dict[str, Any] = { + "auth": self.auth, + "timeout": timeout, + } + + if self.ES_DISABLE_VERIFY_SSL: + kwargs["verify"] = False + if data: - response = requests.get( - self.url, - json=data, - auth=self.auth, - timeout=timeout, - verify=self.verify_ssl, - ) - else: - response = requests.get( - self.url, - auth=self.auth, - timeout=timeout, - verify=self.verify_ssl, - ) + kwargs["json"] = data + + response = requests.get(self.url, **kwargs) + if print_error and not response.ok: print(response.text) return response.json(), response.status_code - def post(self, data=False, ndjson=False): + def post( + self, data: bool | dict = False, ndjson: bool = False + ) -> tuple[dict, int]: """post data to es""" - if ndjson: - headers = {"Content-type": "application/x-ndjson"} - payload = data - else: - headers = {"Content-type": "application/json"} - payload = json.dumps(data) - if data: - response = requests.post( - self.url, - data=payload, - headers=headers, - auth=self.auth, - verify=self.verify_ssl, + kwargs: dict[str, Any] = {"auth": self.auth} + + if ndjson and data: + kwargs.update( + { + "headers": {"Content-type": "application/x-ndjson"}, + "data": data, + } ) - else: - response = requests.post( - self.url, - headers=headers, - auth=self.auth, - verify=self.verify_ssl, + elif data: + kwargs.update( + { + "headers": {"Content-type": "application/json"}, + "data": json.dumps(data), + } ) + if self.ES_DISABLE_VERIFY_SSL: + kwargs["verify"] = False + + response = requests.post(self.url, **kwargs) + if not response.ok: print(response.text) return response.json(), response.status_code - def put(self, data, refresh=False): + def put( + self, + data: bool | dict = False, + refresh: bool = False, + ) -> tuple[dict, Any]: """put data to es""" + if refresh: self.url = f"{self.url}/?refresh=true" - response = requests.put( - f"{self.url}", json=data, auth=self.auth, verify=self.verify_ssl - ) + + kwargs: dict[str, Any] = { + "json": data, + "auth": self.auth, + } + + if self.ES_DISABLE_VERIFY_SSL: + kwargs["verify"] = False + + response = requests.put(self.url, **kwargs) + if not response.ok: print(response.text) print(data) @@ -96,18 +115,25 @@ class ElasticWrap: return response.json(), response.status_code - def delete(self, data=False, refresh=False): + def delete( + self, + data: bool | dict = False, + refresh: bool = False, + ) -> tuple[dict, Any]: """delete document from es""" + if refresh: self.url = f"{self.url}/?refresh=true" + + kwargs: dict[str, Any] = {"auth": self.auth} + if data: - response = requests.delete( - self.url, json=data, auth=self.auth, verify=self.verify_ssl - ) - else: - response = requests.delete( - self.url, auth=self.auth, verify=self.verify_ssl - ) + kwargs["json"] = data + + if self.ES_DISABLE_VERIFY_SSL: + kwargs["verify"] = False + + response = requests.delete(self.url, **kwargs) if not response.ok: print(response.text)