return task commands to notification backend

This commit is contained in:
simon 2023-03-30 11:39:22 +07:00
parent f0becb750a
commit db0ade203a
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
4 changed files with 19 additions and 8 deletions

View File

@ -711,7 +711,8 @@ class TaskIDView(ApiBaseView):
message = {"message": "task can not be stopped"}
return Response(message, status=400)
TaskCommand().stop(task_id)
message_key = self._build_message_key(task_conf, task_id)
TaskCommand().stop(task_id, message_key)
if command == "kill":
if not task_conf.get("api-stop"):
message = {"message": "task can not be killed"}
@ -721,6 +722,10 @@ class TaskIDView(ApiBaseView):
return Response({"message": "command sent"})
def _build_message_key(self, task_conf, task_id):
"""build message key to forward command to notification"""
return f"message:{task_conf.get('group')}:{task_id.split('-')[0]}"
class RefreshView(ApiBaseView):
"""resolves to /api/refresh/

View File

@ -127,13 +127,14 @@ class ChannelSubscription:
continue
if self.task:
if self.task.is_stopped():
self.task.send_progress(["Received Stop signal."])
break
self.task.send_progress(
message_lines=[f"Scanning Channel {idx + 1}/{total}"],
progress=(idx + 1) / total,
)
if self.task.is_stopped():
self.task.send_progress(["Received Stop signal."])
break
return missing_videos
@ -291,7 +292,8 @@ class SubscriptionScanner:
self.missing_videos = []
self.scan_channels()
self.scan_playlists()
if self.task and not self.task.is_stopped():
self.scan_playlists()
return self.missing_videos

View File

@ -5,7 +5,7 @@ functionality:
"""
from home import tasks as ta_tasks
from home.src.ta.ta_redis import TaskRedis
from home.src.ta.ta_redis import RedisArchivist, TaskRedis
class TaskManager:
@ -92,11 +92,12 @@ class TaskCommand:
return message
def stop(self, task_id):
def stop(self, task_id, message_key):
"""
send stop signal to task_id,
needs to be implemented in task to take effect
"""
print(f"[task][{task_id}]: received STOP signal.")
handler = TaskRedis()
task = handler.get_single(task_id)
@ -104,7 +105,9 @@ class TaskCommand:
raise ValueError
handler.set_command(task_id, "STOP")
RedisArchivist().set_message(message_key, "STOP", path=".command")
def kill(self, task_id):
"""send kill signal to task_id"""
print(f"[task][{task_id}]: received KILL signal.")
ta_tasks.app.control.revoke(task_id, terminate=True)

View File

@ -149,7 +149,8 @@ class BaseTask(Task):
message.update({"level": level, "id": task_id})
task_result = TaskManager().get_task(task_id)
if task_result:
message.update({"command": task_result.get("command", False)})
command = task_result.get("command", False)
message.update({"command": command})
key = f"message:{message.get('group')}:{task_id.split('-')[0]}"
return message, key