refactor RedisQueue to use sorted set

This commit is contained in:
Simon 2023-12-17 11:22:26 +07:00
parent 8870782a6e
commit 789c35e2b5
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
3 changed files with 31 additions and 40 deletions

View File

@ -36,7 +36,7 @@ class Command(BaseCommand):
self.stdout.write(TOPIC)
self._sync_redis_state()
self._make_folders()
self._release_locks()
self._clear_redis_keys()
self._clear_tasks()
self._clear_dl_cache()
self._version_check()
@ -73,10 +73,10 @@ class Command(BaseCommand):
self.stdout.write(self.style.SUCCESS(" ✓ expected folders created"))
def _release_locks(self):
"""make sure there are no leftover locks set in redis"""
self.stdout.write("[3] clear leftover locks in redis")
all_locks = [
def _clear_redis_keys(self):
"""make sure there are no leftover locks or keys set in redis"""
self.stdout.write("[3] clear leftover keys in redis")
all_keys = [
"dl_queue_id",
"dl_queue",
"downloading",
@ -85,19 +85,22 @@ class Command(BaseCommand):
"rescan",
"run_backup",
"startup_check",
"reindex:ta_video",
"reindex:ta_channel",
"reindex:ta_playlist",
]
redis_con = RedisArchivist()
has_changed = False
for lock in all_locks:
if redis_con.del_message(lock):
for key in all_keys:
if redis_con.del_message(key):
self.stdout.write(
self.style.SUCCESS(f" ✓ cleared lock {lock}")
self.style.SUCCESS(f" ✓ cleared key {key}")
)
has_changed = True
if not has_changed:
self.stdout.write(self.style.SUCCESS(" no locks found"))
self.stdout.write(self.style.SUCCESS(" no keys found"))
def _clear_tasks(self):
"""clear tasks and messages"""

View File

@ -243,7 +243,7 @@ class Reindex(ReindexBase):
return
for name, index_config in self.REINDEX_CONFIG.items():
if not RedisQueue(index_config["queue_name"]).has_item():
if not RedisQueue(index_config["queue_name"]).length():
continue
self.total = RedisQueue(index_config["queue_name"]).length()

View File

@ -100,61 +100,49 @@ class RedisArchivist(RedisBase):
class RedisQueue(RedisBase):
"""dynamically interact with queues in redis"""
"""
dynamically interact with queues in redis using sorted set
- low score number is first in queue
- add new items with high score number
"""
def __init__(self, queue_name: str):
super().__init__()
self.key = f"{self.NAME_SPACE}{queue_name}"
def get_all(self):
def get_all(self) -> list[str]:
"""return all elements in list"""
result = self.conn.execute_command("LRANGE", self.key, 0, -1)
result = self.conn.zrange(self.key, 0, -1)
return result
def length(self) -> int:
"""return total elements in list"""
return self.conn.execute_command("LLEN", self.key)
return self.conn.zcard(self.key)
def in_queue(self, element) -> str | bool:
"""check if element is in list"""
result = self.conn.execute_command("LPOS", self.key, element)
result = self.conn.zrank(self.key, element)
if result is not None:
return "in_queue"
return False
def add_list(self, to_add):
def add_list(self, to_add: list) -> None:
"""add list to queue"""
self.conn.execute_command("RPUSH", self.key, *to_add)
def add_priority(self, to_add: str) -> None:
"""add single video to front of queue"""
item: str = json.dumps(to_add)
self.clear_item(item)
self.conn.execute_command("LPUSH", self.key, item)
mapping = {i: "+inf" for i in to_add}
self.conn.zadd(self.key, mapping)
def get_next(self) -> str | bool:
"""return next element in the queue, False if none"""
result = self.conn.execute_command("LPOP", self.key)
"""return next element in the queue, if available"""
result = self.conn.zpopmin(self.key)
if not result:
return False
return result
return result[0][0]
def clear(self) -> None:
"""delete list from redis"""
self.conn.execute_command("DEL", self.key)
def clear_item(self, to_clear: str) -> None:
"""remove single item from list if it's there"""
self.conn.execute_command("LREM", self.key, 0, to_clear)
def trim(self, size: int) -> None:
"""trim the queue based on settings amount"""
self.conn.execute_command("LTRIM", self.key, 0, size)
def has_item(self) -> bool:
"""check if queue as at least one pending item"""
result = self.conn.execute_command("LRANGE", self.key, 0, 0)
return bool(result)
self.conn.delete(self.key)
class TaskRedis(RedisBase):