From 6b7354b14f09fe1d549ccf12454b4946be929433 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 15 Mar 2023 16:59:20 +0700 Subject: [PATCH] refactor reindex task --- tubearchivist/home/src/index/reindex.py | 25 +++++++++++++++++-------- tubearchivist/home/src/ta/ta_redis.py | 4 ++++ tubearchivist/home/tasks.py | 8 +++++--- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index afe4f9c3..66d958e7 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -21,7 +21,7 @@ from home.src.index.comments import Comments from home.src.index.playlist import YoutubePlaylist from home.src.index.video import YoutubeVideo from home.src.ta.config import AppConfig -from home.src.ta.ta_redis import RedisArchivist, RedisQueue +from home.src.ta.ta_redis import RedisQueue class ReindexBase: @@ -201,8 +201,9 @@ class ReindexManual(ReindexBase): class Reindex(ReindexBase): """reindex all documents from redis queue""" - def __init__(self): + def __init__(self, task=False): super().__init__() + self.task = task self.all_indexed_ids = False def reindex_all(self): @@ -211,22 +212,22 @@ class Reindex(ReindexBase): print("[reindex] cookie invalid, exiting...") return - for index_config in self.REINDEX_CONFIG.values(): + for name, index_config in self.REINDEX_CONFIG.items(): if not RedisQueue(index_config["queue_name"]).has_item(): continue + total = RedisQueue(index_config["queue_name"]).length() while True: - has_next = self.reindex_index(index_config) + has_next = self.reindex_index(name, index_config, total) if not has_next: break - RedisArchivist().set_message("last_reindex", self.now) - - def reindex_index(self, index_config): + def reindex_index(self, name, index_config, total): """reindex all of a single index""" reindex = self.get_reindex_map(index_config["index_name"]) youtube_id = RedisQueue(index_config["queue_name"]).get_next() if youtube_id: + self._notify(name, index_config, total) reindex(youtube_id) sleep_interval = self.config["downloads"].get("sleep_interval", 0) sleep(sleep_interval) @@ -243,6 +244,14 @@ class Reindex(ReindexBase): return def_map.get(index_name) + def _notify(self, name, index_config, total): + """send notification back to task""" + remaining = RedisQueue(index_config["queue_name"]).length() + idx = total - remaining + message = [f"Reindexing {name[0].upper()}{name[1:]}s {idx}/{total}"] + progress = idx / total + self.task.send_progress(message, progress=progress) + def _reindex_single_video(self, youtube_id): """wrapper to handle channel name changes""" try: @@ -511,7 +520,7 @@ class ChannelFullScan: print(f"{video_id}: no remote match found") continue - expected_type = remote_match[0][-1].value + expected_type = remote_match[0][-1] if video["vid_type"] != expected_type: self.to_update.append( { diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index 684c517b..9c04fb0f 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -114,6 +114,10 @@ class RedisQueue(RedisBase): all_elements = [i.decode() for i in result] return all_elements + def length(self): + """return total elements in list""" + return self.conn.execute_command("LLEN", self.key) + def in_queue(self, element): """check if element is in list""" result = self.conn.execute_command("LPOS", self.key, element) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index d65cddb0..92759e24 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -63,7 +63,7 @@ class BaseTask(Task): "group": "message:download:subscribe", }, "check_reindex": { - "title": "Reindex old documents", + "title": "Reindex Documents", "group": "message:settings:reindex", }, "manual_import": { @@ -187,12 +187,13 @@ def extrac_dl(self, youtube_ids): pending_handler.add_to_pending() -@shared_task(bind=True, name="check_reindex") +@shared_task(bind=True, name="check_reindex", base=BaseTask) def check_reindex(self, data=False, extract_videos=False): """run the reindex main command""" if data: # started from frontend through API print(f"[task][{self.name}] reindex {data}") + self.send_progress("Add items to the reindex Queue.") ReindexManual(extract_videos=extract_videos).extract_data(data) manager = TaskManager() @@ -204,9 +205,10 @@ def check_reindex(self, data=False, extract_videos=False): if not data: # started from scheduler print(f"[task][{self.name}] reindex outdated documents") + self.send_progress("Add outdated documents to the reindex Queue.") ReindexOutdated().add_outdated() - Reindex().reindex_all() + Reindex(task=self).reindex_all() @shared_task(bind=True, name="manual_import")