diff --git a/tubearchivist/config/management/commands/ta_startup.py b/tubearchivist/config/management/commands/ta_startup.py index 75a5d02..a4e1169 100644 --- a/tubearchivist/config/management/commands/ta_startup.py +++ b/tubearchivist/config/management/commands/ta_startup.py @@ -36,7 +36,7 @@ class Command(BaseCommand): self.stdout.write(TOPIC) self._sync_redis_state() self._make_folders() - self._release_locks() + self._clear_redis_keys() self._clear_tasks() self._clear_dl_cache() self._version_check() @@ -73,10 +73,10 @@ class Command(BaseCommand): self.stdout.write(self.style.SUCCESS(" ✓ expected folders created")) - def _release_locks(self): - """make sure there are no leftover locks set in redis""" - self.stdout.write("[3] clear leftover locks in redis") - all_locks = [ + def _clear_redis_keys(self): + """make sure there are no leftover locks or keys set in redis""" + self.stdout.write("[3] clear leftover keys in redis") + all_keys = [ "dl_queue_id", "dl_queue", "downloading", @@ -85,19 +85,22 @@ class Command(BaseCommand): "rescan", "run_backup", "startup_check", + "reindex:ta_video", + "reindex:ta_channel", + "reindex:ta_playlist", ] redis_con = RedisArchivist() has_changed = False - for lock in all_locks: - if redis_con.del_message(lock): + for key in all_keys: + if redis_con.del_message(key): self.stdout.write( - self.style.SUCCESS(f" ✓ cleared lock {lock}") + self.style.SUCCESS(f" ✓ cleared key {key}") ) has_changed = True if not has_changed: - self.stdout.write(self.style.SUCCESS(" no locks found")) + self.stdout.write(self.style.SUCCESS(" no keys found")) def _clear_tasks(self): """clear tasks and messages""" diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index efcdb9b..6bf2c95 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -50,7 +50,7 @@ class DownloadPostProcess: return print(f"auto delete older than {autodelete_days} days") - now_lte = self.now - autodelete_days * 24 * 60 * 60 + now_lte = str(self.now - autodelete_days * 24 * 60 * 60) data = { "query": {"range": {"player.watched_date": {"lte": now_lte}}}, "sort": [{"player.watched_date": {"order": "asc"}}], @@ -63,7 +63,7 @@ class DownloadPostProcess: if "autodelete_days" in value: autodelete_days = value.get("autodelete_days") print(f"{channel_id}: delete older than {autodelete_days}d") - now_lte = self.now - autodelete_days * 24 * 60 * 60 + now_lte = str(self.now - autodelete_days * 24 * 60 * 60) must_list = [ {"range": {"player.watched_date": {"lte": now_lte}}}, {"term": {"channel.channel_id": {"value": channel_id}}}, diff --git a/tubearchivist/home/src/index/channel.py b/tubearchivist/home/src/index/channel.py index 5794ea9..273efff 100644 --- a/tubearchivist/home/src/index/channel.py +++ b/tubearchivist/home/src/index/channel.py @@ -31,8 +31,8 @@ class YoutubeChannel(YouTubeItem): self.task = task def build_yt_url(self): - """build youtube url""" - return f"{self.yt_base}{self.youtube_id}/featured" + """overwrite base to use channel about page""" + return f"{self.yt_base}{self.youtube_id}/about" def build_json(self, upload=False, fallback=False): """get from es or from youtube""" diff --git a/tubearchivist/home/src/index/comments.py b/tubearchivist/home/src/index/comments.py index 50e0f17..84e6cdd 100644 --- a/tubearchivist/home/src/index/comments.py +++ b/tubearchivist/home/src/index/comments.py @@ -126,7 +126,7 @@ class Comments: "comment_author_id": comment["author_id"], "comment_author_thumbnail": comment["author_thumbnail"], "comment_author_is_uploader": comment.get( - "comment_author_is_uploader", False + "author_is_uploader", False ), "comment_parent": comment["parent"], } diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index 53b62d1..224e542 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -243,7 +243,7 @@ class Reindex(ReindexBase): return for name, index_config in self.REINDEX_CONFIG.items(): - if not RedisQueue(index_config["queue_name"]).has_item(): + if not RedisQueue(index_config["queue_name"]).length(): continue self.total = RedisQueue(index_config["queue_name"]).length() diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index de821fb..dd9ac51 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -20,6 +20,7 @@ class RedisBase: self.conn = redis.Redis( host=EnvironmentSettings.REDIS_HOST, port=EnvironmentSettings.REDIS_PORT, + decode_responses=True, ) @@ -82,7 +83,7 @@ class RedisArchivist(RedisBase): if not reply: return [] - return [i.decode().lstrip(self.NAME_SPACE) for i in reply] + return [i.lstrip(self.NAME_SPACE) for i in reply] def list_items(self, query: str) -> list: """list all matches""" @@ -99,65 +100,49 @@ class RedisArchivist(RedisBase): class RedisQueue(RedisBase): - """dynamically interact with queues in redis""" + """ + dynamically interact with queues in redis using sorted set + - low score number is first in queue + - add new items with high score number + """ def __init__(self, queue_name: str): super().__init__() self.key = f"{self.NAME_SPACE}{queue_name}" - def get_all(self): + def get_all(self) -> list[str]: """return all elements in list""" - result = self.conn.execute_command("LRANGE", self.key, 0, -1) - all_elements = [i.decode() for i in result] - return all_elements + result = self.conn.zrange(self.key, 0, -1) + return result def length(self) -> int: """return total elements in list""" - return self.conn.execute_command("LLEN", self.key) + return self.conn.zcard(self.key) def in_queue(self, element) -> str | bool: """check if element is in list""" - result = self.conn.execute_command("LPOS", self.key, element) + result = self.conn.zrank(self.key, element) if result is not None: return "in_queue" return False - def add_list(self, to_add): + def add_list(self, to_add: list) -> None: """add list to queue""" - self.conn.execute_command("RPUSH", self.key, *to_add) - - def add_priority(self, to_add: str) -> None: - """add single video to front of queue""" - item: str = json.dumps(to_add) - self.clear_item(item) - self.conn.execute_command("LPUSH", self.key, item) + mapping = {i: "+inf" for i in to_add} + self.conn.zadd(self.key, mapping) def get_next(self) -> str | bool: - """return next element in the queue, False if none""" - result = self.conn.execute_command("LPOP", self.key) + """return next element in the queue, if available""" + result = self.conn.zpopmin(self.key) if not result: return False - next_element = result.decode() - return next_element + return result[0][0] def clear(self) -> None: """delete list from redis""" - self.conn.execute_command("DEL", self.key) - - def clear_item(self, to_clear: str) -> None: - """remove single item from list if it's there""" - self.conn.execute_command("LREM", self.key, 0, to_clear) - - def trim(self, size: int) -> None: - """trim the queue based on settings amount""" - self.conn.execute_command("LTRIM", self.key, 0, size) - - def has_item(self) -> bool: - """check if queue as at least one pending item""" - result = self.conn.execute_command("LRANGE", self.key, 0, 0) - return bool(result) + self.conn.delete(self.key) class TaskRedis(RedisBase): @@ -170,7 +155,7 @@ class TaskRedis(RedisBase): def get_all(self) -> list: """return all tasks""" all_keys = self.conn.execute_command("KEYS", f"{self.BASE}*") - return [i.decode().replace(self.BASE, "") for i in all_keys] + return [i.replace(self.BASE, "") for i in all_keys] def get_single(self, task_id: str) -> dict: """return content of single task""" @@ -178,7 +163,7 @@ class TaskRedis(RedisBase): if not result: return {} - return json.loads(result.decode()) + return json.loads(result) def set_key( self, task_id: str, message: dict, expire: bool | int = False diff --git a/tubearchivist/home/templates/home/base.html b/tubearchivist/home/templates/home/base.html index 40211f2..63e2286 100644 --- a/tubearchivist/home/templates/home/base.html +++ b/tubearchivist/home/templates/home/base.html @@ -33,11 +33,9 @@