refactor redis connection, fix sync_redis_state task setup issue

This commit is contained in:
simon 2022-03-22 17:50:54 +07:00
parent fa25a56126
commit 9f5c9b17a5
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
4 changed files with 29 additions and 40 deletions

View File

@ -162,7 +162,7 @@ class VideoDownloader:
pending.get_channels() pending.get_channels()
self.video_overwrites = pending.video_overwrites self.video_overwrites = pending.video_overwrites
queue = RedisQueue("dl_queue") queue = RedisQueue()
limit_queue = self.config["downloads"]["limit_count"] limit_queue = self.config["downloads"]["limit_count"]
if limit_queue: if limit_queue:
@ -212,8 +212,7 @@ class VideoDownloader:
RedisArchivist().set_message("message:download", mess_dict) RedisArchivist().set_message("message:download", mess_dict)
return return
queue = RedisQueue("dl_queue") RedisQueue().add_list(to_add)
queue.add_list(to_add)
@staticmethod @staticmethod
def _progress_hook(response): def _progress_hook(response):

View File

@ -114,7 +114,7 @@ class PostData:
print(f"ignore video {video_id}") print(f"ignore video {video_id}")
PendingInteract(video_id=video_id, status="ignore").update_status() PendingInteract(video_id=video_id, status="ignore").update_status()
# also clear from redis queue # also clear from redis queue
RedisQueue("dl_queue").clear_item(video_id) RedisQueue().clear_item(video_id)
return {"success": True} return {"success": True}
@staticmethod @staticmethod
@ -132,7 +132,7 @@ class PostData:
to_execute = self.exec_val to_execute = self.exec_val
if to_execute == "stop": if to_execute == "stop":
print("stopping download queue") print("stopping download queue")
RedisQueue("dl_queue").clear() RedisQueue().clear()
elif to_execute == "kill": elif to_execute == "kill":
task_id = RedisArchivist().get_message("dl_queue_id") task_id = RedisArchivist().get_message("dl_queue_id")
if not isinstance(task_id, str): if not isinstance(task_id, str):

View File

@ -11,12 +11,20 @@ import redis
from home.src.ta.helper import ignore_filelist from home.src.ta.helper import ignore_filelist
class RedisArchivist: class RedisBase:
"""collection of methods to interact with redis""" """connection base for redis"""
REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 REDIS_PORT = os.environ.get("REDIS_PORT") or 6379
NAME_SPACE = "ta:" NAME_SPACE = "ta:"
def __init__(self):
self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT)
class RedisArchivist(RedisBase):
"""collection of methods to interact with redis"""
CHANNELS = [ CHANNELS = [
"download", "download",
"add", "add",
@ -27,14 +35,9 @@ class RedisArchivist:
"setting", "setting",
] ]
def __init__(self):
self.redis_connection = redis.Redis(
host=self.REDIS_HOST, port=self.REDIS_PORT
)
def set_message(self, key, message, expire=True): def set_message(self, key, message, expire=True):
"""write new message to redis""" """write new message to redis"""
self.redis_connection.execute_command( self.conn.execute_command(
"JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message) "JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message)
) )
@ -43,15 +46,11 @@ class RedisArchivist:
secs = 20 secs = 20
else: else:
secs = expire secs = expire
self.redis_connection.execute_command( self.conn.execute_command("EXPIRE", self.NAME_SPACE + key, secs)
"EXPIRE", self.NAME_SPACE + key, secs
)
def get_message(self, key): def get_message(self, key):
"""get message dict from redis""" """get message dict from redis"""
reply = self.redis_connection.execute_command( reply = self.conn.execute_command("JSON.GET", self.NAME_SPACE + key)
"JSON.GET", self.NAME_SPACE + key
)
if reply: if reply:
json_str = json.loads(reply) json_str = json.loads(reply)
else: else:
@ -61,7 +60,7 @@ class RedisArchivist:
def list_items(self, query): def list_items(self, query):
"""list all matches""" """list all matches"""
reply = self.redis_connection.execute_command( reply = self.conn.execute_command(
"KEYS", self.NAME_SPACE + query + "*" "KEYS", self.NAME_SPACE + query + "*"
) )
all_matches = [i.decode().lstrip(self.NAME_SPACE) for i in reply] all_matches = [i.decode().lstrip(self.NAME_SPACE) for i in reply]
@ -74,14 +73,12 @@ class RedisArchivist:
def del_message(self, key): def del_message(self, key):
"""delete key from redis""" """delete key from redis"""
response = self.redis_connection.execute_command( response = self.conn.execute_command("DEL", self.NAME_SPACE + key)
"DEL", self.NAME_SPACE + key
)
return response return response
def get_lock(self, lock_key): def get_lock(self, lock_key):
"""handle lock for task management""" """handle lock for task management"""
redis_lock = self.redis_connection.lock(self.NAME_SPACE + lock_key) redis_lock = self.conn.lock(self.NAME_SPACE + lock_key)
return redis_lock return redis_lock
def get_progress(self): def get_progress(self):
@ -89,7 +86,7 @@ class RedisArchivist:
all_messages = [] all_messages = []
for channel in self.CHANNELS: for channel in self.CHANNELS:
key = "message:" + channel key = "message:" + channel
reply = self.redis_connection.execute_command( reply = self.conn.execute_command(
"JSON.GET", self.NAME_SPACE + key "JSON.GET", self.NAME_SPACE + key
) )
if reply: if reply:
@ -120,19 +117,12 @@ class RedisArchivist:
return mess_dict return mess_dict
class RedisQueue: class RedisQueue(RedisBase):
"""dynamically interact with the download queue in redis""" """dynamically interact with the download queue in redis"""
REDIS_HOST = os.environ.get("REDIS_HOST") def __init__(self):
REDIS_PORT = os.environ.get("REDIS_PORT") super().__init__()
NAME_SPACE = "ta:" self.key = self.NAME_SPACE + "dl_queue"
if not REDIS_PORT:
REDIS_PORT = 6379
def __init__(self, key):
self.key = self.NAME_SPACE + key
self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT)
def get_all(self): def get_all(self):
"""return all elements in list""" """return all elements in list"""

View File

@ -8,8 +8,8 @@ Functionality:
import os import os
import home.apps as startup_apps
from celery import Celery, shared_task from celery import Celery, shared_task
from home.apps import StartupCheck
from home.src.download.queue import PendingList from home.src.download.queue import PendingList
from home.src.download.subscriptions import ( from home.src.download.subscriptions import (
ChannelSubscription, ChannelSubscription,
@ -98,7 +98,7 @@ def download_pending():
@shared_task @shared_task
def download_single(youtube_id): def download_single(youtube_id):
"""start download single video now""" """start download single video now"""
queue = RedisQueue("dl_queue") queue = RedisQueue()
queue.add_priority(youtube_id) queue.add_priority(youtube_id)
print("Added to queue with priority: " + youtube_id) print("Added to queue with priority: " + youtube_id)
# start queue if needed # start queue if needed
@ -181,7 +181,7 @@ def kill_dl(task_id):
app.control.revoke(task_id, terminate=True) app.control.revoke(task_id, terminate=True)
_ = RedisArchivist().del_message("dl_queue_id") _ = RedisArchivist().del_message("dl_queue_id")
RedisQueue("dl_queue").clear() RedisQueue().clear()
# clear cache # clear cache
cache_dir = os.path.join(CONFIG["application"]["cache_dir"], "download") cache_dir = os.path.join(CONFIG["application"]["cache_dir"], "download")
@ -274,5 +274,5 @@ try:
app.conf.beat_schedule = ScheduleBuilder().build_schedule() app.conf.beat_schedule = ScheduleBuilder().build_schedule()
except KeyError: except KeyError:
# update path from v0.0.8 to v0.0.9 to load new defaults # update path from v0.0.8 to v0.0.9 to load new defaults
startup_apps.sync_redis_state() StartupCheck().sync_redis_state()
app.conf.beat_schedule = ScheduleBuilder().build_schedule() app.conf.beat_schedule = ScheduleBuilder().build_schedule()