namespace redis keys to ta:

This commit is contained in:
simon 2021-10-27 18:07:35 +07:00
parent 78561981ad
commit ebd8368856
2 changed files with 16 additions and 8 deletions

View File

@ -85,6 +85,7 @@ class RedisArchivist:
REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT") REDIS_PORT = os.environ.get("REDIS_PORT")
NAME_SPACE = "ta:"
if not REDIS_PORT: if not REDIS_PORT:
REDIS_PORT = 6379 REDIS_PORT = 6379
@ -97,15 +98,19 @@ class RedisArchivist:
def set_message(self, key, message, expire=True): def set_message(self, key, message, expire=True):
"""write new message to redis""" """write new message to redis"""
self.redis_connection.execute_command( self.redis_connection.execute_command(
"JSON.SET", key, ".", json.dumps(message) "JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message)
) )
if expire: if expire:
self.redis_connection.execute_command("EXPIRE", key, 20) self.redis_connection.execute_command(
"EXPIRE", self.NAME_SPACE + key, 20
)
def get_message(self, key): def get_message(self, key):
"""get message dict from redis""" """get message dict from redis"""
reply = self.redis_connection.execute_command("JSON.GET", key) reply = self.redis_connection.execute_command(
"JSON.GET", self.NAME_SPACE + key
)
if reply: if reply:
json_str = json.loads(reply) json_str = json.loads(reply)
else: else:
@ -115,18 +120,20 @@ class RedisArchivist:
def del_message(self, key): def del_message(self, key):
"""delete key from redis""" """delete key from redis"""
response = self.redis_connection.execute_command("DEL", key) response = self.redis_connection.execute_command(
"DEL", self.NAME_SPACE + key
)
return response return response
def get_lock(self, lock_key): def get_lock(self, lock_key):
"""handle lock for task management""" """handle lock for task management"""
redis_lock = self.redis_connection.lock(lock_key) redis_lock = self.redis_connection.lock(self.NAME_SPACE + lock_key)
return redis_lock return redis_lock
def get_dl_message(self, cache_dir): def get_dl_message(self, cache_dir):
"""get latest download progress message if available""" """get latest download progress message if available"""
reply = self.redis_connection.execute_command( reply = self.redis_connection.execute_command(
"JSON.GET", "progress:download" "JSON.GET", self.NAME_SPACE + "progress:download"
) )
if reply: if reply:
json_str = json.loads(reply) json_str = json.loads(reply)
@ -164,12 +171,13 @@ class RedisQueue:
REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT") REDIS_PORT = os.environ.get("REDIS_PORT")
NAME_SPACE = "ta:"
if not REDIS_PORT: if not REDIS_PORT:
REDIS_PORT = 6379 REDIS_PORT = 6379
def __init__(self, key): def __init__(self, key):
self.key = key self.key = self.NAME_SPACE + key
self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT)
def get_all(self): def get_all(self):

View File

@ -27,7 +27,7 @@ if not REDIS_PORT:
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "home.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "home.settings")
app = Celery("tasks", broker=f"redis://{REDIS_HOST}:{REDIS_PORT}") app = Celery("tasks", broker=f"redis://{REDIS_HOST}:{REDIS_PORT}")
app.config_from_object("django.conf:settings", namespace="CELERY") app.config_from_object("django.conf:settings", namespace="ta:")
app.autodiscover_tasks() app.autodiscover_tasks()