From c4b0f900f8444ff2417ea116cc541e40110cab1a Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 17 Nov 2021 17:54:47 +0700 Subject: [PATCH] dedicated search_after class to scroll through index --- tubearchivist/home/src/index.py | 78 +++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tubearchivist/home/src/index.py b/tubearchivist/home/src/index.py index 49a7d23..2294f18 100644 --- a/tubearchivist/home/src/index.py +++ b/tubearchivist/home/src/index.py @@ -707,6 +707,84 @@ class WatchState: print(request.text) +class IndexPaginate: + """use search_after to go through whole index""" + + CONFIG = AppConfig().config + ES_URL = CONFIG["application"]["es_url"] + ES_AUTH = CONFIG["application"]["es_auth"] + HEADERS = {"Content-type": "application/json"} + DEFAULT_SIZE = 500 + + def __init__(self, index_name, data, size=False): + self.index_name = index_name + self.data = data + self.pit_id = False + self.size = size + + def get_results(self): + """get all results""" + self.get_pit() + self.validate_data() + all_results = self.run_loop() + self.clean_pit() + return all_results + + def get_pit(self): + """get pit for index""" + url = f"{self.ES_URL}/{self.index_name}/_pit?keep_alive=10m" + response = requests.post(url, auth=self.ES_AUTH) + json_data = json.loads(response.text) + + self.pit_id = json_data["id"] + + def validate_data(self): + """add pit and size to data""" + if "sort" not in self.data.keys(): + print(self.data) + raise ValueError("missing sort key in data") + + size = self.size or self.DEFAULT_SIZE + + self.data["size"] = size + self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"} + + def run_loop(self): + """loop through results until last hit""" + query_str = json.dumps(self.data) + url = self.ES_URL + "/_search" + + all_results = [] + while True: + response = requests.get( + url, data=query_str, headers=self.HEADERS, auth=self.ES_AUTH + ) + json_data = json.loads(response.text) + all_hits = json_data["hits"]["hits"] + if all_hits: + for hit in all_hits: + source = hit["_source"] + search_after = hit["sort"] + all_results.append(source) + # update search_after with last hit data + self.data["search_after"] = search_after + query_str = json.dumps(self.data) + else: + break + + return all_results + + def clean_pit(self): + """delete pit from elastic search""" + query_str = json.dumps({"id": self.pit_id}) + requests.delete( + self.ES_URL + "/_pit", + data=query_str, + headers=self.HEADERS, + auth=self.ES_AUTH, + ) + + def index_new_video(youtube_id, missing_vid=False): """combine video and channel classes for new video index""" vid_handler = YoutubeVideo(youtube_id)