diff --git a/README.md b/README.md index 985f048..faf025f 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,12 @@ Take a look at the example [docker-compose.yml](https://github.com/tubearchivist | TZ | Set your timezone for the scheduler | Required | | TA_PORT | Overwrite Nginx port | Optional | | TA_UWSGI_PORT | Overwrite container internal uwsgi port | Optional | +| TA_ENABLE_AUTH_PROXY | Enables support for forwarding auth in reverse proxies | [Read more](https://docs.tubearchivist.com/configuration/forward-auth/) | +| TA_AUTH_PROXY_USERNAME_HEADER | Header containing username to log in | Optional | +| TA_AUTH_PROXY_LOGOUT_URL | Logout URL for forwarded auth | Opttional | | ES_URL | URL That ElasticSearch runs on | Optional | +| ES_DISABLE_VERIFY_SSL | Disable ElasticSearch SSL certificate verification | Optional | +| ES_SNAPSHOT_DIR | Custom path where elastic search stores snapshots for master/data nodes | Optional | | HOST_GID | Allow TA to own the video files instead of container user | Optional | | HOST_UID | Allow TA to own the video files instead of container user | Optional | | ELASTIC_USER | Change the default ElasticSearch user | Optional | diff --git a/tubearchivist/api/src/aggs.py b/tubearchivist/api/src/aggs.py index aa35c39..1e2f54c 100644 --- a/tubearchivist/api/src/aggs.py +++ b/tubearchivist/api/src/aggs.py @@ -1,7 +1,7 @@ """aggregations""" from home.src.es.connect import ElasticWrap -from home.src.index.video_streams import DurationConverter +from home.src.ta.helper import get_duration_str class AggBase: @@ -119,7 +119,7 @@ class WatchProgress(AggBase): { "all": { "duration": all_duration, - "duration_str": DurationConverter().get_str(all_duration), + "duration_str": get_duration_str(all_duration), "items": aggregations["total_vids"].get("value"), } } @@ -135,7 +135,7 @@ class WatchProgress(AggBase): """parse bucket""" duration = int(bucket["watch_docs"]["duration"]["value"]) - duration_str = DurationConverter().get_str(duration) + duration_str = get_duration_str(duration) items = bucket["watch_docs"]["true_count"]["value"] if bucket["key_as_string"] == "false": key = "unwatched" @@ -196,6 +196,9 @@ class DownloadHist(AggBase): class BiggestChannel(AggBase): """get channel aggregations""" + def __init__(self, order): + self.data["aggs"][self.name]["multi_terms"]["order"] = {order: "desc"} + name = "channel_stats" path = "ta_video/_search" data = { @@ -231,9 +234,7 @@ class BiggestChannel(AggBase): "name": i["key"][0].title(), "doc_count": i["doc_count"]["value"], "duration": i["duration"]["value"], - "duration_str": DurationConverter().get_str( - i["duration"]["value"] - ), + "duration_str": get_duration_str(int(i["duration"]["value"])), "media_size": i["media_size"]["value"], } for i in buckets diff --git a/tubearchivist/api/src/search_processor.py b/tubearchivist/api/src/search_processor.py index e9b4d24..232474d 100644 --- a/tubearchivist/api/src/search_processor.py +++ b/tubearchivist/api/src/search_processor.py @@ -8,7 +8,7 @@ import urllib.parse from home.src.download.thumbnails import ThumbManager from home.src.ta.config import AppConfig -from home.src.ta.helper import date_praser +from home.src.ta.helper import date_praser, get_duration_str class SearchProcess: @@ -50,6 +50,16 @@ class SearchProcess: processed = self._process_download(result["_source"]) if index == "ta_comment": processed = self._process_comment(result["_source"]) + if index == "ta_subtitle": + processed = self._process_subtitle(result) + + if isinstance(processed, dict): + processed.update( + { + "_index": index, + "_score": round(result.get("_score") or 0, 2), + } + ) return processed @@ -139,3 +149,29 @@ class SearchProcess: processed_comments[-1]["comment_replies"].append(comment) return processed_comments + + def _process_subtitle(self, result): + """take complete result dict to extract highlight""" + subtitle_dict = result["_source"] + highlight = result.get("highlight") + if highlight: + # replace lines with the highlighted markdown + subtitle_line = highlight.get("subtitle_line")[0] + subtitle_dict.update({"subtitle_line": subtitle_line}) + + thumb_path = ThumbManager(subtitle_dict["youtube_id"]).vid_thumb_path() + subtitle_dict.update({"vid_thumb_url": f"/cache/{thumb_path}"}) + + return subtitle_dict + + +def process_aggs(response): + """convert aggs duration to str""" + + if response.get("aggregations"): + aggs = response["aggregations"] + if "total_duration" in aggs: + duration_sec = int(aggs["total_duration"]["value"]) + aggs["total_duration"].update( + {"value_str": get_duration_str(duration_sec)} + ) diff --git a/tubearchivist/api/views.py b/tubearchivist/api/views.py index 24db6dd..5d3e59c 100644 --- a/tubearchivist/api/views.py +++ b/tubearchivist/api/views.py @@ -1025,9 +1025,9 @@ class StatBiggestChannel(ApiBaseView): def get(self, request): """handle get request""" - order = request.GET.get("order", False) + order = request.GET.get("order", "doc_count") if order and order not in self.order_choices: message = {"message": f"invalid order parameter {order}"} return Response(message, status=400) - return Response(BiggestChannel().process()) + return Response(BiggestChannel(order).process()) diff --git a/tubearchivist/config/management/commands/ta_connection.py b/tubearchivist/config/management/commands/ta_connection.py index 23cc14d..4e0f59a 100644 --- a/tubearchivist/config/management/commands/ta_connection.py +++ b/tubearchivist/config/management/commands/ta_connection.py @@ -3,6 +3,7 @@ Functionality: - check that all connections are working """ +from os import environ from time import sleep import requests @@ -132,7 +133,19 @@ class Command(BaseCommand): """check that path.repo var is set""" self.stdout.write("[5] check ES path.repo env var") response, _ = ElasticWrap("_nodes/_all/settings").get() + snaphost_roles = [ + "data", + "data_cold", + "data_content", + "data_frozen", + "data_hot", + "data_warm", + "master", + ] for node in response["nodes"].values(): + if not (set(node["roles"]) & set(snaphost_roles)): + continue + if node["settings"]["path"].get("repo"): self.stdout.write( self.style.SUCCESS(" ✓ path.repo env var is set") @@ -142,7 +155,10 @@ class Command(BaseCommand): message = ( " 🗙 path.repo env var not found. " + "set the following env var to the ES container:\n" - + " path.repo=/usr/share/elasticsearch/data/snapshot" + + " path.repo=" + + environ.get( + "ES_SNAPSHOT_DIR", "/usr/share/elasticsearch/data/snapshot" + ), ) self.stdout.write(self.style.ERROR(f"{message}")) sleep(60) diff --git a/tubearchivist/config/management/commands/ta_startup.py b/tubearchivist/config/management/commands/ta_startup.py index e45c46c..f71fe76 100644 --- a/tubearchivist/config/management/commands/ta_startup.py +++ b/tubearchivist/config/management/commands/ta_startup.py @@ -16,6 +16,7 @@ from home.src.ta.config import AppConfig, ReleaseVersion from home.src.ta.helper import clear_dl_cache from home.src.ta.ta_redis import RedisArchivist from home.src.ta.task_manager import TaskManager +from home.src.ta.users import UserConfig TOPIC = """ @@ -44,6 +45,7 @@ class Command(BaseCommand): self._mig_snapshot_check() self._mig_set_streams() self._mig_set_autostart() + self._mig_move_users_to_es() def _sync_redis_state(self): """make sure redis gets new config.json values""" @@ -219,3 +221,99 @@ class Command(BaseCommand): self.stdout.write(response) sleep(60) raise CommandError(message) + + def _mig_move_users_to_es(self): # noqa: C901 + """migration: update from 0.4.1 to 0.5.0 move user config to ES""" + self.stdout.write("[MIGRATION] move user configuration to ES") + redis = RedisArchivist() + + # 1: Find all users in Redis + users = {i.split(":")[0] for i in redis.list_keys("[0-9]*:")} + if not users: + self.stdout.write(" no users needed migrating to ES") + return + + # 2: Write all Redis user settings to ES + # 3: Remove user settings from Redis + try: + for user in users: + new_conf = UserConfig(user) + + colors_key = f"{user}:colors" + colors = redis.get_message(colors_key).get("status") + if colors is not None: + new_conf.set_value("colors", colors) + redis.del_message(colors_key) + + sort_by_key = f"{user}:sort_by" + sort_by = redis.get_message(sort_by_key).get("status") + if sort_by is not None: + new_conf.set_value("sort_by", sort_by) + redis.del_message(sort_by_key) + + page_size_key = f"{user}:page_size" + page_size = redis.get_message(page_size_key).get("status") + if page_size is not None: + new_conf.set_value("page_size", page_size) + redis.del_message(page_size_key) + + sort_order_key = f"{user}:sort_order" + sort_order = redis.get_message(sort_order_key).get("status") + if sort_order is not None: + new_conf.set_value("sort_order", sort_order) + redis.del_message(sort_order_key) + + grid_items_key = f"{user}:grid_items" + grid_items = redis.get_message(grid_items_key).get("status") + if grid_items is not None: + new_conf.set_value("grid_items", grid_items) + redis.del_message(grid_items_key) + + hide_watch_key = f"{user}:hide_watched" + hide_watch = redis.get_message(hide_watch_key).get("status") + if hide_watch is not None: + new_conf.set_value("hide_watched", hide_watch) + redis.del_message(hide_watch_key) + + ignore_only_key = f"{user}:show_ignored_only" + ignore_only = redis.get_message(ignore_only_key).get("status") + if ignore_only is not None: + new_conf.set_value("show_ignored_only", ignore_only) + redis.del_message(ignore_only_key) + + subed_only_key = f"{user}:show_subed_only" + subed_only = redis.get_message(subed_only_key).get("status") + if subed_only is not None: + new_conf.set_value("show_subed_only", subed_only) + redis.del_message(subed_only_key) + + sb_id_key = f"{user}:id_sb_id" + sb_id = redis.get_message(sb_id_key).get("status") + if sb_id is not None: + new_conf.set_value("sb_id_id", sb_id) + redis.del_message(sb_id_key) + + for view in ["channel", "playlist", "home", "downloads"]: + view_key = f"{user}:view:{view}" + view_style = redis.get_message(view_key).get("status") + if view_style is not None: + new_conf.set_value(f"view_style_{view}", view_style) + redis.del_message(view_key) + + self.stdout.write( + self.style.SUCCESS( + f" ✓ Settings for user '{user}' migrated to ES" + ) + ) + except Exception as e: + message = " 🗙 user migration to ES failed" + self.stdout.write(self.style.ERROR(message)) + self.stdout.write(self.style.ERROR(e)) + sleep(60) + raise CommandError(message) + else: + self.stdout.write( + self.style.SUCCESS( + " ✓ Settings for all users migrated to ES" + ) + ) diff --git a/tubearchivist/config/settings.py b/tubearchivist/config/settings.py index e58eeea..0afef8c 100644 --- a/tubearchivist/config/settings.py +++ b/tubearchivist/config/settings.py @@ -64,6 +64,7 @@ MIDDLEWARE = [ "django.contrib.auth.middleware.AuthenticationMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", + "home.src.ta.health.HealthCheckMiddleware", ] ROOT_URLCONF = "config.urls" @@ -174,7 +175,6 @@ if bool(environ.get("TA_LDAP")): ldap.OPT_X_TLS_REQUIRE_CERT: ldap.OPT_X_TLS_NEVER, } - global AUTHENTICATION_BACKENDS AUTHENTICATION_BACKENDS = ("django_auth_ldap.backend.LDAPBackend",) # Database @@ -210,6 +210,19 @@ AUTH_PASSWORD_VALIDATORS = [ AUTH_USER_MODEL = "home.Account" +# Forward-auth authentication +if bool(environ.get("TA_ENABLE_AUTH_PROXY")): + TA_AUTH_PROXY_USERNAME_HEADER = ( + environ.get("TA_AUTH_PROXY_USERNAME_HEADER") or "HTTP_REMOTE_USER" + ) + TA_AUTH_PROXY_LOGOUT_URL = environ.get("TA_AUTH_PROXY_LOGOUT_URL") + + MIDDLEWARE.append("home.src.ta.auth.HttpRemoteUserMiddleware") + + AUTHENTICATION_BACKENDS = ( + "django.contrib.auth.backends.RemoteUserBackend", + ) + # Internationalization # https://docs.djangoproject.com/en/3.2/topics/i18n/ @@ -256,4 +269,4 @@ CORS_ALLOW_HEADERS = list(default_headers) + [ # TA application settings TA_UPSTREAM = "https://github.com/tubearchivist/tubearchivist" -TA_VERSION = "v0.4.1" +TA_VERSION = "v0.4.2-unstable" diff --git a/tubearchivist/home/config.json b/tubearchivist/home/config.json index d45a2a1..26d3bf9 100644 --- a/tubearchivist/home/config.json +++ b/tubearchivist/home/config.json @@ -1,18 +1,5 @@ { - "archive": { - "sort_by": "published", - "sort_order": "desc", - "page_size": 12 - }, - "default_view": { - "home": "grid", - "channel": "list", - "downloads": "list", - "playlist": "grid", - "grid_items": 3 - }, "subscriptions": { - "auto_download": false, "channel_size": 50, "live_channel_size": 50, "shorts_channel_size": 50, @@ -41,7 +28,6 @@ "app_root": "/app", "cache_dir": "/cache", "videos": "/youtube", - "colors": "dark", "enable_cast": false, "enable_snapshot": true }, diff --git a/tubearchivist/home/src/download/queue.py b/tubearchivist/home/src/download/queue.py index 97636bb..0f92bd4 100644 --- a/tubearchivist/home/src/download/queue.py +++ b/tubearchivist/home/src/download/queue.py @@ -16,9 +16,8 @@ from home.src.download.yt_dlp_base import YtWrap from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.index.playlist import YoutubePlaylist from home.src.index.video_constants import VideoTypeEnum -from home.src.index.video_streams import DurationConverter from home.src.ta.config import AppConfig -from home.src.ta.helper import is_shorts +from home.src.ta.helper import get_duration_str, is_shorts class PendingIndex: @@ -335,9 +334,6 @@ class PendingList(PendingIndex): def _parse_youtube_details(self, vid, vid_type=VideoTypeEnum.VIDEOS): """parse response""" vid_id = vid.get("id") - duration_str = DurationConverter.get_str(vid["duration"]) - if duration_str == "NA": - print(f"skip extracting duration for: {vid_id}") published = datetime.strptime(vid["upload_date"], "%Y%m%d").strftime( "%Y-%m-%d" ) @@ -349,7 +345,7 @@ class PendingList(PendingIndex): "vid_thumb_url": vid["thumbnail"], "title": vid["title"], "channel_id": vid["channel_id"], - "duration": duration_str, + "duration": get_duration_str(vid["duration"]), "published": published, "timestamp": int(datetime.now().timestamp()), # Pulling enum value out so it is serializable diff --git a/tubearchivist/home/src/download/thumbnails.py b/tubearchivist/home/src/download/thumbnails.py index 7041ec2..ca498c5 100644 --- a/tubearchivist/home/src/download/thumbnails.py +++ b/tubearchivist/home/src/download/thumbnails.py @@ -246,9 +246,10 @@ class ThumbManager(ThumbManagerBase): class ValidatorCallback: """handle callback validate thumbnails page by page""" - def __init__(self, source, index_name): + def __init__(self, source, index_name, counter=0): self.source = source self.index_name = index_name + self.counter = counter def run(self): """run the task for page""" @@ -384,9 +385,10 @@ class EmbedCallback: MEDIA_DIR = CONFIG["application"]["videos"] FORMAT = MP4Cover.FORMAT_JPEG - def __init__(self, source, index_name): + def __init__(self, source, index_name, counter=0): self.source = source self.index_name = index_name + self.counter = counter def run(self): """run embed""" diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index f2bf800..9a865ab 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -417,7 +417,7 @@ class VideoDownloader: "lang": "painless", }, } - response, _ = ElasticWrap(path, config=self.config).post(data=data) + response, _ = ElasticWrap(path).post(data=data) updated = response.get("updated") if updated: print(f"[download] reset auto start on {updated} videos.") diff --git a/tubearchivist/home/src/es/backup.py b/tubearchivist/home/src/es/backup.py index df1a481..3dc1cf5 100644 --- a/tubearchivist/home/src/es/backup.py +++ b/tubearchivist/home/src/es/backup.py @@ -18,6 +18,8 @@ from home.src.ta.helper import get_mapping, ignore_filelist class ElasticBackup: """dump index to nd-json files for later bulk import""" + INDEX_SPLIT = ["comment"] + def __init__(self, reason=False, task=False): self.config = AppConfig().config self.cache_dir = self.config["application"]["cache_dir"] @@ -51,14 +53,18 @@ class ElasticBackup: def backup_index(self, index_name): """export all documents of a single index""" - paginate = IndexPaginate( - f"ta_{index_name}", - data={"query": {"match_all": {}}}, - keep_source=True, - callback=BackupCallback, - task=self.task, - total=self._get_total(index_name), - ) + paginate_kwargs = { + "data": {"query": {"match_all": {}}}, + "keep_source": True, + "callback": BackupCallback, + "task": self.task, + "total": self._get_total(index_name), + } + + if index_name in self.INDEX_SPLIT: + paginate_kwargs.update({"size": 200}) + + paginate = IndexPaginate(f"ta_{index_name}", **paginate_kwargs) _ = paginate.get_results() @staticmethod @@ -206,9 +212,10 @@ class ElasticBackup: class BackupCallback: """handle backup ndjson writer as callback for IndexPaginate""" - def __init__(self, source, index_name): + def __init__(self, source, index_name, counter=0): self.source = source self.index_name = index_name + self.counter = counter self.timestamp = datetime.now().strftime("%Y%m%d") def run(self): @@ -237,7 +244,8 @@ class BackupCallback: def _write_es_json(self, file_content): """write nd-json file for es _bulk API to disk""" cache_dir = AppConfig().config["application"]["cache_dir"] - file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json" + index = self.index_name.lstrip("ta_") + file_name = f"es_{index}-{self.timestamp}-{self.counter}.json" file_path = os.path.join(cache_dir, "backup", file_name) with open(file_path, "a+", encoding="utf-8") as f: f.write(file_content) diff --git a/tubearchivist/home/src/es/connect.py b/tubearchivist/home/src/es/connect.py index 0b9d554..a7c3ff5 100644 --- a/tubearchivist/home/src/es/connect.py +++ b/tubearchivist/home/src/es/connect.py @@ -6,9 +6,11 @@ functionality: # pylint: disable=missing-timeout import json +import os +from typing import Any import requests -from home.src.ta.config import AppConfig +import urllib3 class ElasticWrap: @@ -16,61 +18,96 @@ class ElasticWrap: returns response json and status code tuple """ - def __init__(self, path, config=False): - self.url = False - self.auth = False - self.path = path - self.config = config - self._get_config() + ES_URL: str = str(os.environ.get("ES_URL")) + ES_PASS: str = str(os.environ.get("ELASTIC_PASSWORD")) + ES_USER: str = str(os.environ.get("ELASTIC_USER") or "elastic") + ES_DISABLE_VERIFY_SSL: bool = bool(os.environ.get("ES_DISABLE_VERIFY_SSL")) - def _get_config(self): - """add config if not passed""" - if not self.config: - self.config = AppConfig().config + def __init__(self, path: str): + self.url: str = f"{self.ES_URL}/{path}" + self.auth: tuple[str, str] = (self.ES_USER, self.ES_PASS) - es_url = self.config["application"]["es_url"] - self.auth = self.config["application"]["es_auth"] - self.url = f"{es_url}/{self.path}" + if self.ES_DISABLE_VERIFY_SSL: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - def get(self, data=False, timeout=10, print_error=True): + def get( + self, + data: bool | dict = False, + timeout: int = 10, + print_error: bool = True, + ) -> tuple[dict, int]: """get data from es""" + + kwargs: dict[str, Any] = { + "auth": self.auth, + "timeout": timeout, + } + + if self.ES_DISABLE_VERIFY_SSL: + kwargs["verify"] = False + if data: - response = requests.get( - self.url, json=data, auth=self.auth, timeout=timeout - ) - else: - response = requests.get(self.url, auth=self.auth, timeout=timeout) + kwargs["json"] = data + + response = requests.get(self.url, **kwargs) + if print_error and not response.ok: print(response.text) return response.json(), response.status_code - def post(self, data=False, ndjson=False): + def post( + self, data: bool | dict = False, ndjson: bool = False + ) -> tuple[dict, int]: """post data to es""" - if ndjson: - headers = {"Content-type": "application/x-ndjson"} - payload = data - else: - headers = {"Content-type": "application/json"} - payload = json.dumps(data) - if data: - response = requests.post( - self.url, data=payload, headers=headers, auth=self.auth + kwargs: dict[str, Any] = {"auth": self.auth} + + if ndjson and data: + kwargs.update( + { + "headers": {"Content-type": "application/x-ndjson"}, + "data": data, + } ) - else: - response = requests.post(self.url, headers=headers, auth=self.auth) + elif data: + kwargs.update( + { + "headers": {"Content-type": "application/json"}, + "data": json.dumps(data), + } + ) + + if self.ES_DISABLE_VERIFY_SSL: + kwargs["verify"] = False + + response = requests.post(self.url, **kwargs) if not response.ok: print(response.text) return response.json(), response.status_code - def put(self, data, refresh=False): + def put( + self, + data: bool | dict = False, + refresh: bool = False, + ) -> tuple[dict, Any]: """put data to es""" + if refresh: self.url = f"{self.url}/?refresh=true" - response = requests.put(f"{self.url}", json=data, auth=self.auth) + + kwargs: dict[str, Any] = { + "json": data, + "auth": self.auth, + } + + if self.ES_DISABLE_VERIFY_SSL: + kwargs["verify"] = False + + response = requests.put(self.url, **kwargs) + if not response.ok: print(response.text) print(data) @@ -78,14 +115,25 @@ class ElasticWrap: return response.json(), response.status_code - def delete(self, data=False, refresh=False): + def delete( + self, + data: bool | dict = False, + refresh: bool = False, + ) -> tuple[dict, Any]: """delete document from es""" + if refresh: self.url = f"{self.url}/?refresh=true" + + kwargs: dict[str, Any] = {"auth": self.auth} + if data: - response = requests.delete(self.url, json=data, auth=self.auth) - else: - response = requests.delete(self.url, auth=self.auth) + kwargs["json"] = data + + if self.ES_DISABLE_VERIFY_SSL: + kwargs["verify"] = False + + response = requests.delete(self.url, **kwargs) if not response.ok: print(response.text) @@ -156,7 +204,9 @@ class IndexPaginate: all_results.append(hit["_source"]) if self.kwargs.get("callback"): - self.kwargs.get("callback")(all_hits, self.index_name).run() + self.kwargs.get("callback")( + all_hits, self.index_name, counter=counter + ).run() if self.kwargs.get("task"): print(f"{self.index_name}: processing page {counter}") diff --git a/tubearchivist/home/src/es/index_mapping.json b/tubearchivist/home/src/es/index_mapping.json index 06bf13c..d6dda4b 100644 --- a/tubearchivist/home/src/es/index_mapping.json +++ b/tubearchivist/home/src/es/index_mapping.json @@ -1,5 +1,17 @@ { "index_config": [{ + "index_name": "config", + "expected_map": { + "config": { + "type": "object", + "enabled": false + } + }, + "expected_set": { + "number_of_replicas": "0" + } + }, + { "index_name": "channel", "expected_map": { "channel_id": { @@ -601,4 +613,4 @@ } } ] -} \ No newline at end of file +} diff --git a/tubearchivist/home/src/es/snapshot.py b/tubearchivist/home/src/es/snapshot.py index 6d6563c..15fc82c 100644 --- a/tubearchivist/home/src/es/snapshot.py +++ b/tubearchivist/home/src/es/snapshot.py @@ -19,7 +19,9 @@ class ElasticSnapshot: REPO_SETTINGS = { "compress": "true", "chunk_size": "1g", - "location": "/usr/share/elasticsearch/data/snapshot", + "location": environ.get( + "ES_SNAPSHOT_DIR", "/usr/share/elasticsearch/data/snapshot" + ), } POLICY = "ta_daily" diff --git a/tubearchivist/home/src/frontend/api_calls.py b/tubearchivist/home/src/frontend/api_calls.py index 60764ea..c5402ab 100644 --- a/tubearchivist/home/src/frontend/api_calls.py +++ b/tubearchivist/home/src/frontend/api_calls.py @@ -4,7 +4,7 @@ Functionality: - called via user input """ -from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.users import UserConfig from home.tasks import run_restore_backup @@ -41,10 +41,8 @@ class PostData: def _change_view(self): """process view changes in home, channel, and downloads""" - origin, new_view = self.exec_val.split(":") - key = f"{self.current_user}:view:{origin}" - print(f"change view: {key} to {new_view}") - RedisArchivist().set_message(key, {"status": new_view}) + view, setting = self.exec_val.split(":") + UserConfig(self.current_user).set_value(f"view_style_{view}", setting) return {"success": True} def _change_grid(self): @@ -52,48 +50,38 @@ class PostData: grid_items = int(self.exec_val) grid_items = max(grid_items, 3) grid_items = min(grid_items, 7) - - key = f"{self.current_user}:grid_items" - print(f"change grid items: {grid_items}") - RedisArchivist().set_message(key, {"status": grid_items}) + UserConfig(self.current_user).set_value("grid_items", grid_items) return {"success": True} def _sort_order(self): """change the sort between published to downloaded""" - sort_order = {"status": self.exec_val} if self.exec_val in ["asc", "desc"]: - RedisArchivist().set_message( - f"{self.current_user}:sort_order", sort_order + UserConfig(self.current_user).set_value( + "sort_order", self.exec_val ) else: - RedisArchivist().set_message( - f"{self.current_user}:sort_by", sort_order - ) + UserConfig(self.current_user).set_value("sort_by", self.exec_val) return {"success": True} def _hide_watched(self): """toggle if to show watched vids or not""" - key = f"{self.current_user}:hide_watched" - message = {"status": bool(int(self.exec_val))} - print(f"toggle {key}: {message}") - RedisArchivist().set_message(key, message) + UserConfig(self.current_user).set_value( + "hide_watched", bool(int(self.exec_val)) + ) return {"success": True} def _show_subed_only(self): """show or hide subscribed channels only on channels page""" - key = f"{self.current_user}:show_subed_only" - message = {"status": bool(int(self.exec_val))} - print(f"toggle {key}: {message}") - RedisArchivist().set_message(key, message) + UserConfig(self.current_user).set_value( + "show_subed_only", bool(int(self.exec_val)) + ) return {"success": True} def _show_ignored_only(self): """switch view on /downloads/ to show ignored only""" - show_value = self.exec_val - key = f"{self.current_user}:show_ignored_only" - value = {"status": show_value} - print(f"Filter download view ignored only: {show_value}") - RedisArchivist().set_message(key, value) + UserConfig(self.current_user).set_value( + "show_ignored_only", bool(int(self.exec_val)) + ) return {"success": True} def _db_restore(self): diff --git a/tubearchivist/home/src/frontend/searching.py b/tubearchivist/home/src/frontend/searching.py index 9977f36..5bcc01d 100644 --- a/tubearchivist/home/src/frontend/searching.py +++ b/tubearchivist/home/src/frontend/searching.py @@ -6,116 +6,19 @@ Functionality: - calculate pagination values """ -import urllib.parse -from datetime import datetime -from home.src.download.thumbnails import ThumbManager +from api.src.search_processor import SearchProcess from home.src.es.connect import ElasticWrap -from home.src.index.video_streams import DurationConverter -from home.src.ta.config import AppConfig - - -class SearchHandler: - """search elastic search""" - - def __init__(self, path, config, data=False): - self.max_hits = None - self.aggs = None - self.path = path - self.config = config - self.data = data - - def get_data(self): - """get the data""" - response, _ = ElasticWrap(self.path, config=self.config).get(self.data) - - if "hits" in response.keys(): - self.max_hits = response["hits"]["total"]["value"] - return_value = response["hits"]["hits"] - else: - # simulate list for single result to reuse rest of class - return_value = [response] - - if not return_value: - return False - - for idx, hit in enumerate(return_value): - return_value[idx] = self.hit_cleanup(hit) - - if response.get("aggregations"): - self.aggs = response["aggregations"] - if "total_duration" in self.aggs: - duration_sec = self.aggs["total_duration"]["value"] - self.aggs["total_duration"].update( - {"value_str": DurationConverter().get_str(duration_sec)} - ) - - return return_value - - @staticmethod - def hit_cleanup(hit): - """clean up and parse data from a single hit""" - hit["source"] = hit.pop("_source") - hit_keys = hit["source"].keys() - if "media_url" in hit_keys: - parsed_url = urllib.parse.quote(hit["source"]["media_url"]) - hit["source"]["media_url"] = parsed_url - - if "published" in hit_keys: - published = hit["source"]["published"] - date_pub = datetime.strptime(published, "%Y-%m-%d") - date_str = datetime.strftime(date_pub, "%d %b, %Y") - hit["source"]["published"] = date_str - - if "vid_last_refresh" in hit_keys: - vid_last_refresh = hit["source"]["vid_last_refresh"] - date_refresh = datetime.fromtimestamp(vid_last_refresh) - date_str = datetime.strftime(date_refresh, "%d %b, %Y") - hit["source"]["vid_last_refresh"] = date_str - - if "playlist_last_refresh" in hit_keys: - playlist_last_refresh = hit["source"]["playlist_last_refresh"] - date_refresh = datetime.fromtimestamp(playlist_last_refresh) - date_str = datetime.strftime(date_refresh, "%d %b, %Y") - hit["source"]["playlist_last_refresh"] = date_str - - if "vid_thumb_url" in hit_keys: - youtube_id = hit["source"]["youtube_id"] - thumb_path = ThumbManager(youtube_id).vid_thumb_path() - hit["source"]["vid_thumb_url"] = f"/cache/{thumb_path}" - - if "channel_last_refresh" in hit_keys: - refreshed = hit["source"]["channel_last_refresh"] - date_refresh = datetime.fromtimestamp(refreshed) - date_str = datetime.strftime(date_refresh, "%d %b, %Y") - hit["source"]["channel_last_refresh"] = date_str - - if "channel" in hit_keys: - channel_keys = hit["source"]["channel"].keys() - if "channel_last_refresh" in channel_keys: - refreshed = hit["source"]["channel"]["channel_last_refresh"] - date_refresh = datetime.fromtimestamp(refreshed) - date_str = datetime.strftime(date_refresh, "%d %b, %Y") - hit["source"]["channel"]["channel_last_refresh"] = date_str - - if "subtitle_fragment_id" in hit_keys: - youtube_id = hit["source"]["youtube_id"] - thumb_path = ThumbManager(youtube_id).vid_thumb_path() - hit["source"]["vid_thumb_url"] = f"/cache/{thumb_path}" - - return hit class SearchForm: """build query from search form data""" - CONFIG = AppConfig().config - def multi_search(self, search_query): """searching through index""" path, query, query_type = SearchParser(search_query).run() - look_up = SearchHandler(path, config=self.CONFIG, data=query) - search_results = look_up.get_data() + response, _ = ElasticWrap(path).get(data=query) + search_results = SearchProcess(response).process() all_results = self.build_results(search_results) return {"results": all_results, "queryType": query_type} @@ -465,7 +368,6 @@ class QueryBuilder: query = { "size": 30, - "_source": {"excludes": "subtitle_line"}, "query": {"bool": {"must": must_list}}, "highlight": { "fields": { diff --git a/tubearchivist/home/src/index/generic.py b/tubearchivist/home/src/index/generic.py index 6e82e54..a5f624d 100644 --- a/tubearchivist/home/src/index/generic.py +++ b/tubearchivist/home/src/index/generic.py @@ -8,7 +8,7 @@ import math from home.src.download.yt_dlp_base import YtWrap from home.src.es.connect import ElasticWrap from home.src.ta.config import AppConfig -from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.users import UserConfig class YouTubeItem: @@ -100,13 +100,7 @@ class Pagination: def get_page_size(self): """get default or user modified page_size""" - key = f"{self.request.user.id}:page_size" - page_size = RedisArchivist().get_message(key)["status"] - if not page_size: - config = AppConfig().config - page_size = config["archive"]["page_size"] - - return page_size + return UserConfig(self.request.user.id).get_value("page_size") def first_guess(self): """build first guess before api call""" diff --git a/tubearchivist/home/src/index/video.py b/tubearchivist/home/src/index/video.py index 87360ae..606e32f 100644 --- a/tubearchivist/home/src/index/video.py +++ b/tubearchivist/home/src/index/video.py @@ -16,12 +16,9 @@ from home.src.index import playlist as ta_playlist from home.src.index.generic import YouTubeItem from home.src.index.subtitle import YoutubeSubtitle from home.src.index.video_constants import VideoTypeEnum -from home.src.index.video_streams import ( - DurationConverter, - MediaStreamExtractor, -) -from home.src.ta.helper import randomizor -from home.src.ta.ta_redis import RedisArchivist +from home.src.index.video_streams import MediaStreamExtractor +from home.src.ta.helper import get_duration_sec, get_duration_str, randomizor +from home.src.ta.users import UserConfig from ryd_client import ryd_client @@ -35,17 +32,16 @@ class SponsorBlock: self.user_agent = f"{settings.TA_UPSTREAM} {settings.TA_VERSION}" self.last_refresh = int(datetime.now().timestamp()) - def get_sb_id(self): - """get sponsorblock userid or generate if needed""" + def get_sb_id(self) -> str: + """get sponsorblock for the userid or generate if needed""" if not self.user_id: - print("missing request user id") - raise ValueError + raise ValueError("missing request user id") - key = f"{self.user_id}:id_sponsorblock" - sb_id = RedisArchivist().get_message(key) - if not sb_id["status"]: - sb_id = {"status": randomizor(32)} - RedisArchivist().set_message(key, sb_id) + user = UserConfig(self.user_id) + sb_id = user.get_value("sponsorblock_id") + if not sb_id: + sb_id = randomizor(32) + user.set_value("sponsorblock_id", sb_id) return sb_id @@ -91,7 +87,7 @@ class SponsorBlock: def post_timestamps(self, youtube_id, start_time, end_time): """post timestamps to api""" - user_id = self.get_sb_id().get("status") + user_id = self.get_sb_id() data = { "videoID": youtube_id, "startTime": start_time, @@ -108,7 +104,7 @@ class SponsorBlock: def vote_on_segment(self, uuid, vote): """send vote on existing segment""" - user_id = self.get_sb_id().get("status") + user_id = self.get_sb_id() data = { "UUID": uuid, "userID": user_id, @@ -249,16 +245,14 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle): def add_player(self, media_path=False): """add player information for new videos""" vid_path = media_path or self.build_dl_cache_path() + duration = get_duration_sec(vid_path) - duration_handler = DurationConverter() - duration = duration_handler.get_sec(vid_path) - duration_str = duration_handler.get_str(duration) self.json_data.update( { "player": { "watched": False, "duration": duration, - "duration_str": duration_str, + "duration_str": get_duration_str(duration), } } ) diff --git a/tubearchivist/home/src/index/video_streams.py b/tubearchivist/home/src/index/video_streams.py index 7f6f2f2..0d8c182 100644 --- a/tubearchivist/home/src/index/video_streams.py +++ b/tubearchivist/home/src/index/video_streams.py @@ -5,60 +5,6 @@ import subprocess from os import stat -class DurationConverter: - """ - using ffmpeg to get and parse duration from filepath - """ - - @staticmethod - def get_sec(file_path): - """read duration from file""" - duration = subprocess.run( - [ - "ffprobe", - "-v", - "error", - "-show_entries", - "format=duration", - "-of", - "default=noprint_wrappers=1:nokey=1", - file_path, - ], - capture_output=True, - check=True, - ) - duration_raw = duration.stdout.decode().strip() - if duration_raw == "N/A": - return 0 - - duration_sec = int(float(duration_raw)) - return duration_sec - - @staticmethod - def get_str(seconds): - """takes duration in sec and returns clean string""" - if not seconds: - # failed to extract - return "NA" - - days = int(seconds // (24 * 3600)) - hours = int((seconds % (24 * 3600)) // 3600) - minutes = int((seconds % 3600) // 60) - seconds = int(seconds % 60) - - duration_str = str() - if days: - duration_str = f"{days}d " - if hours: - duration_str = duration_str + str(hours).zfill(2) + ":" - if minutes: - duration_str = duration_str + str(minutes).zfill(2) + ":" - else: - duration_str = duration_str + "00:" - duration_str = duration_str + str(seconds).zfill(2) - return duration_str - - class MediaStreamExtractor: """extract stream metadata""" diff --git a/tubearchivist/home/src/ta/auth.py b/tubearchivist/home/src/ta/auth.py new file mode 100644 index 0000000..0567db1 --- /dev/null +++ b/tubearchivist/home/src/ta/auth.py @@ -0,0 +1,10 @@ +from django.conf import settings +from django.contrib.auth.middleware import PersistentRemoteUserMiddleware + + +class HttpRemoteUserMiddleware(PersistentRemoteUserMiddleware): + """This class allows authentication via HTTP_REMOTE_USER which is set for + example by certain SSO applications. + """ + + header = settings.TA_AUTH_PROXY_USERNAME_HEADER diff --git a/tubearchivist/home/src/ta/config.py b/tubearchivist/home/src/ta/config.py index 84fe84a..a32d083 100644 --- a/tubearchivist/home/src/ta/config.py +++ b/tubearchivist/home/src/ta/config.py @@ -17,12 +17,10 @@ from home.src.ta.ta_redis import RedisArchivist class AppConfig: - """handle user settings and application variables""" + """handle application variables""" - def __init__(self, user_id=False): - self.user_id = user_id + def __init__(self): self.config = self.get_config() - self.colors = self.get_colors() def get_config(self): """get config from default file or redis if changed""" @@ -30,12 +28,6 @@ class AppConfig: if not config: config = self.get_config_file() - if self.user_id: - key = f"{self.user_id}:page_size" - page_size = RedisArchivist().get_message(key)["status"] - if page_size: - config["archive"]["page_size"] = page_size - config["application"].update(self.get_config_env()) return config @@ -50,14 +42,12 @@ class AppConfig: @staticmethod def get_config_env(): - """read environment application variables""" - es_pass = os.environ.get("ELASTIC_PASSWORD") - es_user = os.environ.get("ELASTIC_USER", default="elastic") + """read environment application variables. + + Connection to ES is managed in ElasticWrap and the + connection to Redis is managed in RedisArchivist.""" application = { - "REDIS_HOST": os.environ.get("REDIS_HOST"), - "es_url": os.environ.get("ES_URL"), - "es_auth": (es_user, es_pass), "HOST_UID": int(os.environ.get("HOST_UID", False)), "HOST_GID": int(os.environ.get("HOST_GID", False)), "enable_cast": bool(os.environ.get("ENABLE_CAST")), @@ -103,30 +93,6 @@ class AppConfig: RedisArchivist().set_message("config", self.config, save=True) return updated - @staticmethod - def set_user_config(form_post, user_id): - """set values in redis for user settings""" - for key, value in form_post.items(): - if not value: - continue - - message = {"status": value} - redis_key = f"{user_id}:{key}" - RedisArchivist().set_message(redis_key, message, save=True) - - def get_colors(self): - """overwrite config if user has set custom values""" - colors = False - if self.user_id: - col_dict = RedisArchivist().get_message(f"{self.user_id}:colors") - colors = col_dict["status"] - - if not colors: - colors = self.config["application"]["colors"] - - self.config["application"]["colors"] = colors - return colors - @staticmethod def _build_rand_daily(): """build random daily schedule per installation""" diff --git a/tubearchivist/home/src/ta/health.py b/tubearchivist/home/src/ta/health.py new file mode 100644 index 0000000..001a021 --- /dev/null +++ b/tubearchivist/home/src/ta/health.py @@ -0,0 +1,11 @@ +from django.http import HttpResponse + + +class HealthCheckMiddleware: + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + if request.path == "/health": + return HttpResponse("ok") + return self.get_response(request) diff --git a/tubearchivist/home/src/ta/helper.py b/tubearchivist/home/src/ta/helper.py index 0028c11..db6e4b6 100644 --- a/tubearchivist/home/src/ta/helper.py +++ b/tubearchivist/home/src/ta/helper.py @@ -7,6 +7,7 @@ import json import os import random import string +import subprocess from datetime import datetime from urllib.parse import urlparse @@ -141,6 +142,47 @@ def is_shorts(youtube_id: str) -> bool: return response.status_code == 200 +def get_duration_sec(file_path: str) -> int: + """get duration of media file from file path""" + + duration = subprocess.run( + [ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + file_path, + ], + capture_output=True, + check=True, + ) + duration_raw = duration.stdout.decode().strip() + if duration_raw == "N/A": + return 0 + + duration_sec = int(float(duration_raw)) + return duration_sec + + +def get_duration_str(seconds: int) -> str: + """Return a human-readable duration string from seconds.""" + if not seconds: + return "NA" + + units = [("y", 31536000), ("d", 86400), ("h", 3600), ("m", 60), ("s", 1)] + duration_parts = [] + + for unit_label, unit_seconds in units: + if seconds >= unit_seconds: + unit_count, seconds = divmod(seconds, unit_seconds) + duration_parts.append(f"{unit_count}{unit_label}") + + return " ".join(duration_parts) + + def ta_host_parser(ta_host: str) -> tuple[list[str], list[str]]: """parse ta_host env var for ALLOWED_HOSTS and CSRF_TRUSTED_ORIGINS""" allowed_hosts: list[str] = [ diff --git a/tubearchivist/home/src/ta/users.py b/tubearchivist/home/src/ta/users.py new file mode 100644 index 0000000..c337381 --- /dev/null +++ b/tubearchivist/home/src/ta/users.py @@ -0,0 +1,104 @@ +""" +Functionality: +- read and write user config backed by ES +- encapsulate persistence of user properties +""" + +from typing import TypedDict + +from home.src.es.connect import ElasticWrap + + +class UserConfigType(TypedDict, total=False): + """describes the user configuration""" + + colors: str + page_size: int + sort_by: str + sort_order: str + view_style_home: str + view_style_channel: str + view_style_downloads: str + view_style_playlist: str + grid_items: int + hide_watched: bool + show_ignored_only: bool + show_subed_only: bool + sponsorblock_id: str + + +class UserConfig: + """Handle settings for an individual user + + Create getters and setters for usage in the application. + Although tedious it helps prevents everything caring about how properties + are persisted. Plus it allows us to save anytime any value is set. + """ + + _DEFAULT_USER_SETTINGS = UserConfigType( + colors="dark", + page_size=12, + sort_by="published", + sort_order="desc", + view_style_home="grid", + view_style_channel="list", + view_style_downloads="list", + view_style_playlist="grid", + grid_items=3, + hide_watched=False, + show_ignored_only=False, + show_subed_only=False, + sponsorblock_id=None, + ) + + def __init__(self, user_id: str): + self._user_id: str = user_id + self._config: UserConfigType = self._get_config() + + def get_value(self, key: str): + """Get the given key from the users configuration + + Throws a KeyError if the requested Key is not a permitted value""" + if key not in self._DEFAULT_USER_SETTINGS: + raise KeyError(f"Unable to read config for unknown key '{key}'") + + return self._config.get(key) or self._DEFAULT_USER_SETTINGS.get(key) + + def set_value(self, key: str, value: str | bool | int): + """Set or replace a configuration value for the user + + Throws a KeyError if the requested Key is not a permitted value""" + if not self._user_id: + raise ValueError("Unable to persist config for null user_id") + + if key not in self._DEFAULT_USER_SETTINGS: + raise KeyError(f"Unable to persist config for unknown key '{key}'") + + old = self.get_value(key) + self._config[key] = value + + # Upsert this property (creating a record if not exists) + es_payload = {"doc": {"config": {key: value}}, "doc_as_upsert": True} + es_document_path = f"ta_config/_update/user_{self._user_id}" + response, status = ElasticWrap(es_document_path).post(es_payload) + if status < 200 or status > 299: + raise ValueError(f"Failed storing user value {status}: {response}") + + print(f"User {self._user_id} value '{key}' change: {old} > {value}") + + def _get_config(self) -> UserConfigType: + """get config from ES or load from the application defaults""" + if not self._user_id: + # this is for a non logged-in user so use all the defaults + return {} + + # Does this user have configuration stored in ES + es_document_path = f"ta_config/_doc/user_{self._user_id}" + response, status = ElasticWrap(es_document_path).get(print_error=False) + if status == 200 and "_source" in response.keys(): + source = response.get("_source") + if "config" in source.keys(): + return source.get("config") + + # There is no config in ES + return {} diff --git a/tubearchivist/home/templates/home/channel.html b/tubearchivist/home/templates/home/channel.html index 7a7debf..0091345 100644 --- a/tubearchivist/home/templates/home/channel.html +++ b/tubearchivist/home/templates/home/channel.html @@ -42,33 +42,33 @@ {% for channel in results %}
Subscribers: {{ channel.source.channel_subs|intword }}
+Subscribers: {{ channel.channel_subs|intword }}
{% else %} -Subscribers: {{ channel.source.channel_subs|intcomma }}
+Subscribers: {{ channel.channel_subs|intcomma }}
{% endif %}Last refreshed: {{ channel.source.channel_last_refresh }}
- {% if channel.source.channel_subscribed %} - +Last refreshed: {{ channel.channel_last_refresh }}
+ {% if channel.channel_subscribed %} + {% else %} - + {% endif %}{{ aggs.total_items.value }} videos | {{ aggs.total_duration.value_str }} playback | Total size {{ aggs.total_size.value|filesizeformat }}
- + {% endif %}Last refreshed: {{ playlist.source.playlist_last_refresh }}
- {% if playlist.source.playlist_subscribed %} - +Last refreshed: {{ playlist.playlist_last_refresh }}
+ {% if playlist.playlist_subscribed %} + {% else %} - + {% endif %}Published: {{ video.source.published }} | Duration: {{ video.source.duration }} | {{ video.source.youtube_id }}
- {% if video.source.message %} -{{ video.source.message }}
+Published: {{ video.published }} | Duration: {{ video.duration }} | {{ video.youtube_id }}
+ {% if video.message %} +{{ video.message }}
{% endif %}Last refreshed: {{ playlist.source.playlist_last_refresh }}
- {% if playlist.source.playlist_subscribed %} - +Last refreshed: {{ playlist.playlist_last_refresh }}
+ {% if playlist.playlist_subscribed %} + {% else %} - + {% endif %}Total Videos archived: {{ max_hits }}/{{ playlist_info.playlist_entries|length }}
-Watched:
+ {% endif %} {% if reindex %}Reindex scheduled
@@ -107,14 +110,14 @@ {% if results %} {% for video in results %}Loading...
Name | -Videos | -Duration | -Media Size | -
---|
Name | +Videos | +
---|
Name | +Duration | +
---|
Name | +Media Size | +
---|
Add files to the cache/import folder. Make sure to follow the instructions in the Github Wiki.
+Add files to the cache/import folder. Make sure to follow the instructions in the Github Wiki.
Danger Zone: This will delete the metadata of deleted videos from the filesystem.
-Rescan your media folder looking for missing videos and clean up index. More infos on the Github Wiki.
+Rescan your media folder looking for missing videos and clean up index. More infos on the Github Wiki.
Import YouTube cookie: {{ config.downloads.cookie_import }}
For automatic cookie import use Tube Archivist Companion browser extension.
- For manual cookie import, place your cookie file named cookies.google.txt in cache/import before enabling. Instructions in the Wiki.