Feature beat model (#713)

* add django-celery-beat

* implement schedule migration

* fix version_check migration

* remove old schedule init

* better schedule migration

* fix task_config migration

* show task config on settings page

* fix notify url builder

* refactor celery initiation

* fix get task

* fix scheduler mig

* fix linter

* better task_config store on periodic task

* save new schedules

* fix task_config extraction from custom model

* implement auto schedule

* implement schedule delete

* refactor notifications to ES config storage

* downgrade redis

* better notification migration to ES

* add notification url handling

* fix worker start

* fix docs spelling

* don't resend form data on notification refresh

* fix type hints

* move TASK_CONFIG to separate module

* fix partial task config imports

* fix yt_obs typing

* delete schedule

* remove outdated instructions

* create initial schedules

* fix reindex days config key

* fix doc string

* unregister BeatModels
This commit is contained in:
Simon 2024-05-09 20:22:36 +07:00 committed by GitHub
parent 011073617d
commit 9366b8eab9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1051 additions and 520 deletions

View File

@ -17,7 +17,7 @@ python manage.py ta_startup
# start all tasks
nginx &
celery -A home.tasks worker --loglevel=INFO --max-tasks-per-child 10 &
celery -A home.celery worker --loglevel=INFO --max-tasks-per-child 10 &
celery -A home beat --loglevel=INFO \
-s "${BEAT_SCHEDULE_PATH:-${cachedir}/celerybeat-schedule}" &
--scheduler django_celery_beat.schedulers:DatabaseScheduler &
uwsgi --ini uwsgi.ini

View File

@ -121,6 +121,16 @@ urlpatterns = [
views.TaskIDView.as_view(),
name="api-task-id",
),
path(
"schedule/",
views.ScheduleView.as_view(),
name="api-schedule",
),
path(
"schedule/notification/",
views.ScheduleNotification.as_view(),
name="api-schedule-notification",
),
path(
"config/user/",
views.UserConfigView.as_view(),

View File

@ -10,6 +10,7 @@ from api.src.aggs import (
WatchProgress,
)
from api.src.search_processor import SearchProcess
from home.models import CustomPeriodicTask
from home.src.download.queue import PendingInteract
from home.src.download.subscriptions import (
ChannelSubscription,
@ -27,13 +28,14 @@ from home.src.index.playlist import YoutubePlaylist
from home.src.index.reindex import ReindexProgress
from home.src.index.video import SponsorBlock, YoutubeVideo
from home.src.ta.config import AppConfig, ReleaseVersion
from home.src.ta.notify import Notifications, get_all_notifications
from home.src.ta.settings import EnvironmentSettings
from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.task_config import TASK_CONFIG
from home.src.ta.task_manager import TaskCommand, TaskManager
from home.src.ta.urlparser import Parser
from home.src.ta.users import UserConfig
from home.tasks import (
BaseTask,
check_reindex,
download_pending,
extrac_dl,
@ -911,7 +913,7 @@ class TaskNameListView(ApiBaseView):
def get(self, request, task_name):
"""handle get request"""
# pylint: disable=unused-argument
if task_name not in BaseTask.TASK_CONFIG:
if task_name not in TASK_CONFIG:
message = {"message": "invalid task name"}
return Response(message, status=404)
@ -926,12 +928,12 @@ class TaskNameListView(ApiBaseView):
400 if task can't be started here without argument
"""
# pylint: disable=unused-argument
task_config = BaseTask.TASK_CONFIG.get(task_name)
task_config = TASK_CONFIG.get(task_name)
if not task_config:
message = {"message": "invalid task name"}
return Response(message, status=404)
if not task_config.get("api-start"):
if not task_config.get("api_start"):
message = {"message": "can not start task through this endpoint"}
return Response(message, status=400)
@ -970,16 +972,16 @@ class TaskIDView(ApiBaseView):
message = {"message": "task id not found"}
return Response(message, status=404)
task_conf = BaseTask.TASK_CONFIG.get(task_result.get("name"))
task_conf = TASK_CONFIG.get(task_result.get("name"))
if command == "stop":
if not task_conf.get("api-stop"):
if not task_conf.get("api_stop"):
message = {"message": "task can not be stopped"}
return Response(message, status=400)
message_key = self._build_message_key(task_conf, task_id)
TaskCommand().stop(task_id, message_key)
if command == "kill":
if not task_conf.get("api-stop"):
if not task_conf.get("api_stop"):
message = {"message": "task can not be killed"}
return Response(message, status=400)
@ -992,6 +994,56 @@ class TaskIDView(ApiBaseView):
return f"message:{task_conf.get('group')}:{task_id.split('-')[0]}"
class ScheduleView(ApiBaseView):
"""resolves to /api/schedule/
DEL: delete schedule for task
"""
permission_classes = [AdminOnly]
def delete(self, request):
"""delete schedule by task_name query"""
task_name = request.data.get("task_name")
try:
task = CustomPeriodicTask.objects.get(name=task_name)
except CustomPeriodicTask.DoesNotExist:
message = {"message": "task_name not found"}
return Response(message, status=404)
_ = task.delete()
return Response({"success": True})
class ScheduleNotification(ApiBaseView):
"""resolves to /api/schedule/notification/
GET: get all schedule notifications
DEL: delete notification
"""
def get(self, request):
"""handle get request"""
return Response(get_all_notifications())
def delete(self, request):
"""handle delete"""
task_name = request.data.get("task_name")
url = request.data.get("url")
if not TASK_CONFIG.get(task_name):
message = {"message": "task_name not found"}
return Response(message, status=404)
if url:
response, status_code = Notifications(task_name).remove_url(url)
else:
response, status_code = Notifications(task_name).remove_task()
return Response({"response": response, "status_code": status_code})
class RefreshView(ApiBaseView):
"""resolves to /api/refresh/
GET: get refresh progress

View File

@ -5,16 +5,23 @@ Functionality:
"""
import os
from random import randint
from time import sleep
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django_celery_beat.models import CrontabSchedule
from home.models import CustomPeriodicTask
from home.src.es.connect import ElasticWrap
from home.src.es.index_setup import ElasitIndexWrap
from home.src.es.snapshot import ElasticSnapshot
from home.src.ta.config import AppConfig, ReleaseVersion
from home.src.ta.config_schedule import ScheduleBuilder
from home.src.ta.helper import clear_dl_cache
from home.src.ta.notify import Notifications
from home.src.ta.settings import EnvironmentSettings
from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.task_config import TASK_CONFIG
from home.src.ta.task_manager import TaskManager
from home.src.ta.users import UserConfig
@ -45,7 +52,9 @@ class Command(BaseCommand):
self._mig_index_setup()
self._mig_snapshot_check()
self._mig_move_users_to_es()
self._mig_schedule_store()
self._mig_custom_playlist()
self._create_default_schedules()
def _sync_redis_state(self):
"""make sure redis gets new config.json values"""
@ -245,8 +254,134 @@ class Command(BaseCommand):
)
)
def _mig_schedule_store(self):
"""
update from 0.4.4 to 0.4.5
migrate schedule task store to CustomCronSchedule
"""
self.stdout.write("[MIGRATION] migrate schedule store")
config = AppConfig().config
current_schedules = config.get("scheduler")
if not current_schedules:
self.stdout.write(
self.style.SUCCESS(" no schedules to migrate")
)
return
self._mig_update_subscribed(current_schedules)
self._mig_download_pending(current_schedules)
self._mig_check_reindex(current_schedules)
self._mig_thumbnail_check(current_schedules)
self._mig_run_backup(current_schedules)
self._mig_version_check()
del config["scheduler"]
RedisArchivist().set_message("config", config, save=True)
def _mig_update_subscribed(self, current_schedules):
"""create update_subscribed schedule"""
task_name = "update_subscribed"
update_subscribed_schedule = current_schedules.get(task_name)
if update_subscribed_schedule:
self._create_task(task_name, update_subscribed_schedule)
self._create_notifications(task_name, current_schedules)
def _mig_download_pending(self, current_schedules):
"""create download_pending schedule"""
task_name = "download_pending"
download_pending_schedule = current_schedules.get(task_name)
if download_pending_schedule:
self._create_task(task_name, download_pending_schedule)
self._create_notifications(task_name, current_schedules)
def _mig_check_reindex(self, current_schedules):
"""create check_reindex schedule"""
task_name = "check_reindex"
check_reindex_schedule = current_schedules.get(task_name)
if check_reindex_schedule:
task_config = {}
days = current_schedules.get("check_reindex_days")
if days:
task_config.update({"days": days})
self._create_task(
task_name,
check_reindex_schedule,
task_config=task_config,
)
self._create_notifications(task_name, current_schedules)
def _mig_thumbnail_check(self, current_schedules):
"""create thumbnail_check schedule"""
thumbnail_check_schedule = current_schedules.get("thumbnail_check")
if thumbnail_check_schedule:
self._create_task("thumbnail_check", thumbnail_check_schedule)
def _mig_run_backup(self, current_schedules):
"""create run_backup schedule"""
run_backup_schedule = current_schedules.get("run_backup")
if run_backup_schedule:
task_config = False
rotate = current_schedules.get("run_backup_rotate")
if rotate:
task_config = {"rotate": rotate}
self._create_task(
"run_backup", run_backup_schedule, task_config=task_config
)
def _mig_version_check(self):
"""create version_check schedule"""
version_check_schedule = {
"minute": randint(0, 59),
"hour": randint(0, 23),
"day_of_week": "*",
}
self._create_task("version_check", version_check_schedule)
def _create_task(self, task_name, schedule, task_config=False):
"""create task"""
description = TASK_CONFIG[task_name].get("title")
schedule, _ = CrontabSchedule.objects.get_or_create(**schedule)
schedule.timezone = settings.TIME_ZONE
schedule.save()
task, _ = CustomPeriodicTask.objects.get_or_create(
crontab=schedule,
name=task_name,
description=description,
task=task_name,
)
if task_config:
task.task_config = task_config
task.save()
self.stdout.write(
self.style.SUCCESS(f" ✓ new task created: '{task}'")
)
def _create_notifications(self, task_name, current_schedules):
"""migrate notifications of task"""
notifications = current_schedules.get(f"{task_name}_notify")
if not notifications:
return
urls = [i.strip() for i in notifications.split()]
if not urls:
return
self.stdout.write(
self.style.SUCCESS(f" ✓ migrate notifications: '{urls}'")
)
handler = Notifications(task_name)
for url in urls:
handler.add_url(url)
def _mig_custom_playlist(self):
"""migration for custom playlist"""
"""add playlist_type for migration t0 v0.4.7"""
self.stdout.write("[MIGRATION] custom playlist")
data = {
"query": {
@ -277,3 +412,54 @@ class Command(BaseCommand):
self.stdout.write(response)
sleep(60)
raise CommandError(message)
def _create_default_schedules(self) -> None:
"""
create default schedules for new installations
needs to be called after _mig_schedule_store
"""
self.stdout.write("[7] create initial schedules")
init_has_run = CustomPeriodicTask.objects.filter(
name="version_check"
).exists()
if init_has_run:
self.stdout.write(
self.style.SUCCESS(
" schedule init already done, skipping..."
)
)
return
builder = ScheduleBuilder()
check_reindex = builder.get_set_task(
"check_reindex", schedule=builder.SCHEDULES["check_reindex"]
)
check_reindex.task_config.update({"days": 90})
check_reindex.save()
self.stdout.write(
self.style.SUCCESS(
f" ✓ created new default schedule: {check_reindex}"
)
)
thumbnail_check = builder.get_set_task(
"thumbnail_check", schedule=builder.SCHEDULES["thumbnail_check"]
)
self.stdout.write(
self.style.SUCCESS(
f" ✓ created new default schedule: {thumbnail_check}"
)
)
daily_random = f"{randint(0, 59)} {randint(0, 23)} *"
version_check = builder.get_set_task(
"version_check", schedule=daily_random
)
self.stdout.write(
self.style.SUCCESS(
f" ✓ created new default schedule: {version_check}"
)
)
self.stdout.write(
self.style.SUCCESS(" ✓ all default schedules created")
)

View File

@ -38,6 +38,7 @@ ALLOWED_HOSTS, CSRF_TRUSTED_ORIGINS = ta_host_parser(environ["TA_HOST"])
# Application definition
INSTALLED_APPS = [
"django_celery_beat",
"home.apps.HomeConfig",
"django.contrib.admin",
"django.contrib.auth",

View File

@ -1,5 +1,7 @@
""" handle celery startup """
"""start celery app"""
from .tasks import app as celery_app
from __future__ import absolute_import, unicode_literals
from home.celery import app as celery_app
__all__ = ("celery_app",)

View File

@ -2,6 +2,7 @@
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django_celery_beat import models as BeatModels
from .models import Account
@ -34,3 +35,12 @@ class HomeAdmin(BaseUserAdmin):
admin.site.register(Account, HomeAdmin)
admin.site.unregister(
[
BeatModels.ClockedSchedule,
BeatModels.CrontabSchedule,
BeatModels.IntervalSchedule,
BeatModels.PeriodicTask,
BeatModels.SolarSchedule,
]
)

View File

@ -0,0 +1,24 @@
"""initiate celery"""
import os
from celery import Celery
from home.src.ta.config import AppConfig
from home.src.ta.settings import EnvironmentSettings
CONFIG = AppConfig().config
REDIS_HOST = EnvironmentSettings.REDIS_HOST
REDIS_PORT = EnvironmentSettings.REDIS_PORT
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery(
"tasks",
broker=f"redis://{REDIS_HOST}:{REDIS_PORT}",
backend=f"redis://{REDIS_HOST}:{REDIS_PORT}",
result_extended=True,
)
app.config_from_object(
"django.conf:settings", namespace=EnvironmentSettings.REDIS_NAME_SPACE
)
app.autodiscover_tasks()
app.conf.timezone = EnvironmentSettings.TZ

View File

@ -26,18 +26,5 @@
},
"application": {
"enable_snapshot": true
},
"scheduler": {
"update_subscribed": false,
"update_subscribed_notify": false,
"download_pending": false,
"download_pending_notify": false,
"check_reindex": {"minute": "0", "hour": "12", "day_of_week": "*"},
"check_reindex_notify": false,
"check_reindex_days": 90,
"thumbnail_check": {"minute": "0", "hour": "17", "day_of_week": "*"},
"run_backup": false,
"run_backup_rotate": 5,
"version_check": "rand-d"
}
}

View File

@ -0,0 +1,23 @@
# Generated by Django 4.2.7 on 2023-12-05 13:47
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('django_celery_beat', '0018_improve_crontab_helptext'),
('home', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='CustomPeriodicTask',
fields=[
('periodictask_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='django_celery_beat.periodictask')),
('task_config', models.JSONField(default=dict)),
],
bases=('django_celery_beat.periodictask',),
),
]

View File

@ -6,6 +6,7 @@ from django.contrib.auth.models import (
PermissionsMixin,
)
from django.db import models
from django_celery_beat.models import PeriodicTask
class AccountManager(BaseUserManager):
@ -52,3 +53,9 @@ class Account(AbstractBaseUser, PermissionsMixin):
USERNAME_FIELD = "name"
REQUIRED_FIELDS = ["password"]
class CustomPeriodicTask(PeriodicTask):
"""add custom metadata to to task"""
task_config = models.JSONField(default=dict)

View File

@ -10,6 +10,7 @@ import os
import zipfile
from datetime import datetime
from home.models import CustomPeriodicTask
from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.ta.config import AppConfig
from home.src.ta.helper import get_mapping, ignore_filelist
@ -197,7 +198,12 @@ class ElasticBackup:
def rotate_backup(self):
"""delete old backups if needed"""
rotate = self.config["scheduler"]["run_backup_rotate"]
try:
task = CustomPeriodicTask.objects.get(name="run_backup")
except CustomPeriodicTask.DoesNotExist:
return
rotate = task.task_config.get("rotate")
if not rotate:
return

View File

@ -159,50 +159,6 @@ class ApplicationSettingsForm(forms.Form):
)
class SchedulerSettingsForm(forms.Form):
"""handle scheduler settings"""
HELP_TEXT = "Add Apprise notification URLs, one per line"
update_subscribed = forms.CharField(required=False)
update_subscribed_notify = forms.CharField(
label=False,
widget=forms.Textarea(
attrs={
"rows": 2,
"placeholder": HELP_TEXT,
}
),
required=False,
)
download_pending = forms.CharField(required=False)
download_pending_notify = forms.CharField(
label=False,
widget=forms.Textarea(
attrs={
"rows": 2,
"placeholder": HELP_TEXT,
}
),
required=False,
)
check_reindex = forms.CharField(required=False)
check_reindex_notify = forms.CharField(
label=False,
widget=forms.Textarea(
attrs={
"rows": 2,
"placeholder": HELP_TEXT,
}
),
required=False,
)
check_reindex_days = forms.IntegerField(required=False)
thumbnail_check = forms.CharField(required=False)
run_backup = forms.CharField(required=False)
run_backup_rotate = forms.IntegerField(required=False)
class MultiSearchForm(forms.Form):
"""multi search form for /search/"""

View File

@ -0,0 +1,95 @@
"""
Functionality:
- handle schedule forms
- implement form validation
"""
from celery.schedules import crontab
from django import forms
class CrontabValidator:
"""validate crontab"""
@staticmethod
def validate_fields(cron_fields):
"""expect 3 cron fields"""
if not len(cron_fields) == 3:
raise forms.ValidationError("expected three cron schedule fields")
@staticmethod
def validate_minute(minute_field):
"""expect minute int"""
try:
minute_value = int(minute_field)
if not 0 <= minute_value <= 59:
raise forms.ValidationError(
"Invalid value for minutes. Must be between 0 and 59."
)
except ValueError as err:
raise forms.ValidationError(
"Invalid value for minutes. Must be an integer."
) from err
@staticmethod
def validate_cron_tab(minute, hour, day_of_week):
"""check if crontab can be created"""
try:
crontab(minute=minute, hour=hour, day_of_week=day_of_week)
except ValueError as err:
raise forms.ValidationError(f"invalid crontab: {err}") from err
def validate(self, cron_expression):
"""create crontab schedule"""
if cron_expression == "auto":
return
cron_fields = cron_expression.split()
self.validate_fields(cron_fields)
minute, hour, day_of_week = cron_fields
self.validate_minute(minute)
self.validate_cron_tab(minute, hour, day_of_week)
def validate_cron(cron_expression):
"""callable for field"""
CrontabValidator().validate(cron_expression)
class SchedulerSettingsForm(forms.Form):
"""handle scheduler settings"""
update_subscribed = forms.CharField(
required=False, validators=[validate_cron]
)
download_pending = forms.CharField(
required=False, validators=[validate_cron]
)
check_reindex = forms.CharField(required=False, validators=[validate_cron])
check_reindex_days = forms.IntegerField(required=False)
thumbnail_check = forms.CharField(
required=False, validators=[validate_cron]
)
run_backup = forms.CharField(required=False, validators=[validate_cron])
run_backup_rotate = forms.IntegerField(required=False)
class NotificationSettingsForm(forms.Form):
"""add notification URL"""
TASK_CHOICES = [
("", "-- select task --"),
("update_subscribed", "Rescan your Subscriptions"),
("download_pending", "Downloading"),
("check_reindex", "Reindex Documents"),
]
PLACEHOLDER = "Apprise notification URL"
task = forms.ChoiceField(
widget=forms.Select, choices=TASK_CHOICES, required=False
)
notification_url = forms.CharField(
required=False,
widget=forms.TextInput(attrs={"placeholder": PLACEHOLDER}),
)

View File

@ -17,7 +17,7 @@ class YouTubeItem:
es_path = False
index_name = ""
yt_base = ""
yt_obs = {
yt_obs: dict[str, bool | str] = {
"skip_download": True,
"noplaylist": True,
}

View File

@ -9,6 +9,7 @@ import os
from datetime import datetime
from time import sleep
from home.models import CustomPeriodicTask
from home.src.download.queue import PendingList
from home.src.download.subscriptions import ChannelSubscription
from home.src.download.thumbnails import ThumbManager
@ -67,9 +68,22 @@ class ReindexBase:
class ReindexPopulate(ReindexBase):
"""add outdated and recent documents to reindex queue"""
INTERVAL_DEFAIULT = 90
def __init__(self):
super().__init__()
self.interval = self.config["scheduler"]["check_reindex_days"]
self.interval = self.INTERVAL_DEFAIULT
def get_interval(self):
"""get reindex days interval from task"""
try:
task = CustomPeriodicTask.objects.get(name="check_reindex")
except CustomPeriodicTask.DoesNotExist:
return
task_config = task.task_config
if task_config.get("days"):
self.interval = task_config.get("days")
def add_recent(self):
"""add recent videos to refresh"""

View File

@ -5,12 +5,10 @@ Functionality:
"""
import json
import re
from random import randint
from time import sleep
import requests
from celery.schedules import crontab
from django.conf import settings
from home.src.ta.ta_redis import RedisArchivist
@ -74,15 +72,6 @@ class AppConfig:
RedisArchivist().set_message("config", self.config, save=True)
return updated
@staticmethod
def _build_rand_daily():
"""build random daily schedule per installation"""
return {
"minute": randint(0, 59),
"hour": randint(0, 23),
"day_of_week": "*",
}
def load_new_defaults(self):
"""check config.json for missing defaults"""
default_config = self.get_config_file()
@ -91,7 +80,6 @@ class AppConfig:
# check for customizations
if not redis_config:
config = self.get_config()
config["scheduler"]["version_check"] = self._build_rand_daily()
RedisArchivist().set_message("config", config)
return False
@ -106,13 +94,7 @@ class AppConfig:
# missing nested values
for sub_key, sub_value in value.items():
if (
sub_key not in redis_config[key].keys()
or sub_value == "rand-d"
):
if sub_value == "rand-d":
sub_value = self._build_rand_daily()
if sub_key not in redis_config[key].keys():
redis_config[key].update({sub_key: sub_value})
needs_update = True
@ -122,147 +104,6 @@ class AppConfig:
return needs_update
class ScheduleBuilder:
"""build schedule dicts for beat"""
SCHEDULES = {
"update_subscribed": "0 8 *",
"download_pending": "0 16 *",
"check_reindex": "0 12 *",
"thumbnail_check": "0 17 *",
"run_backup": "0 18 0",
"version_check": "0 11 *",
}
CONFIG = ["check_reindex_days", "run_backup_rotate"]
NOTIFY = [
"update_subscribed_notify",
"download_pending_notify",
"check_reindex_notify",
]
MSG = "message:setting"
def __init__(self):
self.config = AppConfig().config
def update_schedule_conf(self, form_post):
"""process form post"""
print("processing form, restart container for changes to take effect")
redis_config = self.config
for key, value in form_post.items():
if key in self.SCHEDULES and value:
try:
to_write = self.value_builder(key, value)
except ValueError:
print(f"failed: {key} {value}")
mess_dict = {
"group": "setting:schedule",
"level": "error",
"title": "Scheduler update failed.",
"messages": ["Invalid schedule input"],
"id": "0000",
}
RedisArchivist().set_message(
self.MSG, mess_dict, expire=True
)
return
redis_config["scheduler"][key] = to_write
if key in self.CONFIG and value:
redis_config["scheduler"][key] = int(value)
if key in self.NOTIFY and value:
if value == "0":
to_write = False
else:
to_write = value
redis_config["scheduler"][key] = to_write
RedisArchivist().set_message("config", redis_config, save=True)
mess_dict = {
"group": "setting:schedule",
"level": "info",
"title": "Scheduler changed.",
"messages": ["Restart container for changes to take effect"],
"id": "0000",
}
RedisArchivist().set_message(self.MSG, mess_dict, expire=True)
def value_builder(self, key, value):
"""validate single cron form entry and return cron dict"""
print(f"change schedule for {key} to {value}")
if value == "0":
# deactivate this schedule
return False
if re.search(r"[\d]{1,2}\/[\d]{1,2}", value):
# number/number cron format will fail in celery
print("number/number schedule formatting not supported")
raise ValueError
keys = ["minute", "hour", "day_of_week"]
if value == "auto":
# set to sensible default
values = self.SCHEDULES[key].split()
else:
values = value.split()
if len(keys) != len(values):
print(f"failed to parse {value} for {key}")
raise ValueError("invalid input")
to_write = dict(zip(keys, values))
self._validate_cron(to_write)
return to_write
@staticmethod
def _validate_cron(to_write):
"""validate all fields, raise value error for impossible schedule"""
all_hours = list(re.split(r"\D+", to_write["hour"]))
for hour in all_hours:
if hour.isdigit() and int(hour) > 23:
print("hour can not be greater than 23")
raise ValueError("invalid input")
all_days = list(re.split(r"\D+", to_write["day_of_week"]))
for day in all_days:
if day.isdigit() and int(day) > 6:
print("day can not be greater than 6")
raise ValueError("invalid input")
if not to_write["minute"].isdigit():
print("too frequent: only number in minutes are supported")
raise ValueError("invalid input")
if int(to_write["minute"]) > 59:
print("minutes can not be greater than 59")
raise ValueError("invalid input")
def build_schedule(self):
"""build schedule dict as expected by app.conf.beat_schedule"""
AppConfig().load_new_defaults()
self.config = AppConfig().config
schedule_dict = {}
for schedule_item in self.SCHEDULES:
item_conf = self.config["scheduler"][schedule_item]
if not item_conf:
continue
schedule_dict.update(
{
f"schedule_{schedule_item}": {
"task": schedule_item,
"schedule": crontab(
minute=item_conf["minute"],
hour=item_conf["hour"],
day_of_week=item_conf["day_of_week"],
),
}
}
)
return schedule_dict
class ReleaseVersion:
"""compare local version with remote version"""

View File

@ -0,0 +1,89 @@
"""
Functionality:
- Handle scheduler config update
"""
from django_celery_beat.models import CrontabSchedule
from home.models import CustomPeriodicTask
from home.src.ta.config import AppConfig
from home.src.ta.settings import EnvironmentSettings
from home.src.ta.task_config import TASK_CONFIG
class ScheduleBuilder:
"""build schedule dicts for beat"""
SCHEDULES = {
"update_subscribed": "0 8 *",
"download_pending": "0 16 *",
"check_reindex": "0 12 *",
"thumbnail_check": "0 17 *",
"run_backup": "0 18 0",
"version_check": "0 11 *",
}
CONFIG = {
"check_reindex_days": "check_reindex",
"run_backup_rotate": "run_backup",
"update_subscribed_notify": "update_subscribed",
"download_pending_notify": "download_pending",
"check_reindex_notify": "check_reindex",
}
MSG = "message:setting"
def __init__(self):
self.config = AppConfig().config
def update_schedule_conf(self, form_post):
"""process form post, schedules need to be validated before"""
for key, value in form_post.items():
if not value:
continue
if key in self.SCHEDULES:
if value == "auto":
value = self.SCHEDULES.get(key)
_ = self.get_set_task(key, value)
continue
if key in self.CONFIG:
self.set_config(key, value)
def get_set_task(self, task_name, schedule=False):
"""get task"""
try:
task = CustomPeriodicTask.objects.get(name=task_name)
except CustomPeriodicTask.DoesNotExist:
description = TASK_CONFIG[task_name].get("title")
task = CustomPeriodicTask(
name=task_name,
task=task_name,
description=description,
)
if schedule:
task_crontab = self.get_set_cron_tab(schedule)
task.crontab = task_crontab
task.save()
return task
@staticmethod
def get_set_cron_tab(schedule):
"""needs to be validated before"""
kwargs = dict(zip(["minute", "hour", "day_of_week"], schedule.split()))
kwargs.update({"timezone": EnvironmentSettings.TZ})
crontab, _ = CrontabSchedule.objects.get_or_create(**kwargs)
return crontab
def set_config(self, key, value):
"""set task_config"""
task_name = self.CONFIG.get(key)
if not task_name:
raise ValueError("invalid config key")
task = CustomPeriodicTask.objects.get(name=task_name)
config_key = key.split(f"{task_name}_")[-1]
task.task_config.update({config_key: value})
task.save()

View File

@ -1,55 +1,141 @@
"""send notifications using apprise"""
import apprise
from home.src.ta.config import AppConfig
from home.src.es.connect import ElasticWrap
from home.src.ta.task_config import TASK_CONFIG
from home.src.ta.task_manager import TaskManager
class Notifications:
"""notification handler"""
"""store notifications in ES"""
def __init__(self, name: str, task_id: str, task_title: str):
self.name: str = name
self.task_id: str = task_id
self.task_title: str = task_title
GET_PATH = "ta_config/_doc/notify"
UPDATE_PATH = "ta_config/_update/notify/"
def send(self) -> None:
def __init__(self, task_name: str):
self.task_name = task_name
def send(self, task_id: str, task_title: str) -> None:
"""send notifications"""
apobj = apprise.Apprise()
hooks: str | None = self.get_url()
if not hooks:
urls: list[str] = self.get_urls()
if not urls:
return
hook_list: list[str] = self.parse_hooks(hooks=hooks)
title, body = self.build_message()
title, body = self._build_message(task_id, task_title)
if not body:
return
for hook in hook_list:
apobj.add(hook)
for url in urls:
apobj.add(url)
apobj.notify(body=body, title=title)
def get_url(self) -> str | None:
"""get apprise urls for task"""
config = AppConfig().config
hooks: str = config["scheduler"].get(f"{self.name}_notify")
return hooks
def parse_hooks(self, hooks: str) -> list[str]:
"""create list of hooks"""
hook_list: list[str] = [i.strip() for i in hooks.split()]
return hook_list
def build_message(self) -> tuple[str, str | None]:
def _build_message(
self, task_id: str, task_title: str
) -> tuple[str, str | None]:
"""build message to send notification"""
task = TaskManager().get_task(self.task_id)
task = TaskManager().get_task(task_id)
status = task.get("status")
title: str = f"[TA] {self.task_title} process ended with {status}"
title: str = f"[TA] {task_title} process ended with {status}"
body: str | None = task.get("result")
return title, body
def get_urls(self) -> list[str]:
"""get stored urls for task"""
response, code = ElasticWrap(self.GET_PATH).get(print_error=False)
if not code == 200:
return []
urls = response["_source"].get(self.task_name, [])
return urls
def add_url(self, url: str) -> None:
"""add url to task notification"""
source = (
"if (!ctx._source.containsKey(params.task_name)) "
+ "{ctx._source[params.task_name] = [params.url]} "
+ "else if (!ctx._source[params.task_name].contains(params.url)) "
+ "{ctx._source[params.task_name].add(params.url)} "
+ "else {ctx.op = 'none'}"
)
data = {
"script": {
"source": source,
"lang": "painless",
"params": {"url": url, "task_name": self.task_name},
},
"upsert": {self.task_name: [url]},
}
_, _ = ElasticWrap(self.UPDATE_PATH).post(data)
def remove_url(self, url: str) -> tuple[dict, int]:
"""remove url from task"""
source = (
"if (ctx._source.containsKey(params.task_name) "
+ "&& ctx._source[params.task_name].contains(params.url)) "
+ "{ctx._source[params.task_name]."
+ "remove(ctx._source[params.task_name].indexOf(params.url))}"
)
data = {
"script": {
"source": source,
"lang": "painless",
"params": {"url": url, "task_name": self.task_name},
}
}
response, status_code = ElasticWrap(self.UPDATE_PATH).post(data)
if not self.get_urls():
_, _ = self.remove_task()
return response, status_code
def remove_task(self) -> tuple[dict, int]:
"""remove all notifications from task"""
source = (
"if (ctx._source.containsKey(params.task_name)) "
+ "{ctx._source.remove(params.task_name)}"
)
data = {
"script": {
"source": source,
"lang": "painless",
"params": {"task_name": self.task_name},
}
}
response, status_code = ElasticWrap(self.UPDATE_PATH).post(data)
return response, status_code
def get_all_notifications() -> dict[str, list[str]]:
"""get all notifications stored"""
path = "ta_config/_doc/notify"
response, status_code = ElasticWrap(path).get(print_error=False)
if not status_code == 200:
return {}
notifications: dict = {}
source = response.get("_source")
if not source:
return notifications
for task_id, urls in source.items():
notifications.update(
{
task_id: {
"urls": urls,
"title": TASK_CONFIG[task_id]["title"],
}
}
)
return notifications

View File

@ -0,0 +1,125 @@
"""
Functionality:
- Static Task config values
- Type definitions
- separate to avoid circular imports
"""
from typing import TypedDict
class TaskItemConfig(TypedDict):
"""describes a task item config"""
title: str
group: str
api_start: bool
api_stop: bool
UPDATE_SUBSCRIBED: TaskItemConfig = {
"title": "Rescan your Subscriptions",
"group": "download:scan",
"api_start": True,
"api_stop": True,
}
DOWNLOAD_PENDING: TaskItemConfig = {
"title": "Downloading",
"group": "download:run",
"api_start": True,
"api_stop": True,
}
EXTRACT_DOWNLOAD: TaskItemConfig = {
"title": "Add to download queue",
"group": "download:add",
"api_start": False,
"api_stop": True,
}
CHECK_REINDEX: TaskItemConfig = {
"title": "Reindex Documents",
"group": "reindex:run",
"api_start": False,
"api_stop": False,
}
MANUAL_IMPORT: TaskItemConfig = {
"title": "Manual video import",
"group": "setting:import",
"api_start": True,
"api_stop": False,
}
RUN_BACKUP: TaskItemConfig = {
"title": "Index Backup",
"group": "setting:backup",
"api_start": True,
"api_stop": False,
}
RESTORE_BACKUP: TaskItemConfig = {
"title": "Restore Backup",
"group": "setting:restore",
"api_start": False,
"api_stop": False,
}
RESCAN_FILESYSTEM: TaskItemConfig = {
"title": "Rescan your Filesystem",
"group": "setting:filesystemscan",
"api_start": True,
"api_stop": False,
}
THUMBNAIL_CHECK: TaskItemConfig = {
"title": "Check your Thumbnails",
"group": "setting:thumbnailcheck",
"api_start": True,
"api_stop": False,
}
RESYNC_THUMBS: TaskItemConfig = {
"title": "Sync Thumbnails to Media Files",
"group": "setting:thumbnailsync",
"api_start": True,
"api_stop": False,
}
INDEX_PLAYLISTS: TaskItemConfig = {
"title": "Index Channel Playlist",
"group": "channel:indexplaylist",
"api_start": False,
"api_stop": False,
}
SUBSCRIBE_TO: TaskItemConfig = {
"title": "Add Subscription",
"group": "subscription:add",
"api_start": False,
"api_stop": False,
}
VERSION_CHECK: TaskItemConfig = {
"title": "Look for new Version",
"group": "",
"api_start": False,
"api_stop": False,
}
TASK_CONFIG: dict[str, TaskItemConfig] = {
"update_subscribed": UPDATE_SUBSCRIBED,
"download_pending": DOWNLOAD_PENDING,
"extract_download": EXTRACT_DOWNLOAD,
"check_reindex": CHECK_REINDEX,
"manual_import": MANUAL_IMPORT,
"run_backup": RUN_BACKUP,
"restore_backup": RESTORE_BACKUP,
"rescan_filesystem": RESCAN_FILESYSTEM,
"thumbnail_check": THUMBNAIL_CHECK,
"resync_thumbs": RESYNC_THUMBS,
"index_playlists": INDEX_PLAYLISTS,
"subscribe_to": SUBSCRIBE_TO,
"version_check": VERSION_CHECK,
}

View File

@ -4,8 +4,9 @@ functionality:
- handle threads and locks
"""
from home import tasks as ta_tasks
from home.celery import app as celery_app
from home.src.ta.ta_redis import RedisArchivist, TaskRedis
from home.src.ta.task_config import TASK_CONFIG
class TaskManager: