From 47020e0cfa64919d50857fdf6df38b2b32371892 Mon Sep 17 00:00:00 2001 From: simon Date: Fri, 24 Sep 2021 23:37:26 +0700 Subject: [PATCH] implement kill function for dl queue --- tubearchivist/home/src/helper.py | 6 ++++++ tubearchivist/home/tasks.py | 24 +++++++++++++++++++++++- tubearchivist/home/views.py | 15 ++++++++++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/tubearchivist/home/src/helper.py b/tubearchivist/home/src/helper.py index 5ef1be8..2f27d55 100644 --- a/tubearchivist/home/src/helper.py +++ b/tubearchivist/home/src/helper.py @@ -88,6 +88,12 @@ def get_message(key): return json_str +def del_message(key): + """delete key from redis""" + redis_connection = redis.Redis(host=REDIS_HOST) + redis_connection.execute_command("DEL", key) + + def get_dl_message(cache_dir): """get latest message if available""" redis_connection = redis.Redis(host=REDIS_HOST) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 91b5a26..ce2c863 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -9,7 +9,7 @@ import os from celery import Celery, shared_task from home.src.config import AppConfig from home.src.download import ChannelSubscription, PendingList, VideoDownloader -from home.src.helper import RedisQueue, get_lock +from home.src.helper import RedisQueue, del_message, get_lock, set_message from home.src.index_management import backup_all_indexes, restore_from_backup from home.src.reindex import ManualImport, reindex_old_documents @@ -128,3 +128,25 @@ def run_restore_backup(): """called from settings page, dump backup to zip file""" restore_from_backup() print("index restore finished") + + +def kill_dl(task_id): + """kill download worker task by ID""" + app.control.revoke(task_id, terminate=True) + del_message("dl_queue_id") + RedisQueue("dl_queue").clear() + + # clear cache + cache_dir = os.path.join(CONFIG["application"]["cache_dir"], "download") + for cached in os.listdir(cache_dir): + to_delete = os.path.join(cache_dir, cached) + os.remove(to_delete) + + # notify + mess_dict = { + "status": "downloading", + "level": "error", + "title": "Brutally killing download queue", + "message": "", + } + set_message("progress:download", mess_dict) diff --git a/tubearchivist/home/views.py b/tubearchivist/home/views.py index 11da297..9e061d5 100644 --- a/tubearchivist/home/views.py +++ b/tubearchivist/home/views.py @@ -27,6 +27,7 @@ from home.tasks import ( download_pending, download_single, extrac_dl, + kill_dl, run_backup, run_manual_import, run_restore_backup, @@ -481,6 +482,7 @@ class PostData: "ignore": self.ignore, "dl_pending": self.dl_pending, "queue": self.queue_handler, + "kill_queue": self.kill_queue, "unsubscribe": self.unsubscribe, "sort_order": self.sort_order, "hide_watched": self.hide_watched, @@ -520,7 +522,10 @@ class PostData: def dl_pending(): """start the download queue""" print("download pending") - download_pending.delay() + running = download_pending.delay() + task_id = running.id + print("set task id: " + task_id) + set_message("dl_queue_id", task_id, expire=False) return {"success": True} def queue_handler(self): @@ -532,6 +537,14 @@ class PostData: return {"success": True} + @staticmethod + def kill_queue(): + """brutally murder the celery task""" + task_id = get_message("dl_queue_id") + print("brutally killing " + task_id) + kill_dl(task_id) + return {"success": True} + def unsubscribe(self): """unsubscribe from channel""" channel_id_unsub = self.exec_val