dedicated search_after class to scroll through index

This commit is contained in:
simon 2021-11-17 17:54:47 +07:00
parent 0e9c0d9f6b
commit c4b0f900f8
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 78 additions and 0 deletions

View File

@ -707,6 +707,84 @@ class WatchState:
print(request.text)
class IndexPaginate:
"""use search_after to go through whole index"""
CONFIG = AppConfig().config
ES_URL = CONFIG["application"]["es_url"]
ES_AUTH = CONFIG["application"]["es_auth"]
HEADERS = {"Content-type": "application/json"}
DEFAULT_SIZE = 500
def __init__(self, index_name, data, size=False):
self.index_name = index_name
self.data = data
self.pit_id = False
self.size = size
def get_results(self):
"""get all results"""
self.get_pit()
self.validate_data()
all_results = self.run_loop()
self.clean_pit()
return all_results
def get_pit(self):
"""get pit for index"""
url = f"{self.ES_URL}/{self.index_name}/_pit?keep_alive=10m"
response = requests.post(url, auth=self.ES_AUTH)
json_data = json.loads(response.text)
self.pit_id = json_data["id"]
def validate_data(self):
"""add pit and size to data"""
if "sort" not in self.data.keys():
print(self.data)
raise ValueError("missing sort key in data")
size = self.size or self.DEFAULT_SIZE
self.data["size"] = size
self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"}
def run_loop(self):
"""loop through results until last hit"""
query_str = json.dumps(self.data)
url = self.ES_URL + "/_search"
all_results = []
while True:
response = requests.get(
url, data=query_str, headers=self.HEADERS, auth=self.ES_AUTH
)
json_data = json.loads(response.text)
all_hits = json_data["hits"]["hits"]
if all_hits:
for hit in all_hits:
source = hit["_source"]
search_after = hit["sort"]
all_results.append(source)
# update search_after with last hit data
self.data["search_after"] = search_after
query_str = json.dumps(self.data)
else:
break
return all_results
def clean_pit(self):
"""delete pit from elastic search"""
query_str = json.dumps({"id": self.pit_id})
requests.delete(
self.ES_URL + "/_pit",
data=query_str,
headers=self.HEADERS,
auth=self.ES_AUTH,
)
def index_new_video(youtube_id, missing_vid=False):
"""combine video and channel classes for new video index"""
vid_handler = YoutubeVideo(youtube_id)