refactor: default set_message in RedisArchivist to True

This commit is contained in:
simon 2022-06-16 10:37:46 +07:00
parent ebfc4a349f
commit 2cf30e1127
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
15 changed files with 101 additions and 86 deletions

View File

@ -50,5 +50,5 @@ class TaskHandler:
print("download pending")
running = download_pending.delay()
print("set task id: " + running.id)
RedisArchivist().set_message("dl_queue_id", running.id, expire=False)
RedisArchivist().set_message("dl_queue_id", running.id)
return {"success": True}

View File

@ -131,7 +131,7 @@ class VideoProgressView(ApiBaseView):
position = request.data.get("position", 0)
key = f"{request.user.id}:progress:{video_id}"
message = {"position": position, "youtube_id": video_id}
RedisArchivist().set_message(key, message, expire=False)
RedisArchivist().set_message(key, message)
self.response = request.data
return Response(self.response)
@ -459,7 +459,7 @@ class TaskApiView(ApiBaseView):
@staticmethod
def get(request):
"""handle get request"""
# pylint: disable=unused-argument
response = {"rescan": False, "downloading": False}
for key in response.keys():
response[key] = RedisArchivist().is_locked(key)

View File

@ -150,7 +150,7 @@ class PendingList(PendingIndex):
"title": "Adding to download queue.",
"message": "Extracting lists",
}
RedisArchivist().set_message("message:add", mess_dict)
RedisArchivist().set_message("message:add", mess_dict, expire=True)
self._process_entry(entry)
def _process_entry(self, entry):
@ -229,10 +229,11 @@ class PendingList(PendingIndex):
"message": "Progress: " + progress,
}
if idx + 1 == len(self.missing_videos):
RedisArchivist().set_message("message:add", mess_dict, expire=4)
expire = 4
else:
RedisArchivist().set_message("message:add", mess_dict)
expire = True
RedisArchivist().set_message("message:add", mess_dict, expire=expire)
if idx + 1 % 25 == 0:
print("adding to queue progress: " + progress)

View File

@ -76,11 +76,13 @@ class ChannelSubscription:
"message": f"Progress: {idx + 1}/{len(all_channels)}",
}
if idx + 1 == len(all_channels):
RedisArchivist().set_message(
"message:rescan", message=message, expire=4
)
expire = 4
else:
RedisArchivist().set_message("message:rescan", message=message)
expire = True
RedisArchivist().set_message(
"message:rescan", message=message, expire=expire
)
return missing_videos
@ -152,7 +154,7 @@ class PlaylistSubscription:
"message": f"Processing {idx + 1} of {len(new_playlists)}",
}
RedisArchivist().set_message(
"message:subplaylist", message=message
"message:subplaylist", message=message, expire=True
)
return new_thumbs
@ -206,7 +208,9 @@ class PlaylistSubscription:
"title": "Scanning playlists: Looking for new videos.",
"message": f"Progress: {idx + 1}/{len(all_playlists)}",
}
RedisArchivist().set_message("message:rescan", message=message)
RedisArchivist().set_message(
"message:rescan", message=message, expire=True
)
for video in all_missing:
youtube_id = video["youtube_id"]

View File

@ -193,11 +193,13 @@ class ThumbManager:
"message": "Downloading Thumbnails, Progress: " + progress,
}
if idx + 1 == len(missing_thumbs):
RedisArchivist().set_message(
"message:add", mess_dict, expire=4
)
expire = 4
else:
RedisArchivist().set_message("message:add", mess_dict)
expire = True
RedisArchivist().set_message(
"message:add", mess_dict, expire=expire
)
if idx + 1 % 25 == 0:
print("thumbnail progress: " + progress)
@ -226,7 +228,8 @@ class ThumbManager:
"title": "Processing Channels",
"message": "Downloading Channel Art.",
}
RedisArchivist().set_message("message:download", mess_dict)
key = "message:download"
RedisArchivist().set_message(key, mess_dict, expire=True)
def download_playlist(self, missing_playlists):
"""download needed artwork for playlists"""
@ -243,7 +246,8 @@ class ThumbManager:
"title": "Processing Playlists",
"message": "Downloading Playlist Art.",
}
RedisArchivist().set_message("message:download", mess_dict)
key = "message:download"
RedisArchivist().set_message(key, mess_dict, expire=True)
def get_base64_blur(self, youtube_id):
"""return base64 encoded placeholder"""

View File

@ -85,7 +85,7 @@ class CookieHandler:
with open(import_path, encoding="utf-8") as cookie_file:
cookie = cookie_file.read()
RedisArchivist().set_message("cookie", cookie, expire=False)
RedisArchivist().set_message("cookie", cookie)
os.remove(import_path)
print("cookie: import successful")
@ -95,7 +95,7 @@ class CookieHandler:
"""revoke cookie"""
RedisArchivist().del_message("cookie")
RedisArchivist().set_message(
"config", False, path=".downloads.cookie_import", expire=False
"config", False, path=".downloads.cookie_import"
)
print("cookie: revoked")
@ -112,7 +112,7 @@ class CookieHandler:
# update in redis to avoid expiring
modified = validator.obs["cookiefile"].getvalue()
if modified:
RedisArchivist().set_message("cookie", modified, expire=False)
RedisArchivist().set_message("cookie", modified)
if not response:
mess_dict = {

View File

@ -125,18 +125,19 @@ class DownloadPostProcess:
+ f"{id_c + 1}/{len(self.download.channels)}"
)
message = f"Progress: {id_p + 1}/{len(all_channel_playlist)}"
key = "message:download"
mess_dict = {
"status": "message:download",
"status": key,
"level": "info",
"title": title,
"message": message,
}
if id_p + 1 == len(all_channel_playlist):
RedisArchivist().set_message(
"message:download", mess_dict, expire=4
)
expire = 4
else:
RedisArchivist().set_message("message:download", mess_dict)
expire = True
RedisArchivist().set_message(key, mess_dict, expire=expire)
class VideoDownloader:
@ -145,6 +146,8 @@ class VideoDownloader:
if not initiated with list, take from queue
"""
MSG = "message:download"
def __init__(self, youtube_id_list=False):
self.obs = False
self.video_overwrites = False
@ -177,23 +180,21 @@ class VideoDownloader:
)
self.channels.add(vid_dict["channel"]["channel_id"])
mess_dict = {
"status": "message:download",
"status": self.MSG,
"level": "info",
"title": "Moving....",
"message": "Moving downloaded file to storage folder",
}
RedisArchivist().set_message(
"message:download", mess_dict, expire=False
)
RedisArchivist().set_message(self.MSG, mess_dict)
self.move_to_archive(vid_dict)
mess_dict = {
"status": "message:download",
"status": self.MSG,
"level": "info",
"title": "Completed",
"message": "",
}
RedisArchivist().set_message("message:download", mess_dict, 10)
RedisArchivist().set_message(self.MSG, mess_dict, expire=10)
self._delete_from_pending(youtube_id)
# post processing
@ -212,16 +213,15 @@ class VideoDownloader:
pending.get_channels()
self.video_overwrites = pending.video_overwrites
@staticmethod
def add_pending():
def add_pending(self):
"""add pending videos to download queue"""
mess_dict = {
"status": "message:download",
"status": self.MSG,
"level": "info",
"title": "Looking for videos to download",
"message": "Scanning your download queue.",
}
RedisArchivist().set_message("message:download", mess_dict)
RedisArchivist().set_message(self.MSG, mess_dict, expire=True)
pending = PendingList()
pending.get_download()
to_add = [i["youtube_id"] for i in pending.all_pending]
@ -229,18 +229,17 @@ class VideoDownloader:
# there is nothing pending
print("download queue is empty")
mess_dict = {
"status": "message:download",
"status": self.MSG,
"level": "error",
"title": "Download queue is empty",
"message": "Add some videos to the queue first.",
}
RedisArchivist().set_message("message:download", mess_dict)
RedisArchivist().set_message(self.MSG, mess_dict, expire=True)
return
RedisQueue().add_list(to_add)
@staticmethod
def _progress_hook(response):
def _progress_hook(self, response):
"""process the progress_hooks from yt_dlp"""
# title
path = os.path.split(response["filename"])[-1][12:]
@ -257,12 +256,12 @@ class VideoDownloader:
except KeyError:
message = "processing"
mess_dict = {
"status": "message:download",
"status": self.MSG,
"level": "info",
"title": title,
"message": message,
}
RedisArchivist().set_message("message:download", mess_dict)
RedisArchivist().set_message(self.MSG, mess_dict, expire=True)
def _build_obs(self):
"""collection to build all obs passed to yt-dlp"""

View File

@ -98,7 +98,7 @@ class PostData:
origin, new_view = self.exec_val.split(":")
key = f"{self.current_user}:view:{origin}"
print(f"change view: {key} to {new_view}")
RedisArchivist().set_message(key, {"status": new_view}, expire=False)
RedisArchivist().set_message(key, {"status": new_view})
return {"success": True}
def _change_grid(self):
@ -109,7 +109,7 @@ class PostData:
key = f"{self.current_user}:grid_items"
print(f"change grid items: {grid_items}")
RedisArchivist().set_message(key, {"status": grid_items}, expire=False)
RedisArchivist().set_message(key, {"status": grid_items})
return {"success": True}
@staticmethod
@ -135,7 +135,7 @@ class PostData:
running = download_pending.delay()
task_id = running.id
print(f"{task_id}: set task id")
RedisArchivist().set_message("dl_queue_id", task_id, expire=False)
RedisArchivist().set_message("dl_queue_id", task_id)
return {"success": True}
def _queue_handler(self):
@ -187,11 +187,11 @@ class PostData:
sort_order = {"status": self.exec_val}
if self.exec_val in ["asc", "desc"]:
RedisArchivist().set_message(
f"{self.current_user}:sort_order", sort_order, expire=False
f"{self.current_user}:sort_order", sort_order
)
else:
RedisArchivist().set_message(
f"{self.current_user}:sort_by", sort_order, expire=False
f"{self.current_user}:sort_by", sort_order
)
return {"success": True}
@ -200,7 +200,7 @@ class PostData:
key = f"{self.current_user}:hide_watched"
message = {"status": bool(int(self.exec_val))}
print(f"toggle {key}: {message}")
RedisArchivist().set_message(key, message, expire=False)
RedisArchivist().set_message(key, message)
return {"success": True}
def _show_subed_only(self):
@ -208,7 +208,7 @@ class PostData:
key = f"{self.current_user}:show_subed_only"
message = {"status": bool(int(self.exec_val))}
print(f"toggle {key}: {message}")
RedisArchivist().set_message(key, message, expire=False)
RedisArchivist().set_message(key, message)
return {"success": True}
def _dlnow(self):
@ -218,7 +218,7 @@ class PostData:
running = download_single.delay(youtube_id=youtube_id)
task_id = running.id
print("set task id: " + task_id)
RedisArchivist().set_message("dl_queue_id", task_id, expire=False)
RedisArchivist().set_message("dl_queue_id", task_id)
return {"success": True}
def _show_ignored_only(self):
@ -227,7 +227,7 @@ class PostData:
key = f"{self.current_user}:show_ignored_only"
value = {"status": show_value}
print(f"Filter download view ignored only: {show_value}")
RedisArchivist().set_message(key, value, expire=False)
RedisArchivist().set_message(key, value)
return {"success": True}
def _forget_ignore(self):

View File

@ -153,6 +153,7 @@ class YoutubeChannel(YouTubeItem):
es_path = False
index_name = "ta_channel"
yt_base = "https://www.youtube.com/channel/"
msg = "message:playlistscan"
def __init__(self, youtube_id):
super().__init__(youtube_id)
@ -252,12 +253,12 @@ class YoutubeChannel(YouTubeItem):
self.get_from_es()
channel_name = self.json_data["channel_name"]
mess_dict = {
"status": "message:playlistscan",
"status": self.msg,
"level": "info",
"title": "Looking for playlists",
"message": f"{channel_name}: Scanning channel in progress",
}
RedisArchivist().set_message("message:playlistscan", mess_dict)
RedisArchivist().set_message(self.msg, mess_dict, expire=True)
self.get_all_playlists()
if not self.all_playlists:
print(f"{self.youtube_id}: no playlists found.")
@ -272,12 +273,12 @@ class YoutubeChannel(YouTubeItem):
"""send notification"""
channel_name = self.json_data["channel_name"]
mess_dict = {
"status": "message:playlistscan",
"status": self.msg,
"level": "info",
"title": f"{channel_name}: Scanning channel for playlists",
"message": f"Progress: {idx + 1}/{len(self.all_playlists)}",
}
RedisArchivist().set_message("message:playlistscan", mess_dict)
RedisArchivist().set_message(self.msg, mess_dict, expire=True)
print("add playlist: " + playlist[1])
@staticmethod

View File

@ -310,4 +310,4 @@ def reindex_old_documents():
handler = Reindex()
handler.check_outdated()
handler.reindex()
RedisArchivist().set_message("last_reindex", handler.now, expire=False)
RedisArchivist().set_message("last_reindex", handler.now)

View File

@ -339,7 +339,7 @@ class SponsorBlock:
sb_id = RedisArchivist().get_message(key)
if not sb_id["status"]:
sb_id = {"status": randomizor(32)}
RedisArchivist().set_message(key, sb_id, expire=False)
RedisArchivist().set_message(key, sb_id)
return sb_id

View File

@ -99,7 +99,7 @@ class AppConfig:
self.config[config_dict][config_value] = to_write
updated.append((config_value, to_write))
RedisArchivist().set_message("config", self.config, expire=False)
RedisArchivist().set_message("config", self.config)
return updated
@staticmethod
@ -111,7 +111,7 @@ class AppConfig:
message = {"status": value}
redis_key = f"{user_id}:{key}"
RedisArchivist().set_message(redis_key, message, expire=False)
RedisArchivist().set_message(redis_key, message)
def get_colors(self):
"""overwrite config if user has set custom values"""
@ -151,7 +151,7 @@ class AppConfig:
needs_update = True
if needs_update:
RedisArchivist().set_message("config", redis_config, expire=False)
RedisArchivist().set_message("config", redis_config)
class ScheduleBuilder:
@ -165,6 +165,7 @@ class ScheduleBuilder:
"run_backup": "0 18 0",
}
CONFIG = ["check_reindex_days", "run_backup_rotate"]
MSG = "message:setting"
def __init__(self):
self.config = AppConfig().config
@ -180,25 +181,27 @@ class ScheduleBuilder:
except ValueError:
print(f"failed: {key} {value}")
mess_dict = {
"status": "message:setting",
"status": self.MSG,
"level": "error",
"title": "Scheduler update failed.",
"message": "Invalid schedule input",
}
RedisArchivist().set_message("message:setting", mess_dict)
RedisArchivist().set_message(
self.MSG, mess_dict, expire=True
)
return
redis_config["scheduler"][key] = to_write
if key in self.CONFIG and value:
redis_config["scheduler"][key] = int(value)
RedisArchivist().set_message("config", redis_config, expire=False)
RedisArchivist().set_message("config", redis_config)
mess_dict = {
"status": "message:setting",
"status": self.MSG,
"level": "info",
"title": "Scheduler changed.",
"message": "Please restart container for changes to take effect",
}
RedisArchivist().set_message("message:setting", mess_dict)
RedisArchivist().set_message(self.MSG, mess_dict, expire=True)
def value_builder(self, key, value):
"""validate single cron form entry and return cron dict"""

View File

@ -34,7 +34,7 @@ class RedisArchivist(RedisBase):
"setting",
]
def set_message(self, key, message, path=".", expire=True):
def set_message(self, key, message, path=".", expire=False):
"""write new message to redis"""
self.conn.execute_command(
"JSON.SET", self.NAME_SPACE + key, path, json.dumps(message)

View File

@ -48,7 +48,7 @@ def update_subscribed():
"title": "Rescanning channels and playlists.",
"message": "Looking for new videos.",
}
RedisArchivist().set_message("message:rescan", message)
RedisArchivist().set_message("message:rescan", message, expire=True)
have_lock = False
my_lock = RedisArchivist().get_lock("rescan")
@ -108,13 +108,14 @@ def download_single(youtube_id):
try:
have_lock = my_lock.acquire(blocking=False)
if have_lock:
key = "message:download"
mess_dict = {
"status": "message:download",
"status": key,
"level": "info",
"title": "Download single video",
"message": "processing",
}
RedisArchivist().set_message("message:download", mess_dict)
RedisArchivist().set_message(key, mess_dict, expire=True)
VideoDownloader().run_queue()
else:
print("Download queue already running.")
@ -196,7 +197,7 @@ def kill_dl(task_id):
"title": "Canceling download process",
"message": "Canceling download queue now.",
}
RedisArchivist().set_message("message:download", mess_dict)
RedisArchivist().set_message("message:download", mess_dict, expire=True)
@shared_task
@ -245,13 +246,14 @@ def subscribe_to(url_str):
channel_id_sub, channel_subscribed=True
)
# notify for channels
key = "message:subchannel"
message = {
"status": "message:subchannel",
"status": key,
"level": "info",
"title": "Subscribing to Channels",
"message": f"Processing {counter} of {len(to_subscribe_list)}",
}
RedisArchivist().set_message("message:subchannel", message=message)
RedisArchivist().set_message(key, message=message, expire=True)
counter = counter + 1
@ -260,13 +262,14 @@ def index_channel_playlists(channel_id):
"""add all playlists of channel to index"""
channel = YoutubeChannel(channel_id)
# notify
key = "message:playlistscan"
mess_dict = {
"status": "message:playlistscan",
"status": key,
"level": "info",
"title": "Looking for playlists",
"message": f'Scanning channel "{channel.youtube_id}" in progress',
}
RedisArchivist().set_message("message:playlistscan", mess_dict)
RedisArchivist().set_message(key, mess_dict, expire=True)
channel.index_channel_playlists()

View File

@ -394,14 +394,15 @@ class DownloadView(ArchivistResultsView):
youtube_ids = UrlListParser(url_str).process_list()
except ValueError:
# failed to process
key = "message:add"
print(f"failed to parse: {url_str}")
mess_dict = {
"status": "message:add",
"status": key,
"level": "error",
"title": "Failed to extract links.",
"message": "Not a video, channel or playlist ID or URL",
}
RedisArchivist().set_message("message:add", mess_dict)
RedisArchivist().set_message(key, mess_dict, expire=True)
return redirect("downloads")
print(youtube_ids)
@ -512,13 +513,14 @@ class ChannelView(ArchivistResultsView):
"""handle http post requests"""
subscribe_form = SubscribeToChannelForm(data=request.POST)
if subscribe_form.is_valid():
key = "message:subchannel"
message = {
"status": "message:subchannel",
"status": key,
"level": "info",
"title": "Subscribing to Channels",
"message": "Parsing form data",
}
RedisArchivist().set_message("message:subchannel", message=message)
RedisArchivist().set_message(key, message=message, expire=True)
url_str = request.POST.get("subscribe")
print(url_str)
subscribe_to.delay(url_str)
@ -659,15 +661,14 @@ class PlaylistView(ArchivistResultsView):
if subscribe_form.is_valid():
url_str = request.POST.get("subscribe")
print(url_str)
key = "message:subplaylist"
message = {
"status": "message:subplaylist",
"status": key,
"level": "info",
"title": "Subscribing to Playlists",
"message": "Parsing form data",
}
RedisArchivist().set_message(
"message:subplaylist", message=message
)
RedisArchivist().set_message(key, message=message, expire=True)
subscribe_to.delay(url_str)
sleep(1)
@ -847,15 +848,14 @@ class SettingsView(View):
valid = handler.validate()
if not valid:
handler.revoke()
key = "message:setting"
message = {
"status": "message:setting",
"status": key,
"level": "error",
"title": "Cookie import failed",
"message": "",
}
RedisArchivist().set_message(
"message:setting", message=message
)
RedisArchivist().set_message(key, message=message, expire=True)
else:
handler.revoke()