implement BaseTask with callbacks

This commit is contained in:
simon 2023-03-13 10:09:48 +07:00
parent 2d2431364d
commit 20f8a5a501
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 90 additions and 1 deletions

View File

@ -8,7 +8,7 @@ Functionality:
import os
from celery import Celery, shared_task
from celery import Celery, Task, shared_task
from home.src.download.queue import PendingList
from home.src.download.subscriptions import (
SubscriptionHandler,
@ -42,6 +42,95 @@ app.autodiscover_tasks()
app.conf.timezone = os.environ.get("TZ") or "UTC"
class BaseTask(Task):
"""base class to inherit each class from"""
TASK_CONFIG = {
"update_subscribed": {
"title": "Rescan your Subscriptions",
"group": "message:download:scan",
},
"download_pending": {
"title": "Downloading",
"group": "message:download:run",
},
"extract_download": {
"title": "Add to download queue",
"group": "message:download:add",
},
"subscribe_to": {
"title": "Add Subscription",
"group": "message:download:subscribe",
},
"check_reindex": {
"title": "Reindex old documents",
"group": "message:settings:reindex",
},
"manual_import": {
"title": "Manual video import",
"group": "message:settings:import",
},
"run_backup": {
"title": "Index Backup",
"group": "message:settings:backup",
},
"restore_backup": {
"title": "Restore Backup",
"group": "message:settings:restore",
},
"rescan_filesystem": {
"title": "Rescan your Filesystem",
"group": "message:settings:filesystemscan",
},
"thumbnail_check": {
"title": "Check your Thumbnails",
"group": "message:settings:thumbnailcheck",
},
"resync_thumbs": {
"title": "Sync Thumbnails to Media Files",
"group": "message:settings:thumbnailsync",
},
"index_playlists": {
"title": "Index Channel Playlist",
"group": "message:channel:indexplaylist",
},
}
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""callback for task failure"""
print(f"{task_id} Failed callback")
def on_success(self, retval, task_id, args, kwargs):
"""callback task completed successfully"""
print(f"{task_id} success callback")
message, key = self._build_message(task_id)
message.update({"message": "Task completed successfully"})
RedisArchivist().set_message(key, message, expire=5)
def before_start(self, task_id, args, kwargs):
"""callback before initiating task"""
print(f"{self.name} create callback")
message, key = self._build_message(task_id)
message.update({"message": "New task received."})
RedisArchivist().set_message(key, message)
def print_progress(self, task_id, progress):
"""print progress"""
print(f"{task_id}: {progress}")
def _build_message(self, task_id, level="info"):
"""build message dict"""
config = self.TASK_CONFIG.get(self.name)
message = {
"status": config.get("group"),
"title": config.get("title"),
"level": level,
"id": task_id,
}
key = f"{config.get('group')}:{task_id.split('-')[0]}"
return message, key
@shared_task(name="update_subscribed", bind=True)
def update_subscribed(self):
"""look for missing videos and add to pending"""