313 lines
10 KiB
Python

"""
Functionality:
- collect tasks
- handle task callbacks
- handle task notifications
- handle task locking
"""
from appsettings.src.backup import ElasticBackup
from appsettings.src.config import ReleaseVersion
from appsettings.src.filesystem import Scanner
from appsettings.src.index_setup import ElasitIndexWrap
from appsettings.src.manual import ImportFolderScanner
from appsettings.src.reindex import Reindex, ReindexManual, ReindexPopulate
from celery import Task, shared_task
from celery.exceptions import Retry
from channel.src.index import YoutubeChannel
from common.src.ta_redis import RedisArchivist
from common.src.urlparser import Parser
from download.src.queue import PendingList
from download.src.subscriptions import SubscriptionHandler, SubscriptionScanner
from download.src.thumbnails import ThumbFilesystem, ThumbValidator
from download.src.yt_dlp_handler import VideoDownloader
from task.src.notify import Notifications
from task.src.task_config import TASK_CONFIG
from task.src.task_manager import TaskManager
class BaseTask(Task):
"""base class to inherit each class from"""
# pylint: disable=abstract-method
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""callback for task failure"""
print(f"{task_id} Failed callback")
message, key = self._build_message(level="error")
message.update({"messages": [f"Task failed: {exc}"]})
RedisArchivist().set_message(key, message, expire=20)
def on_success(self, retval, task_id, args, kwargs):
"""callback task completed successfully"""
print(f"{task_id} success callback")
message, key = self._build_message()
message.update({"messages": ["Task completed successfully"]})
RedisArchivist().set_message(key, message, expire=5)
def before_start(self, task_id, args, kwargs):
"""callback before initiating task"""
print(f"{self.name} create callback")
message, key = self._build_message()
message.update({"messages": ["New task received."]})
RedisArchivist().set_message(key, message)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""callback after task returns"""
print(f"{task_id} return callback")
task_title = TASK_CONFIG.get(self.name).get("title")
Notifications(self.name).send(task_id, task_title)
def send_progress(self, message_lines, progress=False, title=False):
"""send progress message"""
message, key = self._build_message()
message.update(
{
"messages": message_lines,
"progress": progress,
}
)
if title:
message["title"] = title
RedisArchivist().set_message(key, message)
def _build_message(self, level="info"):
"""build message dict"""
task_id = self.request.id
message = TASK_CONFIG.get(self.name).copy()
message.update({"level": level, "id": task_id})
task_result = TaskManager().get_task(task_id)
if task_result:
command = task_result.get("command", False)
message.update({"command": command})
key = f"message:{message.get('group')}:{task_id.split('-')[0]}"
return message, key
def is_stopped(self):
"""check if task is stopped"""
return TaskManager().is_stopped(self.request.id)
@shared_task(name="update_subscribed", bind=True, base=BaseTask)
def update_subscribed(self):
"""look for missing videos and add to pending"""
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] rescan already running")
self.send_progress("Rescan already in progress.")
return None
manager.init(self)
handler = SubscriptionScanner(task=self)
missing_videos = handler.scan()
auto_start = handler.auto_start
if missing_videos:
print(missing_videos)
extrac_dl.delay(missing_videos, auto_start=auto_start)
message = f"Found {len(missing_videos)} videos to add to the queue."
return message
return None
@shared_task(
name="download_pending",
bind=True,
base=BaseTask,
max_retries=3,
default_retry_delay=10,
)
def download_pending(self, auto_only=False):
"""download latest pending videos"""
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] download queue already running")
self.send_progress("Download Queue is already running.")
return None
manager.init(self)
try:
downloader = VideoDownloader(task=self)
downloaded, failed = downloader.run_queue(auto_only=auto_only)
if failed:
print(f"[task][{self.name}] Videos failed, retry.")
self.send_progress("Videos failed, retry.")
raise self.retry()
except Retry as exc:
raise exc
if downloaded:
return f"downloaded {downloaded} video(s)."
return None
@shared_task(name="extract_download", bind=True, base=BaseTask)
def extrac_dl(self, youtube_ids, auto_start=False, status="pending"):
"""parse list passed and add to pending"""
TaskManager().init(self)
if isinstance(youtube_ids, str):
to_add = Parser(youtube_ids).parse()
else:
to_add = youtube_ids
pending_handler = PendingList(youtube_ids=to_add, task=self)
pending_handler.parse_url_list()
videos_added = pending_handler.add_to_pending(
status=status, auto_start=auto_start
)
if auto_start:
download_pending.delay(auto_only=True)
if videos_added:
return f"added {len(videos_added)} Videos to Queue"
return None
@shared_task(bind=True, name="check_reindex", base=BaseTask)
def check_reindex(self, data=False, extract_videos=False):
"""run the reindex main command"""
if data:
# started from frontend through API
print(f"[task][{self.name}] reindex {data}")
self.send_progress("Add items to the reindex Queue.")
ReindexManual(extract_videos=extract_videos).extract_data(data)
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] reindex queue is already running")
self.send_progress("Reindex Queue is already running.")
return
manager.init(self)
if not data:
# started from scheduler
populate = ReindexPopulate()
print(f"[task][{self.name}] reindex outdated documents")
self.send_progress("Add recent documents to the reindex Queue.")
populate.get_interval()
populate.add_recent()
self.send_progress("Add outdated documents to the reindex Queue.")
populate.add_outdated()
handler = Reindex(task=self)
handler.reindex_all()
return handler.build_message()
@shared_task(bind=True, name="manual_import", base=BaseTask)
def run_manual_import(self):
"""called from settings page, to go through import folder"""
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] manual import is already running")
self.send_progress("Manual import is already running.")
return
manager.init(self)
ImportFolderScanner(task=self).scan()
@shared_task(bind=True, name="run_backup", base=BaseTask)
def run_backup(self, reason="auto"):
"""called from settings page, dump backup to zip file"""
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] backup is already running")
self.send_progress("Backup is already running.")
return
manager.init(self)
ElasticBackup(reason=reason, task=self).backup_all_indexes()
@shared_task(bind=True, name="restore_backup", base=BaseTask)
def run_restore_backup(self, filename):
"""called from settings page, dump backup to zip file"""
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] restore is already running")
self.send_progress("Restore is already running.")
return None
manager.init(self)
self.send_progress(["Reset your Index"])
ElasitIndexWrap().reset()
ElasticBackup(task=self).restore(filename)
print("index restore finished")
return f"backup restore completed: {filename}"
@shared_task(bind=True, name="rescan_filesystem", base=BaseTask)
def rescan_filesystem(self):
"""check the media folder for mismatches"""
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] filesystem rescan already running")
self.send_progress("Filesystem Rescan is already running.")
return
manager.init(self)
handler = Scanner(task=self)
handler.scan()
handler.apply()
ThumbValidator(task=self).validate()
@shared_task(bind=True, name="thumbnail_check", base=BaseTask)
def thumbnail_check(self):
"""validate thumbnails"""
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] thumbnail check is already running")
self.send_progress("Thumbnail check is already running.")
return
manager.init(self)
thumnail = ThumbValidator(task=self)
thumnail.validate()
thumnail.clean_up()
@shared_task(bind=True, name="resync_thumbs", base=BaseTask)
def re_sync_thumbs(self):
"""sync thumbnails to mediafiles"""
manager = TaskManager()
if manager.is_pending(self):
print(f"[task][{self.name}] thumb re-embed is already running")
self.send_progress("Thumbnail re-embed is already running.")
return
manager.init(self)
ThumbFilesystem(task=self).embed()
@shared_task(bind=True, name="subscribe_to", base=BaseTask)
def subscribe_to(self, url_str: str, expected_type: str | bool = False):
"""
take a list of urls to subscribe to
optionally validate expected_type channel / playlist
"""
SubscriptionHandler(url_str, task=self).subscribe(expected_type)
@shared_task(bind=True, name="index_playlists", base=BaseTask)
def index_channel_playlists(self, channel_id):
"""add all playlists of channel to index"""
channel = YoutubeChannel(channel_id, task=self)
channel.index_channel_playlists()
@shared_task(name="version_check")
def version_check():
"""check for new updates"""
ReleaseVersion().check()