diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index e540426..4c9e619 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -162,7 +162,7 @@ class VideoDownloader: pending.get_channels() self.video_overwrites = pending.video_overwrites - queue = RedisQueue("dl_queue") + queue = RedisQueue() limit_queue = self.config["downloads"]["limit_count"] if limit_queue: @@ -212,8 +212,7 @@ class VideoDownloader: RedisArchivist().set_message("message:download", mess_dict) return - queue = RedisQueue("dl_queue") - queue.add_list(to_add) + RedisQueue().add_list(to_add) @staticmethod def _progress_hook(response): diff --git a/tubearchivist/home/src/frontend/api_calls.py b/tubearchivist/home/src/frontend/api_calls.py index f7f50e7..26b1730 100644 --- a/tubearchivist/home/src/frontend/api_calls.py +++ b/tubearchivist/home/src/frontend/api_calls.py @@ -114,7 +114,7 @@ class PostData: print(f"ignore video {video_id}") PendingInteract(video_id=video_id, status="ignore").update_status() # also clear from redis queue - RedisQueue("dl_queue").clear_item(video_id) + RedisQueue().clear_item(video_id) return {"success": True} @staticmethod @@ -132,7 +132,7 @@ class PostData: to_execute = self.exec_val if to_execute == "stop": print("stopping download queue") - RedisQueue("dl_queue").clear() + RedisQueue().clear() elif to_execute == "kill": task_id = RedisArchivist().get_message("dl_queue_id") if not isinstance(task_id, str): diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index 6a9efea..0e9c23f 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -11,12 +11,20 @@ import redis from home.src.ta.helper import ignore_filelist -class RedisArchivist: - """collection of methods to interact with redis""" +class RedisBase: + """connection base for redis""" REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 NAME_SPACE = "ta:" + + def __init__(self): + self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) + + +class RedisArchivist(RedisBase): + """collection of methods to interact with redis""" + CHANNELS = [ "download", "add", @@ -27,14 +35,9 @@ class RedisArchivist: "setting", ] - def __init__(self): - self.redis_connection = redis.Redis( - host=self.REDIS_HOST, port=self.REDIS_PORT - ) - def set_message(self, key, message, expire=True): """write new message to redis""" - self.redis_connection.execute_command( + self.conn.execute_command( "JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message) ) @@ -43,15 +46,11 @@ class RedisArchivist: secs = 20 else: secs = expire - self.redis_connection.execute_command( - "EXPIRE", self.NAME_SPACE + key, secs - ) + self.conn.execute_command("EXPIRE", self.NAME_SPACE + key, secs) def get_message(self, key): """get message dict from redis""" - reply = self.redis_connection.execute_command( - "JSON.GET", self.NAME_SPACE + key - ) + reply = self.conn.execute_command("JSON.GET", self.NAME_SPACE + key) if reply: json_str = json.loads(reply) else: @@ -61,7 +60,7 @@ class RedisArchivist: def list_items(self, query): """list all matches""" - reply = self.redis_connection.execute_command( + reply = self.conn.execute_command( "KEYS", self.NAME_SPACE + query + "*" ) all_matches = [i.decode().lstrip(self.NAME_SPACE) for i in reply] @@ -74,14 +73,12 @@ class RedisArchivist: def del_message(self, key): """delete key from redis""" - response = self.redis_connection.execute_command( - "DEL", self.NAME_SPACE + key - ) + response = self.conn.execute_command("DEL", self.NAME_SPACE + key) return response def get_lock(self, lock_key): """handle lock for task management""" - redis_lock = self.redis_connection.lock(self.NAME_SPACE + lock_key) + redis_lock = self.conn.lock(self.NAME_SPACE + lock_key) return redis_lock def get_progress(self): @@ -89,7 +86,7 @@ class RedisArchivist: all_messages = [] for channel in self.CHANNELS: key = "message:" + channel - reply = self.redis_connection.execute_command( + reply = self.conn.execute_command( "JSON.GET", self.NAME_SPACE + key ) if reply: @@ -120,19 +117,12 @@ class RedisArchivist: return mess_dict -class RedisQueue: +class RedisQueue(RedisBase): """dynamically interact with the download queue in redis""" - REDIS_HOST = os.environ.get("REDIS_HOST") - REDIS_PORT = os.environ.get("REDIS_PORT") - NAME_SPACE = "ta:" - - if not REDIS_PORT: - REDIS_PORT = 6379 - - def __init__(self, key): - self.key = self.NAME_SPACE + key - self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) + def __init__(self): + super().__init__() + self.key = self.NAME_SPACE + "dl_queue" def get_all(self): """return all elements in list""" diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index b0f86fd..b8f419b 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -8,8 +8,8 @@ Functionality: import os -import home.apps as startup_apps from celery import Celery, shared_task +from home.apps import StartupCheck from home.src.download.queue import PendingList from home.src.download.subscriptions import ( ChannelSubscription, @@ -98,7 +98,7 @@ def download_pending(): @shared_task def download_single(youtube_id): """start download single video now""" - queue = RedisQueue("dl_queue") + queue = RedisQueue() queue.add_priority(youtube_id) print("Added to queue with priority: " + youtube_id) # start queue if needed @@ -181,7 +181,7 @@ def kill_dl(task_id): app.control.revoke(task_id, terminate=True) _ = RedisArchivist().del_message("dl_queue_id") - RedisQueue("dl_queue").clear() + RedisQueue().clear() # clear cache cache_dir = os.path.join(CONFIG["application"]["cache_dir"], "download") @@ -274,5 +274,5 @@ try: app.conf.beat_schedule = ScheduleBuilder().build_schedule() except KeyError: # update path from v0.0.8 to v0.0.9 to load new defaults - startup_apps.sync_redis_state() + StartupCheck().sync_redis_state() app.conf.beat_schedule = ScheduleBuilder().build_schedule()