From 254c518505f720ba6bafd84b89e7722208f2555b Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 28 Oct 2021 15:49:58 +0700 Subject: [PATCH] basic auth for elasticsearch --- docker-compose.yml | 3 ++ run.sh | 6 ++- tubearchivist/home/src/config.py | 4 ++ tubearchivist/home/src/download.py | 58 ++++++++++++++++------ tubearchivist/home/src/helper.py | 4 +- tubearchivist/home/src/index.py | 45 +++++++++++------ tubearchivist/home/src/index_management.py | 29 ++++++++--- tubearchivist/home/src/reindex.py | 22 +++++--- tubearchivist/home/src/searching.py | 7 ++- tubearchivist/home/views.py | 2 - 10 files changed, 131 insertions(+), 49 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index e52a366..7867d8f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,6 +17,7 @@ services: - HOST_GID=1000 - TA_USERNAME=tubearchivist - TA_PASSWORD=verysecret + - ELASTIC_PASSWORD=verysecret depends_on: - archivist-es - archivist-redis @@ -35,6 +36,8 @@ services: container_name: archivist-es restart: always environment: + - "xpack.security.enabled=true" + - "ELASTIC_PASSWORD=verysecret" - "discovery.type=single-node" - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: diff --git a/run.sh b/run.sh index 27b1399..26a45cb 100644 --- a/run.sh +++ b/run.sh @@ -1,8 +1,12 @@ #!/bin/bash # startup script inside the container for tubearchivist +if [[ -z "$ELASTIC_USER" ]]; then + export ELASTIC_USER=elastic +fi + counter=0 -until curl "$ES_URL" -fs; do +until curl -u "$ELASTIC_USER":"$ELASTIC_PASSWORD" "$ES_URL" -fs; do echo "waiting for elastic search to start" counter=$((counter+1)) if [[ $counter -eq 12 ]]; then diff --git a/tubearchivist/home/src/config.py b/tubearchivist/home/src/config.py index a01f2d3..6e304f5 100644 --- a/tubearchivist/home/src/config.py +++ b/tubearchivist/home/src/config.py @@ -51,9 +51,13 @@ class AppConfig: else: host_gid = False + es_pass = os.environ.get("ELASTIC_PASSWORD") + es_user = os.environ.get("ELASTIC_USER", default="elastic") + application = { "REDIS_HOST": os.environ.get("REDIS_HOST"), "es_url": os.environ.get("ES_URL"), + "es_auth": (es_user, es_pass), "HOST_UID": host_uid, "HOST_GID": host_gid, } diff --git a/tubearchivist/home/src/download.py b/tubearchivist/home/src/download.py index 3f03a85..12502c5 100644 --- a/tubearchivist/home/src/download.py +++ b/tubearchivist/home/src/download.py @@ -29,6 +29,7 @@ class PendingList: CONFIG = AppConfig().config ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] VIDEOS = CONFIG["application"]["videos"] @staticmethod @@ -107,7 +108,9 @@ class PendingList: query_str = "\n".join(bulk_list) headers = {"Content-type": "application/x-ndjson"} url = self.ES_URL + "/_bulk" - request = requests.post(url, data=query_str, headers=headers) + request = requests.post( + url, data=query_str, headers=headers, auth=self.ES_AUTH + ) if not request.ok: print(request) @@ -155,7 +158,7 @@ class PendingList: headers = {"Content-type": "application/json"} # get PIT ID url = self.ES_URL + "/ta_download/_pit?keep_alive=1m" - response = requests.post(url) + response = requests.post(url, auth=self.ES_AUTH) json_data = json.loads(response.text) pit_id = json_data["id"] # query @@ -170,7 +173,9 @@ class PendingList: all_pending = [] all_ignore = [] while True: - response = requests.get(url, data=query_str, headers=headers) + response = requests.get( + url, data=query_str, headers=headers, auth=self.ES_AUTH + ) json_data = json.loads(response.text) all_hits = json_data["hits"]["hits"] if all_hits: @@ -188,7 +193,12 @@ class PendingList: break # clean up PIT query_str = json.dumps({"id": pit_id}) - requests.delete(self.ES_URL + "/_pit", data=query_str, headers=headers) + requests.delete( + self.ES_URL + "/_pit", + data=query_str, + headers=headers, + auth=self.ES_AUTH, + ) return all_pending, all_ignore def get_all_indexed(self): @@ -196,7 +206,7 @@ class PendingList: headers = {"Content-type": "application/json"} # get PIT ID url = self.ES_URL + "/ta_video/_pit?keep_alive=1m" - response = requests.post(url) + response = requests.post(url, auth=self.ES_AUTH) json_data = json.loads(response.text) pit_id = json_data["id"] # query @@ -210,7 +220,9 @@ class PendingList: url = self.ES_URL + "/_search" all_indexed = [] while True: - response = requests.get(url, data=query_str, headers=headers) + response = requests.get( + url, data=query_str, headers=headers, auth=self.ES_AUTH + ) json_data = json.loads(response.text) all_hits = json_data["hits"]["hits"] if all_hits: @@ -224,7 +236,12 @@ class PendingList: break # clean up PIT query_str = json.dumps({"id": pit_id}) - requests.delete(self.ES_URL + "/_pit", data=query_str, headers=headers) + requests.delete( + self.ES_URL + "/_pit", + data=query_str, + headers=headers, + auth=self.ES_AUTH, + ) return all_indexed def get_all_downloaded(self): @@ -244,7 +261,7 @@ class PendingList: def delete_from_pending(self, youtube_id): """delete the youtube_id from ta_download""" url = f"{self.ES_URL}/ta_download/_doc/{youtube_id}" - response = requests.delete(url) + response = requests.delete(url, auth=self.ES_AUTH) if not response.ok: print(response.text) @@ -266,7 +283,9 @@ class PendingList: headers = {"Content-type": "application/x-ndjson"} url = self.ES_URL + "/_bulk" - request = requests.post(url, data=query_str, headers=headers) + request = requests.post( + url, data=query_str, headers=headers, auth=self.ES_AUTH + ) mess_dict = { "status": "ignore", "level": "info", @@ -284,6 +303,7 @@ class ChannelSubscription: def __init__(self): config = AppConfig().config self.es_url = config["application"]["es_url"] + self.es_auth = config["application"]["es_auth"] self.channel_size = config["subscriptions"]["channel_size"] def get_channels(self, subscribed_only=True): @@ -291,7 +311,7 @@ class ChannelSubscription: headers = {"Content-type": "application/json"} # get PIT ID url = self.es_url + "/ta_channel/_pit?keep_alive=1m" - response = requests.post(url) + response = requests.post(url, auth=self.es_auth) json_data = json.loads(response.text) pit_id = json_data["id"] # query @@ -313,7 +333,9 @@ class ChannelSubscription: url = self.es_url + "/_search" all_channels = [] while True: - response = requests.get(url, data=query_str, headers=headers) + response = requests.get( + url, data=query_str, headers=headers, auth=self.es_auth + ) json_data = json.loads(response.text) all_hits = json_data["hits"]["hits"] if all_hits: @@ -328,7 +350,12 @@ class ChannelSubscription: break # clean up PIT query_str = json.dumps({"id": pit_id}) - requests.delete(self.es_url + "/_pit", data=query_str, headers=headers) + requests.delete( + self.es_url + "/_pit", + data=query_str, + headers=headers, + auth=self.es_auth, + ) return all_channels def get_last_youtube_videos(self, channel_id, limit=True): @@ -394,7 +421,9 @@ class ChannelSubscription: url = self.es_url + "/ta_channel/_update/" + channel_id payload = json.dumps({"doc": channel_dict}) # update channel - request = requests.post(url, data=payload, headers=headers) + request = requests.post( + url, data=payload, headers=headers, auth=self.es_auth + ) if not request.ok: print(request.text) # sync to videos @@ -602,7 +631,8 @@ class VideoDownloader: def delete_from_pending(self, youtube_id): """delete downloaded video from pending index if its there""" es_url = self.config["application"]["es_url"] + es_auth = self.config["application"]["es_auth"] url = f"{es_url}/ta_download/_doc/{youtube_id}" - response = requests.delete(url) + response = requests.delete(url, auth=es_auth) if not response.ok and not response.status_code == 404: print(response.text) diff --git a/tubearchivist/home/src/helper.py b/tubearchivist/home/src/helper.py index 5ea1710..81ea81c 100644 --- a/tubearchivist/home/src/helper.py +++ b/tubearchivist/home/src/helper.py @@ -14,13 +14,13 @@ import redis import requests -def get_total_hits(index, es_url, match_field): +def get_total_hits(index, es_url, es_auth, match_field): """get total hits from index""" headers = {"Content-type": "application/json"} data = {"query": {"match": {match_field: True}}} payload = json.dumps(data) url = f"{es_url}/{index}/_search?filter_path=hits.total" - request = requests.post(url, data=payload, headers=headers) + request = requests.post(url, data=payload, headers=headers, auth=es_auth) if not request.ok: print(request.text) total_json = json.loads(request.text) diff --git a/tubearchivist/home/src/index.py b/tubearchivist/home/src/index.py index 8b1fe1d..8a0e11d 100644 --- a/tubearchivist/home/src/index.py +++ b/tubearchivist/home/src/index.py @@ -24,6 +24,7 @@ class YoutubeChannel: CONFIG = AppConfig().config ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] CACHE_DIR = CONFIG["application"]["cache_dir"] VIDEOS = CONFIG["application"]["videos"] @@ -51,7 +52,7 @@ class YoutubeChannel: """get from elastic search first if possible""" channel_id = self.channel_id url = f"{self.ES_URL}/ta_channel/_doc/{channel_id}" - response = requests.get(url) + response = requests.get(url, auth=self.ES_AUTH) if response.ok: channel_source = response.json()["_source"] self.source = "elastic" @@ -63,7 +64,7 @@ class YoutubeChannel: channel_id = self.channel_id url = f"https://www.youtube.com/channel/{channel_id}/about?hl=en" cookies = {"CONSENT": "YES+xxxxxxxxxxxxxxxxxxxxxxxxxxx"} - response = requests.get(url, cookies=cookies) + response = requests.get(url, cookies=cookies, auth=self.ES_AUTH) if response.ok: channel_page = response.text else: @@ -166,7 +167,7 @@ class YoutubeChannel: def upload_to_es(self): """upload channel data to elastic search""" url = f"{self.ES_URL}/ta_channel/_doc/{self.channel_id}" - response = requests.put(url, json=self.channel_dict) + response = requests.put(url, json=self.channel_dict, auth=self.ES_AUTH) print(f"added {self.channel_id} to es") if not response.ok: print(response.text) @@ -183,14 +184,18 @@ class YoutubeChannel: data = {"description": channel_id, "processors": processors} payload = json.dumps(data) url = self.ES_URL + "/_ingest/pipeline/" + channel_id - request = requests.put(url, data=payload, headers=headers) + request = requests.put( + url, data=payload, headers=headers, auth=self.ES_AUTH + ) if not request.ok: print(request.text) # apply pipeline data = {"query": {"match": {"channel.channel_id": channel_id}}} payload = json.dumps(data) url = self.ES_URL + "/ta_video/_update_by_query?pipeline=" + channel_id - request = requests.post(url, data=payload, headers=headers) + request = requests.post( + url, data=payload, headers=headers, auth=self.ES_AUTH + ) if not request.ok: print(request.text) @@ -211,7 +216,9 @@ class YoutubeChannel: } payload = json.dumps(data) url = self.ES_URL + "/ta_video/_delete_by_query" - response = requests.post(url, data=payload, headers=headers) + response = requests.post( + url, data=payload, headers=headers, auth=self.ES_AUTH + ) if not response.ok: print(response.text) @@ -230,7 +237,7 @@ class YoutubeChannel: print("delete indexed videos") self.delete_es_videos() url = self.ES_URL + "/ta_channel/_doc/" + self.channel_id - response = requests.delete(url) + response = requests.delete(url, auth=self.ES_AUTH) if not response.ok: print(response.text) @@ -240,6 +247,7 @@ class YoutubeVideo: CONFIG = AppConfig().config ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] CACHE_DIR = CONFIG["application"]["cache_dir"] VIDEOS = CONFIG["application"]["videos"] @@ -360,7 +368,7 @@ class YoutubeVideo: def get_es_data(self): """get current data from elastic search""" url = self.ES_URL + "/ta_video/_doc/" + self.youtube_id - response = requests.get(url) + response = requests.get(url, auth=self.ES_AUTH) if not response.ok: print(response.text) es_vid_dict = json.loads(response.text) @@ -369,7 +377,7 @@ class YoutubeVideo: def upload_to_es(self): """upload channel data to elastic search""" url = f"{self.ES_URL}/ta_video/_doc/{self.youtube_id}" - response = requests.put(url, json=self.vid_dict) + response = requests.put(url, json=self.vid_dict, auth=self.ES_AUTH) if not response.ok: print(response.text) @@ -380,7 +388,9 @@ class YoutubeVideo: url = f"{self.ES_URL}/ta_video/_update/{youtube_id}" data = {"script": "ctx._source.active = false"} json_str = json.dumps(data) - response = requests.post(url, data=json_str, headers=headers) + response = requests.post( + url, data=json_str, headers=headers, auth=self.ES_AUTH + ) print(f"deactivated {youtube_id}") if not response.ok: print(response.text) @@ -395,7 +405,7 @@ class YoutubeVideo: os.remove(to_delete) # delete from index url = f"{self.ES_URL}/ta_video/_doc/{self.youtube_id}" - response = requests.delete(url) + response = requests.delete(url, auth=self.ES_AUTH) if not response.ok: print(response.text) # delete thumbs from cache @@ -407,6 +417,7 @@ class WatchState: CONFIG = AppConfig().config ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] HEADERS = {"Content-type": "application/json"} def __init__(self, youtube_id): @@ -450,7 +461,9 @@ class WatchState: data["doc"]["player"]["watched"] = False payload = json.dumps(data) - request = requests.post(url, data=payload, headers=self.HEADERS) + request = requests.post( + url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH + ) if not request.ok: print(request.text) @@ -472,7 +485,9 @@ class WatchState: payload = json.dumps(data) url = f"{es_url}/_ingest/pipeline/{youtube_id}" - request = requests.put(url, data=payload, headers=headers) + request = requests.put( + url, data=payload, headers=headers, auth=self.ES_AUTH + ) if not request.ok: print(request.text) raise ValueError("failed to post ingest pipeline") @@ -485,7 +500,9 @@ class WatchState: data = {"query": {"bool": {"must": must_list}}} payload = json.dumps(data) url = f"{es_url}/ta_video/_update_by_query?pipeline={youtube_id}" - request = requests.post(url, data=payload, headers=headers) + request = requests.post( + url, data=payload, headers=headers, auth=self.ES_AUTH + ) if not request.ok: print(request.text) diff --git a/tubearchivist/home/src/index_management.py b/tubearchivist/home/src/index_management.py index 9ebd59e..b2286d8 100644 --- a/tubearchivist/home/src/index_management.py +++ b/tubearchivist/home/src/index_management.py @@ -164,6 +164,7 @@ class ElasticIndex: CONFIG = AppConfig().config ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] HEADERS = {"Content-type": "application/json"} def __init__(self, index_name, expected_map, expected_set): @@ -176,7 +177,7 @@ class ElasticIndex: """check if index already exists and return mapping if it does""" index_name = self.index_name url = f"{self.ES_URL}/ta_{index_name}" - response = requests.get(url) + response = requests.get(url, auth=self.ES_AUTH) exists = response.ok if exists: @@ -274,7 +275,9 @@ class ElasticIndex: query = {"source": {"index": source}, "dest": {"index": destination}} data = json.dumps(query) url = self.ES_URL + "/_reindex?refresh=true" - response = requests.post(url=url, data=data, headers=self.HEADERS) + response = requests.post( + url=url, data=data, headers=self.HEADERS, auth=self.ES_AUTH + ) if not response.ok: print(response.text) @@ -284,7 +287,7 @@ class ElasticIndex: url = f"{self.ES_URL}/ta_{self.index_name}_backup" else: url = f"{self.ES_URL}/ta_{self.index_name}" - response = requests.delete(url) + response = requests.delete(url, auth=self.ES_AUTH) if not response.ok: print(response.text) @@ -301,7 +304,9 @@ class ElasticIndex: # create url = f"{self.ES_URL}/ta_{self.index_name}" data = json.dumps(payload) - response = requests.put(url=url, data=data, headers=self.HEADERS) + response = requests.put( + url=url, data=data, headers=self.HEADERS, auth=self.ES_AUTH + ) if not response.ok: print(response.text) @@ -319,9 +324,10 @@ class ElasticBackup: """export all documents of a single index""" headers = {"Content-type": "application/json"} es_url = self.config["application"]["es_url"] + es_auth = self.config["application"]["es_auth"] # get PIT ID url = f"{es_url}/ta_{index_name}/_pit?keep_alive=1m" - response = requests.post(url) + response = requests.post(url, auth=es_auth) json_data = json.loads(response.text) pit_id = json_data["id"] # build query @@ -336,7 +342,9 @@ class ElasticBackup: # loop until nothing left all_results = [] while True: - response = requests.get(url, data=query_str, headers=headers) + response = requests.get( + url, data=query_str, headers=headers, auth=es_auth + ) json_data = json.loads(response.text) all_hits = json_data["hits"]["hits"] if all_hits: @@ -350,7 +358,9 @@ class ElasticBackup: break # clean up PIT query_str = json.dumps({"id": pit_id}) - requests.delete(es_url + "/_pit", data=query_str, headers=headers) + requests.delete( + es_url + "/_pit", data=query_str, headers=headers, auth=es_auth + ) return all_results @@ -416,6 +426,7 @@ class ElasticBackup: """send bulk to es""" cache_dir = self.config["application"]["cache_dir"] es_url = self.config["application"]["es_url"] + es_auth = self.config["application"]["es_auth"] headers = {"Content-type": "application/x-ndjson"} file_path = os.path.join(cache_dir, file_name) @@ -426,7 +437,9 @@ class ElasticBackup: return url = es_url + "/_bulk" - request = requests.post(url, data=query_str, headers=headers) + request = requests.post( + url, data=query_str, headers=headers, auth=es_auth + ) if not request.ok: print(request.text) diff --git a/tubearchivist/home/src/reindex.py b/tubearchivist/home/src/reindex.py index da279da..a215dec 100644 --- a/tubearchivist/home/src/reindex.py +++ b/tubearchivist/home/src/reindex.py @@ -35,6 +35,7 @@ class Reindex: config = AppConfig().config self.sleep_interval = config["downloads"]["sleep_interval"] self.es_url = config["application"]["es_url"] + self.es_auth = config["application"]["auth"] self.refresh_interval = 90 # scan self.video_daily, self.channel_daily = self.get_daily() @@ -43,10 +44,12 @@ class Reindex: def get_daily(self): """get daily refresh values""" - total_videos = get_total_hits("ta_video", self.es_url, "active") + total_videos = get_total_hits( + "ta_video", self.es_url, self.es_auth, "active" + ) video_daily = ceil(total_videos / self.refresh_interval * 1.2) total_channels = get_total_hits( - "ta_channel", self.es_url, "channel_active" + "ta_channel", self.es_url, self.es_auth, "channel_active" ) channel_daily = ceil(total_channels / self.refresh_interval * 1.2) return (video_daily, channel_daily) @@ -72,7 +75,9 @@ class Reindex: } query_str = json.dumps(data) url = self.es_url + "/ta_video/_search" - response = requests.get(url, data=query_str, headers=headers) + response = requests.get( + url, data=query_str, headers=headers, auth=self.es_auth + ) if not response.ok: print(response.text) response_dict = json.loads(response.text) @@ -100,7 +105,9 @@ class Reindex: } query_str = json.dumps(data) url = self.es_url + "/ta_channel/_search" - response = requests.get(url, data=query_str, headers=headers) + response = requests.get( + url, data=query_str, headers=headers, auth=self.es_auth + ) if not response.ok: print(response.text) response_dict = json.loads(response.text) @@ -208,6 +215,7 @@ class FilesystemScanner: CONFIG = AppConfig().config ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] VIDEOS = CONFIG["application"]["videos"] def __init__(self): @@ -329,7 +337,9 @@ class FilesystemScanner: # make the call headers = {"Content-type": "application/x-ndjson"} url = self.ES_URL + "/_bulk" - request = requests.post(url, data=query_str, headers=headers) + request = requests.post( + url, data=query_str, headers=headers, auth=self.ES_AUTH + ) if not request.ok: print(request.text) @@ -339,7 +349,7 @@ class FilesystemScanner: youtube_id = indexed[0] print(f"deleting {youtube_id} from index") url = self.ES_URL + "/ta_video/_doc/" + youtube_id - request = requests.delete(url) + request = requests.delete(url, auth=self.ES_AUTH) if not request.ok: print(request.text) diff --git a/tubearchivist/home/src/searching.py b/tubearchivist/home/src/searching.py index 0d5b22a..2d8dee4 100644 --- a/tubearchivist/home/src/searching.py +++ b/tubearchivist/home/src/searching.py @@ -20,6 +20,7 @@ class SearchHandler: CONFIG = AppConfig().config CACHE_DIR = CONFIG["application"]["cache_dir"] + ES_AUTH = CONFIG["application"]["es_auth"] def __init__(self, url, data): self.max_hits = None @@ -29,9 +30,11 @@ class SearchHandler: def get_data(self): """get the data""" if self.data: - response = requests.get(self.url, json=self.data).json() + response = requests.get( + self.url, json=self.data, auth=self.ES_AUTH + ).json() else: - response = requests.get(self.url).json() + response = requests.get(self.url, auth=self.ES_AUTH).json() if "hits" in response.keys(): self.max_hits = response["hits"]["total"]["value"] diff --git a/tubearchivist/home/views.py b/tubearchivist/home/views.py index 34337d6..ecf7e9b 100644 --- a/tubearchivist/home/views.py +++ b/tubearchivist/home/views.py @@ -115,8 +115,6 @@ class HomeView(View): } data["query"] = query - print(data) - return data @staticmethod