From f5a56fca8637f40adbf18ad02230a49a6556dadd Mon Sep 17 00:00:00 2001 From: simon Date: Tue, 28 Sep 2021 10:33:00 +0700 Subject: [PATCH] refactored redis functions into dedicated RedisArchivist class --- tubearchivist/home/apps.py | 13 ++-- tubearchivist/home/src/config.py | 8 +- tubearchivist/home/src/download.py | 12 +-- tubearchivist/home/src/helper.py | 119 +++++++++++++++-------------- tubearchivist/home/src/reindex.py | 9 +-- tubearchivist/home/tasks.py | 12 +-- tubearchivist/home/views.py | 34 ++++----- 7 files changed, 107 insertions(+), 100 deletions(-) diff --git a/tubearchivist/home/apps.py b/tubearchivist/home/apps.py index 28c504d..2e527fe 100644 --- a/tubearchivist/home/apps.py +++ b/tubearchivist/home/apps.py @@ -4,7 +4,7 @@ import os from django.apps import AppConfig from home.src.config import AppConfig as ArchivistConfig -from home.src.helper import del_message, set_message +from home.src.helper import RedisArchivist from home.src.index_management import index_check @@ -15,11 +15,14 @@ def sync_redis_state(): config_handler.load_new_defaults() config = config_handler.config sort_order = config["archive"]["sort"] - set_message("sort_order", sort_order, expire=False) + redis_archivist = RedisArchivist() + redis_archivist.set_message("sort_order", sort_order, expire=False) hide_watched = bool(int(config["archive"]["hide_watched"])) - set_message("hide_watched", hide_watched, expire=False) + redis_archivist.set_message("hide_watched", hide_watched, expire=False) show_subed_only = bool(int(config["archive"]["show_subed_only"])) - set_message("show_subed_only", show_subed_only, expire=False) + redis_archivist.set_message( + "show_subed_only", show_subed_only, expire=False + ) def make_folders(): @@ -39,7 +42,7 @@ def release_lock(): """make sure there are no leftover locks set in redis on container start""" all_locks = ["manual_import", "downloading", "dl_queue", "dl_queue_id"] for lock in all_locks: - response = del_message(lock) + response = RedisArchivist().del_message(lock) if response: print("deleted leftover key from redis: " + lock) diff --git a/tubearchivist/home/src/config.py b/tubearchivist/home/src/config.py index 4485882..4c078e4 100644 --- a/tubearchivist/home/src/config.py +++ b/tubearchivist/home/src/config.py @@ -8,7 +8,7 @@ Functionality: import json import os -from home.src.helper import get_message, set_message +from home.src.helper import RedisArchivist class AppConfig: @@ -51,7 +51,7 @@ class AppConfig: @staticmethod def get_config_redis(): """read config json set from redis to overwrite defaults""" - config = get_message("config") + config = RedisArchivist().get_message("config") if not list(config.values())[0]: return False @@ -73,7 +73,7 @@ class AppConfig: config_dict, config_value = key.split(".") config[config_dict][config_value] = to_write - set_message("config", config, expire=False) + RedisArchivist().set_message("config", config, expire=False) def load_new_defaults(self): """check config.json for missing defaults""" @@ -100,4 +100,4 @@ class AppConfig: needs_update = True if needs_update: - set_message("config", redis_config, expire=False) + RedisArchivist().set_message("config", redis_config, expire=False) diff --git a/tubearchivist/home/src/download.py b/tubearchivist/home/src/download.py index 134d891..6b217f5 100644 --- a/tubearchivist/home/src/download.py +++ b/tubearchivist/home/src/download.py @@ -16,10 +16,10 @@ import yt_dlp as youtube_dl from home.src.config import AppConfig from home.src.helper import ( DurationConverter, + RedisArchivist, RedisQueue, clean_string, ignore_filelist, - set_message, ) from home.src.index import YoutubeChannel, index_new_video @@ -43,7 +43,7 @@ class PendingList: "title": "Adding to download queue.", "message": "Extracting lists", } - set_message("progress:download", mess_dict) + RedisArchivist().set_message("progress:download", mess_dict) # extract url = entry["url"] url_type = entry["type"] @@ -98,7 +98,7 @@ class PendingList: "title": "Adding to download queue.", "message": "Processing IDs...", } - set_message("progress:download", mess_dict) + RedisArchivist().set_message("progress:download", mess_dict) # add last newline bulk_list.append("\n") query_str = "\n".join(bulk_list) @@ -264,7 +264,7 @@ class PendingList: "title": "Added to ignore list", "message": "", } - set_message("progress:download", mess_dict) + RedisArchivist().set_message("progress:download", mess_dict) if not request.ok: print(request) @@ -350,7 +350,7 @@ class ChannelSubscription: for channel in all_channels: channel_id = channel["channel_id"] last_videos = self.get_last_youtube_videos(channel_id) - set_message( + RedisArchivist().set_message( "progress:download", { "status": "rescan", @@ -468,7 +468,7 @@ class VideoDownloader: "title": title, "message": message, } - set_message("progress:download", mess_dict) + RedisArchivist().set_message("progress:download", mess_dict) def dl_single_vid(self, youtube_id): """download single video""" diff --git a/tubearchivist/home/src/helper.py b/tubearchivist/home/src/helper.py index 77c676d..17b39ee 100644 --- a/tubearchivist/home/src/helper.py +++ b/tubearchivist/home/src/helper.py @@ -13,8 +13,6 @@ import unicodedata import redis import requests -REDIS_HOST = os.environ.get("REDIS_HOST") - def get_total_hits(index, es_url, match_field): """get total hits from index""" @@ -82,78 +80,87 @@ def process_url_list(url_str): return youtube_ids -def set_message(key, message, expire=True): - """write new message to redis""" - redis_connection = redis.Redis(host=REDIS_HOST) - redis_connection.execute_command("JSON.SET", key, ".", json.dumps(message)) - if expire: - redis_connection.execute_command("EXPIRE", key, 20) +class RedisArchivist: + """collection of methods to interact with redis""" + REDIS_HOST = os.environ.get("REDIS_HOST") -def get_message(key): - """get any message from JSON key""" - redis_connection = redis.Redis(host=REDIS_HOST) - reply = redis_connection.execute_command("JSON.GET", key) - if reply: - json_str = json.loads(reply) - else: - json_str = {"status": False} - return json_str + def __init__(self): + self.redis_connection = redis.Redis(host=self.REDIS_HOST) + def set_message(self, key, message, expire=True): + """write new message to redis""" + self.redis_connection.execute_command( + "JSON.SET", key, ".", json.dumps(message) + ) -def del_message(key): - """delete key from redis""" - redis_connection = redis.Redis(host=REDIS_HOST) - response = redis_connection.execute_command("DEL", key) - return response + if expire: + self.redis_connection.execute_command("EXPIRE", key, 20) + def get_message(self, key): + """get message dict from redis""" + reply = self.redis_connection.execute_command("JSON.GET", key) + if reply: + json_str = json.loads(reply) + else: + json_str = {"status": False} -def get_dl_message(cache_dir): - """get latest message if available""" - redis_connection = redis.Redis(host=REDIS_HOST) - reply = redis_connection.execute_command("JSON.GET", "progress:download") - if reply: - json_str = json.loads(reply) - elif json_str := monitor_cache_dir(cache_dir): - json_str = monitor_cache_dir(cache_dir) - else: - json_str = {"status": False} - return json_str + return json_str + def del_message(self, key): + """delete key from redis""" + response = self.redis_connection.execute_command("DEL", key) + return response -def get_lock(lock_key): - """handle lock for task management""" - redis_lock = redis.Redis(host=REDIS_HOST).lock(lock_key) - return redis_lock + def get_lock(self, lock_key): + """handle lock for task management""" + redis_lock = self.redis_connection.lock(lock_key) + return redis_lock + def get_dl_message(self, cache_dir): + """get latest download progress message if available""" + reply = self.redis_connection.execute_command( + "JSON.GET", "progress:download" + ) + if reply: + json_str = json.loads(reply) + elif json_str := self.monitor_cache_dir(cache_dir): + json_str = self.monitor_cache_dir(cache_dir) + else: + json_str = {"status": False} -def monitor_cache_dir(cache_dir): - """ - look at download cache dir directly as alternative progress info - """ - dl_cache = os.path.join(cache_dir, "download") - all_cache_file = os.listdir(dl_cache) - cache_file = ignore_filelist(all_cache_file) - if cache_file: - filename = cache_file[0][12:].replace("_", " ").split(".")[0] - mess_dict = { - "status": "downloading", - "level": "info", - "title": "Downloading: " + filename, - "message": "", - } - else: - return False + return json_str - return mess_dict + @staticmethod + def monitor_cache_dir(cache_dir): + """ + look at download cache dir directly as alternative progress info + """ + dl_cache = os.path.join(cache_dir, "download") + all_cache_file = os.listdir(dl_cache) + cache_file = ignore_filelist(all_cache_file) + if cache_file: + filename = cache_file[0][12:].replace("_", " ").split(".")[0] + mess_dict = { + "status": "downloading", + "level": "info", + "title": "Downloading: " + filename, + "message": "", + } + else: + return False + + return mess_dict class RedisQueue: """dynamically interact with the download queue in redis""" + REDIS_HOST = os.environ.get("REDIS_HOST") + def __init__(self, key): self.key = key - self.conn = redis.Redis(host=REDIS_HOST) + self.conn = redis.Redis(host=self.REDIS_HOST) def get_all(self): """return all elements in list""" diff --git a/tubearchivist/home/src/reindex.py b/tubearchivist/home/src/reindex.py index edc0a9d..ebaf3c5 100644 --- a/tubearchivist/home/src/reindex.py +++ b/tubearchivist/home/src/reindex.py @@ -18,11 +18,10 @@ import requests from home.src.config import AppConfig from home.src.download import ChannelSubscription, PendingList, VideoDownloader from home.src.helper import ( + RedisArchivist, clean_string, - get_message, get_total_hits, ignore_filelist, - set_message, ) from home.src.index import YoutubeChannel, YoutubeVideo, index_new_video @@ -128,7 +127,7 @@ class Reindex: "title": "Scraping all youtube channels", "message": message, } - set_message("progress:download", mess_dict) + RedisArchivist().set_message("progress:download", mess_dict) channel_index = YoutubeChannel(channel_id) subscribed = channel_index.channel_dict["channel_subscribed"] channel_index.channel_dict = channel_index.build_channel_dict( @@ -472,7 +471,7 @@ def reindex_old_documents(): """daily refresh of old documents""" # check needed last run now = int(datetime.now().strftime("%s")) - last_reindex = get_message("last_reindex") + last_reindex = RedisArchivist().get_message("last_reindex") if isinstance(last_reindex, int) and now - last_reindex < 60 * 60 * 24: return # continue if needed @@ -480,4 +479,4 @@ def reindex_old_documents(): reindex_handler.check_outdated() reindex_handler.reindex() # set timestamp - set_message("last_reindex", now, expire=False) + RedisArchivist().set_message("last_reindex", now, expire=False) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 3a331f5..9cdb07b 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -9,7 +9,7 @@ import os from celery import Celery, shared_task from home.src.config import AppConfig from home.src.download import ChannelSubscription, PendingList, VideoDownloader -from home.src.helper import RedisQueue, del_message, get_lock, set_message +from home.src.helper import RedisArchivist, RedisQueue from home.src.index_management import backup_all_indexes, restore_from_backup from home.src.reindex import ManualImport, reindex_old_documents @@ -39,7 +39,7 @@ def download_pending(): """download latest pending videos""" have_lock = False - my_lock = get_lock("downloading") + my_lock = RedisArchivist().get_lock("downloading") try: have_lock = my_lock.acquire(blocking=False) @@ -65,7 +65,7 @@ def download_single(youtube_id): # start queue if needed have_lock = False - my_lock = get_lock("downloading") + my_lock = RedisArchivist().get_lock("downloading") try: have_lock = my_lock.acquire(blocking=False) @@ -100,7 +100,7 @@ def run_manual_import(): print("starting media file import") have_lock = False - my_lock = get_lock("manual_import") + my_lock = RedisArchivist().get_lock("manual_import") try: have_lock = my_lock.acquire(blocking=False) @@ -133,7 +133,7 @@ def run_restore_backup(): def kill_dl(task_id): """kill download worker task by ID""" app.control.revoke(task_id, terminate=True) - _ = del_message("dl_queue_id") + _ = RedisArchivist().del_message("dl_queue_id") RedisQueue("dl_queue").clear() # clear cache @@ -149,4 +149,4 @@ def kill_dl(task_id): "title": "Brutally killing download queue", "message": "", } - set_message("progress:download", mess_dict) + RedisArchivist().set_message("progress:download", mess_dict) diff --git a/tubearchivist/home/views.py b/tubearchivist/home/views.py index 44c8324..2275f18 100644 --- a/tubearchivist/home/views.py +++ b/tubearchivist/home/views.py @@ -14,13 +14,7 @@ from django.utils.http import urlencode from django.views import View from home.src.config import AppConfig from home.src.download import ChannelSubscription, PendingList -from home.src.helper import ( - RedisQueue, - get_dl_message, - get_message, - process_url_list, - set_message, -) +from home.src.helper import RedisArchivist, RedisQueue, process_url_list from home.src.index import WatchState from home.src.searching import Pagination, SearchForm, SearchHandler from home.tasks import ( @@ -114,8 +108,8 @@ class HomeView(View): """read needed values from redis""" config_handler = AppConfig().config colors = config_handler["application"]["colors"] - sort_order = get_message("sort_order") - hide_watched = get_message("hide_watched") + sort_order = RedisArchivist().get_message("sort_order") + hide_watched = RedisArchivist().get_message("hide_watched") return colors, sort_order, hide_watched @staticmethod @@ -208,7 +202,7 @@ class DownloadView(View): "title": "Failed to extract links.", "message": "Not a video, channel or playlist ID or URL", } - set_message("progress:download", mess_dict) + RedisArchivist().set_message("progress:download", mess_dict) return redirect("downloads") print(youtube_ids) @@ -322,7 +316,7 @@ class ChannelView(View): "query": {"match_all": {}}, "sort": [{"channel_name.keyword": {"order": "asc"}}], } - show_subed_only = get_message("show_subed_only") + show_subed_only = RedisArchivist().get_message("show_subed_only") if show_subed_only: data["query"] = {"term": {"channel_subscribed": {"value": True}}} search = SearchHandler(url, data) @@ -442,7 +436,7 @@ def progress(request): """endpoint for download progress ajax calls""" config = AppConfig().config cache_dir = config["application"]["cache_dir"] - json_data = get_dl_message(cache_dir) + json_data = RedisArchivist().get_dl_message(cache_dir) return JsonResponse(json_data) @@ -524,7 +518,7 @@ class PostData: running = download_pending.delay() task_id = running.id print("set task id: " + task_id) - set_message("dl_queue_id", task_id, expire=False) + RedisArchivist().set_message("dl_queue_id", task_id, expire=False) return {"success": True} def queue_handler(self): @@ -534,7 +528,7 @@ class PostData: print("stopping download queue") RedisQueue("dl_queue").clear() elif to_execute == "kill": - task_id = get_message("dl_queue_id") + task_id = RedisArchivist().get_message("dl_queue_id") print("brutally killing " + task_id) kill_dl(task_id) @@ -552,21 +546,25 @@ class PostData: def sort_order(self): """change the sort between published to downloaded""" sort_order = self.exec_val - set_message("sort_order", sort_order, expire=False) + RedisArchivist().set_message("sort_order", sort_order, expire=False) return {"success": True} def hide_watched(self): """toggle if to show watched vids or not""" hide_watched = bool(int(self.exec_val)) print(f"hide watched: {hide_watched}") - set_message("hide_watched", hide_watched, expire=False) + RedisArchivist().set_message( + "hide_watched", hide_watched, expire=False + ) return {"success": True} def show_subed_only(self): """show or hide subscribed channels only on channels page""" show_subed_only = bool(int(self.exec_val)) print(f"show subed only: {show_subed_only}") - set_message("show_subed_only", show_subed_only, expire=False) + RedisArchivist().set_message( + "show_subed_only", show_subed_only, expire=False + ) return {"success": True} def dlnow(self): @@ -576,7 +574,7 @@ class PostData: running = download_single.delay(youtube_id=youtube_id) task_id = running.id print("set task id: " + task_id) - set_message("dl_queue_id", task_id, expire=False) + RedisArchivist().set_message("dl_queue_id", task_id, expire=False) return {"success": True} @staticmethod