tubearchivist/tubearchivist/home/src/ta/ta_redis.py

172 lines
4.9 KiB
Python
Raw Normal View History

"""
functionality:
- interact with redis
- hold temporary download queue in redis
"""
2022-01-22 15:13:37 +00:00
import json
import os
import redis
from home.src.ta.helper import ignore_filelist
class RedisArchivist:
"""collection of methods to interact with redis"""
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT") or 6379
NAME_SPACE = "ta:"
CHANNELS = [
"download",
"add",
"rescan",
"subchannel",
"subplaylist",
"playlistscan",
"setting",
]
def __init__(self):
self.redis_connection = redis.Redis(
host=self.REDIS_HOST, port=self.REDIS_PORT
)
def set_message(self, key, message, expire=True):
"""write new message to redis"""
self.redis_connection.execute_command(
"JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message)
)
if expire:
if isinstance(expire, bool):
secs = 20
else:
secs = expire
self.redis_connection.execute_command(
"EXPIRE", self.NAME_SPACE + key, secs
)
def get_message(self, key):
"""get message dict from redis"""
reply = self.redis_connection.execute_command(
"JSON.GET", self.NAME_SPACE + key
)
if reply:
json_str = json.loads(reply)
else:
json_str = {"status": False}
return json_str
2022-02-24 11:55:18 +00:00
def list_items(self, query):
"""list all matches"""
reply = self.redis_connection.execute_command(
"KEYS", self.NAME_SPACE + query + "*"
)
all_matches = [i.decode().lstrip(self.NAME_SPACE) for i in reply]
all_results = []
for match in all_matches:
json_str = self.get_message(match)
all_results.append(json_str)
return all_results
2022-01-22 15:13:37 +00:00
def del_message(self, key):
"""delete key from redis"""
response = self.redis_connection.execute_command(
"DEL", self.NAME_SPACE + key
)
return response
def get_lock(self, lock_key):
"""handle lock for task management"""
redis_lock = self.redis_connection.lock(self.NAME_SPACE + lock_key)
return redis_lock
def get_progress(self):
"""get a list of all progress messages"""
all_messages = []
for channel in self.CHANNELS:
key = "message:" + channel
reply = self.redis_connection.execute_command(
"JSON.GET", self.NAME_SPACE + key
)
if reply:
json_str = json.loads(reply)
all_messages.append(json_str)
return all_messages
@staticmethod
def monitor_cache_dir(cache_dir):
"""
look at download cache dir directly as alternative progress info
"""
dl_cache = os.path.join(cache_dir, "download")
all_cache_file = os.listdir(dl_cache)
cache_file = ignore_filelist(all_cache_file)
if cache_file:
filename = cache_file[0][12:].replace("_", " ").split(".")[0]
mess_dict = {
"status": "message:download",
"level": "info",
"title": "Downloading: " + filename,
"message": "",
}
else:
return False
return mess_dict
class RedisQueue:
"""dynamically interact with the download queue in redis"""
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT")
NAME_SPACE = "ta:"
if not REDIS_PORT:
REDIS_PORT = 6379
def __init__(self, key):
self.key = self.NAME_SPACE + key
self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT)
def get_all(self):
"""return all elements in list"""
result = self.conn.execute_command("LRANGE", self.key, 0, -1)
all_elements = [i.decode() for i in result]
return all_elements
def add_list(self, to_add):
"""add list to queue"""
self.conn.execute_command("RPUSH", self.key, *to_add)
def add_priority(self, to_add):
"""add single video to front of queue"""
self.clear_item(to_add)
self.conn.execute_command("LPUSH", self.key, to_add)
def get_next(self):
"""return next element in the queue, False if none"""
result = self.conn.execute_command("LPOP", self.key)
if not result:
return False
next_element = result.decode()
return next_element
def clear(self):
"""delete list from redis"""
self.conn.execute_command("DEL", self.key)
def clear_item(self, to_clear):
"""remove single item from list if it's there"""
self.conn.execute_command("LREM", self.key, 0, to_clear)
def trim(self, size):
"""trim the queue based on settings amount"""
self.conn.execute_command("LTRIM", self.key, 0, size)