diff --git a/README.md b/README.md index 1db9dfe..58179ba 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ ![home screenshot](assets/tube-archivist-screenshot-home.png?raw=true "Tube Archivist Home") ## Table of contents: -* [Wiki](https://github.com/tubearchivist/tubearchivist/wiki) with [FAQ](https://github.com/tubearchivist/tubearchivist/wiki/FAQ) +* Docs: [Wiki](https://github.com/tubearchivist/tubearchivist/wiki), [FAQ](https://github.com/tubearchivist/tubearchivist/wiki/FAQ) and [API](https://github.com/tubearchivist/tubearchivist/tree/master/tubearchivist/api) * [Core functionality](#core-functionality) * [Resources](#resources) * [Installing and updating](#installing-and-updating) diff --git a/deploy.sh b/deploy.sh index b76a68c..3f258b7 100755 --- a/deploy.sh +++ b/deploy.sh @@ -87,7 +87,7 @@ function validate { echo "running black" black --exclude "migrations/*" --diff --color --check -l 79 "$check_path" echo "running codespell" - codespell --skip="./.git,./package.json,./package-lock.json,./node_modules" "$check_path" + codespell --skip="./.git,./package.json,./package-lock.json,./node_modules,./.mypy_cache" "$check_path" echo "running flake8" flake8 "$check_path" --exclude "migrations" --count --max-complexity=10 \ --max-line-length=79 --show-source --statistics diff --git a/tubearchivist/api/README.md b/tubearchivist/api/README.md index 00e929f..b87980f 100644 --- a/tubearchivist/api/README.md +++ b/tubearchivist/api/README.md @@ -2,8 +2,9 @@ Documentation of available API endpoints. Note: -- This is very early stages and will change! -- Check the commit history to see if a documented feature is already in your release +- This has changed in the past and will change again while building out additional integrations and functionality. +- All changes to the API are marked with a `[API]` keyword for easy searching, for example search for [commits](https://github.com/tubearchivist/tubearchivist/search?o=desc&q=%5Bapi%5D&s=committer-date&type=commits). You'll find the same in the [release notes](https://github.com/tubearchivist/tubearchivist/releases). +- Check the commit history and release notes to see if a documented feature is already in your release. ## Table of contents - [Authentication](#authentication) @@ -35,9 +36,13 @@ Note: - [Snapshot List](#snapshot-list-view) - [Snapshot Single](#snapshot-item-view) +**Task management** +- [Task Name List](#task-name-list-view) +- [Task Name Single](#task-name-item-view) +- [Task ID](#task-id-view) + **Additional** - [Login](#login-view) -- [Task](#task-view) WIP - [Refresh](#refresh-view) - [Cookie](#cookie-view) - [Search](#search-view) @@ -251,7 +256,7 @@ POST /api/snapshot/ Create new snapshot now, will return immediately, task will run async in the background, will return snapshot name: ```json { - "snapshot_name": "ta_daily_ + "snapshot_name": "ta_daily_" } ``` @@ -260,7 +265,7 @@ GET /api/snapshot/\/ Return metadata of a single snapshot ```json { - "id": "ta_daily_, + "id": "ta_daily_", "state": "SUCCESS", "es_version": "0.0.0", "start_date": "date_str", @@ -276,6 +281,29 @@ Restore this snapshot DELETE /api/snapshot/\/ Remove this snapshot from index +## Task Name List View +GET /api/task-name/ +Return all task results + +## Task Name Item View +GET /api/task-name/\/ +Return all ask results by task name + +POST /api/task-name/\/ +Start a new task by task name, only tasks without arguments can be started like that, see `home.tasks.BaseTask.TASK_CONFIG` for more info. + +## Task ID view +GET /api/task-id/\/ +Return task status by task ID + +POST /api/task-id/\/ +```json +{ + "command": "stop|kill" +} +``` +Send command to a task, valid commands: `stop` and `kill`. + ## Login View Return token and user ID for username and password: POST /api/login/ @@ -294,33 +322,6 @@ after successful login returns } ``` -## Task View -GET /api/task/ -POST /api/task/ - -Check if there is an ongoing task: -GET /api/task/ - -Returns: -```json -{ - "rescan": false, - "downloading": false -} -``` - -Start a background task -POST /api/task/ -```json -{ - "run": "task_name" -} -``` - -List of valid task names: -- **download_pending**: Start the download queue -- **rescan_pending**: Rescan your subscriptions - ## Refresh View GET /api/refresh/ parameters: diff --git a/tubearchivist/api/src/task_processor.py b/tubearchivist/api/src/task_processor.py deleted file mode 100644 index dd42ee0..0000000 --- a/tubearchivist/api/src/task_processor.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Functionality: -- process tasks from API -- validate -- handover to celery -""" - -from home.src.ta.ta_redis import RedisArchivist -from home.tasks import download_pending, update_subscribed - - -class TaskHandler: - """handle tasks from api""" - - def __init__(self, data): - self.data = data - - def run_task(self): - """map data and run""" - task_name = self.data["run"] - try: - to_run = self.exec_map(task_name) - except KeyError as err: - print(f"invalid task name {task_name}") - raise ValueError from err - - response = to_run() - response.update({"task": task_name}) - return response - - def exec_map(self, task_name): - """map dict key and return function to execute""" - exec_map = { - "download_pending": self._download_pending, - "rescan_pending": self._rescan_pending, - } - - return exec_map[task_name] - - @staticmethod - def _rescan_pending(): - """look for new items in subscribed channels""" - print("rescan subscribed channels") - update_subscribed.delay() - return {"success": True} - - @staticmethod - def _download_pending(): - """start the download queue""" - print("download pending") - running = download_pending.delay() - print("set task id: " + running.id) - RedisArchivist().set_message("dl_queue_id", running.id) - return {"success": True} diff --git a/tubearchivist/api/urls.py b/tubearchivist/api/urls.py index 7cf5368..19c6e19 100644 --- a/tubearchivist/api/urls.py +++ b/tubearchivist/api/urls.py @@ -1,138 +1,134 @@ """all api urls""" -from api.views import ( - ChannelApiListView, - ChannelApiVideoView, - ChannelApiView, - CookieView, - DownloadApiListView, - DownloadApiView, - LoginApiView, - PingView, - PlaylistApiListView, - PlaylistApiVideoView, - PlaylistApiView, - RefreshView, - SearchView, - SnapshotApiListView, - SnapshotApiView, - TaskApiView, - VideoApiListView, - VideoApiView, - VideoCommentView, - VideoProgressView, - VideoSimilarView, - VideoSponsorView, - WatchedView, -) +from api import views from django.urls import path urlpatterns = [ - path("ping/", PingView.as_view(), name="ping"), - path("login/", LoginApiView.as_view(), name="api-login"), + path("ping/", views.PingView.as_view(), name="ping"), + path("login/", views.LoginApiView.as_view(), name="api-login"), path( "video/", - VideoApiListView.as_view(), + views.VideoApiListView.as_view(), name="api-video-list", ), path( "video//", - VideoApiView.as_view(), + views.VideoApiView.as_view(), name="api-video", ), path( "video//progress/", - VideoProgressView.as_view(), + views.VideoProgressView.as_view(), name="api-video-progress", ), path( "video//comment/", - VideoCommentView.as_view(), + views.VideoCommentView.as_view(), name="api-video-comment", ), path( "video//similar/", - VideoSimilarView.as_view(), + views.VideoSimilarView.as_view(), name="api-video-similar", ), path( "video//sponsor/", - VideoSponsorView.as_view(), + views.VideoSponsorView.as_view(), name="api-video-sponsor", ), path( "channel/", - ChannelApiListView.as_view(), + views.ChannelApiListView.as_view(), name="api-channel-list", ), path( "channel//", - ChannelApiView.as_view(), + views.ChannelApiView.as_view(), name="api-channel", ), path( "channel//video/", - ChannelApiVideoView.as_view(), + views.ChannelApiVideoView.as_view(), name="api-channel-video", ), path( "playlist/", - PlaylistApiListView.as_view(), + views.PlaylistApiListView.as_view(), name="api-playlist-list", ), path( "playlist//", - PlaylistApiView.as_view(), + views.PlaylistApiView.as_view(), name="api-playlist", ), path( "playlist//video/", - PlaylistApiVideoView.as_view(), + views.PlaylistApiVideoView.as_view(), name="api-playlist-video", ), path( "download/", - DownloadApiListView.as_view(), + views.DownloadApiListView.as_view(), name="api-download-list", ), path( "download//", - DownloadApiView.as_view(), + views.DownloadApiView.as_view(), name="api-download", ), path( "refresh/", - RefreshView.as_view(), + views.RefreshView.as_view(), name="api-refresh", ), - path( - "task/", - TaskApiView.as_view(), - name="api-task", - ), path( "snapshot/", - SnapshotApiListView.as_view(), + views.SnapshotApiListView.as_view(), name="api-snapshot-list", ), path( "snapshot//", - SnapshotApiView.as_view(), + views.SnapshotApiView.as_view(), name="api-snapshot", ), + path( + "task-name/", + views.TaskListView.as_view(), + name="api-task-list", + ), + path( + "task-name//", + views.TaskNameListView.as_view(), + name="api-task-name-list", + ), + path( + "task-id//", + views.TaskIDView.as_view(), + name="api-task-id", + ), path( "cookie/", - CookieView.as_view(), + views.CookieView.as_view(), name="api-cookie", ), path( "watched/", - WatchedView.as_view(), + views.WatchedView.as_view(), name="api-watched", ), path( "search/", - SearchView.as_view(), + views.SearchView.as_view(), name="api-search", ), + path( + "token/", + views.TokenView.as_view(), + name="api-token", + ), + path( + "notification/", + views.NotificationView.as_view(), + name="api-notification", + ), ] diff --git a/tubearchivist/api/views.py b/tubearchivist/api/views.py index 14f547f..85b44b0 100644 --- a/tubearchivist/api/views.py +++ b/tubearchivist/api/views.py @@ -1,7 +1,6 @@ """all API views""" from api.src.search_processor import SearchProcess -from api.src.task_processor import TaskHandler from home.src.download.queue import PendingInteract from home.src.download.yt_dlp_base import CookieHandler from home.src.es.connect import ElasticWrap @@ -14,8 +13,15 @@ from home.src.index.reindex import ReindexProgress from home.src.index.video import SponsorBlock, YoutubeVideo from home.src.ta.config import AppConfig from home.src.ta.ta_redis import RedisArchivist, RedisQueue +from home.src.ta.task_manager import TaskCommand, TaskManager from home.src.ta.urlparser import Parser -from home.tasks import check_reindex, download_single, extrac_dl, subscribe_to +from home.tasks import ( + BaseTask, + check_reindex, + download_pending, + extrac_dl, + subscribe_to, +) from rest_framework.authentication import ( SessionAuthentication, TokenAuthentication, @@ -424,14 +430,15 @@ class DownloadApiView(ApiBaseView): print(message) return Response({"message": message}, status=400) - pending_video, status_code = PendingInteract(video_id).get_item() + _, status_code = PendingInteract(video_id).get_item() if status_code == 404: message = f"{video_id}: item not found {status_code}" return Response({"message": message}, status=404) print(f"{video_id}: change status to {item_status}") if item_status == "priority": - download_single.delay(pending_video) + PendingInteract(youtube_id=video_id).prioritize() + download_pending.delay(from_queue=False) else: PendingInteract(video_id, item_status).update_status() RedisQueue(queue_name="dl_queue").clear_item(video_id) @@ -554,29 +561,6 @@ class LoginApiView(ObtainAuthToken): return Response({"token": token.key, "user_id": user.pk}) -class TaskApiView(ApiBaseView): - """resolves to /api/task/ - GET: check if ongoing background task - POST: start a new background task - """ - - @staticmethod - def get(request): - """handle get request""" - # pylint: disable=unused-argument - response = {"rescan": False, "downloading": False} - for key in response.keys(): - response[key] = RedisArchivist().is_locked(key) - - return Response(response) - - def post(self, request): - """handle post request""" - response = TaskHandler(request.data).run_task() - - return Response(response) - - class SnapshotApiListView(ApiBaseView): """resolves to /api/snapshot/ GET: returns snashot config plus list of existing snapshots @@ -641,6 +625,108 @@ class SnapshotApiView(ApiBaseView): return Response(response) +class TaskListView(ApiBaseView): + """resolves to /api/task-name/ + GET: return a list of all stored task results + """ + + def get(self, request): + """handle get request""" + # pylint: disable=unused-argument + all_results = TaskManager().get_all_results() + + return Response(all_results) + + +class TaskNameListView(ApiBaseView): + """resolves to /api/task-name// + GET: return a list of stored results of task + POST: start new background process + """ + + def get(self, request, task_name): + """handle get request""" + # pylint: disable=unused-argument + if task_name not in BaseTask.TASK_CONFIG: + message = {"message": "invalid task name"} + return Response(message, status=404) + + all_results = TaskManager().get_tasks_by_name(task_name) + + return Response(all_results) + + def post(self, request, task_name): + """ + handle post request + 404 for invalid task_name + 400 if task can't be started here without argument + """ + # pylint: disable=unused-argument + task_config = BaseTask.TASK_CONFIG.get(task_name) + if not task_config: + message = {"message": "invalid task name"} + return Response(message, status=404) + + if not task_config.get("api-start"): + message = {"message": "can not start task through this endpoint"} + return Response(message, status=400) + + message = TaskCommand().start(task_name) + + return Response({"message": message}) + + +class TaskIDView(ApiBaseView): + """resolves to /api/task-id// + GET: return details of task id + """ + + valid_commands = ["stop", "kill"] + + def get(self, request, task_id): + """handle get request""" + # pylint: disable=unused-argument + task_result = TaskManager().get_task(task_id) + if not task_result: + message = {"message": "task id not found"} + return Response(message, status=404) + + return Response(task_result) + + def post(self, request, task_id): + """post command to task""" + command = request.data.get("command") + if not command or command not in self.valid_commands: + message = {"message": "no valid command found"} + return Response(message, status=400) + + task_result = TaskManager().get_task(task_id) + if not task_result: + message = {"message": "task id not found"} + return Response(message, status=404) + + task_conf = BaseTask.TASK_CONFIG.get(task_result.get("name")) + if command == "stop": + if not task_conf.get("api-stop"): + message = {"message": "task can not be stopped"} + return Response(message, status=400) + + message_key = self._build_message_key(task_conf, task_id) + TaskCommand().stop(task_id, message_key) + if command == "kill": + if not task_conf.get("api-stop"): + message = {"message": "task can not be killed"} + return Response(message, status=400) + + TaskCommand().kill(task_id) + + return Response({"message": "command sent"}) + + def _build_message_key(self, task_conf, task_id): + """build message key to forward command to notification""" + return f"message:{task_conf.get('group')}:{task_id.split('-')[0]}" + + class RefreshView(ApiBaseView): """resolves to /api/refresh/ GET: get refresh progress @@ -760,3 +846,33 @@ class SearchView(ApiBaseView): search_results = SearchForm().multi_search(search_query) return Response(search_results) + + +class TokenView(ApiBaseView): + """resolves to /api/token/ + DELETE: revoke the token + """ + + @staticmethod + def delete(request): + print("revoke API token") + request.user.auth_token.delete() + return Response({"success": True}) + + +class NotificationView(ApiBaseView): + """resolves to /api/notification/ + GET: returns a list of notifications + filter query to filter messages by group + """ + + valid_filters = ["download", "settings", "channel"] + + def get(self, request): + """get all notifications""" + query = "message" + filter_by = request.GET.get("filter", None) + if filter_by in self.valid_filters: + query = f"{query}:{filter_by}" + + return Response(RedisArchivist().list_items(query)) diff --git a/tubearchivist/config/management/commands/ta_connection.py b/tubearchivist/config/management/commands/ta_connection.py index 4f6b8c3..a6159b3 100644 --- a/tubearchivist/config/management/commands/ta_connection.py +++ b/tubearchivist/config/management/commands/ta_connection.py @@ -43,7 +43,6 @@ class Command(BaseCommand): self.stdout.write("[1] connect to Redis") redis_conn = RedisArchivist().conn for _ in range(5): - sleep(2) try: pong = redis_conn.execute_command("PING") if pong: @@ -54,10 +53,12 @@ class Command(BaseCommand): except Exception: # pylint: disable=broad-except self.stdout.write(" ... retry Redis connection") + sleep(2) message = " 🗙 Redis connection failed" self.stdout.write(self.style.ERROR(f"{message}")) RedisArchivist().exec("PING") + sleep(60) raise CommandError(message) def _redis_config_set(self): @@ -75,13 +76,13 @@ class Command(BaseCommand): self.stdout.write("[3] connect to Elastic Search") total = self.TIMEOUT // 5 for i in range(total): - sleep(5) self.stdout.write(f" ... waiting for ES [{i}/{total}]") try: _, status_code = ElasticWrap("/").get( timeout=1, print_error=False ) except requests.exceptions.ConnectionError: + sleep(5) continue if status_code and status_code == 200: @@ -98,6 +99,7 @@ class Command(BaseCommand): self.stdout.write(self.style.ERROR(f"{message}")) self.stdout.write(f" error message: {response}") self.stdout.write(f" status code: {status_code}") + sleep(60) raise CommandError(message) def _es_version_check(self): @@ -118,6 +120,7 @@ class Command(BaseCommand): + f"Expected {self.MIN_MAJOR}.{self.MIN_MINOR} but got {version}" ) self.stdout.write(self.style.ERROR(f"{message}")) + sleep(60) raise CommandError(message) def _es_path_check(self): @@ -137,4 +140,5 @@ class Command(BaseCommand): + " path.repo=/usr/share/elasticsearch/data/snapshot" ) self.stdout.write(self.style.ERROR(f"{message}")) + sleep(60) raise CommandError(message) diff --git a/tubearchivist/config/management/commands/ta_startup.py b/tubearchivist/config/management/commands/ta_startup.py index a74358d..bba4eff 100644 --- a/tubearchivist/config/management/commands/ta_startup.py +++ b/tubearchivist/config/management/commands/ta_startup.py @@ -14,6 +14,7 @@ from home.src.es.snapshot import ElasticSnapshot from home.src.ta.config import AppConfig, ReleaseVersion from home.src.ta.helper import clear_dl_cache from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.task_manager import TaskManager TOPIC = """ @@ -35,6 +36,7 @@ class Command(BaseCommand): self._sync_redis_state() self._make_folders() self._release_locks() + self._clear_tasks() self._clear_dl_cache() self._version_check() self._mig_index_setup() @@ -96,9 +98,23 @@ class Command(BaseCommand): if not has_changed: self.stdout.write(self.style.SUCCESS(" no locks found")) + def _clear_tasks(self): + """clear tasks and messages""" + self.stdout.write("[4] clear task leftovers") + TaskManager().fail_pending() + redis_con = RedisArchivist() + to_delete = redis_con.list_keys("message:") + if to_delete: + for key in to_delete: + redis_con.del_message(key) + + self.stdout.write( + self.style.SUCCESS(f" ✓ cleared {len(to_delete)} messages") + ) + def _clear_dl_cache(self): """clear leftover files from dl cache""" - self.stdout.write("[4] clear leftover files from dl cache") + self.stdout.write("[5] clear leftover files from dl cache") config = AppConfig().config leftover_files = clear_dl_cache(config) if leftover_files: @@ -110,7 +126,7 @@ class Command(BaseCommand): def _version_check(self): """remove new release key if updated now""" - self.stdout.write("[5] check for first run after update") + self.stdout.write("[6] check for first run after update") new_version = ReleaseVersion().is_updated() if new_version: self.stdout.write( @@ -175,4 +191,5 @@ class Command(BaseCommand): message = f" 🗙 {index_name} vid_type update failed" self.stdout.write(self.style.ERROR(message)) self.stdout.write(response) + sleep(60) raise CommandError(message) diff --git a/tubearchivist/config/settings.py b/tubearchivist/config/settings.py index 865586d..73aa701 100644 --- a/tubearchivist/config/settings.py +++ b/tubearchivist/config/settings.py @@ -265,4 +265,4 @@ CORS_ALLOW_HEADERS = list(default_headers) + [ # TA application settings TA_UPSTREAM = "https://github.com/tubearchivist/tubearchivist" -TA_VERSION = "v0.3.4" +TA_VERSION = "v0.3.5-unstable" diff --git a/tubearchivist/home/src/download/queue.py b/tubearchivist/home/src/download/queue.py index 2175823..5a453ec 100644 --- a/tubearchivist/home/src/download/queue.py +++ b/tubearchivist/home/src/download/queue.py @@ -18,7 +18,7 @@ from home.src.index.playlist import YoutubePlaylist from home.src.index.video_constants import VideoTypeEnum from home.src.ta.config import AppConfig from home.src.ta.helper import DurationConverter, is_shorts -from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.ta_redis import RedisQueue class PendingIndex: @@ -117,6 +117,16 @@ class PendingInteract: path = f"ta_download/_update/{self.youtube_id}" _, _ = ElasticWrap(path).post(data=data) + def prioritize(self): + """prioritize pending item in redis queue""" + pending_video, _ = self.get_item() + vid_type = pending_video.get("vid_type", VideoTypeEnum.VIDEOS.value) + to_add = { + "youtube_id": pending_video["youtube_id"], + "vid_type": vid_type, + } + RedisQueue(queue_name="dl_queue").add_priority(to_add) + def get_item(self): """return pending item dict""" path = f"ta_download/_doc/{self.youtube_id}" @@ -151,12 +161,14 @@ class PendingList(PendingIndex): "noplaylist": True, "writethumbnail": True, "simulate": True, + "check_formats": None, } - def __init__(self, youtube_ids=False): + def __init__(self, youtube_ids=False, task=False): super().__init__() self.config = AppConfig().config self.youtube_ids = youtube_ids + self.task = task self.to_skip = False self.missing_videos = False @@ -165,16 +177,16 @@ class PendingList(PendingIndex): self.missing_videos = [] self.get_download() self.get_indexed() - for entry in self.youtube_ids: - # notify - mess_dict = { - "status": "message:add", - "level": "info", - "title": "Adding to download queue.", - "message": "Extracting lists", - } - RedisArchivist().set_message("message:add", mess_dict, expire=True) + total = len(self.youtube_ids) + for idx, entry in enumerate(self.youtube_ids): self._process_entry(entry) + if not self.task: + continue + + self.task.send_progress( + message_lines=[f"Extracting items {idx + 1}/{total}"], + progress=(idx + 1) / total, + ) def _process_entry(self, entry): """process single entry from url list""" @@ -228,9 +240,13 @@ class PendingList(PendingIndex): self.get_channels() bulk_list = [] + total = len(self.missing_videos) for idx, (youtube_id, vid_type) in enumerate(self.missing_videos): - print(f"{youtube_id} ({vid_type}): add to download queue") - self._notify_add(idx) + if self.task and self.task.is_stopped(): + break + + print(f"{youtube_id}: [{idx + 1}/{total}]: add to queue") + self._notify_add(idx, total) video_details = self.get_youtube_details(youtube_id, vid_type) if not video_details: continue @@ -243,29 +259,34 @@ class PendingList(PendingIndex): url = video_details["vid_thumb_url"] ThumbManager(youtube_id).download_video_thumb(url) - if bulk_list: - # add last newline - bulk_list.append("\n") - query_str = "\n".join(bulk_list) - _, _ = ElasticWrap("_bulk").post(query_str, ndjson=True) + if len(bulk_list) >= 20: + self._ingest_bulk(bulk_list) + bulk_list = [] - def _notify_add(self, idx): + self._ingest_bulk(bulk_list) + + def _ingest_bulk(self, bulk_list): + """add items to queue in bulk""" + if not bulk_list: + return + + # add last newline + bulk_list.append("\n") + query_str = "\n".join(bulk_list) + _, _ = ElasticWrap("_bulk").post(query_str, ndjson=True) + + def _notify_add(self, idx, total): """send notification for adding videos to download queue""" - progress = f"{idx + 1}/{len(self.missing_videos)}" - mess_dict = { - "status": "message:add", - "level": "info", - "title": "Adding new videos to download queue.", - "message": "Progress: " + progress, - } - if idx + 1 == len(self.missing_videos): - expire = 4 - else: - expire = True + if not self.task: + return - RedisArchivist().set_message("message:add", mess_dict, expire=expire) - if idx + 1 % 25 == 0: - print("adding to queue progress: " + progress) + self.task.send_progress( + message_lines=[ + "Adding new videos to download queue.", + f"Extracting items {idx + 1}/{total}", + ], + progress=(idx + 1) / total, + ) def get_youtube_details(self, youtube_id, vid_type=VideoTypeEnum.VIDEOS): """get details from youtubedl for single pending video""" diff --git a/tubearchivist/home/src/download/subscriptions.py b/tubearchivist/home/src/download/subscriptions.py index 67818b8..b006f84 100644 --- a/tubearchivist/home/src/download/subscriptions.py +++ b/tubearchivist/home/src/download/subscriptions.py @@ -13,13 +13,15 @@ from home.src.index.playlist import YoutubePlaylist from home.src.index.video_constants import VideoTypeEnum from home.src.ta.config import AppConfig from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.urlparser import Parser class ChannelSubscription: """manage the list of channels subscribed""" - def __init__(self): + def __init__(self, task=False): self.config = AppConfig().config + self.task = task @staticmethod def get_channels(subscribed_only=True): @@ -44,7 +46,7 @@ class ChannelSubscription: last_videos = [] - for vid_type, limit_amount in queries: + for vid_type_enum, limit_amount in queries: obs = { "skip_download": True, "extract_flat": True, @@ -52,9 +54,9 @@ class ChannelSubscription: if limit: obs["playlistend"] = limit_amount - path = vid_type.value + vid_type = vid_type_enum.value channel = YtWrap(obs, self.config).extract( - f"https://www.youtube.com/channel/{channel_id}/{path}" + f"https://www.youtube.com/channel/{channel_id}/{vid_type}" ) if not channel: continue @@ -101,12 +103,16 @@ class ChannelSubscription: def find_missing(self): """add missing videos from subscribed channels to pending""" all_channels = self.get_channels() + if not all_channels: + return False + pending = queue.PendingList() pending.get_download() pending.get_indexed() missing_videos = [] + total = len(all_channels) for idx, channel in enumerate(all_channels): channel_id = channel["channel_id"] print(f"{channel_id}: find missing videos.") @@ -116,21 +122,19 @@ class ChannelSubscription: for video_id, _, vid_type in last_videos: if video_id not in pending.to_skip: missing_videos.append((video_id, vid_type)) - # notify - message = { - "status": "message:rescan", - "level": "info", - "title": "Scanning channels: Looking for new videos.", - "message": f"Progress: {idx + 1}/{len(all_channels)}", - } - if idx + 1 == len(all_channels): - expire = 4 - else: - expire = True - RedisArchivist().set_message( - "message:rescan", message=message, expire=expire - ) + if not self.task: + continue + + if self.task: + if self.task.is_stopped(): + self.task.send_progress(["Received Stop signal."]) + break + + self.task.send_progress( + message_lines=[f"Scanning Channel {idx + 1}/{total}"], + progress=(idx + 1) / total, + ) return missing_videos @@ -147,8 +151,9 @@ class ChannelSubscription: class PlaylistSubscription: """manage the playlist download functionality""" - def __init__(self): + def __init__(self, task=False): self.config = AppConfig().config + self.task = task @staticmethod def get_playlists(subscribed_only=True): @@ -232,14 +237,18 @@ class PlaylistSubscription: def find_missing(self): """find videos in subscribed playlists not downloaded yet""" all_playlists = [i["playlist_id"] for i in self.get_playlists()] + if not all_playlists: + return False + to_ignore = self.get_to_ignore() missing_videos = [] + total = len(all_playlists) for idx, playlist_id in enumerate(all_playlists): size_limit = self.config["subscriptions"]["channel_size"] playlist = YoutubePlaylist(playlist_id) - playlist.update_playlist() - if not playlist: + is_active = playlist.update_playlist() + if not is_active: playlist.deactivate() continue @@ -249,19 +258,123 @@ class PlaylistSubscription: all_missing = [i for i in playlist_entries if not i["downloaded"]] - message = { - "status": "message:rescan", - "level": "info", - "title": "Scanning playlists: Looking for new videos.", - "message": f"Progress: {idx + 1}/{len(all_playlists)}", - } - RedisArchivist().set_message( - "message:rescan", message=message, expire=True - ) - for video in all_missing: youtube_id = video["youtube_id"] if youtube_id not in to_ignore: missing_videos.append(youtube_id) + if not self.task: + continue + + if self.task: + self.task.send_progress( + message_lines=[f"Scanning Playlists {idx + 1}/{total}"], + progress=(idx + 1) / total, + ) + if self.task.is_stopped(): + self.task.send_progress(["Received Stop signal."]) + break + return missing_videos + + +class SubscriptionScanner: + """add missing videos to queue""" + + def __init__(self, task=False): + self.task = task + self.missing_videos = False + + def scan(self): + """scan channels and playlists""" + if self.task: + self.task.send_progress(["Rescanning channels and playlists."]) + + self.missing_videos = [] + self.scan_channels() + if self.task and not self.task.is_stopped(): + self.scan_playlists() + + return self.missing_videos + + def scan_channels(self): + """get missing from channels""" + channel_handler = ChannelSubscription(task=self.task) + missing = channel_handler.find_missing() + if not missing: + return + + for vid_id, vid_type in missing: + self.missing_videos.append( + {"type": "video", "vid_type": vid_type, "url": vid_id} + ) + + def scan_playlists(self): + """get missing from playlists""" + playlist_handler = PlaylistSubscription(task=self.task) + missing = playlist_handler.find_missing() + if not missing: + return + + for i in missing: + self.missing_videos.append( + { + "type": "video", + "vid_type": VideoTypeEnum.VIDEOS.value, + "url": i, + } + ) + + +class SubscriptionHandler: + """subscribe to channels and playlists from url_str""" + + def __init__(self, url_str, task=False): + self.url_str = url_str + self.task = task + self.to_subscribe = False + + def subscribe(self): + """subscribe to url_str items""" + if self.task: + self.task.send_progress(["Processing form content."]) + self.to_subscribe = Parser(self.url_str).parse() + + total = len(self.to_subscribe) + for idx, item in enumerate(self.to_subscribe): + if self.task: + self._notify(idx, item, total) + + self.subscribe_type(item) + + def subscribe_type(self, item): + """process single item""" + if item["type"] == "playlist": + PlaylistSubscription().process_url_str([item]) + return + + if item["type"] == "video": + # extract channel id from video + vid = queue.PendingList().get_youtube_details(item["url"]) + channel_id = vid["channel_id"] + elif item["type"] == "channel": + channel_id = item["url"] + else: + raise ValueError("failed to subscribe to: " + item["url"]) + + self._subscribe(channel_id) + + def _subscribe(self, channel_id): + """subscribe to channel""" + ChannelSubscription().change_subscribe( + channel_id, channel_subscribed=True + ) + + def _notify(self, idx, item, total): + """send notification message to redis""" + subscribe_type = item["type"].title() + message_lines = [ + f"Subscribe to {subscribe_type}", + f"Progress: {idx + 1}/{total}", + ] + self.task.send_progress(message_lines, progress=(idx + 1) / total) diff --git a/tubearchivist/home/src/download/thumbnails.py b/tubearchivist/home/src/download/thumbnails.py index 49082aa..ab4743e 100644 --- a/tubearchivist/home/src/download/thumbnails.py +++ b/tubearchivist/home/src/download/thumbnails.py @@ -10,8 +10,7 @@ from io import BytesIO from time import sleep import requests -from home.src.download import queue # partial import -from home.src.es.connect import IndexPaginate +from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.ta.config import AppConfig from mutagen.mp4 import MP4, MP4Cover from PIL import Image, ImageFile, ImageFilter, UnidentifiedImageError @@ -272,102 +271,121 @@ class ValidatorCallback: class ThumbValidator: """validate thumbnails""" - def download_missing(self): - """download all missing artwork""" - self.download_missing_videos() - self.download_missing_channels() - self.download_missing_playlists() - - def download_missing_videos(self): - """get all missing video thumbnails""" - data = { - "query": {"term": {"active": {"value": True}}}, - "sort": [{"youtube_id": {"order": "asc"}}], - "_source": ["vid_thumb_url", "youtube_id"], - } - paginate = IndexPaginate( - "ta_video", data, size=5000, callback=ValidatorCallback - ) - _ = paginate.get_results() - - def download_missing_channels(self): - """get all missing channel thumbnails""" - data = { - "query": {"term": {"channel_active": {"value": True}}}, - "sort": [{"channel_id": {"order": "asc"}}], - "_source": { - "excludes": ["channel_description", "channel_overwrites"] + INDEX = [ + { + "data": { + "query": {"term": {"active": {"value": True}}}, + "_source": ["vid_thumb_url", "youtube_id"], }, - } - paginate = IndexPaginate( - "ta_channel", data, callback=ValidatorCallback - ) - _ = paginate.get_results() + "name": "ta_video", + }, + { + "data": { + "query": {"term": {"channel_active": {"value": True}}}, + "_source": { + "excludes": ["channel_description", "channel_overwrites"] + }, + }, + "name": "ta_channel", + }, + { + "data": { + "query": {"term": {"playlist_active": {"value": True}}}, + "_source": ["playlist_id", "playlist_thumbnail"], + }, + "name": "ta_playlist", + }, + ] - def download_missing_playlists(self): - """get all missing playlist artwork""" - data = { - "query": {"term": {"playlist_active": {"value": True}}}, - "sort": [{"playlist_id": {"order": "asc"}}], - "_source": ["playlist_id", "playlist_thumbnail"], - } - paginate = IndexPaginate( - "ta_playlist", data, callback=ValidatorCallback - ) - _ = paginate.get_results() + def __init__(self, task): + self.task = task + + def validate(self): + """validate all indexes""" + for index in self.INDEX: + total = self._get_total(index["name"]) + if not total: + continue + + paginate = IndexPaginate( + index_name=index["name"], + data=index["data"], + size=1000, + callback=ValidatorCallback, + task=self.task, + total=total, + ) + _ = paginate.get_results() + + @staticmethod + def _get_total(index_name): + """get total documents in index""" + path = f"{index_name}/_count" + response, _ = ElasticWrap(path).get() + + return response.get("count") class ThumbFilesystem: - """filesystem tasks for thumbnails""" + """sync thumbnail files to media files""" + + INDEX_NAME = "ta_video" + + def __init__(self, task=False): + self.task = task + + def embed(self): + """entry point""" + data = { + "query": {"match_all": {}}, + "_source": ["media_url", "youtube_id"], + } + paginate = IndexPaginate( + index_name=self.INDEX_NAME, + data=data, + callback=EmbedCallback, + task=self.task, + total=self._get_total(), + ) + _ = paginate.get_results() + + def _get_total(self): + """get total documents in index""" + path = f"{self.INDEX_NAME}/_count" + response, _ = ElasticWrap(path).get() + + return response.get("count") + + +class EmbedCallback: + """callback class to embed thumbnails""" CONFIG = AppConfig().config CACHE_DIR = CONFIG["application"]["cache_dir"] MEDIA_DIR = CONFIG["application"]["videos"] - VIDEO_DIR = os.path.join(CACHE_DIR, "videos") + FORMAT = MP4Cover.FORMAT_JPEG - def sync(self): - """embed thumbnails to mediafiles""" - video_list = self.get_thumb_list() - self._embed_thumbs(video_list) + def __init__(self, source, index_name): + self.source = source + self.index_name = index_name - def get_thumb_list(self): - """get list of mediafiles and matching thumbnails""" - pending = queue.PendingList() - pending.get_download() - pending.get_indexed() - - video_list = [] - for video in pending.all_videos: - video_id = video["youtube_id"] - media_url = os.path.join(self.MEDIA_DIR, video["media_url"]) + def run(self): + """run embed""" + for video in self.source: + video_id = video["_source"]["youtube_id"] + media_url = os.path.join( + self.MEDIA_DIR, video["_source"]["media_url"] + ) thumb_path = os.path.join( self.CACHE_DIR, ThumbManager(video_id).vid_thumb_path() ) - video_list.append( - { - "media_url": media_url, - "thumb_path": thumb_path, - } - ) + if os.path.exists(thumb_path): + self.embed(media_url, thumb_path) - return video_list + def embed(self, media_url, thumb_path): + """embed thumb in single media file""" + video = MP4(media_url) + with open(thumb_path, "rb") as f: + video["covr"] = [MP4Cover(f.read(), imageformat=self.FORMAT)] - @staticmethod - def _embed_thumbs(video_list): - """rewrite the thumbnail into media file""" - - counter = 1 - for video in video_list: - # loop through all videos - media_url = video["media_url"] - thumb_path = video["thumb_path"] - - mutagen_vid = MP4(media_url) - with open(thumb_path, "rb") as f: - mutagen_vid["covr"] = [ - MP4Cover(f.read(), imageformat=MP4Cover.FORMAT_JPEG) - ] - mutagen_vid.save() - if counter % 50 == 0: - print(f"thumbnail write progress {counter}/{len(video_list)}") - counter = counter + 1 + video.save() diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index be7c71a..5db44bf 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -22,7 +22,7 @@ from home.src.index.video import YoutubeVideo, index_new_video from home.src.index.video_constants import VideoTypeEnum from home.src.ta.config import AppConfig from home.src.ta.helper import clean_string, ignore_filelist -from home.src.ta.ta_redis import RedisArchivist, RedisQueue +from home.src.ta.ta_redis import RedisQueue class DownloadPostProcess: @@ -96,7 +96,7 @@ class DownloadPostProcess: def validate_playlists(self): """look for playlist needing to update""" for id_c, channel_id in enumerate(self.download.channels): - channel = YoutubeChannel(channel_id) + channel = YoutubeChannel(channel_id, task=self.download.task) overwrites = self.pending.channel_overwrites.get(channel_id, False) if overwrites and overwrites.get("index_playlists"): # validate from remote @@ -125,28 +125,23 @@ class DownloadPostProcess: def _notify_playlist_progress(self, all_channel_playlist, id_c, id_p): """notify to UI""" - title = ( - "Processing playlists for channels: " - + f"{id_c + 1}/{len(self.download.channels)}" - ) - message = f"Progress: {id_p + 1}/{len(all_channel_playlist)}" - key = "message:download" - mess_dict = { - "status": key, - "level": "info", - "title": title, - "message": message, - } - if id_p + 1 == len(all_channel_playlist): - expire = 4 - else: - expire = True + if not self.download.task: + return - RedisArchivist().set_message(key, mess_dict, expire=expire) + total_channel = len(self.download.channels) + total_playlist = len(all_channel_playlist) + + message = [f"Validate Playlists {id_p + 1}/{total_playlist}"] + title = f"Post Processing Channels: {id_c + 1}/{total_channel}" + progress = (id_c + 1) / total_channel + + self.download.task.send_progress( + message, progress=progress, title=title + ) def get_comments(self): """get comments from youtube""" - CommentList(self.download.videos).index(notify=True) + CommentList(self.download.videos, task=self.download.task).index() class VideoDownloader: @@ -155,12 +150,11 @@ class VideoDownloader: if not initiated with list, take from queue """ - MSG = "message:download" - - def __init__(self, youtube_id_list=False): + def __init__(self, youtube_id_list=False, task=False): self.obs = False self.video_overwrites = False self.youtube_id_list = youtube_id_list + self.task = task self.config = AppConfig().config self._build_obs() self.channels = set() @@ -169,7 +163,6 @@ class VideoDownloader: def run_queue(self): """setup download queue in redis loop until no more items""" self._setup_queue() - queue = RedisQueue(queue_name="dl_queue") limit_queue = self.config["downloads"]["limit_count"] @@ -178,14 +171,11 @@ class VideoDownloader: while True: youtube_data = queue.get_next() - if not youtube_data: + if self.task.is_stopped() or not youtube_data: + queue.clear() break - try: - youtube_data = json.loads(youtube_data) - except json.JSONDecodeError: # This many not be necessary - continue - + youtube_data = json.loads(youtube_data) youtube_id = youtube_data.get("youtube_id") tmp_vid_type = youtube_data.get( @@ -198,13 +188,8 @@ class VideoDownloader: if not success: continue - mess_dict = { - "status": self.MSG, - "level": "info", - "title": "Indexing....", - "message": "Add video metadata to index.", - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=120) + if self.task: + self.task.send_progress(["Add video metadata to index."]) vid_dict = index_new_video( youtube_id, @@ -213,29 +198,20 @@ class VideoDownloader: ) self.channels.add(vid_dict["channel"]["channel_id"]) self.videos.add(vid_dict["youtube_id"]) - mess_dict = { - "status": self.MSG, - "level": "info", - "title": "Moving....", - "message": "Moving downloaded file to storage folder", - } - RedisArchivist().set_message(self.MSG, mess_dict) + + if self.task: + self.task.send_progress(["Move downloaded file to archive."]) + + self.move_to_archive(vid_dict) if queue.has_item(): message = "Continue with next video." - expire = False else: message = "Download queue is finished." - expire = 10 - self.move_to_archive(vid_dict) - mess_dict = { - "status": self.MSG, - "level": "info", - "title": "Completed", - "message": message, - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=expire) + if self.task: + self.task.send_progress([message]) + self._delete_from_pending(youtube_id) # post processing @@ -256,13 +232,9 @@ class VideoDownloader: def add_pending(self): """add pending videos to download queue""" - mess_dict = { - "status": self.MSG, - "level": "info", - "title": "Looking for videos to download", - "message": "Scanning your download queue.", - } - RedisArchivist().set_message(self.MSG, mess_dict) + if self.task: + self.task.send_progress(["Scanning your download queue."]) + pending = PendingList() pending.get_download() to_add = [ @@ -279,40 +251,32 @@ class VideoDownloader: if not to_add: # there is nothing pending print("download queue is empty") - mess_dict = { - "status": self.MSG, - "level": "error", - "title": "Download queue is empty", - "message": "Add some videos to the queue first.", - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=True) + if self.task: + self.task.send_progress(["Download queue is empty."]) + return RedisQueue(queue_name="dl_queue").add_list(to_add) def _progress_hook(self, response): """process the progress_hooks from yt_dlp""" - title = "Downloading: " + response["info_dict"]["title"] - + progress = False try: size = response.get("_total_bytes_str") if size.strip() == "N/A": size = response.get("_total_bytes_estimate_str", "N/A") percent = response["_percent_str"] + progress = float(percent.strip("%")) / 100 speed = response["_speed_str"] eta = response["_eta_str"] message = f"{percent} of {size} at {speed} - time left: {eta}" except KeyError: message = "processing" - mess_dict = { - "status": self.MSG, - "level": "info", - "title": title, - "message": message, - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=True) + if self.task: + title = response["info_dict"]["title"] + self.task.send_progress([title, message], progress=progress) def _build_obs(self): """collection to build all obs passed to yt-dlp""" diff --git a/tubearchivist/home/src/es/backup.py b/tubearchivist/home/src/es/backup.py index 92ffa10..d3a76dc 100644 --- a/tubearchivist/home/src/es/backup.py +++ b/tubearchivist/home/src/es/backup.py @@ -18,12 +18,13 @@ from home.src.ta.helper import get_mapping, ignore_filelist class ElasticBackup: """dump index to nd-json files for later bulk import""" - def __init__(self, reason=False): + def __init__(self, reason=False, task=False): self.config = AppConfig().config self.cache_dir = self.config["application"]["cache_dir"] self.timestamp = datetime.now().strftime("%Y%m%d") self.index_config = get_mapping() self.reason = reason + self.task = task def backup_all_indexes(self): """backup all indexes, add reason to init""" @@ -44,18 +45,26 @@ class ElasticBackup: if self.reason == "auto": self.rotate_backup() - @staticmethod - def backup_index(index_name): + def backup_index(self, index_name): """export all documents of a single index""" - data = { - "query": {"match_all": {}}, - "sort": [{"_doc": {"order": "desc"}}], - } paginate = IndexPaginate( - f"ta_{index_name}", data, keep_source=True, callback=BackupCallback + f"ta_{index_name}", + data={"query": {"match_all": {}}}, + keep_source=True, + callback=BackupCallback, + task=self.task, + total=self._get_total(index_name), ) _ = paginate.get_results() + @staticmethod + def _get_total(index_name): + """get total documents in index""" + path = f"ta_{index_name}/_count" + response, _ = ElasticWrap(path).get() + + return response.get("count") + def zip_it(self): """pack it up into single zip file""" file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" @@ -142,7 +151,8 @@ class ElasticBackup: """go through the unpacked files and restore""" backup_dir = os.path.join(self.cache_dir, "backup") - for json_f in zip_content: + for idx, json_f in enumerate(zip_content): + self._notify_restore(idx, json_f, len(zip_content)) file_name = os.path.join(backup_dir, json_f) if not json_f.startswith("es_") or not json_f.endswith(".json"): @@ -153,6 +163,12 @@ class ElasticBackup: self.post_bulk_restore(file_name) os.remove(file_name) + def _notify_restore(self, idx, json_f, total_files): + """notify restore progress""" + message = [f"Restore index from json backup file {json_f}."] + progress = (idx + 1) / total_files + self.task.send_progress(message_lines=message, progress=progress) + @staticmethod def index_exists(index_name): """check if index already exists to skip""" diff --git a/tubearchivist/home/src/es/connect.py b/tubearchivist/home/src/es/connect.py index 4b59ace..fd6e5ab 100644 --- a/tubearchivist/home/src/es/connect.py +++ b/tubearchivist/home/src/es/connect.py @@ -97,8 +97,10 @@ class IndexPaginate: """use search_after to go through whole index kwargs: - size: int, overwrite DEFAULT_SIZE - - keep_source: bool, keep _source key from es resutls - - callback: obj, Class with run method collback for every loop + - keep_source: bool, keep _source key from es results + - callback: obj, Class implementing run method callback for every loop + - task: task object to send notification + - total: int, total items in index for progress message """ DEFAULT_SIZE = 500 @@ -107,12 +109,10 @@ class IndexPaginate: self.index_name = index_name self.data = data self.pit_id = False - self.size = kwargs.get("size") - self.keep_source = kwargs.get("keep_source") - self.callback = kwargs.get("callback") + self.kwargs = kwargs def get_results(self): - """get all results""" + """get all results, add task and total for notifications""" self.get_pit() self.validate_data() all_results = self.run_loop() @@ -130,7 +130,7 @@ class IndexPaginate: if "sort" not in self.data.keys(): self.data.update({"sort": [{"_doc": {"order": "desc"}}]}) - self.data["size"] = self.size or self.DEFAULT_SIZE + self.data["size"] = self.kwargs.get("size") or self.DEFAULT_SIZE self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"} def run_loop(self): @@ -140,30 +140,37 @@ class IndexPaginate: while True: response, _ = ElasticWrap("_search").get(data=self.data) all_hits = response["hits"]["hits"] - if all_hits: - for hit in all_hits: - if self.keep_source: - source = hit - else: - source = hit["_source"] - - if not self.callback: - all_results.append(source) - - if self.callback: - self.callback(all_hits, self.index_name).run() - if counter % 10 == 0: - print(f"{self.index_name}: processing page {counter}") - counter = counter + 1 - - # update search_after with last hit data - self.data["search_after"] = all_hits[-1]["sort"] - else: + if not all_hits: break + for hit in all_hits: + if self.kwargs.get("keep_source"): + all_results.append(hit) + else: + all_results.append(hit["_source"]) + + if self.kwargs.get("callback"): + self.kwargs.get("callback")(all_hits, self.index_name).run() + + if self.kwargs.get("task"): + print(f"{self.index_name}: processing page {counter}") + self._notify(len(all_results)) + + counter += 1 + + # update search_after with last hit data + self.data["search_after"] = all_hits[-1]["sort"] + return all_results + def _notify(self, processed): + """send notification on task""" + total = self.kwargs.get("total") + progress = (processed + 1) / total + index_clean = self.index_name.lstrip("ta_").title() + message = [f"Processing {index_clean}s {processed}/{total}"] + self.kwargs.get("task").send_progress(message, progress=progress) + def clean_pit(self): """delete pit from elastic search""" - data = {"id": self.pit_id} - ElasticWrap("_pit").delete(data=data) + ElasticWrap("_pit").delete(data={"id": self.pit_id}) diff --git a/tubearchivist/home/src/frontend/api_calls.py b/tubearchivist/home/src/frontend/api_calls.py index a7d3890..f2d6231 100644 --- a/tubearchivist/home/src/frontend/api_calls.py +++ b/tubearchivist/home/src/frontend/api_calls.py @@ -9,20 +9,9 @@ from home.src.download.subscriptions import ( PlaylistSubscription, ) from home.src.index.playlist import YoutubePlaylist -from home.src.ta.ta_redis import RedisArchivist, RedisQueue +from home.src.ta.ta_redis import RedisArchivist from home.src.ta.urlparser import Parser -from home.tasks import ( - download_pending, - index_channel_playlists, - kill_dl, - re_sync_thumbs, - rescan_filesystem, - run_backup, - run_manual_import, - run_restore_backup, - subscribe_to, - update_subscribed, -) +from home.tasks import run_restore_backup, subscribe_to class PostData: @@ -47,22 +36,14 @@ class PostData: exec_map = { "change_view": self._change_view, "change_grid": self._change_grid, - "rescan_pending": self._rescan_pending, - "dl_pending": self._dl_pending, - "queue": self._queue_handler, "unsubscribe": self._unsubscribe, "subscribe": self._subscribe, "sort_order": self._sort_order, "hide_watched": self._hide_watched, "show_subed_only": self._show_subed_only, "show_ignored_only": self._show_ignored_only, - "manual-import": self._manual_import, - "re-embed": self._re_embed, - "db-backup": self._db_backup, "db-restore": self._db_restore, - "fs-rescan": self._fs_rescan, "delete-playlist": self._delete_playlist, - "find-playlists": self._find_playlists, } return exec_map[self.to_exec] @@ -86,39 +67,6 @@ class PostData: RedisArchivist().set_message(key, {"status": grid_items}) return {"success": True} - @staticmethod - def _rescan_pending(): - """look for new items in subscribed channels""" - print("rescan subscribed channels") - update_subscribed.delay() - return {"success": True} - - @staticmethod - def _dl_pending(): - """start the download queue""" - print("download pending") - running = download_pending.delay() - task_id = running.id - print(f"{task_id}: set task id") - RedisArchivist().set_message("dl_queue_id", task_id) - return {"success": True} - - def _queue_handler(self): - """queue controls from frontend""" - to_execute = self.exec_val - if to_execute == "stop": - print("stopping download queue") - RedisQueue(queue_name="dl_queue").clear() - elif to_execute == "kill": - task_id = RedisArchivist().get_message("dl_queue_id") - if not isinstance(task_id, str): - task_id = False - else: - print("brutally killing " + task_id) - kill_dl(task_id) - - return {"success": True} - def _unsubscribe(self): """unsubscribe from channels or playlists""" id_unsub = self.exec_val @@ -185,27 +133,6 @@ class PostData: RedisArchivist().set_message(key, value) return {"success": True} - @staticmethod - def _manual_import(): - """run manual import from settings page""" - print("starting manual import") - run_manual_import.delay() - return {"success": True} - - @staticmethod - def _re_embed(): - """rewrite thumbnails into media files""" - print("start video thumbnail embed process") - re_sync_thumbs.delay() - return {"success": True} - - @staticmethod - def _db_backup(): - """backup es to zip from settings page""" - print("backing up database") - run_backup.delay("manual") - return {"success": True} - def _db_restore(self): """restore es zip from settings page""" print("restoring index from backup zip") @@ -213,13 +140,6 @@ class PostData: run_restore_backup.delay(filename) return {"success": True} - @staticmethod - def _fs_rescan(): - """start file system rescan task""" - print("start filesystem scan") - rescan_filesystem.delay() - return {"success": True} - def _delete_playlist(self): """delete playlist, only metadata or incl all videos""" playlist_dict = self.exec_val @@ -232,9 +152,3 @@ class PostData: YoutubePlaylist(playlist_id).delete_videos_playlist() return {"success": True} - - def _find_playlists(self): - """add all playlists of a channel""" - channel_id = self.exec_val - index_channel_playlists.delay(channel_id) - return {"success": True} diff --git a/tubearchivist/home/src/index/channel.py b/tubearchivist/home/src/index/channel.py index 477d178..5c90fa6 100644 --- a/tubearchivist/home/src/index/channel.py +++ b/tubearchivist/home/src/index/channel.py @@ -18,7 +18,6 @@ from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.index.generic import YouTubeItem from home.src.index.playlist import YoutubePlaylist from home.src.ta.helper import clean_string, requests_headers -from home.src.ta.ta_redis import RedisArchivist class ChannelScraper: @@ -167,12 +166,12 @@ class YoutubeChannel(YouTubeItem): es_path = False index_name = "ta_channel" yt_base = "https://www.youtube.com/channel/" - msg = "message:playlistscan" - def __init__(self, youtube_id): + def __init__(self, youtube_id, task=False): super().__init__(youtube_id) self.es_path = f"{self.index_name}/_doc/{youtube_id}" self.all_playlists = False + self.task = task def build_json(self, upload=False, fallback=False): """get from es or from youtube""" @@ -324,34 +323,29 @@ class YoutubeChannel(YouTubeItem): print(f"{self.youtube_id}: index all playlists") self.get_from_es() channel_name = self.json_data["channel_name"] - mess_dict = { - "status": self.msg, - "level": "info", - "title": "Looking for playlists", - "message": f"{channel_name}: Scanning channel in progress", - } - RedisArchivist().set_message(self.msg, mess_dict, expire=True) + self.task.send_progress([f"{channel_name}: Looking for Playlists"]) self.get_all_playlists() if not self.all_playlists: print(f"{self.youtube_id}: no playlists found.") return all_youtube_ids = self.get_all_video_ids() + total = len(self.all_playlists) for idx, playlist in enumerate(self.all_playlists): - self._notify_single_playlist(idx, playlist) - self._index_single_playlist(playlist, all_youtube_ids) + if self.task: + self._notify_single_playlist(idx, total) - def _notify_single_playlist(self, idx, playlist): + self._index_single_playlist(playlist, all_youtube_ids) + print("add playlist: " + playlist[1]) + + def _notify_single_playlist(self, idx, total): """send notification""" channel_name = self.json_data["channel_name"] - mess_dict = { - "status": self.msg, - "level": "info", - "title": f"{channel_name}: Scanning channel for playlists", - "message": f"Progress: {idx + 1}/{len(self.all_playlists)}", - } - RedisArchivist().set_message(self.msg, mess_dict, expire=True) - print("add playlist: " + playlist[1]) + message = [ + f"{channel_name}: Scanning channel for playlists", + f"Progress: {idx + 1}/{total}", + ] + self.task.send_progress(message, progress=(idx + 1) / total) @staticmethod def _index_single_playlist(playlist, all_youtube_ids): diff --git a/tubearchivist/home/src/index/comments.py b/tubearchivist/home/src/index/comments.py index 32cea55..1cbe75a 100644 --- a/tubearchivist/home/src/index/comments.py +++ b/tubearchivist/home/src/index/comments.py @@ -10,7 +10,6 @@ from datetime import datetime from home.src.download.yt_dlp_base import YtWrap from home.src.es.connect import ElasticWrap from home.src.ta.config import AppConfig -from home.src.ta.ta_redis import RedisArchivist class Comments: @@ -24,14 +23,13 @@ class Comments: self.is_activated = False self.comments_format = False - def build_json(self, notify=False): + def build_json(self): """build json document for es""" print(f"{self.youtube_id}: get comments") self.check_config() if not self.is_activated: return - self._send_notification(notify) comments_raw, channel_id = self.get_yt_comments() if not comments_raw and not channel_id: return @@ -52,23 +50,6 @@ class Comments: self.is_activated = bool(self.config["downloads"]["comment_max"]) - @staticmethod - def _send_notification(notify): - """send notification for download post process message""" - if not notify: - return - - key = "message:download" - idx, total_videos = notify - message = { - "status": key, - "level": "info", - "title": "Download and index comments", - "message": f"Progress: {idx + 1}/{total_videos}", - } - - RedisArchivist().set_message(key, message) - def build_yt_obs(self): """ get extractor config @@ -79,6 +60,7 @@ class Comments: comment_sort = self.config["downloads"]["comment_sort"] yt_obs = { + "check_formats": None, "skip_download": True, "getcomments": True, "extractor_args": { @@ -200,38 +182,28 @@ class Comments: class CommentList: """interact with comments in group""" - def __init__(self, video_ids): + def __init__(self, video_ids, task=False): self.video_ids = video_ids + self.task = task self.config = AppConfig().config - def index(self, notify=False): - """index group of videos""" + def index(self): + """index comments for list, init with task object to notify""" if not self.config["downloads"].get("comment_max"): return total_videos = len(self.video_ids) - if notify: - self._notify(f"add comments for {total_videos} videos", False) + for idx, youtube_id in enumerate(self.video_ids): + if self.task: + self.notify(idx, total_videos) - for idx, video_id in enumerate(self.video_ids): - comment = Comments(video_id, config=self.config) - if notify: - notify = (idx, total_videos) - comment.build_json(notify=notify) + comment = Comments(youtube_id, config=self.config) + comment.build_json() if comment.json_data: comment.upload_comments() - if notify: - self._notify(f"added comments for {total_videos} videos", 5) - - @staticmethod - def _notify(message, expire): - """send notification""" - key = "message:download" - message = { - "status": key, - "level": "info", - "title": "Download and index comments finished", - "message": message, - } - RedisArchivist().set_message(key, message, expire=expire) + def notify(self, idx, total_videos): + """send notification on task""" + message = [f"Add comments for new videos {idx + 1}/{total_videos}"] + progress = (idx + 1) / total_videos + self.task.send_progress(message, progress=progress) diff --git a/tubearchivist/home/src/index/filesystem.py b/tubearchivist/home/src/index/filesystem.py index 5f496ae..c4001af 100644 --- a/tubearchivist/home/src/index/filesystem.py +++ b/tubearchivist/home/src/index/filesystem.py @@ -7,38 +7,37 @@ Functionality: import json import os -import re -import shutil -import subprocess from home.src.download.queue import PendingList -from home.src.download.thumbnails import ThumbManager from home.src.es.connect import ElasticWrap from home.src.index.comments import CommentList -from home.src.index.video import YoutubeVideo, index_new_video +from home.src.index.video import index_new_video from home.src.ta.config import AppConfig from home.src.ta.helper import clean_string, ignore_filelist -from PIL import Image, ImageFile -from yt_dlp.utils import ISO639Utils +from PIL import ImageFile ImageFile.LOAD_TRUNCATED_IMAGES = True -class FilesystemScanner: - """handle scanning and fixing from filesystem""" +class ScannerBase: + """scan the filesystem base class""" CONFIG = AppConfig().config VIDEOS = CONFIG["application"]["videos"] def __init__(self): - self.all_downloaded = self.get_all_downloaded() - self.all_indexed = self.get_all_indexed() - self.mismatch = None - self.to_rename = None - self.to_index = None - self.to_delete = None + self.to_index = False + self.to_delete = False + self.mismatch = False + self.to_rename = False - def get_all_downloaded(self): + def scan(self): + """entry point, scan and compare""" + all_downloaded = self._get_all_downloaded() + all_indexed = self._get_all_indexed() + self.list_comarison(all_downloaded, all_indexed) + + def _get_all_downloaded(self): """get a list of all video files downloaded""" channels = os.listdir(self.VIDEOS) all_channels = ignore_filelist(channels) @@ -56,7 +55,7 @@ class FilesystemScanner: return all_downloaded @staticmethod - def get_all_indexed(): + def _get_all_indexed(): """get a list of all indexed videos""" index_handler = PendingList() index_handler.get_download() @@ -71,64 +70,77 @@ class FilesystemScanner: all_indexed.append((youtube_id, media_url, published, title)) return all_indexed - def list_comarison(self): + def list_comarison(self, all_downloaded, all_indexed): """compare the lists to figure out what to do""" - self.find_unindexed() - self.find_missing() - self.find_bad_media_url() + self._find_unindexed(all_downloaded, all_indexed) + self._find_missing(all_downloaded, all_indexed) + self._find_bad_media_url(all_downloaded, all_indexed) - def find_unindexed(self): + def _find_unindexed(self, all_downloaded, all_indexed): """find video files without a matching document indexed""" - all_indexed_ids = [i[0] for i in self.all_indexed] - to_index = [] - for downloaded in self.all_downloaded: + all_indexed_ids = [i[0] for i in all_indexed] + self.to_index = [] + for downloaded in all_downloaded: if downloaded[2] not in all_indexed_ids: - to_index.append(downloaded) + self.to_index.append(downloaded) - self.to_index = to_index - - def find_missing(self): + def _find_missing(self, all_downloaded, all_indexed): """find indexed videos without matching media file""" - all_downloaded_ids = [i[2] for i in self.all_downloaded] - to_delete = [] - for video in self.all_indexed: + all_downloaded_ids = [i[2] for i in all_downloaded] + self.to_delete = [] + for video in all_indexed: youtube_id = video[0] if youtube_id not in all_downloaded_ids: - to_delete.append(video) + self.to_delete.append(video) - self.to_delete = to_delete - - def find_bad_media_url(self): + def _find_bad_media_url(self, all_downloaded, all_indexed): """rename media files not matching the indexed title""" - to_fix = [] - to_rename = [] - for downloaded in self.all_downloaded: + self.mismatch = [] + self.to_rename = [] + + for downloaded in all_downloaded: channel, filename, downloaded_id = downloaded # find in indexed - for indexed in self.all_indexed: + for indexed in all_indexed: indexed_id, media_url, published, title = indexed if indexed_id == downloaded_id: # found it - title_c = clean_string(title) pub = published.replace("-", "") - expected_filename = f"{pub}_{indexed_id}_{title_c}.mp4" - new_url = os.path.join(channel, expected_filename) - if expected_filename != filename: + expected = f"{pub}_{indexed_id}_{clean_string(title)}.mp4" + new_url = os.path.join(channel, expected) + if expected != filename: # file to rename - to_rename.append( - (channel, filename, expected_filename) - ) + self.to_rename.append((channel, filename, expected)) if media_url != new_url: # media_url to update in es - to_fix.append((indexed_id, new_url)) + self.mismatch.append((indexed_id, new_url)) break - self.mismatch = to_fix - self.to_rename = to_rename + +class Filesystem(ScannerBase): + """handle scanning and fixing from filesystem""" + + def __init__(self, task=False): + super().__init__() + self.task = task + + def process(self): + """entry point""" + self.task.send_progress(["Scanning your archive and index."]) + self.scan() + self.rename_files() + self.send_mismatch_bulk() + self.delete_from_index() + self.add_missing() def rename_files(self): """rename media files as identified by find_bad_media_url""" + if not self.to_rename: + return + + total = len(self.to_rename) + self.task.send_progress([f"Rename {total} media files."]) for bad_filename in self.to_rename: channel, filename, expected_filename = bad_filename print(f"renaming [{filename}] to [{expected_filename}]") @@ -138,6 +150,11 @@ class FilesystemScanner: def send_mismatch_bulk(self): """build bulk update""" + if not self.mismatch: + return + + total = len(self.mismatch) + self.task.send_progress([f"Fix media urls for {total} files"]) bulk_list = [] for video_mismatch in self.mismatch: youtube_id, media_url = video_mismatch @@ -153,457 +170,32 @@ class FilesystemScanner: def delete_from_index(self): """find indexed but deleted mediafile""" + if not self.to_delete: + return + + total = len(self.to_delete) + self.task.send_progress([f"Clean up {total} items from index."]) for indexed in self.to_delete: youtube_id = indexed[0] print(f"deleting {youtube_id} from index") path = f"ta_video/_doc/{youtube_id}" _, _ = ElasticWrap(path).delete() - -class ImportFolderScanner: - """import and indexing existing video files - - identify all media files belonging to a video - - identify youtube id - - convert if needed - """ - - CONFIG = AppConfig().config - CACHE_DIR = CONFIG["application"]["cache_dir"] - IMPORT_DIR = os.path.join(CACHE_DIR, "import") - - EXT_MAP = { - "media": [".mp4", ".mkv", ".webm"], - "metadata": [".json"], - "thumb": [".jpg", ".png", ".webp"], - "subtitle": [".vtt"], - } - - def __init__(self): - self.to_import = False - - def scan(self): - """scan and match media files""" - all_files = self.get_all_files() - self.match_files(all_files) - self.process_videos() - - return self.to_import - - def get_all_files(self): - """get all files in /import""" - rel_paths = ignore_filelist(os.listdir(self.IMPORT_DIR)) - all_files = [os.path.join(self.IMPORT_DIR, i) for i in rel_paths] - all_files.sort() - - return all_files - - @staticmethod - def _get_template(): - """base dict for video""" - return { - "media": False, - "video_id": False, - "metadata": False, - "thumb": False, - "subtitle": [], - } - - def match_files(self, all_files): - """loop through all files, join what matches""" - self.to_import = [] - - current_video = self._get_template() - last_base = False - - for file_path in all_files: - base_name, ext = self._detect_base_name(file_path) - key, file_path = self._detect_type(file_path, ext) - if not key or not file_path: - continue - - if base_name != last_base: - if last_base: - print(f"manual import: {current_video}") - self.to_import.append(current_video) - - current_video = self._get_template() - last_base = base_name - - if key == "subtitle": - current_video["subtitle"].append(file_path) - else: - current_video[key] = file_path - - if current_video.get("media"): - print(f"manual import: {current_video}") - self.to_import.append(current_video) - - def _detect_base_name(self, file_path): - """extract base_name and ext for matching""" - base_name_raw, ext = os.path.splitext(file_path) - base_name, ext2 = os.path.splitext(base_name_raw) - - if ext2: - if ISO639Utils.short2long(ext2.strip(".")) or ext2 == ".info": - # valid secondary extension - return base_name, ext - - return base_name_raw, ext - - def _detect_type(self, file_path, ext): - """detect metadata type for file""" - - for key, value in self.EXT_MAP.items(): - if ext in value: - return key, file_path - - return False, False - - def process_videos(self): - """loop through all videos""" - for current_video in self.to_import: - if not current_video["media"]: - print(f"{current_video}: no matching media file found.") - raise ValueError - - self._detect_youtube_id(current_video) - self._dump_thumb(current_video) - self._convert_thumb(current_video) - self._get_subtitles(current_video) - self._convert_video(current_video) - print(f"manual import: {current_video}") - - ManualImport(current_video, self.CONFIG).run() - - def _detect_youtube_id(self, current_video): - """find video id from filename or json""" - youtube_id = self._extract_id_from_filename(current_video["media"]) - if youtube_id: - current_video["video_id"] = youtube_id + def add_missing(self): + """add missing videos to index""" + video_ids = [i[2] for i in self.to_index] + if not video_ids: return - youtube_id = self._extract_id_from_json(current_video["metadata"]) - if youtube_id: - current_video["video_id"] = youtube_id - return - - raise ValueError("failed to find video id") - - @staticmethod - def _extract_id_from_filename(file_name): - """ - look at the file name for the youtube id - expects filename ending in []. - """ - base_name, _ = os.path.splitext(file_name) - id_search = re.search(r"\[([a-zA-Z0-9_-]{11})\]$", base_name) - if id_search: - youtube_id = id_search.group(1) - return youtube_id - - print(f"id extraction failed from filename: {file_name}") - - return False - - def _extract_id_from_json(self, json_file): - """open json file and extract id""" - json_path = os.path.join(self.CACHE_DIR, "import", json_file) - with open(json_path, "r", encoding="utf-8") as f: - json_content = f.read() - - youtube_id = json.loads(json_content)["id"] - - return youtube_id - - def _dump_thumb(self, current_video): - """extract embedded thumb before converting""" - if current_video["thumb"]: - return - - media_path = current_video["media"] - _, ext = os.path.splitext(media_path) - - new_path = False - if ext == ".mkv": - idx, thumb_type = self._get_mkv_thumb_stream(media_path) - if idx is not None: - new_path = self.dump_mpv_thumb(media_path, idx, thumb_type) - - elif ext == ".mp4": - thumb_type = self.get_mp4_thumb_type(media_path) - if thumb_type: - new_path = self.dump_mp4_thumb(media_path, thumb_type) - - if new_path: - current_video["thumb"] = new_path - - def _get_mkv_thumb_stream(self, media_path): - """get stream idx of thumbnail for mkv files""" - streams = self._get_streams(media_path) - attachments = [ - i for i in streams["streams"] if i["codec_type"] == "attachment" - ] - - for idx, stream in enumerate(attachments): - tags = stream["tags"] - if "mimetype" in tags and tags["filename"].startswith("cover"): - _, ext = os.path.splitext(tags["filename"]) - return idx, ext - - return None, None - - @staticmethod - def dump_mpv_thumb(media_path, idx, thumb_type): - """write cover to disk for mkv""" - _, media_ext = os.path.splitext(media_path) - new_path = f"{media_path.rstrip(media_ext)}{thumb_type}" - subprocess.run( - [ - "ffmpeg", - "-v", - "quiet", - f"-dump_attachment:t:{idx}", - new_path, - "-i", - media_path, - ], - check=False, - ) - - return new_path - - def get_mp4_thumb_type(self, media_path): - """detect filetype of embedded thumbnail""" - streams = self._get_streams(media_path) - - for stream in streams["streams"]: - if stream["codec_name"] in ["png", "jpg"]: - return stream["codec_name"] - - return False - - def _convert_thumb(self, current_video): - """convert all thumbnails to jpg""" - if not current_video["thumb"]: - return - - thumb_path = current_video["thumb"] - - base_path, ext = os.path.splitext(thumb_path) - if ext == ".jpg": - return - - new_path = f"{base_path}.jpg" - img_raw = Image.open(thumb_path) - img_raw.convert("RGB").save(new_path) - - os.remove(thumb_path) - current_video["thumb"] = new_path - - def _get_subtitles(self, current_video): - """find all subtitles in media file""" - if current_video["subtitle"]: - return - - media_path = current_video["media"] - streams = self._get_streams(media_path) - base_path, ext = os.path.splitext(media_path) - - if ext == ".webm": - print(f"{media_path}: subtitle extract from webm not supported") - return - - for idx, stream in enumerate(streams["streams"]): - if stream["codec_type"] == "subtitle": - lang = ISO639Utils.long2short(stream["tags"]["language"]) - sub_path = f"{base_path}.{lang}.vtt" - self._dump_subtitle(idx, media_path, sub_path) - current_video["subtitle"].append(sub_path) - - @staticmethod - def _dump_subtitle(idx, media_path, sub_path): - """extract subtitle from media file""" - subprocess.run( - ["ffmpeg", "-i", media_path, "-map", f"0:{idx}", sub_path], - check=True, - ) - - @staticmethod - def _get_streams(media_path): - """return all streams from media_path""" - streams_raw = subprocess.run( - [ - "ffprobe", - "-v", - "error", - "-show_streams", - "-print_format", - "json", - media_path, - ], - capture_output=True, - check=True, - ) - streams = json.loads(streams_raw.stdout.decode()) - - return streams - - @staticmethod - def dump_mp4_thumb(media_path, thumb_type): - """save cover to disk""" - _, ext = os.path.splitext(media_path) - new_path = f"{media_path.rstrip(ext)}.{thumb_type}" - - subprocess.run( - [ - "ffmpeg", - "-i", - media_path, - "-map", - "0:v", - "-map", - "-0:V", - "-c", - "copy", - new_path, - ], - check=True, - ) - - return new_path - - def _convert_video(self, current_video): - """convert if needed""" - current_path = current_video["media"] - base_path, ext = os.path.splitext(current_path) - if ext == ".mp4": - return - - new_path = base_path + ".mp4" - subprocess.run( - [ - "ffmpeg", - "-i", - current_path, - new_path, - "-loglevel", - "warning", - "-stats", - ], - check=True, - ) - current_video["media"] = new_path - os.remove(current_path) - - -class ManualImport: - """import single identified video""" - - def __init__(self, current_video, config): - self.current_video = current_video - self.config = config - - def run(self): - """run all""" - json_data = self.index_metadata() - self._move_to_archive(json_data) - self._cleanup(json_data) - - def index_metadata(self): - """get metadata from yt or json""" - video_id = self.current_video["video_id"] - video = YoutubeVideo(video_id) - video.build_json( - youtube_meta_overwrite=self._get_info_json(), - media_path=self.current_video["media"], - ) - if not video.json_data: - print(f"{video_id}: manual import failed, and no metadata found.") - raise ValueError - - video.check_subtitles(subtitle_files=self.current_video["subtitle"]) - video.upload_to_es() - - if video.offline_import and self.current_video["thumb"]: - old_path = self.current_video["thumb"] - thumbs = ThumbManager(video_id) - new_path = thumbs.vid_thumb_path(absolute=True, create_folder=True) - shutil.move(old_path, new_path, copy_function=shutil.copyfile) - else: - url = video.json_data["vid_thumb_url"] - ThumbManager(video_id).download_video_thumb(url) - - return video.json_data - - def _get_info_json(self): - """read info_json from file""" - if not self.current_video["metadata"]: - return False - - with open(self.current_video["metadata"], "r", encoding="utf-8") as f: - info_json = json.loads(f.read()) - - return info_json - - def _move_to_archive(self, json_data): - """move identified media file to archive""" - videos = self.config["application"]["videos"] - - channel, file = os.path.split(json_data["media_url"]) - channel_folder = os.path.join(videos, channel) - if not os.path.exists(channel_folder): - os.makedirs(channel_folder) - - old_path = self.current_video["media"] - new_path = os.path.join(channel_folder, file) - shutil.move(old_path, new_path, copy_function=shutil.copyfile) - - base_name, _ = os.path.splitext(new_path) - for old_path in self.current_video["subtitle"]: - lang = old_path.split(".")[-2] - new_path = f"{base_name}.{lang}.vtt" - shutil.move(old_path, new_path, copy_function=shutil.copyfile) - - def _cleanup(self, json_data): - """cleanup leftover files""" - meta_data = self.current_video["metadata"] - if meta_data and os.path.exists(meta_data): - os.remove(meta_data) - - thumb = self.current_video["thumb"] - if thumb and os.path.exists(thumb): - os.remove(thumb) - - for subtitle_file in self.current_video["subtitle"]: - if os.path.exists(subtitle_file): - os.remove(subtitle_file) - - channel_info = os.path.join( - self.config["application"]["cache_dir"], - "import", - f"{json_data['channel']['channel_id']}.info.json", - ) - if os.path.exists(channel_info): - os.remove(channel_info) - - -def scan_filesystem(): - """grouped function to delete and update index""" - filesystem_handler = FilesystemScanner() - filesystem_handler.list_comarison() - if filesystem_handler.to_rename: - print("renaming files") - filesystem_handler.rename_files() - if filesystem_handler.mismatch: - print("fixing media urls in index") - filesystem_handler.send_mismatch_bulk() - if filesystem_handler.to_delete: - print("delete metadata from index") - filesystem_handler.delete_from_index() - if filesystem_handler.to_index: - print("index new videos") - video_ids = [i[2] for i in filesystem_handler.to_index] - for youtube_id in video_ids: + total = len(video_ids) + for idx, youtube_id in enumerate(video_ids): + if self.task: + self.task.send_progress( + message_lines=[ + f"Index missing video {youtube_id}, {idx}/{total}" + ], + progress=(idx + 1) / total, + ) index_new_video(youtube_id) - CommentList(video_ids).index() + CommentList(video_ids, task=self.task).index() diff --git a/tubearchivist/home/src/index/manual.py b/tubearchivist/home/src/index/manual.py new file mode 100644 index 0000000..a4fa2e4 --- /dev/null +++ b/tubearchivist/home/src/index/manual.py @@ -0,0 +1,466 @@ +""" +Functionality: +- Handle manual import task +- Scan and identify media files in import folder +- Process import media files +""" + +import json +import os +import re +import shutil +import subprocess + +from home.src.download.thumbnails import ThumbManager +from home.src.index.video import YoutubeVideo +from home.src.ta.config import AppConfig +from home.src.ta.helper import ignore_filelist +from PIL import Image +from yt_dlp.utils import ISO639Utils + + +class ImportFolderScanner: + """import and indexing existing video files + - identify all media files belonging to a video + - identify youtube id + - convert if needed + """ + + CONFIG = AppConfig().config + CACHE_DIR = CONFIG["application"]["cache_dir"] + IMPORT_DIR = os.path.join(CACHE_DIR, "import") + + EXT_MAP = { + "media": [".mp4", ".mkv", ".webm"], + "metadata": [".json"], + "thumb": [".jpg", ".png", ".webp"], + "subtitle": [".vtt"], + } + + def __init__(self, task=False): + self.task = task + self.to_import = False + + def scan(self): + """scan and match media files""" + if self.task: + self.task.send_progress(["Scanning your import folder."]) + + all_files = self.get_all_files() + self.match_files(all_files) + self.process_videos() + + return self.to_import + + def get_all_files(self): + """get all files in /import""" + rel_paths = ignore_filelist(os.listdir(self.IMPORT_DIR)) + all_files = [os.path.join(self.IMPORT_DIR, i) for i in rel_paths] + all_files.sort() + + return all_files + + @staticmethod + def _get_template(): + """base dict for video""" + return { + "media": False, + "video_id": False, + "metadata": False, + "thumb": False, + "subtitle": [], + } + + def match_files(self, all_files): + """loop through all files, join what matches""" + self.to_import = [] + + current_video = self._get_template() + last_base = False + + for file_path in all_files: + base_name, ext = self._detect_base_name(file_path) + key, file_path = self._detect_type(file_path, ext) + if not key or not file_path: + continue + + if base_name != last_base: + if last_base: + print(f"manual import: {current_video}") + self.to_import.append(current_video) + + current_video = self._get_template() + last_base = base_name + + if key == "subtitle": + current_video["subtitle"].append(file_path) + else: + current_video[key] = file_path + + if current_video.get("media"): + print(f"manual import: {current_video}") + self.to_import.append(current_video) + + def _detect_base_name(self, file_path): + """extract base_name and ext for matching""" + base_name_raw, ext = os.path.splitext(file_path) + base_name, ext2 = os.path.splitext(base_name_raw) + + if ext2: + if ISO639Utils.short2long(ext2.strip(".")) or ext2 == ".info": + # valid secondary extension + return base_name, ext + + return base_name_raw, ext + + def _detect_type(self, file_path, ext): + """detect metadata type for file""" + + for key, value in self.EXT_MAP.items(): + if ext in value: + return key, file_path + + return False, False + + def process_videos(self): + """loop through all videos""" + for idx, current_video in enumerate(self.to_import): + if not current_video["media"]: + print(f"{current_video}: no matching media file found.") + raise ValueError + + if self.task: + self._notify(idx, current_video) + + self._detect_youtube_id(current_video) + self._dump_thumb(current_video) + self._convert_thumb(current_video) + self._get_subtitles(current_video) + self._convert_video(current_video) + print(f"manual import: {current_video}") + + ManualImport(current_video, self.CONFIG).run() + + def _notify(self, idx, current_video): + """send notification back to task""" + filename = os.path.split(current_video["media"])[-1] + if len(filename) > 50: + filename = filename[:50] + "..." + + message = [ + f"Import queue processing video {idx + 1}/{len(self.to_import)}", + filename, + ] + progress = (idx + 1) / len(self.to_import) + self.task.send_progress(message, progress=progress) + + def _detect_youtube_id(self, current_video): + """find video id from filename or json""" + youtube_id = self._extract_id_from_filename(current_video["media"]) + if youtube_id: + current_video["video_id"] = youtube_id + return + + youtube_id = self._extract_id_from_json(current_video["metadata"]) + if youtube_id: + current_video["video_id"] = youtube_id + return + + raise ValueError("failed to find video id") + + @staticmethod + def _extract_id_from_filename(file_name): + """ + look at the file name for the youtube id + expects filename ending in []. + """ + base_name, _ = os.path.splitext(file_name) + id_search = re.search(r"\[([a-zA-Z0-9_-]{11})\]$", base_name) + if id_search: + youtube_id = id_search.group(1) + return youtube_id + + print(f"id extraction failed from filename: {file_name}") + + return False + + def _extract_id_from_json(self, json_file): + """open json file and extract id""" + json_path = os.path.join(self.CACHE_DIR, "import", json_file) + with open(json_path, "r", encoding="utf-8") as f: + json_content = f.read() + + youtube_id = json.loads(json_content)["id"] + + return youtube_id + + def _dump_thumb(self, current_video): + """extract embedded thumb before converting""" + if current_video["thumb"]: + return + + media_path = current_video["media"] + _, ext = os.path.splitext(media_path) + + new_path = False + if ext == ".mkv": + idx, thumb_type = self._get_mkv_thumb_stream(media_path) + if idx is not None: + new_path = self.dump_mpv_thumb(media_path, idx, thumb_type) + + elif ext == ".mp4": + thumb_type = self.get_mp4_thumb_type(media_path) + if thumb_type: + new_path = self.dump_mp4_thumb(media_path, thumb_type) + + if new_path: + current_video["thumb"] = new_path + + def _get_mkv_thumb_stream(self, media_path): + """get stream idx of thumbnail for mkv files""" + streams = self._get_streams(media_path) + attachments = [ + i for i in streams["streams"] if i["codec_type"] == "attachment" + ] + + for idx, stream in enumerate(attachments): + tags = stream["tags"] + if "mimetype" in tags and tags["filename"].startswith("cover"): + _, ext = os.path.splitext(tags["filename"]) + return idx, ext + + return None, None + + @staticmethod + def dump_mpv_thumb(media_path, idx, thumb_type): + """write cover to disk for mkv""" + _, media_ext = os.path.splitext(media_path) + new_path = f"{media_path.rstrip(media_ext)}{thumb_type}" + subprocess.run( + [ + "ffmpeg", + "-v", + "quiet", + f"-dump_attachment:t:{idx}", + new_path, + "-i", + media_path, + ], + check=False, + ) + + return new_path + + def get_mp4_thumb_type(self, media_path): + """detect filetype of embedded thumbnail""" + streams = self._get_streams(media_path) + + for stream in streams["streams"]: + if stream["codec_name"] in ["png", "jpg"]: + return stream["codec_name"] + + return False + + def _convert_thumb(self, current_video): + """convert all thumbnails to jpg""" + if not current_video["thumb"]: + return + + thumb_path = current_video["thumb"] + + base_path, ext = os.path.splitext(thumb_path) + if ext == ".jpg": + return + + new_path = f"{base_path}.jpg" + img_raw = Image.open(thumb_path) + img_raw.convert("RGB").save(new_path) + + os.remove(thumb_path) + current_video["thumb"] = new_path + + def _get_subtitles(self, current_video): + """find all subtitles in media file""" + if current_video["subtitle"]: + return + + media_path = current_video["media"] + streams = self._get_streams(media_path) + base_path, ext = os.path.splitext(media_path) + + if ext == ".webm": + print(f"{media_path}: subtitle extract from webm not supported") + return + + for idx, stream in enumerate(streams["streams"]): + if stream["codec_type"] == "subtitle": + lang = ISO639Utils.long2short(stream["tags"]["language"]) + sub_path = f"{base_path}.{lang}.vtt" + self._dump_subtitle(idx, media_path, sub_path) + current_video["subtitle"].append(sub_path) + + @staticmethod + def _dump_subtitle(idx, media_path, sub_path): + """extract subtitle from media file""" + subprocess.run( + ["ffmpeg", "-i", media_path, "-map", f"0:{idx}", sub_path], + check=True, + ) + + @staticmethod + def _get_streams(media_path): + """return all streams from media_path""" + streams_raw = subprocess.run( + [ + "ffprobe", + "-v", + "error", + "-show_streams", + "-print_format", + "json", + media_path, + ], + capture_output=True, + check=True, + ) + streams = json.loads(streams_raw.stdout.decode()) + + return streams + + @staticmethod + def dump_mp4_thumb(media_path, thumb_type): + """save cover to disk""" + _, ext = os.path.splitext(media_path) + new_path = f"{media_path.rstrip(ext)}.{thumb_type}" + + subprocess.run( + [ + "ffmpeg", + "-i", + media_path, + "-map", + "0:v", + "-map", + "-0:V", + "-c", + "copy", + new_path, + ], + check=True, + ) + + return new_path + + def _convert_video(self, current_video): + """convert if needed""" + current_path = current_video["media"] + base_path, ext = os.path.splitext(current_path) + if ext == ".mp4": + return + + new_path = base_path + ".mp4" + subprocess.run( + [ + "ffmpeg", + "-i", + current_path, + new_path, + "-loglevel", + "warning", + "-stats", + ], + check=True, + ) + current_video["media"] = new_path + os.remove(current_path) + + +class ManualImport: + """import single identified video""" + + def __init__(self, current_video, config): + self.current_video = current_video + self.config = config + + def run(self): + """run all""" + json_data = self.index_metadata() + self._move_to_archive(json_data) + self._cleanup(json_data) + + def index_metadata(self): + """get metadata from yt or json""" + video_id = self.current_video["video_id"] + video = YoutubeVideo(video_id) + video.build_json( + youtube_meta_overwrite=self._get_info_json(), + media_path=self.current_video["media"], + ) + if not video.json_data: + print(f"{video_id}: manual import failed, and no metadata found.") + raise ValueError + + video.check_subtitles(subtitle_files=self.current_video["subtitle"]) + video.upload_to_es() + + if video.offline_import and self.current_video["thumb"]: + old_path = self.current_video["thumb"] + thumbs = ThumbManager(video_id) + new_path = thumbs.vid_thumb_path(absolute=True, create_folder=True) + shutil.move(old_path, new_path, copy_function=shutil.copyfile) + else: + url = video.json_data["vid_thumb_url"] + ThumbManager(video_id).download_video_thumb(url) + + return video.json_data + + def _get_info_json(self): + """read info_json from file""" + if not self.current_video["metadata"]: + return False + + with open(self.current_video["metadata"], "r", encoding="utf-8") as f: + info_json = json.loads(f.read()) + + return info_json + + def _move_to_archive(self, json_data): + """move identified media file to archive""" + videos = self.config["application"]["videos"] + + channel, file = os.path.split(json_data["media_url"]) + channel_folder = os.path.join(videos, channel) + if not os.path.exists(channel_folder): + os.makedirs(channel_folder) + + old_path = self.current_video["media"] + new_path = os.path.join(channel_folder, file) + shutil.move(old_path, new_path, copy_function=shutil.copyfile) + + base_name, _ = os.path.splitext(new_path) + for old_path in self.current_video["subtitle"]: + lang = old_path.split(".")[-2] + new_path = f"{base_name}.{lang}.vtt" + shutil.move(old_path, new_path, copy_function=shutil.copyfile) + + def _cleanup(self, json_data): + """cleanup leftover files""" + meta_data = self.current_video["metadata"] + if meta_data and os.path.exists(meta_data): + os.remove(meta_data) + + thumb = self.current_video["thumb"] + if thumb and os.path.exists(thumb): + os.remove(thumb) + + for subtitle_file in self.current_video["subtitle"]: + if os.path.exists(subtitle_file): + os.remove(subtitle_file) + + channel_info = os.path.join( + self.config["application"]["cache_dir"], + "import", + f"{json_data['channel']['channel_id']}.info.json", + ) + if os.path.exists(channel_info): + os.remove(channel_info) diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index afe4f9c..baeadfa 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -21,7 +21,7 @@ from home.src.index.comments import Comments from home.src.index.playlist import YoutubePlaylist from home.src.index.video import YoutubeVideo from home.src.ta.config import AppConfig -from home.src.ta.ta_redis import RedisArchivist, RedisQueue +from home.src.ta.ta_redis import RedisQueue class ReindexBase: @@ -201,8 +201,9 @@ class ReindexManual(ReindexBase): class Reindex(ReindexBase): """reindex all documents from redis queue""" - def __init__(self): + def __init__(self, task=False): super().__init__() + self.task = task self.all_indexed_ids = False def reindex_all(self): @@ -211,22 +212,22 @@ class Reindex(ReindexBase): print("[reindex] cookie invalid, exiting...") return - for index_config in self.REINDEX_CONFIG.values(): + for name, index_config in self.REINDEX_CONFIG.items(): if not RedisQueue(index_config["queue_name"]).has_item(): continue + total = RedisQueue(index_config["queue_name"]).length() while True: - has_next = self.reindex_index(index_config) + has_next = self.reindex_index(name, index_config, total) if not has_next: break - RedisArchivist().set_message("last_reindex", self.now) - - def reindex_index(self, index_config): + def reindex_index(self, name, index_config, total): """reindex all of a single index""" reindex = self.get_reindex_map(index_config["index_name"]) youtube_id = RedisQueue(index_config["queue_name"]).get_next() if youtube_id: + self._notify(name, index_config, total) reindex(youtube_id) sleep_interval = self.config["downloads"].get("sleep_interval", 0) sleep(sleep_interval) @@ -243,6 +244,14 @@ class Reindex(ReindexBase): return def_map.get(index_name) + def _notify(self, name, index_config, total): + """send notification back to task""" + remaining = RedisQueue(index_config["queue_name"]).length() + idx = total - remaining + message = [f"Reindexing {name.title()}s {idx}/{total}"] + progress = idx / total + self.task.send_progress(message, progress=progress) + def _reindex_single_video(self, youtube_id): """wrapper to handle channel name changes""" try: @@ -511,7 +520,7 @@ class ChannelFullScan: print(f"{video_id}: no remote match found") continue - expected_type = remote_match[0][-1].value + expected_type = remote_match[0][-1] if video["vid_type"] != expected_type: self.to_update.append( { diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index 14e90a1..46b1b5c 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -2,6 +2,7 @@ functionality: - interact with redis - hold temporary download queue in redis +- interact with celery tasks results """ import json @@ -13,9 +14,9 @@ import redis class RedisBase: """connection base for redis""" - REDIS_HOST = os.environ.get("REDIS_HOST") - REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 - NAME_SPACE = "ta:" + REDIS_HOST: str = str(os.environ.get("REDIS_HOST")) + REDIS_PORT: int = int(os.environ.get("REDIS_PORT") or 6379) + NAME_SPACE: str = "ta:" def __init__(self): self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) @@ -24,7 +25,7 @@ class RedisBase: class RedisArchivist(RedisBase): """collection of methods to interact with redis""" - CHANNELS = [ + CHANNELS: list[str] = [ "download", "add", "rescan", @@ -34,7 +35,13 @@ class RedisArchivist(RedisBase): "setting", ] - def set_message(self, key, message, path=".", expire=False): + def set_message( + self, + key: str, + message: dict, + path: str = ".", + expire: bool | int = False, + ) -> None: """write new message to redis""" self.conn.execute_command( "JSON.SET", self.NAME_SPACE + key, path, json.dumps(message) @@ -42,69 +49,47 @@ class RedisArchivist(RedisBase): if expire: if isinstance(expire, bool): - secs = 20 + secs: int = 20 else: secs = expire self.conn.execute_command("EXPIRE", self.NAME_SPACE + key, secs) - def get_message(self, key): + def get_message(self, key: str) -> dict: """get message dict from redis""" reply = self.conn.execute_command("JSON.GET", self.NAME_SPACE + key) if reply: - json_str = json.loads(reply) - else: - json_str = {"status": False} + return json.loads(reply) - return json_str + return {"status": False} - def list_items(self, query): - """list all matches""" + def list_keys(self, query: str) -> list: + """return all key matches""" reply = self.conn.execute_command( "KEYS", self.NAME_SPACE + query + "*" ) - all_matches = [i.decode().lstrip(self.NAME_SPACE) for i in reply] - all_results = [] - for match in all_matches: - json_str = self.get_message(match) - all_results.append(json_str) + if not reply: + return [] - return all_results + return [i.decode().lstrip(self.NAME_SPACE) for i in reply] - def del_message(self, key): + def list_items(self, query: str) -> list: + """list all matches""" + all_matches = self.list_keys(query) + if not all_matches: + return [] + + return [self.get_message(i) for i in all_matches] + + def del_message(self, key: str) -> bool: """delete key from redis""" response = self.conn.execute_command("DEL", self.NAME_SPACE + key) return response - def get_lock(self, lock_key): - """handle lock for task management""" - redis_lock = self.conn.lock(self.NAME_SPACE + lock_key) - return redis_lock - - def is_locked(self, lock_key): - """check if lock is set""" - lock_name = self.NAME_SPACE + lock_key - lock_status = bool(self.conn.execute_command("GET", lock_name)) - return lock_status - - def get_progress(self): - """get a list of all progress messages""" - all_messages = [] - for channel in self.CHANNELS: - key = "message:" + channel - reply = self.conn.execute_command( - "JSON.GET", self.NAME_SPACE + key - ) - if reply: - json_str = json.loads(reply) - all_messages.append(json_str) - - return all_messages - class RedisQueue(RedisBase): """dynamically interact with queues in redis""" - def __init__(self, queue_name): + def __init__(self, queue_name: str): super().__init__() self.key = f"{self.NAME_SPACE}{queue_name}" @@ -114,7 +99,11 @@ class RedisQueue(RedisBase): all_elements = [i.decode() for i in result] return all_elements - def in_queue(self, element): + def length(self) -> int: + """return total elements in list""" + return self.conn.execute_command("LLEN", self.key) + + def in_queue(self, element) -> str | bool: """check if element is in list""" result = self.conn.execute_command("LPOS", self.key, element) if result is not None: @@ -126,12 +115,13 @@ class RedisQueue(RedisBase): """add list to queue""" self.conn.execute_command("RPUSH", self.key, *to_add) - def add_priority(self, to_add): + def add_priority(self, to_add: str) -> None: """add single video to front of queue""" - self.clear_item(to_add) - self.conn.execute_command("LPUSH", self.key, to_add) + item: str = json.dumps(to_add) + self.clear_item(item) + self.conn.execute_command("LPUSH", self.key, item) - def get_next(self): + def get_next(self) -> str | bool: """return next element in the queue, False if none""" result = self.conn.execute_command("LPOP", self.key) if not result: @@ -140,19 +130,74 @@ class RedisQueue(RedisBase): next_element = result.decode() return next_element - def clear(self): + def clear(self) -> None: """delete list from redis""" self.conn.execute_command("DEL", self.key) - def clear_item(self, to_clear): + def clear_item(self, to_clear: str) -> None: """remove single item from list if it's there""" self.conn.execute_command("LREM", self.key, 0, to_clear) - def trim(self, size): + def trim(self, size: int) -> None: """trim the queue based on settings amount""" self.conn.execute_command("LTRIM", self.key, 0, size) - def has_item(self): + def has_item(self) -> bool: """check if queue as at least one pending item""" result = self.conn.execute_command("LRANGE", self.key, 0, 0) return bool(result) + + +class TaskRedis(RedisBase): + """interact with redis tasks""" + + BASE: str = "celery-task-meta-" + EXPIRE: int = 60 * 60 * 24 + COMMANDS: list[str] = ["STOP", "KILL"] + + def get_all(self) -> list: + """return all tasks""" + all_keys = self.conn.execute_command("KEYS", f"{self.BASE}*") + return [i.decode().replace(self.BASE, "") for i in all_keys] + + def get_single(self, task_id: str) -> dict: + """return content of single task""" + result = self.conn.execute_command("GET", self.BASE + task_id) + if not result: + return {} + + return json.loads(result.decode()) + + def set_key( + self, task_id: str, message: dict, expire: bool | int = False + ) -> None: + """set value for lock, initial or update""" + key: str = f"{self.BASE}{task_id}" + self.conn.execute_command("SET", key, json.dumps(message)) + + if expire: + self.conn.execute_command("EXPIRE", key, self.EXPIRE) + + def set_command(self, task_id: str, command: str) -> None: + """set task command""" + if command not in self.COMMANDS: + print(f"{command} not in valid commands {self.COMMANDS}") + raise ValueError + + message = self.get_single(task_id) + if not message: + print(f"{task_id} not found") + raise KeyError + + message.update({"command": command}) + self.set_key(task_id, message) + + def del_task(self, task_id: str) -> None: + """delete task result by id""" + self.conn.execute_command("DEL", f"{self.BASE}{task_id}") + + def del_all(self) -> None: + """delete all task results""" + all_tasks = self.get_all() + for task_id in all_tasks: + self.del_task(task_id) diff --git a/tubearchivist/home/src/ta/task_manager.py b/tubearchivist/home/src/ta/task_manager.py new file mode 100644 index 0000000..3771fdd --- /dev/null +++ b/tubearchivist/home/src/ta/task_manager.py @@ -0,0 +1,116 @@ +""" +functionality: +- interact with in redis stored task results +- handle threads and locks +""" + +from home import tasks as ta_tasks +from home.src.ta.ta_redis import RedisArchivist, TaskRedis + + +class TaskManager: + """manage tasks""" + + def get_all_results(self): + """return all task results""" + handler = TaskRedis() + all_keys = handler.get_all() + if not all_keys: + return False + + return [handler.get_single(i) for i in all_keys] + + def get_tasks_by_name(self, task_name): + """get all tasks by name""" + all_results = self.get_all_results() + if not all_results: + return False + + return [i for i in all_results if i.get("name") == task_name] + + def get_task(self, task_id): + """get single task""" + return TaskRedis().get_single(task_id) + + def is_pending(self, task): + """check if task_name is pending, pass task object""" + tasks = self.get_tasks_by_name(task.name) + if not tasks: + return False + + return bool([i for i in tasks if i.get("status") == "PENDING"]) + + def is_stopped(self, task_id): + """check if task_id has received STOP command""" + task = self.get_task(task_id) + + return task.get("command") == "STOP" + + def get_pending(self, task_name): + """get all pending tasks of task_name""" + tasks = self.get_tasks_by_name(task_name) + if not tasks: + return False + + return [i for i in tasks if i.get("status") == "PENDING"] + + def init(self, task): + """pass task object from bind task to set initial pending message""" + message = { + "status": "PENDING", + "result": None, + "traceback": None, + "date_done": False, + "name": task.name, + "task_id": task.request.id, + } + TaskRedis().set_key(task.request.id, message) + + def fail_pending(self): + """ + mark all pending as failed, + run at startup to recover from hard reset + """ + all_results = self.get_all_results() + if not all_results: + return + + for result in all_results: + if result.get("status") == "PENDING": + result["status"] = "FAILED" + TaskRedis().set_key(result["task_id"], result, expire=True) + + +class TaskCommand: + """run commands on task""" + + def start(self, task_name): + """start task by task_name, only pass task that don't take args""" + task = ta_tasks.app.tasks.get(task_name).delay() + message = { + "task_id": task.id, + "status": task.status, + "task_name": task.name, + } + + return message + + def stop(self, task_id, message_key): + """ + send stop signal to task_id, + needs to be implemented in task to take effect + """ + print(f"[task][{task_id}]: received STOP signal.") + handler = TaskRedis() + + task = handler.get_single(task_id) + if not task["name"] in ta_tasks.BaseTask.TASK_CONFIG: + raise ValueError + + handler.set_command(task_id, "STOP") + RedisArchivist().set_message(message_key, "STOP", path=".command") + + def kill(self, task_id): + """send kill signal to task_id""" + print(f"[task][{task_id}]: received KILL signal.") + ta_tasks.app.control.revoke(task_id, terminate=True) diff --git a/tubearchivist/home/src/ta/urlparser.py b/tubearchivist/home/src/ta/urlparser.py index 32c6030..24dfc68 100644 --- a/tubearchivist/home/src/ta/urlparser.py +++ b/tubearchivist/home/src/ta/urlparser.py @@ -103,6 +103,7 @@ class Parser: def _extract_channel_name(url): """find channel id from channel name with yt-dlp help""" obs_request = { + "check_formats": None, "skip_download": True, "extract_flat": True, "playlistend": 0, diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 4abc6cb..f2c5341 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -6,304 +6,319 @@ Functionality: because tasks are initiated at application start """ -import json import os -from celery import Celery, shared_task +from celery import Celery, Task, shared_task from home.src.download.queue import PendingList from home.src.download.subscriptions import ( - ChannelSubscription, - PlaylistSubscription, + SubscriptionHandler, + SubscriptionScanner, ) from home.src.download.thumbnails import ThumbFilesystem, ThumbValidator from home.src.download.yt_dlp_handler import VideoDownloader from home.src.es.backup import ElasticBackup from home.src.es.index_setup import ElasitIndexWrap from home.src.index.channel import YoutubeChannel -from home.src.index.filesystem import ImportFolderScanner, scan_filesystem +from home.src.index.filesystem import Filesystem +from home.src.index.manual import ImportFolderScanner from home.src.index.reindex import Reindex, ReindexManual, ReindexOutdated -from home.src.index.video_constants import VideoTypeEnum from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder -from home.src.ta.helper import clear_dl_cache -from home.src.ta.ta_redis import RedisArchivist, RedisQueue -from home.src.ta.urlparser import Parser +from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.task_manager import TaskManager CONFIG = AppConfig().config REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") -app = Celery("tasks", broker=f"redis://{REDIS_HOST}:{REDIS_PORT}") +app = Celery( + "tasks", + broker=f"redis://{REDIS_HOST}:{REDIS_PORT}", + backend=f"redis://{REDIS_HOST}:{REDIS_PORT}", + result_extended=True, +) app.config_from_object("django.conf:settings", namespace="ta:") app.autodiscover_tasks() app.conf.timezone = os.environ.get("TZ") or "UTC" -@shared_task(name="update_subscribed") -def update_subscribed(): - """look for missing videos and add to pending""" - message = { - "status": "message:rescan", - "level": "info", - "title": "Rescanning channels and playlists.", - "message": "Looking for new videos.", +class BaseTask(Task): + """base class to inherit each class from""" + + # pylint: disable=abstract-method + + TASK_CONFIG = { + "update_subscribed": { + "title": "Rescan your Subscriptions", + "group": "download:scan", + "api-start": True, + "api-stop": True, + }, + "download_pending": { + "title": "Downloading", + "group": "download:run", + "api-start": True, + "api-stop": True, + }, + "extract_download": { + "title": "Add to download queue", + "group": "download:add", + "api-stop": True, + }, + "check_reindex": { + "title": "Reindex Documents", + "group": "reindex:run", + }, + "manual_import": { + "title": "Manual video import", + "group": "setting:import", + "api-start": True, + }, + "run_backup": { + "title": "Index Backup", + "group": "setting:backup", + "api-start": True, + }, + "restore_backup": { + "title": "Restore Backup", + "group": "setting:restore", + }, + "rescan_filesystem": { + "title": "Rescan your Filesystem", + "group": "setting:filesystemscan", + "api-start": True, + }, + "thumbnail_check": { + "title": "Check your Thumbnails", + "group": "setting:thumbnailcheck", + "api-start": True, + }, + "resync_thumbs": { + "title": "Sync Thumbnails to Media Files", + "group": "setting:thumbnailsync", + "api-start": True, + }, + "index_playlists": { + "title": "Index Channel Playlist", + "group": "channel:indexplaylist", + }, + "subscribe_to": { + "title": "Add Subscription", + "group": "subscription:add", + }, } - RedisArchivist().set_message("message:rescan", message, expire=True) - have_lock = False - my_lock = RedisArchivist().get_lock("rescan") + def on_failure(self, exc, task_id, args, kwargs, einfo): + """callback for task failure""" + print(f"{task_id} Failed callback") + message, key = self._build_message(level="error") + message.update({"messages": ["Task failed"]}) + RedisArchivist().set_message(key, message, expire=20) - try: - have_lock = my_lock.acquire(blocking=False) - if have_lock: - channel_handler = ChannelSubscription() - missing_from_channels = channel_handler.find_missing() - playlist_handler = PlaylistSubscription() - missing_from_playlists = playlist_handler.find_missing() - if missing_from_channels or missing_from_playlists: - channel_videos = [ - {"type": "video", "vid_type": vid_type, "url": vid_id} - for vid_id, vid_type in missing_from_channels - ] - playlist_videos = [ - { - "type": "video", - "vid_type": VideoTypeEnum.VIDEOS, - "url": i, - } - for i in missing_from_playlists - ] - pending_handler = PendingList( - youtube_ids=channel_videos + playlist_videos - ) - pending_handler.parse_url_list() - pending_handler.add_to_pending() + def on_success(self, retval, task_id, args, kwargs): + """callback task completed successfully""" + print(f"{task_id} success callback") + message, key = self._build_message() + message.update({"messages": ["Task completed successfully"]}) + RedisArchivist().set_message(key, message, expire=5) - else: - print("Did not acquire rescan lock.") + def before_start(self, task_id, args, kwargs): + """callback before initiating task""" + print(f"{self.name} create callback") + message, key = self._build_message() + message.update({"messages": ["New task received."]}) + RedisArchivist().set_message(key, message) - finally: - if have_lock: - my_lock.release() - - -@shared_task(name="download_pending") -def download_pending(): - """download latest pending videos""" - have_lock = False - my_lock = RedisArchivist().get_lock("downloading") - - try: - have_lock = my_lock.acquire(blocking=False) - if have_lock: - downloader = VideoDownloader() - downloader.add_pending() - downloader.run_queue() - else: - print("Did not acquire download lock.") - - finally: - if have_lock: - my_lock.release() - - -@shared_task -def download_single(pending_video): - """start download single video now""" - queue = RedisQueue(queue_name="dl_queue") - - to_add = { - "youtube_id": pending_video["youtube_id"], - "vid_type": pending_video.get("vid_type", VideoTypeEnum.VIDEOS.value), - } - queue.add_priority(json.dumps(to_add)) - print(f"Added to queue with priority: {to_add}") - # start queue if needed - have_lock = False - my_lock = RedisArchivist().get_lock("downloading") - - try: - have_lock = my_lock.acquire(blocking=False) - if have_lock: - key = "message:download" - mess_dict = { - "status": key, - "level": "info", - "title": "Download single video", - "message": "processing", + def send_progress(self, message_lines, progress=False, title=False): + """send progress message""" + message, key = self._build_message() + message.update( + { + "messages": message_lines, + "progress": progress, } - RedisArchivist().set_message(key, mess_dict, expire=True) - VideoDownloader().run_queue() - else: - print("Download queue already running.") + ) + if title: + message["title"] = title - finally: - # release if only single run - if have_lock and not queue.get_next(): - my_lock.release() + RedisArchivist().set_message(key, message) + + def _build_message(self, level="info"): + """build message dict""" + task_id = self.request.id + message = self.TASK_CONFIG.get(self.name) + message.update({"level": level, "id": task_id}) + task_result = TaskManager().get_task(task_id) + if task_result: + command = task_result.get("command", False) + message.update({"command": command}) + + key = f"message:{message.get('group')}:{task_id.split('-')[0]}" + return message, key + + def is_stopped(self): + """check if task is stopped""" + return TaskManager().is_stopped(self.request.id) -@shared_task -def extrac_dl(youtube_ids): +@shared_task(name="update_subscribed", bind=True, base=BaseTask) +def update_subscribed(self): + """look for missing videos and add to pending""" + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] rescan already running") + self.send_progress("Rescan already in progress.") + return + + manager.init(self) + missing_videos = SubscriptionScanner(task=self).scan() + if missing_videos: + print(missing_videos) + extrac_dl.delay(missing_videos) + + +@shared_task(name="download_pending", bind=True, base=BaseTask) +def download_pending(self, from_queue=True): + """download latest pending videos""" + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] download queue already running") + self.send_progress("Download Queue is already running.") + return + + manager.init(self) + downloader = VideoDownloader(task=self) + if from_queue: + downloader.add_pending() + downloader.run_queue() + + +@shared_task(name="extract_download", bind=True, base=BaseTask) +def extrac_dl(self, youtube_ids): """parse list passed and add to pending""" - pending_handler = PendingList(youtube_ids=youtube_ids) + TaskManager().init(self) + pending_handler = PendingList(youtube_ids=youtube_ids, task=self) pending_handler.parse_url_list() pending_handler.add_to_pending() -@shared_task(name="check_reindex") -def check_reindex(data=False, extract_videos=False): +@shared_task(bind=True, name="check_reindex", base=BaseTask) +def check_reindex(self, data=False, extract_videos=False): """run the reindex main command""" if data: + # started from frontend through API + print(f"[task][{self.name}] reindex {data}") + self.send_progress("Add items to the reindex Queue.") ReindexManual(extract_videos=extract_videos).extract_data(data) - have_lock = False - reindex_lock = RedisArchivist().get_lock("reindex") + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] reindex queue is already running") + self.send_progress("Reindex Queue is already running.") + return - try: - have_lock = reindex_lock.acquire(blocking=False) - if have_lock: - if not data: - ReindexOutdated().add_outdated() + manager.init(self) + if not data: + # started from scheduler + print(f"[task][{self.name}] reindex outdated documents") + self.send_progress("Add outdated documents to the reindex Queue.") + ReindexOutdated().add_outdated() - Reindex().reindex_all() - else: - print("Did not acquire reindex lock.") - - finally: - if have_lock: - reindex_lock.release() + Reindex(task=self).reindex_all() -@shared_task -def run_manual_import(): +@shared_task(bind=True, name="manual_import", base=BaseTask) +def run_manual_import(self): """called from settings page, to go through import folder""" - print("starting media file import") - have_lock = False - my_lock = RedisArchivist().get_lock("manual_import") + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] manual import is already running") + self.send_progress("Manual import is already running.") + return - try: - have_lock = my_lock.acquire(blocking=False) - if have_lock: - ImportFolderScanner().scan() - else: - print("Did not acquire lock form import.") - - finally: - if have_lock: - my_lock.release() + manager.init(self) + ImportFolderScanner(task=self).scan() -@shared_task(name="run_backup") -def run_backup(reason="auto"): +@shared_task(bind=True, name="run_backup", base=BaseTask) +def run_backup(self, reason="auto"): """called from settings page, dump backup to zip file""" - have_lock = False - my_lock = RedisArchivist().get_lock("run_backup") + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] backup is already running") + self.send_progress("Backup is already running.") + return - try: - have_lock = my_lock.acquire(blocking=False) - if have_lock: - ElasticBackup(reason=reason).backup_all_indexes() - else: - print("Did not acquire lock for backup task.") - finally: - if have_lock: - my_lock.release() - print("backup finished") + manager.init(self) + ElasticBackup(reason=reason, task=self).backup_all_indexes() -@shared_task -def run_restore_backup(filename): +@shared_task(bind=True, name="restore_backup", base=BaseTask) +def run_restore_backup(self, filename): """called from settings page, dump backup to zip file""" + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] restore is already running") + self.send_progress("Restore is already running.") + return + + manager.init(self) + self.send_progress(["Reset your Index"]) ElasitIndexWrap().reset() - ElasticBackup().restore(filename) + ElasticBackup(task=self).restore(filename) print("index restore finished") -def kill_dl(task_id): - """kill download worker task by ID""" - if task_id: - app.control.revoke(task_id, terminate=True) - - _ = RedisArchivist().del_message("dl_queue_id") - RedisQueue(queue_name="dl_queue").clear() - - _ = clear_dl_cache(CONFIG) - - # notify - mess_dict = { - "status": "message:download", - "level": "error", - "title": "Canceling download process", - "message": "Canceling download queue now.", - } - RedisArchivist().set_message("message:download", mess_dict, expire=True) - - -@shared_task -def rescan_filesystem(): +@shared_task(bind=True, name="rescan_filesystem", base=BaseTask) +def rescan_filesystem(self): """check the media folder for mismatches""" - scan_filesystem() - ThumbValidator().download_missing() + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] filesystem rescan already running") + self.send_progress("Filesystem Rescan is already running.") + return + + manager.init(self) + Filesystem(task=self).process() + ThumbValidator(task=self).validate() -@shared_task(name="thumbnail_check") -def thumbnail_check(): +@shared_task(bind=True, name="thumbnail_check", base=BaseTask) +def thumbnail_check(self): """validate thumbnails""" - ThumbValidator().download_missing() + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] thumbnail check is already running") + self.send_progress("Thumbnail check is already running.") + return + + manager.init(self) + ThumbValidator(task=self).validate() -@shared_task -def re_sync_thumbs(): +@shared_task(bind=True, name="resync_thumbs", base=BaseTask) +def re_sync_thumbs(self): """sync thumbnails to mediafiles""" - ThumbFilesystem().sync() + manager = TaskManager() + if manager.is_pending(self): + print(f"[task][{self.name}] thumb re-embed is already running") + self.send_progress("Thumbnail re-embed is already running.") + return + + manager.init(self) + ThumbFilesystem(task=self).embed() -@shared_task -def subscribe_to(url_str): +@shared_task(bind=True, name="subscribe_to", base=BaseTask) +def subscribe_to(self, url_str): """take a list of urls to subscribe to""" - to_subscribe_list = Parser(url_str).parse() - for idx, item in enumerate(to_subscribe_list): - to_sub_id = item["url"] - if item["type"] == "playlist": - PlaylistSubscription().process_url_str([item]) - continue - - if item["type"] == "video": - vid_details = PendingList().get_youtube_details(to_sub_id) - channel_id_sub = vid_details["channel_id"] - elif item["type"] == "channel": - channel_id_sub = to_sub_id - else: - raise ValueError("failed to subscribe to: " + to_sub_id) - - ChannelSubscription().change_subscribe( - channel_id_sub, channel_subscribed=True - ) - # notify for channels - key = "message:subchannel" - message = { - "status": key, - "level": "info", - "title": "Subscribing to Channels", - "message": f"Processing {idx + 1} of {len(to_subscribe_list)}", - } - RedisArchivist().set_message(key, message=message, expire=True) + SubscriptionHandler(url_str, task=self).subscribe() -@shared_task -def index_channel_playlists(channel_id): +@shared_task(bind=True, name="index_playlists", base=BaseTask) +def index_channel_playlists(self, channel_id): """add all playlists of channel to index""" - channel = YoutubeChannel(channel_id) - # notify - key = "message:playlistscan" - mess_dict = { - "status": key, - "level": "info", - "title": "Looking for playlists", - "message": f"{channel_id}: Channel scan in progress", - } - RedisArchivist().set_message(key, mess_dict, expire=True) + channel = YoutubeChannel(channel_id, task=self) channel.index_channel_playlists() diff --git a/tubearchivist/home/templates/home/channel.html b/tubearchivist/home/templates/home/channel.html index 203ad96..c2f1e0b 100644 --- a/tubearchivist/home/templates/home/channel.html +++ b/tubearchivist/home/templates/home/channel.html @@ -18,7 +18,7 @@ -
+
Show subscribed only: diff --git a/tubearchivist/home/templates/home/channel_id.html b/tubearchivist/home/templates/home/channel_id.html index a4c273f..9539f58 100644 --- a/tubearchivist/home/templates/home/channel_id.html +++ b/tubearchivist/home/templates/home/channel_id.html @@ -22,7 +22,7 @@

Downloads

{% endif %}
-
+
diff --git a/tubearchivist/home/templates/home/channel_id_about.html b/tubearchivist/home/templates/home/channel_id_about.html index 1688148..ac6a66c 100644 --- a/tubearchivist/home/templates/home/channel_id_about.html +++ b/tubearchivist/home/templates/home/channel_id_about.html @@ -22,6 +22,7 @@

Downloads

{% endif %}
+
diff --git a/tubearchivist/home/templates/home/channel_id_playlist.html b/tubearchivist/home/templates/home/channel_id_playlist.html index 03b484a..609883e 100644 --- a/tubearchivist/home/templates/home/channel_id_playlist.html +++ b/tubearchivist/home/templates/home/channel_id_playlist.html @@ -22,6 +22,7 @@

Downloads

{% endif %}
+
Show subscribed only: diff --git a/tubearchivist/home/templates/home/playlist.html b/tubearchivist/home/templates/home/playlist.html index bdfa229..2d786bf 100644 --- a/tubearchivist/home/templates/home/playlist.html +++ b/tubearchivist/home/templates/home/playlist.html @@ -17,7 +17,7 @@
-
+
Show subscribed only: diff --git a/tubearchivist/home/urls.py b/tubearchivist/home/urls.py index 8dc8c9d..14e851b 100644 --- a/tubearchivist/home/urls.py +++ b/tubearchivist/home/urls.py @@ -4,78 +4,73 @@ from django.conf import settings from django.contrib.auth.decorators import login_required from django.contrib.auth.views import LogoutView from django.urls import path -from home.views import ( - AboutView, - ChannelIdAboutView, - ChannelIdLiveView, - ChannelIdPlaylistView, - ChannelIdShortsView, - ChannelIdView, - ChannelView, - DownloadView, - HomeView, - LoginView, - PlaylistIdView, - PlaylistView, - SearchView, - SettingsView, - VideoView, - process, - progress, -) +from home import views urlpatterns = [ - path("", login_required(HomeView.as_view()), name="home"), - path("login/", LoginView.as_view(), name="login"), + path("", login_required(views.HomeView.as_view()), name="home"), + path("login/", views.LoginView.as_view(), name="login"), path( "logout/", LogoutView.as_view(), {"next_page": settings.LOGOUT_REDIRECT_URL}, name="logout", ), - path("about/", AboutView.as_view(), name="about"), + path("about/", views.AboutView.as_view(), name="about"), path( - "downloads/", login_required(DownloadView.as_view()), name="downloads" + "downloads/", + login_required(views.DownloadView.as_view()), + name="downloads", + ), + path( + "settings/", + login_required(views.SettingsView.as_view()), + name="settings", + ), + path("process/", login_required(views.process), name="process"), + path( + "channel/", + login_required(views.ChannelView.as_view()), + name="channel", ), - path("settings/", login_required(SettingsView.as_view()), name="settings"), - path("process/", login_required(process), name="process"), - path("progress/", login_required(progress), name="progress"), - path("channel/", login_required(ChannelView.as_view()), name="channel"), path( "channel//", - login_required(ChannelIdView.as_view()), + login_required(views.ChannelIdView.as_view()), name="channel_id", ), path( "channel//streams/", - login_required(ChannelIdLiveView.as_view()), + login_required(views.ChannelIdLiveView.as_view()), name="channel_id_live", ), path( "channel//shorts/", - login_required(ChannelIdShortsView.as_view()), + login_required(views.ChannelIdShortsView.as_view()), name="channel_id_shorts", ), path( "channel//about/", - login_required(ChannelIdAboutView.as_view()), + login_required(views.ChannelIdAboutView.as_view()), name="channel_id_about", ), path( "channel//playlist/", - login_required(ChannelIdPlaylistView.as_view()), + login_required(views.ChannelIdPlaylistView.as_view()), name="channel_id_playlist", ), path( "video//", - login_required(VideoView.as_view()), + login_required(views.VideoView.as_view()), name="video", ), - path("playlist/", login_required(PlaylistView.as_view()), name="playlist"), + path( + "playlist/", + login_required(views.PlaylistView.as_view()), + name="playlist", + ), path( "playlist//", - login_required(PlaylistIdView.as_view()), + login_required(views.PlaylistIdView.as_view()), name="playlist_id", ), - path("search/", login_required(SearchView.as_view()), name="search"), + path("search/", login_required(views.SearchView.as_view()), name="search"), ] diff --git a/tubearchivist/home/views.py b/tubearchivist/home/views.py index db735d0..527e18b 100644 --- a/tubearchivist/home/views.py +++ b/tubearchivist/home/views.py @@ -619,21 +619,6 @@ class ChannelIdView(ChannelIdBaseView): to_append = {"term": {"player.watched": {"value": False}}} self.data["query"]["bool"]["must"].append(to_append) - @staticmethod - def post(request, channel_id): - """handle post request""" - print(f"handle post from {channel_id}") - channel_overwrite_form = ChannelOverwriteForm(request.POST) - if channel_overwrite_form.is_valid(): - overwrites = channel_overwrite_form.cleaned_data - print(f"{channel_id}: set overwrites {overwrites}") - channel_overwrites(channel_id, overwrites=overwrites) - if overwrites.get("index_playlists") == "1": - index_channel_playlists.delay(channel_id) - - sleep(1) - return redirect("channel_id", channel_id, permanent=True) - class ChannelIdLiveView(ChannelIdView): """resolves to /channel//streams/ @@ -769,14 +754,6 @@ class ChannelView(ArchivistResultsView): """handle http post requests""" subscribe_form = SubscribeToChannelForm(data=request.POST) if subscribe_form.is_valid(): - key = "message:subchannel" - message = { - "status": key, - "level": "info", - "title": "Subscribing to Channels", - "message": "Parsing form data", - } - RedisArchivist().set_message(key, message=message, expire=True) url_str = request.POST.get("subscribe") print(url_str) subscribe_to.delay(url_str) @@ -922,14 +899,6 @@ class PlaylistView(ArchivistResultsView): if subscribe_form.is_valid(): url_str = request.POST.get("subscribe") print(url_str) - key = "message:subplaylist" - message = { - "status": key, - "level": "info", - "title": "Subscribing to Playlists", - "message": "Parsing form data", - } - RedisArchivist().set_message(key, message=message, expire=True) subscribe_to.delay(url_str) sleep(1) @@ -1131,26 +1100,11 @@ class SettingsView(MinView): RedisArchivist().set_message(key, message=message, expire=True) -def progress(request): - # pylint: disable=unused-argument - """resolves to /progress/ - return list of messages for frontend - """ - all_messages = RedisArchivist().get_progress() - json_data = {"messages": all_messages} - return JsonResponse(json_data) - - def process(request): """handle all the buttons calls via POST ajax""" if request.method == "POST": current_user = request.user.id post_dict = json.loads(request.body.decode()) - if post_dict.get("reset-token"): - print("revoke API token") - request.user.auth_token.delete() - return JsonResponse({"success": True}) - post_handler = PostData(post_dict, current_user) if post_handler.to_exec: task_result = post_handler.run_task() diff --git a/tubearchivist/requirements.txt b/tubearchivist/requirements.txt index 721df00..01c75c3 100644 --- a/tubearchivist/requirements.txt +++ b/tubearchivist/requirements.txt @@ -1,13 +1,13 @@ -beautifulsoup4==4.11.2 +beautifulsoup4==4.12.0 celery==5.2.7 Django==4.1.7 django-auth-ldap==4.1.0 django-cors-headers==3.14.0 djangorestframework==3.14.0 Pillow==9.4.0 -redis==4.5.1 +redis==4.5.3 requests==2.28.2 ryd-client==0.0.6 uWSGI==2.0.21 whitenoise==6.4.0 -yt_dlp==2023.3.3 +yt_dlp==2023.3.4 diff --git a/tubearchivist/static/css/style.css b/tubearchivist/static/css/style.css index dbf76f9..4119946 100644 --- a/tubearchivist/static/css/style.css +++ b/tubearchivist/static/css/style.css @@ -395,6 +395,10 @@ button:hover { position: relative; /* needed for modal */ } +#notifications { + position: relative; +} + .notifications { text-align: center; width: 80%; @@ -473,7 +477,8 @@ video:-webkit-full-screen { align-items: center; } -.video-progress-bar { +.video-progress-bar, +.notification-progress-bar { position: absolute; background-color: var(--accent-font-dark); height: 7px; @@ -1017,13 +1022,12 @@ video:-webkit-full-screen { cursor: pointer; } -.dl-control-icons { +.task-control-icons { display: flex; justify-content: center; - padding-bottom: 10px; } -.dl-control-icons img { +.task-control-icons img { width: 30px; cursor: pointer; margin: 5px; @@ -1044,6 +1048,7 @@ video:-webkit-full-screen { /* status message */ .notification { + position: relative; background-color: var(--highlight-bg); text-align: center; padding: 30px 0 15px 0; diff --git a/tubearchivist/static/progress.js b/tubearchivist/static/progress.js index 87f5e19..36e7ede 100644 --- a/tubearchivist/static/progress.js +++ b/tubearchivist/static/progress.js @@ -5,16 +5,9 @@ 'use strict'; -checkMessages(); +/* globals apiRequest animate */ -// page map to notification status -const messageTypes = { - download: ['message:download', 'message:add', 'message:rescan', 'message:playlistscan'], - channel: ['message:subchannel'], - channel_id: ['message:playlistscan'], - playlist: ['message:subplaylist'], - setting: ['message:setting'], -}; +checkMessages(); // start to look for messages function checkMessages() { @@ -25,82 +18,121 @@ function checkMessages() { } } -// get messages for page on timer function getMessages(dataOrigin) { - fetch('/progress/') - .then(response => response.json()) - .then(responseData => { - const messages = buildMessage(responseData, dataOrigin); - if (messages.length > 0) { - // restart itself - setTimeout(() => getMessages(dataOrigin), 500); - } - }); + let apiEndpoint = '/api/notification/'; + let responseData = apiRequest(apiEndpoint, 'GET'); + let messages = buildMessage(responseData, dataOrigin); + if (messages.length > 0) { + // restart itself + setTimeout(() => getMessages(dataOrigin), 500); + } } -// make div for all messages, return relevant function buildMessage(responseData, dataOrigin) { - // filter relevan messages - let allMessages = responseData['messages']; - let messages = allMessages.filter(function (value) { - return messageTypes[dataOrigin].includes(value['status']); + // filter relevant messages + let messages = responseData.filter(function (value) { + return value.group.startsWith(dataOrigin); }, dataOrigin); - // build divs - let notificationDiv = document.getElementById('notifications'); - let nots = notificationDiv.childElementCount; - notificationDiv.innerHTML = ''; + let notifications = document.getElementById('notifications'); + let currentNotifications = notifications.childElementCount; + for (let i = 0; i < messages.length; i++) { - let messageData = messages[i]; - let messageStatus = messageData['status']; - let messageBox = document.createElement('div'); - let title = document.createElement('h3'); - title.innerHTML = messageData['title']; - let message = document.createElement('p'); - message.innerHTML = messageData['message']; - messageBox.appendChild(title); - messageBox.appendChild(message); - messageBox.classList.add(messageData['level'], 'notification'); - notificationDiv.appendChild(messageBox); - if (messageStatus === 'message:download') { - checkDownloadIcons(); + const messageData = messages[i]; + if (!document.getElementById(messageData.id)) { + let messageBox = buildPlainBox(messageData); + notifications.appendChild(messageBox); + } + updateMessageBox(messageData); + if (messageData.group.startsWith('download:')) { + animateIcons(messageData.group); } } - // reload page when no more notifications - if (nots > 0 && messages.length === 0) { + clearNotifications(responseData); + if (currentNotifications > 0 && messages.length === 0) { location.reload(); } return messages; } -// check if download icons are needed -function checkDownloadIcons() { - let iconBox = document.getElementById('downloadControl'); - if (iconBox.childElementCount === 0) { - let downloadIcons = buildDownloadIcons(); - iconBox.appendChild(downloadIcons); +function buildPlainBox(messageData) { + let messageBox = document.createElement('div'); + messageBox.classList.add(messageData.level, 'notification'); + messageBox.id = messageData.id; + messageBox.innerHTML = ` +

+

+
+
`; + return messageBox; +} + +function updateMessageBox(messageData) { + let messageBox = document.getElementById(messageData.id); + let children = messageBox.children; + children[0].textContent = messageData.title; + children[1].innerHTML = messageData.messages.join('
'); + if ( + !messageBox.querySelector('#stop-icon') && + messageData['api-stop'] && + messageData.command !== 'STOP' + ) { + children[2].appendChild(buildStopIcon(messageData.id)); + } + if (messageData.progress) { + children[3].style.width = `${messageData.progress * 100 || 0}%`; } } -// add dl control icons -function buildDownloadIcons() { - let downloadIcons = document.createElement('div'); - downloadIcons.classList = 'dl-control-icons'; - // stop icon +function animateIcons(group) { + let rescanIcon = document.getElementById('rescan-icon'); + let dlIcon = document.getElementById('download-icon'); + switch (group) { + case 'download:scan': + if (rescanIcon && !rescanIcon.classList.contains('rotate-img')) { + animate('rescan-icon', 'rotate-img'); + } + break; + + case 'download:run': + if (dlIcon && !dlIcon.classList.contains('bounce-img')) { + animate('download-icon', 'bounce-img'); + } + break; + + default: + break; + } +} + +function buildStopIcon(taskId) { let stopIcon = document.createElement('img'); stopIcon.setAttribute('id', 'stop-icon'); - stopIcon.setAttribute('title', 'Stop Download Queue'); + stopIcon.setAttribute('data', taskId); + stopIcon.setAttribute('title', 'Stop Task'); stopIcon.setAttribute('src', '/static/img/icon-stop.svg'); stopIcon.setAttribute('alt', 'stop icon'); - stopIcon.setAttribute('onclick', 'stopQueue()'); - // kill icon + stopIcon.setAttribute('onclick', 'stopTask(this)'); + return stopIcon; +} + +function buildKillIcon(taskId) { let killIcon = document.createElement('img'); killIcon.setAttribute('id', 'kill-icon'); - killIcon.setAttribute('title', 'Kill Download Queue'); + killIcon.setAttribute('data', taskId); + killIcon.setAttribute('title', 'Kill Task'); killIcon.setAttribute('src', '/static/img/icon-close.svg'); killIcon.setAttribute('alt', 'kill icon'); - killIcon.setAttribute('onclick', 'killQueue()'); - // stich together - downloadIcons.appendChild(stopIcon); - downloadIcons.appendChild(killIcon); - return downloadIcons; + killIcon.setAttribute('onclick', 'killTask(this)'); + return killIcon; +} + +function clearNotifications(responseData) { + let allIds = Array.from(responseData, x => x.id); + let allBoxes = document.getElementsByClassName('notification'); + for (let i = 0; i < allBoxes.length; i++) { + const notificationBox = allBoxes[i]; + if (!allIds.includes(notificationBox.id)) { + notificationBox.remove(); + } + } } diff --git a/tubearchivist/static/script.js b/tubearchivist/static/script.js index 598f4e9..b902be5 100644 --- a/tubearchivist/static/script.js +++ b/tubearchivist/static/script.js @@ -167,18 +167,18 @@ function reindex(button) { // download page buttons function rescanPending() { - let payload = JSON.stringify({ rescan_pending: true }); + let apiEndpoint = '/api/task-name/update_subscribed/'; + apiRequest(apiEndpoint, 'POST'); animate('rescan-icon', 'rotate-img'); - sendPost(payload); setTimeout(function () { checkMessages(); }, 500); } function dlPending() { - let payload = JSON.stringify({ dl_pending: true }); + let apiEndpoint = '/api/task-name/download_pending/'; + apiRequest(apiEndpoint, 'POST'); animate('download-icon', 'bounce-img'); - sendPost(payload); setTimeout(function () { checkMessages(); }, 500); @@ -228,50 +228,64 @@ function deleteQueue(button) { document.getElementById(button.id).replaceWith(message); } -function stopQueue() { - let payload = JSON.stringify({ queue: 'stop' }); - sendPost(payload); - document.getElementById('stop-icon').remove(); +function stopTask(icon) { + let taskId = icon.getAttribute('data'); + let apiEndpoint = `/api/task-id/${taskId}/`; + apiRequest(apiEndpoint, 'POST', { command: 'stop' }); + icon.remove(); } -function killQueue() { - let payload = JSON.stringify({ queue: 'kill' }); - sendPost(payload); - document.getElementById('kill-icon').remove(); +function killTask(icon) { + let taskId = icon.getAttribute('data'); + let apiEndpoint = `/api/task-id/${taskId}/`; + apiRequest(apiEndpoint, 'POST', { command: 'kill' }); + icon.remove(); } // settings page buttons function manualImport() { - let payload = JSON.stringify({ 'manual-import': true }); - sendPost(payload); + let apiEndpoint = '/api/task-name/manual_import/'; + apiRequest(apiEndpoint, 'POST'); // clear button let message = document.createElement('p'); message.innerText = 'processing import'; let toReplace = document.getElementById('manual-import'); toReplace.innerHTML = ''; toReplace.appendChild(message); + setTimeout(function () { + location.replace('#notifications'); + checkMessages(); + }, 200); } function reEmbed() { - let payload = JSON.stringify({ 're-embed': true }); - sendPost(payload); + let apiEndpoint = '/api/task-name/resync_thumbs/'; + apiRequest(apiEndpoint, 'POST'); // clear button let message = document.createElement('p'); message.innerText = 'processing thumbnails'; let toReplace = document.getElementById('re-embed'); toReplace.innerHTML = ''; toReplace.appendChild(message); + setTimeout(function () { + location.replace('#notifications'); + checkMessages(); + }, 200); } function dbBackup() { - let payload = JSON.stringify({ 'db-backup': true }); - sendPost(payload); + let apiEndpoint = '/api/task-name/run_backup/'; + apiRequest(apiEndpoint, 'POST'); // clear button let message = document.createElement('p'); message.innerText = 'backing up archive'; let toReplace = document.getElementById('db-backup'); toReplace.innerHTML = ''; toReplace.appendChild(message); + setTimeout(function () { + location.replace('#notifications'); + checkMessages(); + }, 200); } function dbRestore(button) { @@ -284,25 +298,37 @@ function dbRestore(button) { let toReplace = document.getElementById(fileName); toReplace.innerHTML = ''; toReplace.appendChild(message); + setTimeout(function () { + location.replace('#notifications'); + checkMessages(); + }, 200); } function fsRescan() { - let payload = JSON.stringify({ 'fs-rescan': true }); - sendPost(payload); + let apiEndpoint = '/api/task-name/rescan_filesystem/'; + apiRequest(apiEndpoint, 'POST'); // clear button let message = document.createElement('p'); message.innerText = 'File system scan in progress'; let toReplace = document.getElementById('fs-rescan'); toReplace.innerHTML = ''; toReplace.appendChild(message); + setTimeout(function () { + location.replace('#notifications'); + checkMessages(); + }, 200); } function resetToken() { - let payload = JSON.stringify({ 'reset-token': true }); - sendPost(payload); - let message = document.createElement('p'); - message.innerText = 'Token revoked'; - document.getElementById('text-reveal').replaceWith(message); + let apiEndpoint = '/api/token/'; + let result = apiRequest(apiEndpoint, 'DELETE'); + if (result && result.success) { + let message = document.createElement('p'); + message.innerText = 'Token revoked'; + document.getElementById('text-reveal').replaceWith(message); + } else { + console.error('unable to revoke token'); + } } // restore from snapshot