implement kill function for dl queue

This commit is contained in:
simon 2021-09-24 23:37:26 +07:00
parent f53391c1bb
commit 47020e0cfa
3 changed files with 43 additions and 2 deletions

View File

@ -88,6 +88,12 @@ def get_message(key):
return json_str return json_str
def del_message(key):
"""delete key from redis"""
redis_connection = redis.Redis(host=REDIS_HOST)
redis_connection.execute_command("DEL", key)
def get_dl_message(cache_dir): def get_dl_message(cache_dir):
"""get latest message if available""" """get latest message if available"""
redis_connection = redis.Redis(host=REDIS_HOST) redis_connection = redis.Redis(host=REDIS_HOST)

View File

@ -9,7 +9,7 @@ import os
from celery import Celery, shared_task from celery import Celery, shared_task
from home.src.config import AppConfig from home.src.config import AppConfig
from home.src.download import ChannelSubscription, PendingList, VideoDownloader from home.src.download import ChannelSubscription, PendingList, VideoDownloader
from home.src.helper import RedisQueue, get_lock from home.src.helper import RedisQueue, del_message, get_lock, set_message
from home.src.index_management import backup_all_indexes, restore_from_backup from home.src.index_management import backup_all_indexes, restore_from_backup
from home.src.reindex import ManualImport, reindex_old_documents from home.src.reindex import ManualImport, reindex_old_documents
@ -128,3 +128,25 @@ def run_restore_backup():
"""called from settings page, dump backup to zip file""" """called from settings page, dump backup to zip file"""
restore_from_backup() restore_from_backup()
print("index restore finished") print("index restore finished")
def kill_dl(task_id):
"""kill download worker task by ID"""
app.control.revoke(task_id, terminate=True)
del_message("dl_queue_id")
RedisQueue("dl_queue").clear()
# clear cache
cache_dir = os.path.join(CONFIG["application"]["cache_dir"], "download")
for cached in os.listdir(cache_dir):
to_delete = os.path.join(cache_dir, cached)
os.remove(to_delete)
# notify
mess_dict = {
"status": "downloading",
"level": "error",
"title": "Brutally killing download queue",
"message": "",
}
set_message("progress:download", mess_dict)

View File

@ -27,6 +27,7 @@ from home.tasks import (
download_pending, download_pending,
download_single, download_single,
extrac_dl, extrac_dl,
kill_dl,
run_backup, run_backup,
run_manual_import, run_manual_import,
run_restore_backup, run_restore_backup,
@ -481,6 +482,7 @@ class PostData:
"ignore": self.ignore, "ignore": self.ignore,
"dl_pending": self.dl_pending, "dl_pending": self.dl_pending,
"queue": self.queue_handler, "queue": self.queue_handler,
"kill_queue": self.kill_queue,
"unsubscribe": self.unsubscribe, "unsubscribe": self.unsubscribe,
"sort_order": self.sort_order, "sort_order": self.sort_order,
"hide_watched": self.hide_watched, "hide_watched": self.hide_watched,
@ -520,7 +522,10 @@ class PostData:
def dl_pending(): def dl_pending():
"""start the download queue""" """start the download queue"""
print("download pending") print("download pending")
download_pending.delay() running = download_pending.delay()
task_id = running.id
print("set task id: " + task_id)
set_message("dl_queue_id", task_id, expire=False)
return {"success": True} return {"success": True}
def queue_handler(self): def queue_handler(self):
@ -532,6 +537,14 @@ class PostData:
return {"success": True} return {"success": True}
@staticmethod
def kill_queue():
"""brutally murder the celery task"""
task_id = get_message("dl_queue_id")
print("brutally killing " + task_id)
kill_dl(task_id)
return {"success": True}
def unsubscribe(self): def unsubscribe(self):
"""unsubscribe from channel""" """unsubscribe from channel"""
channel_id_unsub = self.exec_val channel_id_unsub = self.exec_val