From 9366b8eab9c32e07061466b15591b379b60d5975 Mon Sep 17 00:00:00 2001 From: Simon <35427372+bbilly1@users.noreply.github.com> Date: Thu, 9 May 2024 20:22:36 +0700 Subject: [PATCH] Feature beat model (#713) * add django-celery-beat * implement schedule migration * fix version_check migration * remove old schedule init * better schedule migration * fix task_config migration * show task config on settings page * fix notify url builder * refactor celery initiation * fix get task * fix scheduler mig * fix linter * better task_config store on periodic task * save new schedules * fix task_config extraction from custom model * implement auto schedule * implement schedule delete * refactor notifications to ES config storage * downgrade redis * better notification migration to ES * add notification url handling * fix worker start * fix docs spelling * don't resend form data on notification refresh * fix type hints * move TASK_CONFIG to separate module * fix partial task config imports * fix yt_obs typing * delete schedule * remove outdated instructions * create initial schedules * fix reindex days config key * fix doc string * unregister BeatModels --- docker_assets/run.sh | 4 +- tubearchivist/api/urls.py | 10 + tubearchivist/api/views.py | 66 +++++- .../config/management/commands/ta_startup.py | 188 ++++++++++++++- tubearchivist/config/settings.py | 1 + tubearchivist/home/__init__.py | 6 +- tubearchivist/home/admin.py | 10 + tubearchivist/home/celery.py | 24 ++ tubearchivist/home/config.json | 13 - .../migrations/0002_customperiodictask.py | 23 ++ tubearchivist/home/models.py | 7 + tubearchivist/home/src/es/backup.py | 8 +- tubearchivist/home/src/frontend/forms.py | 44 ---- .../home/src/frontend/forms_schedule.py | 95 ++++++++ tubearchivist/home/src/index/generic.py | 2 +- tubearchivist/home/src/index/reindex.py | 16 +- tubearchivist/home/src/ta/config.py | 161 +------------ tubearchivist/home/src/ta/config_schedule.py | 89 +++++++ tubearchivist/home/src/ta/notify.py | 146 +++++++++--- tubearchivist/home/src/ta/task_config.py | 125 ++++++++++ tubearchivist/home/src/ta/task_manager.py | 9 +- tubearchivist/home/tasks.py | 103 +------- .../templates/home/settings_scheduling.html | 120 +++++----- tubearchivist/home/views.py | 62 ++++- tubearchivist/requirements.txt | 9 +- tubearchivist/static/progress.js | 6 +- tubearchivist/static/script.js | 224 +++++++++++------- 27 files changed, 1051 insertions(+), 520 deletions(-) create mode 100644 tubearchivist/home/celery.py create mode 100644 tubearchivist/home/migrations/0002_customperiodictask.py create mode 100644 tubearchivist/home/src/frontend/forms_schedule.py create mode 100644 tubearchivist/home/src/ta/config_schedule.py create mode 100644 tubearchivist/home/src/ta/task_config.py diff --git a/docker_assets/run.sh b/docker_assets/run.sh index 4b60e585..8cda03c0 100644 --- a/docker_assets/run.sh +++ b/docker_assets/run.sh @@ -17,7 +17,7 @@ python manage.py ta_startup # start all tasks nginx & -celery -A home.tasks worker --loglevel=INFO --max-tasks-per-child 10 & +celery -A home.celery worker --loglevel=INFO --max-tasks-per-child 10 & celery -A home beat --loglevel=INFO \ - -s "${BEAT_SCHEDULE_PATH:-${cachedir}/celerybeat-schedule}" & + --scheduler django_celery_beat.schedulers:DatabaseScheduler & uwsgi --ini uwsgi.ini diff --git a/tubearchivist/api/urls.py b/tubearchivist/api/urls.py index b03547cd..4ad57a73 100644 --- a/tubearchivist/api/urls.py +++ b/tubearchivist/api/urls.py @@ -121,6 +121,16 @@ urlpatterns = [ views.TaskIDView.as_view(), name="api-task-id", ), + path( + "schedule/", + views.ScheduleView.as_view(), + name="api-schedule", + ), + path( + "schedule/notification/", + views.ScheduleNotification.as_view(), + name="api-schedule-notification", + ), path( "config/user/", views.UserConfigView.as_view(), diff --git a/tubearchivist/api/views.py b/tubearchivist/api/views.py index 77c345ab..a9359874 100644 --- a/tubearchivist/api/views.py +++ b/tubearchivist/api/views.py @@ -10,6 +10,7 @@ from api.src.aggs import ( WatchProgress, ) from api.src.search_processor import SearchProcess +from home.models import CustomPeriodicTask from home.src.download.queue import PendingInteract from home.src.download.subscriptions import ( ChannelSubscription, @@ -27,13 +28,14 @@ from home.src.index.playlist import YoutubePlaylist from home.src.index.reindex import ReindexProgress from home.src.index.video import SponsorBlock, YoutubeVideo from home.src.ta.config import AppConfig, ReleaseVersion +from home.src.ta.notify import Notifications, get_all_notifications from home.src.ta.settings import EnvironmentSettings from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.task_config import TASK_CONFIG from home.src.ta.task_manager import TaskCommand, TaskManager from home.src.ta.urlparser import Parser from home.src.ta.users import UserConfig from home.tasks import ( - BaseTask, check_reindex, download_pending, extrac_dl, @@ -911,7 +913,7 @@ class TaskNameListView(ApiBaseView): def get(self, request, task_name): """handle get request""" # pylint: disable=unused-argument - if task_name not in BaseTask.TASK_CONFIG: + if task_name not in TASK_CONFIG: message = {"message": "invalid task name"} return Response(message, status=404) @@ -926,12 +928,12 @@ class TaskNameListView(ApiBaseView): 400 if task can't be started here without argument """ # pylint: disable=unused-argument - task_config = BaseTask.TASK_CONFIG.get(task_name) + task_config = TASK_CONFIG.get(task_name) if not task_config: message = {"message": "invalid task name"} return Response(message, status=404) - if not task_config.get("api-start"): + if not task_config.get("api_start"): message = {"message": "can not start task through this endpoint"} return Response(message, status=400) @@ -970,16 +972,16 @@ class TaskIDView(ApiBaseView): message = {"message": "task id not found"} return Response(message, status=404) - task_conf = BaseTask.TASK_CONFIG.get(task_result.get("name")) + task_conf = TASK_CONFIG.get(task_result.get("name")) if command == "stop": - if not task_conf.get("api-stop"): + if not task_conf.get("api_stop"): message = {"message": "task can not be stopped"} return Response(message, status=400) message_key = self._build_message_key(task_conf, task_id) TaskCommand().stop(task_id, message_key) if command == "kill": - if not task_conf.get("api-stop"): + if not task_conf.get("api_stop"): message = {"message": "task can not be killed"} return Response(message, status=400) @@ -992,6 +994,56 @@ class TaskIDView(ApiBaseView): return f"message:{task_conf.get('group')}:{task_id.split('-')[0]}" +class ScheduleView(ApiBaseView): + """resolves to /api/schedule/ + DEL: delete schedule for task + """ + + permission_classes = [AdminOnly] + + def delete(self, request): + """delete schedule by task_name query""" + task_name = request.data.get("task_name") + try: + task = CustomPeriodicTask.objects.get(name=task_name) + except CustomPeriodicTask.DoesNotExist: + message = {"message": "task_name not found"} + return Response(message, status=404) + + _ = task.delete() + + return Response({"success": True}) + + +class ScheduleNotification(ApiBaseView): + """resolves to /api/schedule/notification/ + GET: get all schedule notifications + DEL: delete notification + """ + + def get(self, request): + """handle get request""" + + return Response(get_all_notifications()) + + def delete(self, request): + """handle delete""" + + task_name = request.data.get("task_name") + url = request.data.get("url") + + if not TASK_CONFIG.get(task_name): + message = {"message": "task_name not found"} + return Response(message, status=404) + + if url: + response, status_code = Notifications(task_name).remove_url(url) + else: + response, status_code = Notifications(task_name).remove_task() + + return Response({"response": response, "status_code": status_code}) + + class RefreshView(ApiBaseView): """resolves to /api/refresh/ GET: get refresh progress diff --git a/tubearchivist/config/management/commands/ta_startup.py b/tubearchivist/config/management/commands/ta_startup.py index 16d43d2f..bb2ced8f 100644 --- a/tubearchivist/config/management/commands/ta_startup.py +++ b/tubearchivist/config/management/commands/ta_startup.py @@ -5,16 +5,23 @@ Functionality: """ import os +from random import randint from time import sleep +from django.conf import settings from django.core.management.base import BaseCommand, CommandError +from django_celery_beat.models import CrontabSchedule +from home.models import CustomPeriodicTask from home.src.es.connect import ElasticWrap from home.src.es.index_setup import ElasitIndexWrap from home.src.es.snapshot import ElasticSnapshot from home.src.ta.config import AppConfig, ReleaseVersion +from home.src.ta.config_schedule import ScheduleBuilder from home.src.ta.helper import clear_dl_cache +from home.src.ta.notify import Notifications from home.src.ta.settings import EnvironmentSettings from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.task_config import TASK_CONFIG from home.src.ta.task_manager import TaskManager from home.src.ta.users import UserConfig @@ -45,7 +52,9 @@ class Command(BaseCommand): self._mig_index_setup() self._mig_snapshot_check() self._mig_move_users_to_es() + self._mig_schedule_store() self._mig_custom_playlist() + self._create_default_schedules() def _sync_redis_state(self): """make sure redis gets new config.json values""" @@ -245,8 +254,134 @@ class Command(BaseCommand): ) ) + def _mig_schedule_store(self): + """ + update from 0.4.4 to 0.4.5 + migrate schedule task store to CustomCronSchedule + """ + self.stdout.write("[MIGRATION] migrate schedule store") + config = AppConfig().config + current_schedules = config.get("scheduler") + if not current_schedules: + self.stdout.write( + self.style.SUCCESS(" no schedules to migrate") + ) + return + + self._mig_update_subscribed(current_schedules) + self._mig_download_pending(current_schedules) + self._mig_check_reindex(current_schedules) + self._mig_thumbnail_check(current_schedules) + self._mig_run_backup(current_schedules) + self._mig_version_check() + + del config["scheduler"] + RedisArchivist().set_message("config", config, save=True) + + def _mig_update_subscribed(self, current_schedules): + """create update_subscribed schedule""" + task_name = "update_subscribed" + update_subscribed_schedule = current_schedules.get(task_name) + if update_subscribed_schedule: + self._create_task(task_name, update_subscribed_schedule) + + self._create_notifications(task_name, current_schedules) + + def _mig_download_pending(self, current_schedules): + """create download_pending schedule""" + task_name = "download_pending" + download_pending_schedule = current_schedules.get(task_name) + if download_pending_schedule: + self._create_task(task_name, download_pending_schedule) + + self._create_notifications(task_name, current_schedules) + + def _mig_check_reindex(self, current_schedules): + """create check_reindex schedule""" + task_name = "check_reindex" + check_reindex_schedule = current_schedules.get(task_name) + if check_reindex_schedule: + task_config = {} + days = current_schedules.get("check_reindex_days") + if days: + task_config.update({"days": days}) + + self._create_task( + task_name, + check_reindex_schedule, + task_config=task_config, + ) + + self._create_notifications(task_name, current_schedules) + + def _mig_thumbnail_check(self, current_schedules): + """create thumbnail_check schedule""" + thumbnail_check_schedule = current_schedules.get("thumbnail_check") + if thumbnail_check_schedule: + self._create_task("thumbnail_check", thumbnail_check_schedule) + + def _mig_run_backup(self, current_schedules): + """create run_backup schedule""" + run_backup_schedule = current_schedules.get("run_backup") + if run_backup_schedule: + task_config = False + rotate = current_schedules.get("run_backup_rotate") + if rotate: + task_config = {"rotate": rotate} + + self._create_task( + "run_backup", run_backup_schedule, task_config=task_config + ) + + def _mig_version_check(self): + """create version_check schedule""" + version_check_schedule = { + "minute": randint(0, 59), + "hour": randint(0, 23), + "day_of_week": "*", + } + self._create_task("version_check", version_check_schedule) + + def _create_task(self, task_name, schedule, task_config=False): + """create task""" + description = TASK_CONFIG[task_name].get("title") + schedule, _ = CrontabSchedule.objects.get_or_create(**schedule) + schedule.timezone = settings.TIME_ZONE + schedule.save() + + task, _ = CustomPeriodicTask.objects.get_or_create( + crontab=schedule, + name=task_name, + description=description, + task=task_name, + ) + if task_config: + task.task_config = task_config + task.save() + + self.stdout.write( + self.style.SUCCESS(f" ✓ new task created: '{task}'") + ) + + def _create_notifications(self, task_name, current_schedules): + """migrate notifications of task""" + notifications = current_schedules.get(f"{task_name}_notify") + if not notifications: + return + + urls = [i.strip() for i in notifications.split()] + if not urls: + return + + self.stdout.write( + self.style.SUCCESS(f" ✓ migrate notifications: '{urls}'") + ) + handler = Notifications(task_name) + for url in urls: + handler.add_url(url) + def _mig_custom_playlist(self): - """migration for custom playlist""" + """add playlist_type for migration t0 v0.4.7""" self.stdout.write("[MIGRATION] custom playlist") data = { "query": { @@ -277,3 +412,54 @@ class Command(BaseCommand): self.stdout.write(response) sleep(60) raise CommandError(message) + + def _create_default_schedules(self) -> None: + """ + create default schedules for new installations + needs to be called after _mig_schedule_store + """ + self.stdout.write("[7] create initial schedules") + init_has_run = CustomPeriodicTask.objects.filter( + name="version_check" + ).exists() + + if init_has_run: + self.stdout.write( + self.style.SUCCESS( + " schedule init already done, skipping..." + ) + ) + return + + builder = ScheduleBuilder() + check_reindex = builder.get_set_task( + "check_reindex", schedule=builder.SCHEDULES["check_reindex"] + ) + check_reindex.task_config.update({"days": 90}) + check_reindex.save() + self.stdout.write( + self.style.SUCCESS( + f" ✓ created new default schedule: {check_reindex}" + ) + ) + + thumbnail_check = builder.get_set_task( + "thumbnail_check", schedule=builder.SCHEDULES["thumbnail_check"] + ) + self.stdout.write( + self.style.SUCCESS( + f" ✓ created new default schedule: {thumbnail_check}" + ) + ) + daily_random = f"{randint(0, 59)} {randint(0, 23)} *" + version_check = builder.get_set_task( + "version_check", schedule=daily_random + ) + self.stdout.write( + self.style.SUCCESS( + f" ✓ created new default schedule: {version_check}" + ) + ) + self.stdout.write( + self.style.SUCCESS(" ✓ all default schedules created") + ) diff --git a/tubearchivist/config/settings.py b/tubearchivist/config/settings.py index 3be6177b..ac41eaa0 100644 --- a/tubearchivist/config/settings.py +++ b/tubearchivist/config/settings.py @@ -38,6 +38,7 @@ ALLOWED_HOSTS, CSRF_TRUSTED_ORIGINS = ta_host_parser(environ["TA_HOST"]) # Application definition INSTALLED_APPS = [ + "django_celery_beat", "home.apps.HomeConfig", "django.contrib.admin", "django.contrib.auth", diff --git a/tubearchivist/home/__init__.py b/tubearchivist/home/__init__.py index 736385d6..2e00ac76 100644 --- a/tubearchivist/home/__init__.py +++ b/tubearchivist/home/__init__.py @@ -1,5 +1,7 @@ -""" handle celery startup """ +"""start celery app""" -from .tasks import app as celery_app +from __future__ import absolute_import, unicode_literals + +from home.celery import app as celery_app __all__ = ("celery_app",) diff --git a/tubearchivist/home/admin.py b/tubearchivist/home/admin.py index 2bcb7011..3c6e83c5 100644 --- a/tubearchivist/home/admin.py +++ b/tubearchivist/home/admin.py @@ -2,6 +2,7 @@ from django.contrib import admin from django.contrib.auth.admin import UserAdmin as BaseUserAdmin +from django_celery_beat import models as BeatModels from .models import Account @@ -34,3 +35,12 @@ class HomeAdmin(BaseUserAdmin): admin.site.register(Account, HomeAdmin) +admin.site.unregister( + [ + BeatModels.ClockedSchedule, + BeatModels.CrontabSchedule, + BeatModels.IntervalSchedule, + BeatModels.PeriodicTask, + BeatModels.SolarSchedule, + ] +) diff --git a/tubearchivist/home/celery.py b/tubearchivist/home/celery.py new file mode 100644 index 00000000..9f65795b --- /dev/null +++ b/tubearchivist/home/celery.py @@ -0,0 +1,24 @@ +"""initiate celery""" + +import os + +from celery import Celery +from home.src.ta.config import AppConfig +from home.src.ta.settings import EnvironmentSettings + +CONFIG = AppConfig().config +REDIS_HOST = EnvironmentSettings.REDIS_HOST +REDIS_PORT = EnvironmentSettings.REDIS_PORT + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") +app = Celery( + "tasks", + broker=f"redis://{REDIS_HOST}:{REDIS_PORT}", + backend=f"redis://{REDIS_HOST}:{REDIS_PORT}", + result_extended=True, +) +app.config_from_object( + "django.conf:settings", namespace=EnvironmentSettings.REDIS_NAME_SPACE +) +app.autodiscover_tasks() +app.conf.timezone = EnvironmentSettings.TZ diff --git a/tubearchivist/home/config.json b/tubearchivist/home/config.json index 1b655f47..184721b4 100644 --- a/tubearchivist/home/config.json +++ b/tubearchivist/home/config.json @@ -26,18 +26,5 @@ }, "application": { "enable_snapshot": true - }, - "scheduler": { - "update_subscribed": false, - "update_subscribed_notify": false, - "download_pending": false, - "download_pending_notify": false, - "check_reindex": {"minute": "0", "hour": "12", "day_of_week": "*"}, - "check_reindex_notify": false, - "check_reindex_days": 90, - "thumbnail_check": {"minute": "0", "hour": "17", "day_of_week": "*"}, - "run_backup": false, - "run_backup_rotate": 5, - "version_check": "rand-d" } } diff --git a/tubearchivist/home/migrations/0002_customperiodictask.py b/tubearchivist/home/migrations/0002_customperiodictask.py new file mode 100644 index 00000000..cc584980 --- /dev/null +++ b/tubearchivist/home/migrations/0002_customperiodictask.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.7 on 2023-12-05 13:47 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_beat', '0018_improve_crontab_helptext'), + ('home', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='CustomPeriodicTask', + fields=[ + ('periodictask_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='django_celery_beat.periodictask')), + ('task_config', models.JSONField(default=dict)), + ], + bases=('django_celery_beat.periodictask',), + ), + ] diff --git a/tubearchivist/home/models.py b/tubearchivist/home/models.py index 2dacd0c7..3f0c376b 100644 --- a/tubearchivist/home/models.py +++ b/tubearchivist/home/models.py @@ -6,6 +6,7 @@ from django.contrib.auth.models import ( PermissionsMixin, ) from django.db import models +from django_celery_beat.models import PeriodicTask class AccountManager(BaseUserManager): @@ -52,3 +53,9 @@ class Account(AbstractBaseUser, PermissionsMixin): USERNAME_FIELD = "name" REQUIRED_FIELDS = ["password"] + + +class CustomPeriodicTask(PeriodicTask): + """add custom metadata to to task""" + + task_config = models.JSONField(default=dict) diff --git a/tubearchivist/home/src/es/backup.py b/tubearchivist/home/src/es/backup.py index 1c3778bb..3f134472 100644 --- a/tubearchivist/home/src/es/backup.py +++ b/tubearchivist/home/src/es/backup.py @@ -10,6 +10,7 @@ import os import zipfile from datetime import datetime +from home.models import CustomPeriodicTask from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.ta.config import AppConfig from home.src.ta.helper import get_mapping, ignore_filelist @@ -197,7 +198,12 @@ class ElasticBackup: def rotate_backup(self): """delete old backups if needed""" - rotate = self.config["scheduler"]["run_backup_rotate"] + try: + task = CustomPeriodicTask.objects.get(name="run_backup") + except CustomPeriodicTask.DoesNotExist: + return + + rotate = task.task_config.get("rotate") if not rotate: return diff --git a/tubearchivist/home/src/frontend/forms.py b/tubearchivist/home/src/frontend/forms.py index f8fcb798..7f42797c 100644 --- a/tubearchivist/home/src/frontend/forms.py +++ b/tubearchivist/home/src/frontend/forms.py @@ -159,50 +159,6 @@ class ApplicationSettingsForm(forms.Form): ) -class SchedulerSettingsForm(forms.Form): - """handle scheduler settings""" - - HELP_TEXT = "Add Apprise notification URLs, one per line" - - update_subscribed = forms.CharField(required=False) - update_subscribed_notify = forms.CharField( - label=False, - widget=forms.Textarea( - attrs={ - "rows": 2, - "placeholder": HELP_TEXT, - } - ), - required=False, - ) - download_pending = forms.CharField(required=False) - download_pending_notify = forms.CharField( - label=False, - widget=forms.Textarea( - attrs={ - "rows": 2, - "placeholder": HELP_TEXT, - } - ), - required=False, - ) - check_reindex = forms.CharField(required=False) - check_reindex_notify = forms.CharField( - label=False, - widget=forms.Textarea( - attrs={ - "rows": 2, - "placeholder": HELP_TEXT, - } - ), - required=False, - ) - check_reindex_days = forms.IntegerField(required=False) - thumbnail_check = forms.CharField(required=False) - run_backup = forms.CharField(required=False) - run_backup_rotate = forms.IntegerField(required=False) - - class MultiSearchForm(forms.Form): """multi search form for /search/""" diff --git a/tubearchivist/home/src/frontend/forms_schedule.py b/tubearchivist/home/src/frontend/forms_schedule.py new file mode 100644 index 00000000..fc0e7eed --- /dev/null +++ b/tubearchivist/home/src/frontend/forms_schedule.py @@ -0,0 +1,95 @@ +""" +Functionality: +- handle schedule forms +- implement form validation +""" + +from celery.schedules import crontab +from django import forms + + +class CrontabValidator: + """validate crontab""" + + @staticmethod + def validate_fields(cron_fields): + """expect 3 cron fields""" + if not len(cron_fields) == 3: + raise forms.ValidationError("expected three cron schedule fields") + + @staticmethod + def validate_minute(minute_field): + """expect minute int""" + try: + minute_value = int(minute_field) + if not 0 <= minute_value <= 59: + raise forms.ValidationError( + "Invalid value for minutes. Must be between 0 and 59." + ) + except ValueError as err: + raise forms.ValidationError( + "Invalid value for minutes. Must be an integer." + ) from err + + @staticmethod + def validate_cron_tab(minute, hour, day_of_week): + """check if crontab can be created""" + try: + crontab(minute=minute, hour=hour, day_of_week=day_of_week) + except ValueError as err: + raise forms.ValidationError(f"invalid crontab: {err}") from err + + def validate(self, cron_expression): + """create crontab schedule""" + if cron_expression == "auto": + return + + cron_fields = cron_expression.split() + self.validate_fields(cron_fields) + + minute, hour, day_of_week = cron_fields + self.validate_minute(minute) + self.validate_cron_tab(minute, hour, day_of_week) + + +def validate_cron(cron_expression): + """callable for field""" + CrontabValidator().validate(cron_expression) + + +class SchedulerSettingsForm(forms.Form): + """handle scheduler settings""" + + update_subscribed = forms.CharField( + required=False, validators=[validate_cron] + ) + download_pending = forms.CharField( + required=False, validators=[validate_cron] + ) + check_reindex = forms.CharField(required=False, validators=[validate_cron]) + check_reindex_days = forms.IntegerField(required=False) + thumbnail_check = forms.CharField( + required=False, validators=[validate_cron] + ) + run_backup = forms.CharField(required=False, validators=[validate_cron]) + run_backup_rotate = forms.IntegerField(required=False) + + +class NotificationSettingsForm(forms.Form): + """add notification URL""" + + TASK_CHOICES = [ + ("", "-- select task --"), + ("update_subscribed", "Rescan your Subscriptions"), + ("download_pending", "Downloading"), + ("check_reindex", "Reindex Documents"), + ] + PLACEHOLDER = "Apprise notification URL" + + task = forms.ChoiceField( + widget=forms.Select, choices=TASK_CHOICES, required=False + ) + notification_url = forms.CharField( + required=False, + widget=forms.TextInput(attrs={"placeholder": PLACEHOLDER}), + ) diff --git a/tubearchivist/home/src/index/generic.py b/tubearchivist/home/src/index/generic.py index e18623ee..8a502bb5 100644 --- a/tubearchivist/home/src/index/generic.py +++ b/tubearchivist/home/src/index/generic.py @@ -17,7 +17,7 @@ class YouTubeItem: es_path = False index_name = "" yt_base = "" - yt_obs = { + yt_obs: dict[str, bool | str] = { "skip_download": True, "noplaylist": True, } diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index 10a5596d..ca2bf37a 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -9,6 +9,7 @@ import os from datetime import datetime from time import sleep +from home.models import CustomPeriodicTask from home.src.download.queue import PendingList from home.src.download.subscriptions import ChannelSubscription from home.src.download.thumbnails import ThumbManager @@ -67,9 +68,22 @@ class ReindexBase: class ReindexPopulate(ReindexBase): """add outdated and recent documents to reindex queue""" + INTERVAL_DEFAIULT = 90 + def __init__(self): super().__init__() - self.interval = self.config["scheduler"]["check_reindex_days"] + self.interval = self.INTERVAL_DEFAIULT + + def get_interval(self): + """get reindex days interval from task""" + try: + task = CustomPeriodicTask.objects.get(name="check_reindex") + except CustomPeriodicTask.DoesNotExist: + return + + task_config = task.task_config + if task_config.get("days"): + self.interval = task_config.get("days") def add_recent(self): """add recent videos to refresh""" diff --git a/tubearchivist/home/src/ta/config.py b/tubearchivist/home/src/ta/config.py index 39819322..195e6527 100644 --- a/tubearchivist/home/src/ta/config.py +++ b/tubearchivist/home/src/ta/config.py @@ -5,12 +5,10 @@ Functionality: """ import json -import re from random import randint from time import sleep import requests -from celery.schedules import crontab from django.conf import settings from home.src.ta.ta_redis import RedisArchivist @@ -74,15 +72,6 @@ class AppConfig: RedisArchivist().set_message("config", self.config, save=True) return updated - @staticmethod - def _build_rand_daily(): - """build random daily schedule per installation""" - return { - "minute": randint(0, 59), - "hour": randint(0, 23), - "day_of_week": "*", - } - def load_new_defaults(self): """check config.json for missing defaults""" default_config = self.get_config_file() @@ -91,7 +80,6 @@ class AppConfig: # check for customizations if not redis_config: config = self.get_config() - config["scheduler"]["version_check"] = self._build_rand_daily() RedisArchivist().set_message("config", config) return False @@ -106,13 +94,7 @@ class AppConfig: # missing nested values for sub_key, sub_value in value.items(): - if ( - sub_key not in redis_config[key].keys() - or sub_value == "rand-d" - ): - if sub_value == "rand-d": - sub_value = self._build_rand_daily() - + if sub_key not in redis_config[key].keys(): redis_config[key].update({sub_key: sub_value}) needs_update = True @@ -122,147 +104,6 @@ class AppConfig: return needs_update -class ScheduleBuilder: - """build schedule dicts for beat""" - - SCHEDULES = { - "update_subscribed": "0 8 *", - "download_pending": "0 16 *", - "check_reindex": "0 12 *", - "thumbnail_check": "0 17 *", - "run_backup": "0 18 0", - "version_check": "0 11 *", - } - CONFIG = ["check_reindex_days", "run_backup_rotate"] - NOTIFY = [ - "update_subscribed_notify", - "download_pending_notify", - "check_reindex_notify", - ] - MSG = "message:setting" - - def __init__(self): - self.config = AppConfig().config - - def update_schedule_conf(self, form_post): - """process form post""" - print("processing form, restart container for changes to take effect") - redis_config = self.config - for key, value in form_post.items(): - if key in self.SCHEDULES and value: - try: - to_write = self.value_builder(key, value) - except ValueError: - print(f"failed: {key} {value}") - mess_dict = { - "group": "setting:schedule", - "level": "error", - "title": "Scheduler update failed.", - "messages": ["Invalid schedule input"], - "id": "0000", - } - RedisArchivist().set_message( - self.MSG, mess_dict, expire=True - ) - return - - redis_config["scheduler"][key] = to_write - if key in self.CONFIG and value: - redis_config["scheduler"][key] = int(value) - if key in self.NOTIFY and value: - if value == "0": - to_write = False - else: - to_write = value - redis_config["scheduler"][key] = to_write - - RedisArchivist().set_message("config", redis_config, save=True) - mess_dict = { - "group": "setting:schedule", - "level": "info", - "title": "Scheduler changed.", - "messages": ["Restart container for changes to take effect"], - "id": "0000", - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=True) - - def value_builder(self, key, value): - """validate single cron form entry and return cron dict""" - print(f"change schedule for {key} to {value}") - if value == "0": - # deactivate this schedule - return False - if re.search(r"[\d]{1,2}\/[\d]{1,2}", value): - # number/number cron format will fail in celery - print("number/number schedule formatting not supported") - raise ValueError - - keys = ["minute", "hour", "day_of_week"] - if value == "auto": - # set to sensible default - values = self.SCHEDULES[key].split() - else: - values = value.split() - - if len(keys) != len(values): - print(f"failed to parse {value} for {key}") - raise ValueError("invalid input") - - to_write = dict(zip(keys, values)) - self._validate_cron(to_write) - - return to_write - - @staticmethod - def _validate_cron(to_write): - """validate all fields, raise value error for impossible schedule""" - all_hours = list(re.split(r"\D+", to_write["hour"])) - for hour in all_hours: - if hour.isdigit() and int(hour) > 23: - print("hour can not be greater than 23") - raise ValueError("invalid input") - - all_days = list(re.split(r"\D+", to_write["day_of_week"])) - for day in all_days: - if day.isdigit() and int(day) > 6: - print("day can not be greater than 6") - raise ValueError("invalid input") - - if not to_write["minute"].isdigit(): - print("too frequent: only number in minutes are supported") - raise ValueError("invalid input") - - if int(to_write["minute"]) > 59: - print("minutes can not be greater than 59") - raise ValueError("invalid input") - - def build_schedule(self): - """build schedule dict as expected by app.conf.beat_schedule""" - AppConfig().load_new_defaults() - self.config = AppConfig().config - schedule_dict = {} - - for schedule_item in self.SCHEDULES: - item_conf = self.config["scheduler"][schedule_item] - if not item_conf: - continue - - schedule_dict.update( - { - f"schedule_{schedule_item}": { - "task": schedule_item, - "schedule": crontab( - minute=item_conf["minute"], - hour=item_conf["hour"], - day_of_week=item_conf["day_of_week"], - ), - } - } - ) - - return schedule_dict - - class ReleaseVersion: """compare local version with remote version""" diff --git a/tubearchivist/home/src/ta/config_schedule.py b/tubearchivist/home/src/ta/config_schedule.py new file mode 100644 index 00000000..24b81ca9 --- /dev/null +++ b/tubearchivist/home/src/ta/config_schedule.py @@ -0,0 +1,89 @@ +""" +Functionality: +- Handle scheduler config update +""" + +from django_celery_beat.models import CrontabSchedule +from home.models import CustomPeriodicTask +from home.src.ta.config import AppConfig +from home.src.ta.settings import EnvironmentSettings +from home.src.ta.task_config import TASK_CONFIG + + +class ScheduleBuilder: + """build schedule dicts for beat""" + + SCHEDULES = { + "update_subscribed": "0 8 *", + "download_pending": "0 16 *", + "check_reindex": "0 12 *", + "thumbnail_check": "0 17 *", + "run_backup": "0 18 0", + "version_check": "0 11 *", + } + CONFIG = { + "check_reindex_days": "check_reindex", + "run_backup_rotate": "run_backup", + "update_subscribed_notify": "update_subscribed", + "download_pending_notify": "download_pending", + "check_reindex_notify": "check_reindex", + } + MSG = "message:setting" + + def __init__(self): + self.config = AppConfig().config + + def update_schedule_conf(self, form_post): + """process form post, schedules need to be validated before""" + for key, value in form_post.items(): + if not value: + continue + + if key in self.SCHEDULES: + if value == "auto": + value = self.SCHEDULES.get(key) + + _ = self.get_set_task(key, value) + continue + + if key in self.CONFIG: + self.set_config(key, value) + + def get_set_task(self, task_name, schedule=False): + """get task""" + try: + task = CustomPeriodicTask.objects.get(name=task_name) + except CustomPeriodicTask.DoesNotExist: + description = TASK_CONFIG[task_name].get("title") + task = CustomPeriodicTask( + name=task_name, + task=task_name, + description=description, + ) + + if schedule: + task_crontab = self.get_set_cron_tab(schedule) + task.crontab = task_crontab + task.save() + + return task + + @staticmethod + def get_set_cron_tab(schedule): + """needs to be validated before""" + kwargs = dict(zip(["minute", "hour", "day_of_week"], schedule.split())) + kwargs.update({"timezone": EnvironmentSettings.TZ}) + crontab, _ = CrontabSchedule.objects.get_or_create(**kwargs) + + return crontab + + def set_config(self, key, value): + """set task_config""" + task_name = self.CONFIG.get(key) + if not task_name: + raise ValueError("invalid config key") + + task = CustomPeriodicTask.objects.get(name=task_name) + config_key = key.split(f"{task_name}_")[-1] + task.task_config.update({config_key: value}) + task.save() diff --git a/tubearchivist/home/src/ta/notify.py b/tubearchivist/home/src/ta/notify.py index 5b1c9c7a..63140775 100644 --- a/tubearchivist/home/src/ta/notify.py +++ b/tubearchivist/home/src/ta/notify.py @@ -1,55 +1,141 @@ """send notifications using apprise""" import apprise -from home.src.ta.config import AppConfig +from home.src.es.connect import ElasticWrap +from home.src.ta.task_config import TASK_CONFIG from home.src.ta.task_manager import TaskManager class Notifications: - """notification handler""" + """store notifications in ES""" - def __init__(self, name: str, task_id: str, task_title: str): - self.name: str = name - self.task_id: str = task_id - self.task_title: str = task_title + GET_PATH = "ta_config/_doc/notify" + UPDATE_PATH = "ta_config/_update/notify/" - def send(self) -> None: + def __init__(self, task_name: str): + self.task_name = task_name + + def send(self, task_id: str, task_title: str) -> None: """send notifications""" apobj = apprise.Apprise() - hooks: str | None = self.get_url() - if not hooks: + urls: list[str] = self.get_urls() + if not urls: return - hook_list: list[str] = self.parse_hooks(hooks=hooks) - title, body = self.build_message() + title, body = self._build_message(task_id, task_title) if not body: return - for hook in hook_list: - apobj.add(hook) + for url in urls: + apobj.add(url) apobj.notify(body=body, title=title) - def get_url(self) -> str | None: - """get apprise urls for task""" - config = AppConfig().config - hooks: str = config["scheduler"].get(f"{self.name}_notify") - - return hooks - - def parse_hooks(self, hooks: str) -> list[str]: - """create list of hooks""" - - hook_list: list[str] = [i.strip() for i in hooks.split()] - - return hook_list - - def build_message(self) -> tuple[str, str | None]: + def _build_message( + self, task_id: str, task_title: str + ) -> tuple[str, str | None]: """build message to send notification""" - task = TaskManager().get_task(self.task_id) + task = TaskManager().get_task(task_id) status = task.get("status") - title: str = f"[TA] {self.task_title} process ended with {status}" + title: str = f"[TA] {task_title} process ended with {status}" body: str | None = task.get("result") return title, body + + def get_urls(self) -> list[str]: + """get stored urls for task""" + response, code = ElasticWrap(self.GET_PATH).get(print_error=False) + if not code == 200: + return [] + + urls = response["_source"].get(self.task_name, []) + + return urls + + def add_url(self, url: str) -> None: + """add url to task notification""" + source = ( + "if (!ctx._source.containsKey(params.task_name)) " + + "{ctx._source[params.task_name] = [params.url]} " + + "else if (!ctx._source[params.task_name].contains(params.url)) " + + "{ctx._source[params.task_name].add(params.url)} " + + "else {ctx.op = 'none'}" + ) + + data = { + "script": { + "source": source, + "lang": "painless", + "params": {"url": url, "task_name": self.task_name}, + }, + "upsert": {self.task_name: [url]}, + } + + _, _ = ElasticWrap(self.UPDATE_PATH).post(data) + + def remove_url(self, url: str) -> tuple[dict, int]: + """remove url from task""" + source = ( + "if (ctx._source.containsKey(params.task_name) " + + "&& ctx._source[params.task_name].contains(params.url)) " + + "{ctx._source[params.task_name]." + + "remove(ctx._source[params.task_name].indexOf(params.url))}" + ) + + data = { + "script": { + "source": source, + "lang": "painless", + "params": {"url": url, "task_name": self.task_name}, + } + } + + response, status_code = ElasticWrap(self.UPDATE_PATH).post(data) + if not self.get_urls(): + _, _ = self.remove_task() + + return response, status_code + + def remove_task(self) -> tuple[dict, int]: + """remove all notifications from task""" + source = ( + "if (ctx._source.containsKey(params.task_name)) " + + "{ctx._source.remove(params.task_name)}" + ) + data = { + "script": { + "source": source, + "lang": "painless", + "params": {"task_name": self.task_name}, + } + } + + response, status_code = ElasticWrap(self.UPDATE_PATH).post(data) + + return response, status_code + + +def get_all_notifications() -> dict[str, list[str]]: + """get all notifications stored""" + path = "ta_config/_doc/notify" + response, status_code = ElasticWrap(path).get(print_error=False) + if not status_code == 200: + return {} + + notifications: dict = {} + source = response.get("_source") + if not source: + return notifications + + for task_id, urls in source.items(): + notifications.update( + { + task_id: { + "urls": urls, + "title": TASK_CONFIG[task_id]["title"], + } + } + ) + + return notifications diff --git a/tubearchivist/home/src/ta/task_config.py b/tubearchivist/home/src/ta/task_config.py new file mode 100644 index 00000000..5c3dc433 --- /dev/null +++ b/tubearchivist/home/src/ta/task_config.py @@ -0,0 +1,125 @@ +""" +Functionality: +- Static Task config values +- Type definitions +- separate to avoid circular imports +""" + +from typing import TypedDict + + +class TaskItemConfig(TypedDict): + """describes a task item config""" + + title: str + group: str + api_start: bool + api_stop: bool + + +UPDATE_SUBSCRIBED: TaskItemConfig = { + "title": "Rescan your Subscriptions", + "group": "download:scan", + "api_start": True, + "api_stop": True, +} + +DOWNLOAD_PENDING: TaskItemConfig = { + "title": "Downloading", + "group": "download:run", + "api_start": True, + "api_stop": True, +} + +EXTRACT_DOWNLOAD: TaskItemConfig = { + "title": "Add to download queue", + "group": "download:add", + "api_start": False, + "api_stop": True, +} + +CHECK_REINDEX: TaskItemConfig = { + "title": "Reindex Documents", + "group": "reindex:run", + "api_start": False, + "api_stop": False, +} + +MANUAL_IMPORT: TaskItemConfig = { + "title": "Manual video import", + "group": "setting:import", + "api_start": True, + "api_stop": False, +} + +RUN_BACKUP: TaskItemConfig = { + "title": "Index Backup", + "group": "setting:backup", + "api_start": True, + "api_stop": False, +} + +RESTORE_BACKUP: TaskItemConfig = { + "title": "Restore Backup", + "group": "setting:restore", + "api_start": False, + "api_stop": False, +} + +RESCAN_FILESYSTEM: TaskItemConfig = { + "title": "Rescan your Filesystem", + "group": "setting:filesystemscan", + "api_start": True, + "api_stop": False, +} + +THUMBNAIL_CHECK: TaskItemConfig = { + "title": "Check your Thumbnails", + "group": "setting:thumbnailcheck", + "api_start": True, + "api_stop": False, +} + +RESYNC_THUMBS: TaskItemConfig = { + "title": "Sync Thumbnails to Media Files", + "group": "setting:thumbnailsync", + "api_start": True, + "api_stop": False, +} + +INDEX_PLAYLISTS: TaskItemConfig = { + "title": "Index Channel Playlist", + "group": "channel:indexplaylist", + "api_start": False, + "api_stop": False, +} + +SUBSCRIBE_TO: TaskItemConfig = { + "title": "Add Subscription", + "group": "subscription:add", + "api_start": False, + "api_stop": False, +} + +VERSION_CHECK: TaskItemConfig = { + "title": "Look for new Version", + "group": "", + "api_start": False, + "api_stop": False, +} + +TASK_CONFIG: dict[str, TaskItemConfig] = { + "update_subscribed": UPDATE_SUBSCRIBED, + "download_pending": DOWNLOAD_PENDING, + "extract_download": EXTRACT_DOWNLOAD, + "check_reindex": CHECK_REINDEX, + "manual_import": MANUAL_IMPORT, + "run_backup": RUN_BACKUP, + "restore_backup": RESTORE_BACKUP, + "rescan_filesystem": RESCAN_FILESYSTEM, + "thumbnail_check": THUMBNAIL_CHECK, + "resync_thumbs": RESYNC_THUMBS, + "index_playlists": INDEX_PLAYLISTS, + "subscribe_to": SUBSCRIBE_TO, + "version_check": VERSION_CHECK, +} diff --git a/tubearchivist/home/src/ta/task_manager.py b/tubearchivist/home/src/ta/task_manager.py index 3771fdd6..0813ccb5 100644 --- a/tubearchivist/home/src/ta/task_manager.py +++ b/tubearchivist/home/src/ta/task_manager.py @@ -4,8 +4,9 @@ functionality: - handle threads and locks """ -from home import tasks as ta_tasks +from home.celery import app as celery_app from home.src.ta.ta_redis import RedisArchivist, TaskRedis +from home.src.ta.task_config import TASK_CONFIG class TaskManager: @@ -86,7 +87,7 @@ class TaskCommand: def start(self, task_name): """start task by task_name, only pass task that don't take args""" - task = ta_tasks.app.tasks.get(task_name).delay() + task = celery_app.tasks.get(task_name).delay() message = { "task_id": task.id, "status": task.status, @@ -104,7 +105,7 @@ class TaskCommand: handler = TaskRedis() task = handler.get_single(task_id) - if not task["name"] in ta_tasks.BaseTask.TASK_CONFIG: + if not task["name"] in TASK_CONFIG: raise ValueError handler.set_command(task_id, "STOP") @@ -113,4 +114,4 @@ class TaskCommand: def kill(self, task_id): """send kill signal to task_id""" print(f"[task][{task_id}]: received KILL signal.") - ta_tasks.app.control.revoke(task_id, terminate=True) + celery_app.control.revoke(task_id, terminate=True) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index e0ab8e7c..31c73789 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -1,14 +1,12 @@ """ Functionality: -- initiate celery app - collect tasks -- user config changes won't get applied here - because tasks are initiated at application start +- handle task callbacks +- handle task notifications +- handle task locking """ -import os - -from celery import Celery, Task, shared_task +from celery import Task, shared_task from home.src.download.queue import PendingList from home.src.download.subscriptions import ( SubscriptionHandler, @@ -22,97 +20,19 @@ from home.src.index.channel import YoutubeChannel from home.src.index.filesystem import Scanner from home.src.index.manual import ImportFolderScanner from home.src.index.reindex import Reindex, ReindexManual, ReindexPopulate -from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder +from home.src.ta.config import ReleaseVersion from home.src.ta.notify import Notifications -from home.src.ta.settings import EnvironmentSettings from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.task_config import TASK_CONFIG from home.src.ta.task_manager import TaskManager from home.src.ta.urlparser import Parser -CONFIG = AppConfig().config -REDIS_HOST = EnvironmentSettings.REDIS_HOST -REDIS_PORT = EnvironmentSettings.REDIS_PORT - -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") -app = Celery( - "tasks", - broker=f"redis://{REDIS_HOST}:{REDIS_PORT}", - backend=f"redis://{REDIS_HOST}:{REDIS_PORT}", - result_extended=True, -) -app.config_from_object( - "django.conf:settings", namespace=EnvironmentSettings.REDIS_NAME_SPACE -) -app.autodiscover_tasks() -app.conf.timezone = EnvironmentSettings.TZ - class BaseTask(Task): """base class to inherit each class from""" # pylint: disable=abstract-method - TASK_CONFIG = { - "update_subscribed": { - "title": "Rescan your Subscriptions", - "group": "download:scan", - "api-start": True, - "api-stop": True, - }, - "download_pending": { - "title": "Downloading", - "group": "download:run", - "api-start": True, - "api-stop": True, - }, - "extract_download": { - "title": "Add to download queue", - "group": "download:add", - "api-stop": True, - }, - "check_reindex": { - "title": "Reindex Documents", - "group": "reindex:run", - }, - "manual_import": { - "title": "Manual video import", - "group": "setting:import", - "api-start": True, - }, - "run_backup": { - "title": "Index Backup", - "group": "setting:backup", - "api-start": True, - }, - "restore_backup": { - "title": "Restore Backup", - "group": "setting:restore", - }, - "rescan_filesystem": { - "title": "Rescan your Filesystem", - "group": "setting:filesystemscan", - "api-start": True, - }, - "thumbnail_check": { - "title": "Check your Thumbnails", - "group": "setting:thumbnailcheck", - "api-start": True, - }, - "resync_thumbs": { - "title": "Sync Thumbnails to Media Files", - "group": "setting:thumbnailsync", - "api-start": True, - }, - "index_playlists": { - "title": "Index Channel Playlist", - "group": "channel:indexplaylist", - }, - "subscribe_to": { - "title": "Add Subscription", - "group": "subscription:add", - }, - } - def on_failure(self, exc, task_id, args, kwargs, einfo): """callback for task failure""" print(f"{task_id} Failed callback") @@ -137,8 +57,8 @@ class BaseTask(Task): def after_return(self, status, retval, task_id, args, kwargs, einfo): """callback after task returns""" print(f"{task_id} return callback") - task_title = self.TASK_CONFIG.get(self.name).get("title") - Notifications(self.name, task_id, task_title).send() + task_title = TASK_CONFIG.get(self.name).get("title") + Notifications(self.name).send(task_id, task_title) def send_progress(self, message_lines, progress=False, title=False): """send progress message""" @@ -157,7 +77,7 @@ class BaseTask(Task): def _build_message(self, level="info"): """build message dict""" task_id = self.request.id - message = self.TASK_CONFIG.get(self.name).copy() + message = TASK_CONFIG.get(self.name).copy() message.update({"level": level, "id": task_id}) task_result = TaskManager().get_task(task_id) if task_result: @@ -251,6 +171,7 @@ def check_reindex(self, data=False, extract_videos=False): populate = ReindexPopulate() print(f"[task][{self.name}] reindex outdated documents") self.send_progress("Add recent documents to the reindex Queue.") + populate.get_interval() populate.add_recent() self.send_progress("Add outdated documents to the reindex Queue.") populate.add_outdated() @@ -367,7 +288,3 @@ def index_channel_playlists(self, channel_id): def version_check(): """check for new updates""" ReleaseVersion().check() - - -# start schedule here -app.conf.beat_schedule = ScheduleBuilder().build_schedule() diff --git a/tubearchivist/home/templates/home/settings_scheduling.html b/tubearchivist/home/templates/home/settings_scheduling.html index 1b011c1c..b01f4644 100644 --- a/tubearchivist/home/templates/home/settings_scheduling.html +++ b/tubearchivist/home/templates/home/settings_scheduling.html @@ -10,11 +10,9 @@
  • 0 15 *: Run task every day at 15:00 in the afternoon.
  • 30 8 */2: Run task every second day of the week (Sun, Tue, Thu, Sat) at 08:30 in the morning.
  • auto: Sensible default.
  • -
  • 0: (zero), deactivate that task.
  • Note:

    @@ -24,68 +22,47 @@

    Rescan Subscriptions

    +

    Become a sponsor and join members.tubearchivist.com to get access to real time notifications for new videos uploaded by your favorite channels.

    Current rescan schedule: - {% if config.scheduler.update_subscribed %} - {% for key, value in config.scheduler.update_subscribed.items %} - {{ value }} - {% endfor %} + {% if update_subscribed %} + {{ update_subscribed.crontab.minute }} {{ update_subscribed.crontab.hour }} {{ update_subscribed.crontab.day_of_week }} + {% else %} False {% endif %}

    -

    Become a sponsor and join members.tubearchivist.com to get access to real time notifications for new videos uploaded by your favorite channels.

    Periodically rescan your subscriptions:

    + {% for error in scheduler_form.update_subscribed.errors %} +

    {{ error }}

    + {% endfor %} {{ scheduler_form.update_subscribed }}
    -
    -

    Send notification on task completed:

    - {% if config.scheduler.update_subscribed_notify %} -

    stored notification links

    -
    -

    {{ config.scheduler.update_subscribed_notify|linebreaks }}

    -
    - {% else %} -

    Current notification urls: {{ config.scheduler.update_subscribed_notify }}

    - {% endif %} - {{ scheduler_form.update_subscribed_notify }} -

    Start Download

    Current Download schedule: - {% if config.scheduler.download_pending %} - {% for key, value in config.scheduler.download_pending.items %} - {{ value }} - {% endfor %} + {% if download_pending %} + {{ download_pending.crontab.minute }} {{ download_pending.crontab.hour }} {{ download_pending.crontab.day_of_week }} + {% else %} False {% endif %}

    Automatic video download schedule:

    + {% for error in scheduler_form.download_pending.errors %} +

    {{ error }}

    + {% endfor %} {{ scheduler_form.download_pending }}
    -
    -

    Send notification on task completed:

    - {% if config.scheduler.download_pending_notify %} -

    stored notification links

    -
    -

    {{ config.scheduler.download_pending_notify|linebreaks }}

    -
    - {% else %} -

    Current notification urls: {{ config.scheduler.download_pending_notify }}

    - {% endif %} - {{ scheduler_form.download_pending_notify }} -

    Refresh Metadata

    Current Metadata refresh schedule: - {% if config.scheduler.check_reindex %} - {% for key, value in config.scheduler.check_reindex.items %} - {{ value }} - {% endfor %} + {% if check_reindex %} + {{ check_reindex.crontab.minute }} {{ check_reindex.crontab.hour }} {{ check_reindex.crontab.day_of_week }} + {% else %} False {% endif %} @@ -94,36 +71,29 @@ {{ scheduler_form.check_reindex }}

    -

    Current refresh for metadata older than x days: {{ config.scheduler.check_reindex_days }}

    +

    Current refresh for metadata older than x days: {{ check_reindex.task_config.days }}

    Refresh older than x days, recommended 90:

    + {% for error in scheduler_form.check_reindex.errors %} +

    {{ error }}

    + {% endfor %} {{ scheduler_form.check_reindex_days }}
    -
    -

    Send notification on task completed:

    - {% if config.scheduler.check_reindex_notify %} -

    stored notification links

    -
    -

    {{ config.scheduler.check_reindex_notify|linebreaks }}

    -
    - {% else %} -

    Current notification urls: {{ config.scheduler.check_reindex_notify }}

    - {% endif %} - {{ scheduler_form.check_reindex_notify }} -

    Thumbnail Check

    Current thumbnail check schedule: - {% if config.scheduler.thumbnail_check %} - {% for key, value in config.scheduler.thumbnail_check.items %} - {{ value }} - {% endfor %} + {% if thumbnail_check %} + {{ thumbnail_check.crontab.minute }} {{ thumbnail_check.crontab.hour }} {{ thumbnail_check.crontab.day_of_week }} + {% else %} False {% endif %}

    Periodically check and cleanup thumbnails:

    + {% for error in scheduler_form.thumbnail_check.errors %} +

    {{ error }}

    + {% endfor %} {{ scheduler_form.thumbnail_check }}
    @@ -132,23 +102,51 @@

    Zip file backups are very slow for large archives and consistency is not guaranteed, use snapshots instead. Make sure no other tasks are running when creating a Zip file backup.

    Current index backup schedule: - {% if config.scheduler.run_backup %} - {% for key, value in config.scheduler.run_backup.items %} - {{ value }} - {% endfor %} + {% if run_backup %} + {{ run_backup.crontab.minute }} {{ run_backup.crontab.hour }} {{ run_backup.crontab.day_of_week }} + {% else %} False {% endif %}

    Automatically backup metadata to a zip file:

    + {% for error in scheduler_form.run_backup.errors %} +

    {{ error }}

    + {% endfor %} {{ scheduler_form.run_backup }}
    -

    Current backup files to keep: {{ config.scheduler.run_backup_rotate }}

    +

    Current backup files to keep: {{ run_backup.task_config.rotate }}

    Max auto backups to keep:

    {{ scheduler_form.run_backup_rotate }}
    +
    +

    Add Notification URL

    +
    + {% if notifications %} +

    stored notification links

    +
    + {% for task, notifications in notifications.items %} +

    {{ notifications.title }}

    + {% for url in notifications.urls %} +

    + + {{ url }} +

    + {% endfor %} + {% endfor %} +
    + {% else %} +

    No notifications stored

    + {% endif %} +
    +
    +

    Send notification on completed tasks with the help of the Apprise library.

    + {{ notification_form.task }} + {{ notification_form.notification_url }} +
    +
    {% endblock settings_content %} diff --git a/tubearchivist/home/views.py b/tubearchivist/home/views.py index 69c16adb..e8903d75 100644 --- a/tubearchivist/home/views.py +++ b/tubearchivist/home/views.py @@ -19,6 +19,7 @@ from django.http import Http404 from django.shortcuts import redirect, render from django.utils.decorators import method_decorator from django.views import View +from home.models import CustomPeriodicTask from home.src.download.queue import PendingInteract from home.src.download.yt_dlp_base import CookieHandler from home.src.es.backup import ElasticBackup @@ -31,18 +32,23 @@ from home.src.frontend.forms import ( CreatePlaylistForm, CustomAuthForm, MultiSearchForm, - SchedulerSettingsForm, SubscribeToChannelForm, SubscribeToPlaylistForm, UserSettingsForm, ) +from home.src.frontend.forms_schedule import ( + NotificationSettingsForm, + SchedulerSettingsForm, +) from home.src.index.channel import channel_overwrites from home.src.index.generic import Pagination from home.src.index.playlist import YoutubePlaylist from home.src.index.reindex import ReindexProgress from home.src.index.video_constants import VideoTypeEnum -from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder +from home.src.ta.config import AppConfig, ReleaseVersion +from home.src.ta.config_schedule import ScheduleBuilder from home.src.ta.helper import check_stylesheet, time_parser +from home.src.ta.notify import Notifications, get_all_notifications from home.src.ta.settings import EnvironmentSettings from home.src.ta.ta_redis import RedisArchivist from home.src.ta.users import UserConfig @@ -1107,29 +1113,65 @@ class SettingsSchedulingView(MinView): def get(self, request): """read and display current settings""" - context = self.get_min_context(request) - context.update( - { - "title": "Scheduling Settings", - "config": AppConfig().config, - "scheduler_form": SchedulerSettingsForm(), - } - ) + context = self.get_context(request, SchedulerSettingsForm()) return render(request, "home/settings_scheduling.html", context) def post(self, request): """handle form post to update settings""" scheduler_form = SchedulerSettingsForm(request.POST) + notification_form = NotificationSettingsForm(request.POST) + + if notification_form.is_valid(): + notification_form_post = notification_form.cleaned_data + print(notification_form_post) + if any(notification_form_post.values()): + task_name = notification_form_post.get("task") + url = notification_form_post.get("notification_url") + Notifications(task_name).add_url(url) + if scheduler_form.is_valid(): scheduler_form_post = scheduler_form.cleaned_data if any(scheduler_form_post.values()): print(scheduler_form_post) ScheduleBuilder().update_schedule_conf(scheduler_form_post) + else: + self.fail_message() + context = self.get_context(request, scheduler_form) + return render(request, "home/settings_scheduling.html", context) sleep(1) return redirect("settings_scheduling", permanent=True) + def get_context(self, request, scheduler_form): + """get context""" + context = self.get_min_context(request) + all_tasks = CustomPeriodicTask.objects.all() + context.update( + { + "title": "Scheduling Settings", + "scheduler_form": scheduler_form, + "notification_form": NotificationSettingsForm(), + "notifications": get_all_notifications(), + } + ) + for task in all_tasks: + context.update({task.name: task}) + + return context + + @staticmethod + def fail_message(): + """send failure message""" + mess_dict = { + "group": "setting:schedule", + "level": "error", + "title": "Scheduler update failed.", + "messages": ["Invalid schedule input"], + "id": "0000", + } + RedisArchivist().set_message("message:setting", mess_dict, expire=True) + @method_decorator(user_passes_test(check_admin), name="dispatch") class SettingsActionsView(MinView): diff --git a/tubearchivist/requirements.txt b/tubearchivist/requirements.txt index 23d9b466..fdb20a30 100644 --- a/tubearchivist/requirements.txt +++ b/tubearchivist/requirements.txt @@ -1,13 +1,14 @@ -apprise==1.7.5 -celery==5.3.6 +apprise==1.7.6 +celery==5.4.0 Django==5.0.4 django-auth-ldap==4.8.0 +django-celery-beat==2.6.0 django-cors-headers==4.3.1 djangorestframework==3.15.1 Pillow==10.3.0 -redis==5.0.0 +redis==5.0.3 requests==2.31.0 ryd-client==0.0.6 -uWSGI==2.0.24 +uWSGI==2.0.25.1 whitenoise==6.6.0 yt-dlp @ git+https://github.com/bbilly1/yt-dlp@4935eec0b4f4dffbd86d998a2d3a706875e9d761 diff --git a/tubearchivist/static/progress.js b/tubearchivist/static/progress.js index 3127214e..25de8c76 100644 --- a/tubearchivist/static/progress.js +++ b/tubearchivist/static/progress.js @@ -12,7 +12,7 @@ checkMessages(); // start to look for messages function checkMessages() { let notifications = document.getElementById('notifications'); - if (notifications && notifications.childNodes.length === 0 ) { + if (notifications && notifications.childNodes.length === 0) { let dataOrigin = notifications.getAttribute('data'); getMessages(dataOrigin); } @@ -55,7 +55,7 @@ function buildMessage(responseData, dataOrigin) { } clearNotifications(responseData); if (currentNotifications > 0 && messages.length === 0) { - location.reload(); + location.replace(location.href); } return messages; } @@ -79,7 +79,7 @@ function updateMessageBox(messageData) { children[1].innerHTML = messageData.messages.join('
    '); if ( !messageBox.querySelector('#stop-icon') && - messageData['api-stop'] && + messageData['api_stop'] && messageData.command !== 'STOP' ) { children[2].appendChild(buildStopIcon(messageData.id)); diff --git a/tubearchivist/static/script.js b/tubearchivist/static/script.js index 8bf726c2..31751158 100644 --- a/tubearchivist/static/script.js +++ b/tubearchivist/static/script.js @@ -203,18 +203,28 @@ function showAddToPlaylistMenu(input1) { dataId = input1.getAttribute('data-id'); buttonId = input1.getAttribute('id'); playlists = getCustomPlaylists(); - + //hide the invoking button - input1.style.visibility = "hidden"; - + input1.style.visibility = 'hidden'; + //show the form - form_code = '

    Add video to...

    '; - - for(let i = 0; i < playlists.length; i++) { - let obj = playlists[i]; - form_code += '

    '+obj.playlist_name+'

    '; + form_code = + '

    Add video to...

    '; + + for (let i = 0; i < playlists.length; i++) { + let obj = playlists[i]; + form_code += + '

    ' + + obj.playlist_name + + '

    '; } - + form_code += '

    Create playlist

    '; input1.parentNode.parentNode.innerHTML += form_code; } @@ -222,18 +232,17 @@ function showAddToPlaylistMenu(input1) { //handles user action of adding a video to a custom playlist function addToCustomPlaylist(input, video_id, playlist_id) { let apiEndpoint = '/api/playlist/' + playlist_id + '/'; - let data = { "action": "create", "video_id": video_id }; + let data = { action: 'create', video_id: video_id }; apiRequest(apiEndpoint, 'POST', data); - + //mark the item added in the ui - input.firstChild.src='/static/img/icon-seen.svg'; + input.firstChild.src = '/static/img/icon-seen.svg'; } function removeDotMenu(input1, button_id) { - //show the menu button - document.getElementById(button_id).style.visibility = "visible"; - + document.getElementById(button_id).style.visibility = 'visible'; + //remove the form input1.parentNode.remove(); } @@ -243,20 +252,67 @@ function showCustomPlaylistMenu(input1, playlist_id, current_page, last_page) { let dataId, form_code, buttonId; dataId = input1.getAttribute('data-id'); buttonId = input1.getAttribute('id'); - + //hide the invoking button - input1.style.visibility = "hidden"; - + input1.style.visibility = 'hidden'; + //show the form - form_code = '

    Move Video

    '; - - form_code += ''; - form_code += ''; - form_code += ''; - form_code += ''; - form_code += ''; - - + form_code = + '

    Move Video

    '; + + form_code += + ''; + form_code += + ''; + form_code += + ''; + form_code += + ''; + form_code += + ''; + form_code += '
    '; input1.parentNode.parentNode.innerHTML += form_code; } @@ -266,65 +322,46 @@ function moveCustomPlaylistVideo(input1, playlist_id, current_page, last_page) { let dataId, dataContext; dataId = input1.getAttribute('data-id'); dataContext = input1.getAttribute('data-context'); - + let apiEndpoint = '/api/playlist/' + playlist_id + '/'; - let data = { "action": dataContext, "video_id": dataId }; + let data = { action: dataContext, video_id: dataId }; apiRequest(apiEndpoint, 'POST', data); - + let itemDom = input1.parentElement.parentElement.parentElement; let listDom = itemDom.parentElement; - - if (dataContext === "up") - { - let sibling = itemDom.previousElementSibling; - if (sibling !== null) - { - sibling.before(itemDom); - } - else if (current_page > 1) - { - itemDom.remove(); - } - } - else if (dataContext === "down") - { - let sibling = itemDom.nextElementSibling; - if (sibling !== null) - { - sibling.after(itemDom); - } - else if (current_page !== last_page) - { - itemDom.remove(); - } - } - else if (dataContext === "top") - { - let sibling = listDom.firstElementChild; - if (sibling !== null) - { - sibling.before(itemDom); - } - if (current_page > 1) - { - itemDom.remove(); - } - } - else if (dataContext === "bottom") - { - let sibling = listDom.lastElementChild; - if (sibling !== null) - { - sibling.after(itemDom); - } - if (current_page !== last_page) - { - itemDom.remove(); - } - } - else if (dataContext === "remove") - { - itemDom.remove(); + + if (dataContext === 'up') { + let sibling = itemDom.previousElementSibling; + if (sibling !== null) { + sibling.before(itemDom); + } else if (current_page > 1) { + itemDom.remove(); + } + } else if (dataContext === 'down') { + let sibling = itemDom.nextElementSibling; + if (sibling !== null) { + sibling.after(itemDom); + } else if (current_page !== last_page) { + itemDom.remove(); + } + } else if (dataContext === 'top') { + let sibling = listDom.firstElementChild; + if (sibling !== null) { + sibling.before(itemDom); + } + if (current_page > 1) { + itemDom.remove(); + } + } else if (dataContext === 'bottom') { + let sibling = listDom.lastElementChild; + if (sibling !== null) { + sibling.after(itemDom); + } + if (current_page !== last_page) { + itemDom.remove(); + } + } else if (dataContext === 'remove') { + itemDom.remove(); } } @@ -494,6 +531,28 @@ function createSnapshot() { document.getElementById('createButton').replaceWith(message); } +function deleteNotificationUrl(button) { + console.log('delete notification url'); + let apiEndpoint = '/api/schedule/notification/'; + let data = { + task_name: button.dataset.task, + url: button.dataset.url, + }; + apiRequest(apiEndpoint, 'DELETE', data); + button.parentElement.remove(); +} + +function deleteSchedule(button) { + console.log('delete schedule'); + let apiEndpoint = '/api/schedule/'; + let data = { task_name: button.dataset.schedule }; + apiRequest(apiEndpoint, 'DELETE', data); + let message = document.createElement('span'); + message.innerText = 'False'; + message.classList.add('settings-current'); + button.parentElement.replaceWith(message); +} + // delete from file system function deleteConfirm() { let to_show = document.getElementById('delete-button'); @@ -1522,7 +1581,6 @@ document.addEventListener('readystatechange', textExpandButtonVisibilityUpdate); window.addEventListener('resize', textExpandButtonVisibilityUpdate); function showForm(id) { - let id2 = id === undefined ? 'hidden-form' : id; let formElement = document.getElementById(id2); let displayStyle = formElement.style.display;