[API] remove old task endpoint add task-name and task-id

This commit is contained in:
simon 2023-03-22 16:35:25 +07:00
parent 11a61a06a6
commit 3bd6075a9b
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
6 changed files with 186 additions and 145 deletions

View File

@ -36,9 +36,13 @@ Note:
- [Snapshot List](#snapshot-list-view)
- [Snapshot Single](#snapshot-item-view)
**Task management**
- [Task Name List](#task-name-list-view)
- [Task Name Single](#task-name-item-view)
- [Task ID](#task-id-view)
**Additional**
- [Login](#login-view)
- [Task](#task-view) WIP
- [Refresh](#refresh-view)
- [Cookie](#cookie-view)
- [Search](#search-view)
@ -252,7 +256,7 @@ POST /api/snapshot/
Create new snapshot now, will return immediately, task will run async in the background, will return snapshot name:
```json
{
"snapshot_name": "ta_daily_<random-id>
"snapshot_name": "ta_daily_<random-id>"
}
```
@ -261,7 +265,7 @@ GET /api/snapshot/\<snapshot-id>/
Return metadata of a single snapshot
```json
{
"id": "ta_daily_<random-id>,
"id": "ta_daily_<random-id>",
"state": "SUCCESS",
"es_version": "0.0.0",
"start_date": "date_str",
@ -277,6 +281,29 @@ Restore this snapshot
DELETE /api/snapshot/\<snapshot-id>/
Remove this snapshot from index
## Task Name List View
GET /api/task-name/
Return all task results
## Task Name Item View
GET /api/task-name/\<task-name>/
Return all ask results by task name
POST /api/task-name/\<task-name>/
Start a new task by task name, only tasks without arguments can be started like that, see `home.tasks.BaseTask.TASK_CONFIG` for more info.
## Task ID view
GET /api/task-id/\<task-id>/
Return task status by task ID
POST /api/task-id/\<task-id>/
```json
{
"command": "stop|kill"
}
```
Send command to a task, valid commands: `stop` and `kill`.
## Login View
Return token and user ID for username and password:
POST /api/login/
@ -295,33 +322,6 @@ after successful login returns
}
```
## Task View
GET /api/task/
POST /api/task/
Check if there is an ongoing task:
GET /api/task/
Returns:
```json
{
"rescan": false,
"downloading": false
}
```
Start a background task
POST /api/task/
```json
{
"run": "task_name"
}
```
List of valid task names:
- **download_pending**: Start the download queue
- **rescan_pending**: Rescan your subscriptions
## Refresh View
GET /api/refresh/
parameters:

View File

@ -1,54 +0,0 @@
"""
Functionality:
- process tasks from API
- validate
- handover to celery
"""
from home.src.ta.ta_redis import RedisArchivist
from home.tasks import download_pending, update_subscribed
class TaskHandler:
"""handle tasks from api"""
def __init__(self, data):
self.data = data
def run_task(self):
"""map data and run"""
task_name = self.data["run"]
try:
to_run = self.exec_map(task_name)
except KeyError as err:
print(f"invalid task name {task_name}")
raise ValueError from err
response = to_run()
response.update({"task": task_name})
return response
def exec_map(self, task_name):
"""map dict key and return function to execute"""
exec_map = {
"download_pending": self._download_pending,
"rescan_pending": self._rescan_pending,
}
return exec_map[task_name]
@staticmethod
def _rescan_pending():
"""look for new items in subscribed channels"""
print("rescan subscribed channels")
update_subscribed.delay()
return {"success": True}
@staticmethod
def _download_pending():
"""start the download queue"""
print("download pending")
running = download_pending.delay()
print("set task id: " + running.id)
RedisArchivist().set_message("dl_queue_id", running.id)
return {"success": True}

View File

@ -81,11 +81,6 @@ urlpatterns = [
views.RefreshView.as_view(),
name="api-refresh",
),
path(
"task/",
views.TaskApiView.as_view(),
name="api-task",
),
path(
"snapshot/",
views.SnapshotApiListView.as_view(),
@ -96,6 +91,21 @@ urlpatterns = [
views.SnapshotApiView.as_view(),
name="api-snapshot",
),
path(
"task-name/",
views.TaskListView.as_view(),
name="api-task-list",
),
path(
"task-name/<slug:task_name>/",
views.TaskNameListView.as_view(),
name="api-task-name-list",
),
path(
"task-id/<slug:task_id>/",
views.TaskIDView.as_view(),
name="api-task-id",
),
path(
"cookie/",
views.CookieView.as_view(),

View File

@ -1,7 +1,6 @@
"""all API views"""
from api.src.search_processor import SearchProcess
from api.src.task_processor import TaskHandler
from home.src.download.queue import PendingInteract
from home.src.download.yt_dlp_base import CookieHandler
from home.src.es.connect import ElasticWrap
@ -14,8 +13,15 @@ from home.src.index.reindex import ReindexProgress
from home.src.index.video import SponsorBlock, YoutubeVideo
from home.src.ta.config import AppConfig
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.task_manager import TaskCommand, TaskManager
from home.src.ta.urlparser import Parser
from home.tasks import check_reindex, download_pending, extrac_dl, subscribe_to
from home.tasks import (
BaseTask,
check_reindex,
download_pending,
extrac_dl,
subscribe_to,
)
from rest_framework.authentication import (
SessionAuthentication,
TokenAuthentication,
@ -555,29 +561,6 @@ class LoginApiView(ObtainAuthToken):
return Response({"token": token.key, "user_id": user.pk})
class TaskApiView(ApiBaseView):
"""resolves to /api/task/
GET: check if ongoing background task
POST: start a new background task
"""
@staticmethod
def get(request):
"""handle get request"""
# pylint: disable=unused-argument
response = {"rescan": False, "downloading": False}
for key in response.keys():
response[key] = RedisArchivist().is_locked(key)
return Response(response)
def post(self, request):
"""handle post request"""
response = TaskHandler(request.data).run_task()
return Response(response)
class SnapshotApiListView(ApiBaseView):
"""resolves to /api/snapshot/
GET: returns snashot config plus list of existing snapshots
@ -642,6 +625,103 @@ class SnapshotApiView(ApiBaseView):
return Response(response)
class TaskListView(ApiBaseView):
"""resolves to /api/task-name/
GET: return a list of all stored task results
"""
def get(self, request):
"""handle get request"""
# pylint: disable=unused-argument
all_results = TaskManager().get_all_results()
return Response(all_results)
class TaskNameListView(ApiBaseView):
"""resolves to /api/task-name/<task-name>/
GET: return a list of stored results of task
POST: start new background process
"""
def get(self, request, task_name):
"""handle get request"""
# pylint: disable=unused-argument
if task_name not in BaseTask.TASK_CONFIG:
message = {"message": "invalid task name"}
return Response(message, status=404)
all_results = TaskManager().get_tasks_by_name(task_name)
return Response(all_results)
def post(self, request, task_name):
"""
handle post request
404 for invalid task_name
400 if task can't be started here without argument
"""
# pylint: disable=unused-argument
task_config = BaseTask.TASK_CONFIG.get(task_name)
if not task_config:
message = {"message": "invalid task name"}
return Response(message, status=404)
if not task_config.get("api-start"):
message = {"message": "can not start task through this endpoint"}
return Response(message, status=400)
message = TaskCommand().start(task_name)
return Response({"message": message})
class TaskIDView(ApiBaseView):
"""resolves to /api/task-id/<task-id>/
GET: return details of task id
"""
valid_commands = ["stop", "kill"]
def get(self, request, task_id):
"""handle get request"""
# pylint: disable=unused-argument
task_result = TaskManager().get_task(task_id)
if not task_result:
message = {"message": "task id not found"}
return Response(message, status=404)
return Response(task_result)
def post(self, request, task_id):
"""post command to task"""
command = request.data.get("command")
if not command or command not in self.valid_commands:
message = {"message": "no valid command found"}
return Response(message, status=400)
task_result = TaskManager().get_task(task_id)
if not task_result:
message = {"message": "task id not found"}
return Response(message, status=404)
task_conf = BaseTask.TASK_CONFIG.get(task_result.get("name"))
if command == "stop":
if not task_conf.get("api-stop"):
message = {"message": "task can not be stopped"}
return Response(message, status=400)
TaskCommand().stop(task_id)
if command == "kill":
if not task_conf.get("api-stop"):
message = {"message": "task can not be killed"}
return Response(message, status=400)
TaskCommand().kill(task_id)
return Response({"message": "command sent"})
class RefreshView(ApiBaseView):
"""resolves to /api/refresh/
GET: get refresh progress

View File

@ -4,6 +4,7 @@ functionality:
- handle threads and locks
"""
from home import tasks as ta_tasks
from home.src.ta.ta_redis import TaskRedis
@ -61,32 +62,32 @@ class TaskManager:
class TaskCommand:
"""send command pending task"""
"""run commands on task"""
def __init__(self, command="STOP"):
self.command = command
def start(self, task_name):
"""start task by task_name, only pass task that don't take args"""
task = ta_tasks.app.tasks.get(task_name).delay()
message = {
"task_id": task.id,
"status": task.status,
"task_name": task.name,
}
def by_id(self, task_id):
"""run command on single task id"""
self._set_command(task_id)
return message
def by_name(self, task_name):
"""run command on all tasks by name"""
pending = TaskManager().get_pending(task_name)
if not pending:
return
def stop(self, task_id):
"""
send stop signal to task_id,
needs to be implemented in task to take effect
"""
handler = TaskRedis()
for task in pending:
self._set_command(task.get("task_id"))
task = handler.get_single(task_id)
if not task["name"] in ta_tasks.BaseTask.TASK_CONFIG:
raise ValueError
def _set_command(self, task_id):
"""stop single task by id"""
TaskRedis().set_command(task_id, self.command)
if self.command == "KILL":
self._kill(task_id)
handler.set_command(task_id, "STOP")
def _kill(self, task_id):
"""kill task by id"""
from home.tasks import app as CeleryApp
CeleryApp.control.revoke(task_id, terminate=True)
def kill(self, task_id):
"""send kill signal to task_id"""
ta_tasks.app.control.revoke(task_id, terminate=True)

View File

@ -46,14 +46,18 @@ app.conf.timezone = os.environ.get("TZ") or "UTC"
class BaseTask(Task):
"""base class to inherit each class from"""
# pylint: disable=abstract-method
TASK_CONFIG = {
"update_subscribed": {
"title": "Rescan your Subscriptions",
"group": "download:scan",
"api-start": True,
},
"download_pending": {
"title": "Downloading",
"group": "download:run",
"api-start": True,
},
"extract_download": {
"title": "Add to download queue",
@ -66,10 +70,12 @@ class BaseTask(Task):
"manual_import": {
"title": "Manual video import",
"group": "setting:import",
"api-start": True,
},
"run_backup": {
"title": "Index Backup",
"group": "setting:backup",
"api-start": True,
},
"restore_backup": {
"title": "Restore Backup",
@ -78,14 +84,17 @@ class BaseTask(Task):
"rescan_filesystem": {
"title": "Rescan your Filesystem",
"group": "setting:filesystemscan",
"api-start": True,
},
"thumbnail_check": {
"title": "Check your Thumbnails",
"group": "setting:thumbnailcheck",
"api-start": True,
},
"resync_thumbs": {
"title": "Sync Thumbnails to Media Files",
"group": "setting:thumbnailsync",
"api-start": True,
},
"index_playlists": {
"title": "Index Channel Playlist",
@ -132,14 +141,9 @@ class BaseTask(Task):
def _build_message(self, level="info"):
"""build message dict"""
task_id = self.request.id
config = self.TASK_CONFIG.get(self.name)
message = {
"status": config.get("group"),
"title": config.get("title"),
"level": level,
"id": task_id,
}
key = f"message:{config.get('group')}:{task_id.split('-')[0]}"
message = self.TASK_CONFIG.get(self.name)
message.update({"level": level, "id": task_id})
key = f"message:{message.get('group')}:{task_id.split('-')[0]}"
return message, key