diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8188766..06034d5 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -25,7 +25,7 @@ This is my setup I have landed on, YMMV: ```bash ./deploy.sh test ``` -- The command above will also copy the file `tubarchivist/testing.sh` into the working folder of the container. Running this script will install additional debugging tools I regularly use in testing. +- The command above will call the docker build command with `--build-arg INSTALL_DEBUG=1` to install additional useful debug tools. - The `test` argument takes another optional argument to build for a specific architecture valid options are: `amd64`, `arm64` and `multi`, default is `amd64`. - This `deploy.sh` file is not meant to be universally usable for every possible environment but could serve as an idea on how to automatically rebuild containers to test changes - customize to your liking. diff --git a/Dockerfile b/Dockerfile index b5145d1..1e803c2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,8 @@ # build the tube archivist image from default python slim image -FROM python:3.10.2-slim-bullseye +FROM python:3.10.3-slim-bullseye ARG TARGETPLATFORM +ARG INSTALL_DEBUG ENV PYTHONUNBUFFERED 1 @@ -26,6 +27,13 @@ RUN if [ "$TARGETPLATFORM" = "linux/amd64" ] ; then \ apt-get -y update && apt-get -y install --no-install-recommends ffmpeg && rm -rf /var/lib/apt/lists/* \ ; fi +# install debug tools for testing environment +RUN if [ "$INSTALL_DEBUG" ] ; then \ + apt-get -y update && apt-get -y install --no-install-recommends \ + vim htop bmon net-tools iputils-ping procps \ + && pip install --no-cache-dir ipython --src /usr/local/src \ + ; fi + # make folders RUN mkdir /cache RUN mkdir /youtube diff --git a/deploy.sh b/deploy.sh index 1af7865..26dd350 100755 --- a/deploy.sh +++ b/deploy.sh @@ -43,7 +43,10 @@ function sync_test { # pass argument to build for specific platform host="tubearchivist.local" + # make base folder + ssh "$host" "mkdir -p docker" + # copy project files to build image rsync -a --progress --delete-after \ --exclude ".git" \ --exclude ".gitignore" \ @@ -52,8 +55,8 @@ function sync_test { --exclude "db.sqlite3" \ . -e ssh "$host":tubearchivist - # uncomment or copy your own docker-compose file - # rsync -r --progress --delete docker-compose.yml -e ssh "$host":docker + # copy default docker-compose file if not exist + rsync --progress --ignore-existing docker-compose.yml -e ssh "$host":docker if [[ $1 = "amd64" ]]; then platform="linux/amd64" @@ -65,12 +68,9 @@ function sync_test { platform="linux/amd64" fi - ssh "$host" "docker buildx build --platform $platform -t bbilly1/tubearchivist:latest tubearchivist --load" + ssh "$host" "docker buildx build --build-arg INSTALL_DEBUG=1 --platform $platform -t bbilly1/tubearchivist:latest tubearchivist --load" ssh "$host" 'docker-compose -f docker/docker-compose.yml up -d' - ssh "$host" 'docker cp tubearchivist/tubearchivist/testing.sh tubearchivist:/app/testing.sh' - ssh "$host" 'docker exec tubearchivist chmod +x /app/testing.sh' - } diff --git a/tubearchivist/api/views.py b/tubearchivist/api/views.py index cfa1875..f22941c 100644 --- a/tubearchivist/api/views.py +++ b/tubearchivist/api/views.py @@ -1,7 +1,7 @@ """all API views""" -import requests from home.src.download.thumbnails import ThumbManager +from home.src.es.connect import ElasticWrap from home.src.ta.config import AppConfig from home.src.ta.helper import UrlListParser from home.src.ta.ta_redis import RedisArchivist @@ -24,31 +24,21 @@ class ApiBaseView(APIView): def __init__(self): super().__init__() - self.response = {"data": False} + self.response = {"data": False, "config": AppConfig().config} self.status_code = False self.context = False - self.default_conf = AppConfig().config - - def config_builder(self): - """build confic context""" - self.context = { - "es_url": self.default_conf["application"]["es_url"], - "es_auth": self.default_conf["application"]["es_auth"], - } - self.response["config"] = self.default_conf def get_document(self, document_id): """get single document from es""" - es_url = self.context["es_url"] - url = f"{es_url}{self.search_base}{document_id}" - print(url) - response = requests.get(url, auth=self.context["es_auth"]) + path = f"{self.search_base}{document_id}" + print(path) + response, status_code = ElasticWrap(path).get() try: - self.response["data"] = response.json()["_source"] + self.response["data"] = response["_source"] except KeyError: print(f"item not found: {document_id}") self.response["data"] = False - self.status_code = response.status_code + self.status_code = status_code def process_keys(self): """process keys for frontend""" @@ -59,7 +49,7 @@ class ApiBaseView(APIView): if "vid_thumb_url" in all_keys: youtube_id = self.response["data"]["youtube_id"] vid_thumb_url = ThumbManager().vid_thumb_path(youtube_id) - cache_dir = self.default_conf["application"]["cache_dir"] + cache_dir = self.response["config"]["application"]["cache_dir"] new_thumb = f"{cache_dir}/{vid_thumb_url}" self.response["data"]["vid_thumb_url"] = new_thumb if "subtitles" in all_keys: @@ -75,13 +65,11 @@ class ApiBaseView(APIView): def get_document_list(self, data): """get a list of results""" - es_url = self.context["es_url"] - url = f"{es_url}{self.search_base}" - print(url) - response = requests.get(url, json=data, auth=self.context["es_auth"]) - all_hits = response.json()["hits"]["hits"] + print(self.search_base) + response, status_code = ElasticWrap(self.search_base).get(data=data) + all_hits = response["hits"]["hits"] self.response["data"] = [i["_source"] for i in all_hits] - self.status_code = response.status_code + self.status_code = status_code class VideoApiView(ApiBaseView): @@ -89,12 +77,11 @@ class VideoApiView(ApiBaseView): GET: returns metadata dict of video """ - search_base = "/ta_video/_doc/" + search_base = "ta_video/_doc/" def get(self, request, video_id): # pylint: disable=unused-argument """get request""" - self.config_builder() self.get_document(video_id) self.process_keys() return Response(self.response, status=self.status_code) @@ -143,12 +130,11 @@ class ChannelApiView(ApiBaseView): GET: returns metadata dict of channel """ - search_base = "/ta_channel/_doc/" + search_base = "ta_channel/_doc/" def get(self, request, channel_id): # pylint: disable=unused-argument """get request""" - self.config_builder() self.get_document(channel_id) return Response(self.response, status=self.status_code) @@ -159,13 +145,12 @@ class ChannelApiListView(ApiBaseView): POST: edit a list of channels """ - search_base = "/ta_channel/_search/" + search_base = "ta_channel/_search/" def get(self, request): # pylint: disable=unused-argument """get request""" data = {"query": {"match_all": {}}} - self.config_builder() self.get_document_list(data) self.get_paginate() @@ -194,12 +179,11 @@ class PlaylistApiView(ApiBaseView): GET: returns metadata dict of playlist """ - search_base = "/ta_playlist/_doc/" + search_base = "ta_playlist/_doc/" def get(self, request, playlist_id): # pylint: disable=unused-argument """get request""" - self.config_builder() self.get_document(playlist_id) return Response(self.response, status=self.status_code) @@ -209,12 +193,11 @@ class DownloadApiView(ApiBaseView): GET: returns metadata dict of an item in the download queue """ - search_base = "/ta_download/_doc/" + search_base = "ta_download/_doc/" def get(self, request, video_id): # pylint: disable=unused-argument """get request""" - self.config_builder() self.get_document(video_id) return Response(self.response, status=self.status_code) @@ -225,13 +208,12 @@ class DownloadApiListView(ApiBaseView): POST: add a list of videos to download queue """ - search_base = "/ta_download/_search/" + search_base = "ta_download/_search/" def get(self, request): # pylint: disable=unused-argument """get request""" data = {"query": {"match_all": {}}} - self.config_builder() self.get_document_list(data) self.get_paginate() return Response(self.response) diff --git a/tubearchivist/home/apps.py b/tubearchivist/home/apps.py index 13e3986..46a940c 100644 --- a/tubearchivist/home/apps.py +++ b/tubearchivist/home/apps.py @@ -1,8 +1,10 @@ """handle custom startup functions""" import os +import sys from django.apps import AppConfig +from home.src.es.connect import ElasticWrap from home.src.es.index_setup import index_check from home.src.ta.config import AppConfig as ArchivistConfig from home.src.ta.ta_redis import RedisArchivist @@ -11,6 +13,9 @@ from home.src.ta.ta_redis import RedisArchivist class StartupCheck: """checks to run at application startup""" + MIN_MAJOR, MAX_MAJOR = 7, 7 + MIN_MINOR = 17 + def __init__(self): self.config_handler = ArchivistConfig() self.redis_con = RedisArchivist() @@ -19,6 +24,7 @@ class StartupCheck: def run(self): """run all startup checks""" print("run startup checks") + self.es_version_check() self.release_lock() index_check() self.sync_redis_state() @@ -72,6 +78,33 @@ class StartupCheck: if response: print("deleted leftover key from redis: " + lock) + def is_invalid(self, version): + """return true if es version is invalid, false if ok""" + major, minor = [int(i) for i in version.split(".")[:2]] + if not self.MIN_MAJOR <= major <= self.MAX_MAJOR: + return True + + if minor >= self.MIN_MINOR: + return False + + return True + + def es_version_check(self): + """check for minimal elasticsearch version""" + response, _ = ElasticWrap("/").get() + version = response["version"]["number"] + invalid = self.is_invalid(version) + + if invalid: + print( + "minial required elasticsearch version: " + + f"{self.MIN_MAJOR}.{self.MIN_MINOR}, " + + "please update to recommended version." + ) + sys.exit(1) + + print("elasticsearch version check passed") + class HomeConfig(AppConfig): """call startup funcs""" diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index e540426..d764011 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -11,11 +11,10 @@ import shutil from datetime import datetime from time import sleep -import requests import yt_dlp from home.src.download.queue import PendingList from home.src.download.subscriptions import PlaylistSubscription -from home.src.es.connect import IndexPaginate +from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.index.channel import YoutubeChannel from home.src.index.playlist import YoutubePlaylist from home.src.index.video import YoutubeVideo, index_new_video @@ -162,7 +161,7 @@ class VideoDownloader: pending.get_channels() self.video_overwrites = pending.video_overwrites - queue = RedisQueue("dl_queue") + queue = RedisQueue() limit_queue = self.config["downloads"]["limit_count"] if limit_queue: @@ -212,8 +211,7 @@ class VideoDownloader: RedisArchivist().set_message("message:download", mess_dict) return - queue = RedisQueue("dl_queue") - queue.add_list(to_add) + RedisQueue().add_list(to_add) @staticmethod def _progress_hook(response): @@ -371,14 +369,11 @@ class VideoDownloader: if host_uid and host_gid: os.chown(new_file_path, host_uid, host_gid) - def _delete_from_pending(self, youtube_id): + @staticmethod + def _delete_from_pending(youtube_id): """delete downloaded video from pending index if its there""" - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - url = f"{es_url}/ta_download/_doc/{youtube_id}" - response = requests.delete(url, auth=es_auth) - if not response.ok and not response.status_code == 404: - print(response.text) + path = f"ta_download/_doc/{youtube_id}" + _, _ = ElasticWrap(path).delete() def _add_subscribed_channels(self): """add all channels subscribed to refresh""" diff --git a/tubearchivist/home/src/es/connect.py b/tubearchivist/home/src/es/connect.py index 79fc0bd..f976943 100644 --- a/tubearchivist/home/src/es/connect.py +++ b/tubearchivist/home/src/es/connect.py @@ -93,11 +93,12 @@ class IndexPaginate: DEFAULT_SIZE = 500 - def __init__(self, index_name, data, size=False): + def __init__(self, index_name, data, size=False, keep_source=False): self.index_name = index_name self.data = data self.pit_id = False self.size = size + self.keep_source = keep_source def get_results(self): """get all results""" @@ -132,7 +133,10 @@ class IndexPaginate: all_hits = response["hits"]["hits"] if all_hits: for hit in all_hits: - source = hit["_source"] + if self.keep_source: + source = hit + else: + source = hit["_source"] search_after = hit["sort"] all_results.append(source) # update search_after with last hit data diff --git a/tubearchivist/home/src/es/index_setup.py b/tubearchivist/home/src/es/index_setup.py index 8b0ed97..4f20cd3 100644 --- a/tubearchivist/home/src/es/index_setup.py +++ b/tubearchivist/home/src/es/index_setup.py @@ -10,7 +10,7 @@ import os import zipfile from datetime import datetime -import requests +from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.ta.config import AppConfig from home.src.ta.helper import ignore_filelist @@ -20,11 +20,6 @@ class ElasticIndex: handle mapping and settings on elastic search for a given index """ - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - HEADERS = {"Content-type": "application/json"} - def __init__(self, index_name, expected_map, expected_set): self.index_name = index_name self.expected_map = expected_map @@ -33,15 +28,9 @@ class ElasticIndex: def index_exists(self): """check if index already exists and return mapping if it does""" - index_name = self.index_name - url = f"{self.ES_URL}/ta_{index_name}" - response = requests.get(url, auth=self.ES_AUTH) - exists = response.ok - - if exists: - details = response.json()[f"ta_{index_name}"] - else: - details = False + response, status_code = ElasticWrap(f"ta_{self.index_name}").get() + exists = status_code == 200 + details = response.get(f"ta_{self.index_name}", False) return exists, details @@ -110,63 +99,41 @@ class ElasticIndex: def rebuild_index(self): """rebuild with new mapping""" - # backup self.reindex("backup") - # delete original self.delete_index(backup=False) - # create new self.create_blank() self.reindex("restore") - # delete backup self.delete_index() def reindex(self, method): """create on elastic search""" - index_name = self.index_name if method == "backup": - source = f"ta_{index_name}" - destination = f"ta_{index_name}_backup" + source = f"ta_{self.index_name}" + destination = f"ta_{self.index_name}_backup" elif method == "restore": - source = f"ta_{index_name}_backup" - destination = f"ta_{index_name}" + source = f"ta_{self.index_name}_backup" + destination = f"ta_{self.index_name}" - query = {"source": {"index": source}, "dest": {"index": destination}} - data = json.dumps(query) - url = self.ES_URL + "/_reindex?refresh=true" - response = requests.post( - url=url, data=data, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) + data = {"source": {"index": source}, "dest": {"index": destination}} + _, _ = ElasticWrap("_reindex?refresh=true").post(data=data) def delete_index(self, backup=True): """delete index passed as argument""" + path = f"ta_{self.index_name}" if backup: - url = f"{self.ES_URL}/ta_{self.index_name}_backup" - else: - url = f"{self.ES_URL}/ta_{self.index_name}" - response = requests.delete(url, auth=self.ES_AUTH) - if not response.ok: - print(response.text) + path = path + "_backup" + + _, _ = ElasticWrap(path).delete() def create_blank(self): """apply new mapping and settings for blank new index""" - expected_map = self.expected_map - expected_set = self.expected_set - # stich payload - payload = {} - if expected_set: - payload.update({"settings": expected_set}) - if expected_map: - payload.update({"mappings": {"properties": expected_map}}) - # create - url = f"{self.ES_URL}/ta_{self.index_name}" - data = json.dumps(payload) - response = requests.put( - url=url, data=data, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not response.ok: - print(response.text) + data = {} + if self.expected_set: + data.update({"settings": self.expected_set}) + if self.expected_map: + data.update({"mappings": {"properties": self.expected_map}}) + + _, _ = ElasticWrap(f"ta_{self.index_name}").put(data) class ElasticBackup: @@ -174,52 +141,21 @@ class ElasticBackup: def __init__(self, index_config, reason): self.config = AppConfig().config + self.cache_dir = self.config["application"]["cache_dir"] self.index_config = index_config self.reason = reason self.timestamp = datetime.now().strftime("%Y%m%d") self.backup_files = [] - def get_all_documents(self, index_name): + @staticmethod + def get_all_documents(index_name): """export all documents of a single index""" - headers = {"Content-type": "application/json"} - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - # get PIT ID - url = f"{es_url}/ta_{index_name}/_pit?keep_alive=1m" - response = requests.post(url, auth=es_auth) - json_data = json.loads(response.text) - pit_id = json_data["id"] - # build query data = { "query": {"match_all": {}}, - "size": 100, - "pit": {"id": pit_id, "keep_alive": "1m"}, - "sort": [{"_id": {"order": "asc"}}], + "sort": [{"_doc": {"order": "desc"}}], } - query_str = json.dumps(data) - url = es_url + "/_search" - # loop until nothing left - all_results = [] - while True: - response = requests.get( - url, data=query_str, headers=headers, auth=es_auth - ) - json_data = json.loads(response.text) - all_hits = json_data["hits"]["hits"] - if all_hits: - for hit in all_hits: - search_after = hit["sort"] - all_results.append(hit) - # update search_after with last hit data - data["search_after"] = search_after - query_str = json.dumps(data) - else: - break - # clean up PIT - query_str = json.dumps({"id": pit_id}) - requests.delete( - es_url + "/_pit", data=query_str, headers=headers, auth=es_auth - ) + paginate = IndexPaginate(f"ta_{index_name}", data, keep_source=True) + all_results = paginate.get_results() return all_results @@ -244,9 +180,8 @@ class ElasticBackup: def write_es_json(self, file_content, index_name): """write nd-json file for es _bulk API to disk""" - cache_dir = self.config["application"]["cache_dir"] file_name = f"es_{index_name}-{self.timestamp}.json" - file_path = os.path.join(cache_dir, "backup", file_name) + file_path = os.path.join(self.cache_dir, "backup", file_name) with open(file_path, "w", encoding="utf-8") as f: f.write(file_content) @@ -254,9 +189,8 @@ class ElasticBackup: def write_ta_json(self, all_results, index_name): """write generic json file to disk""" - cache_dir = self.config["application"]["cache_dir"] file_name = f"ta_{index_name}-{self.timestamp}.json" - file_path = os.path.join(cache_dir, "backup", file_name) + file_path = os.path.join(self.cache_dir, "backup", file_name) to_write = [i["_source"] for i in all_results] file_content = json.dumps(to_write) with open(file_path, "w", encoding="utf-8") as f: @@ -266,9 +200,8 @@ class ElasticBackup: def zip_it(self): """pack it up into single zip file""" - cache_dir = self.config["application"]["cache_dir"] file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" - backup_folder = os.path.join(cache_dir, "backup") + backup_folder = os.path.join(self.cache_dir, "backup") backup_file = os.path.join(backup_folder, file_name) with zipfile.ZipFile( @@ -283,29 +216,18 @@ class ElasticBackup: def post_bulk_restore(self, file_name): """send bulk to es""" - cache_dir = self.config["application"]["cache_dir"] - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - headers = {"Content-type": "application/x-ndjson"} - file_path = os.path.join(cache_dir, file_name) - + file_path = os.path.join(self.cache_dir, file_name) with open(file_path, "r", encoding="utf-8") as f: - query_str = f.read() + data = f.read() - if not query_str.strip(): + if not data.strip(): return - url = es_url + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=es_auth - ) - if not request.ok: - print(request.text) + _, _ = ElasticWrap("_bulk").post(data=data, ndjson=True) def get_all_backup_files(self): """build all available backup files for view""" - cache_dir = self.config["application"]["cache_dir"] - backup_dir = os.path.join(cache_dir, "backup") + backup_dir = os.path.join(self.cache_dir, "backup") backup_files = os.listdir(backup_dir) all_backup_files = ignore_filelist(backup_files) all_available_backups = [ @@ -336,8 +258,7 @@ class ElasticBackup: def unpack_zip_backup(self, filename): """extract backup zip and return filelist""" - cache_dir = self.config["application"]["cache_dir"] - backup_dir = os.path.join(cache_dir, "backup") + backup_dir = os.path.join(self.cache_dir, "backup") file_path = os.path.join(backup_dir, filename) with zipfile.ZipFile(file_path, "r") as z: @@ -348,9 +269,7 @@ class ElasticBackup: def restore_json_files(self, zip_content): """go through the unpacked files and restore""" - - cache_dir = self.config["application"]["cache_dir"] - backup_dir = os.path.join(cache_dir, "backup") + backup_dir = os.path.join(self.cache_dir, "backup") for json_f in zip_content: @@ -364,14 +283,13 @@ class ElasticBackup: self.post_bulk_restore(file_name) os.remove(file_name) - def index_exists(self, index_name): + @staticmethod + def index_exists(index_name): """check if index already exists to skip""" - es_url = self.config["application"]["es_url"] - es_auth = self.config["application"]["es_auth"] - url = f"{es_url}/ta_{index_name}" - response = requests.get(url, auth=es_auth) + _, status_code = ElasticWrap(f"ta_{index_name}").get() + exists = status_code == 200 - return response.ok + return exists def rotate_backup(self): """delete old backups if needed""" @@ -386,8 +304,7 @@ class ElasticBackup: print("no backup files to rotate") return - cache_dir = self.config["application"]["cache_dir"] - backup_dir = os.path.join(cache_dir, "backup") + backup_dir = os.path.join(self.cache_dir, "backup") all_to_delete = auto[rotate:] for to_delete in all_to_delete: diff --git a/tubearchivist/home/src/frontend/api_calls.py b/tubearchivist/home/src/frontend/api_calls.py index f7f50e7..26b1730 100644 --- a/tubearchivist/home/src/frontend/api_calls.py +++ b/tubearchivist/home/src/frontend/api_calls.py @@ -114,7 +114,7 @@ class PostData: print(f"ignore video {video_id}") PendingInteract(video_id=video_id, status="ignore").update_status() # also clear from redis queue - RedisQueue("dl_queue").clear_item(video_id) + RedisQueue().clear_item(video_id) return {"success": True} @staticmethod @@ -132,7 +132,7 @@ class PostData: to_execute = self.exec_val if to_execute == "stop": print("stopping download queue") - RedisQueue("dl_queue").clear() + RedisQueue().clear() elif to_execute == "kill": task_id = RedisArchivist().get_message("dl_queue_id") if not isinstance(task_id, str): diff --git a/tubearchivist/home/src/frontend/watched.py b/tubearchivist/home/src/frontend/watched.py index 36ed072..85aa1ab 100644 --- a/tubearchivist/home/src/frontend/watched.py +++ b/tubearchivist/home/src/frontend/watched.py @@ -3,22 +3,15 @@ functionality: - handle watched state for videos, channels and playlists """ -import json from datetime import datetime -import requests -from home.src.ta.config import AppConfig +from home.src.es.connect import ElasticWrap from home.src.ta.helper import UrlListParser class WatchState: """handle watched checkbox for videos and channels""" - CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] - HEADERS = {"Content-type": "application/json"} - def __init__(self, youtube_id): self.youtube_id = youtube_id self.stamp = int(datetime.now().strftime("%s")) @@ -33,7 +26,7 @@ class WatchState: elif url_type == "playlist": self.mark_playlist_watched() - print(f"marked {self.youtube_id} as watched") + print(f"{self.youtube_id}: marked as watched") def mark_as_unwatched(self): """revert watched state to false""" @@ -41,7 +34,7 @@ class WatchState: if url_type == "video": self.mark_vid_watched(revert=True) - print(f"revert {self.youtube_id} as unwatched") + print(f"{self.youtube_id}: revert as unwatched") def dedect_type(self): """find youtube id type""" @@ -52,77 +45,54 @@ class WatchState: def mark_vid_watched(self, revert=False): """change watched status of single video""" - url = self.ES_URL + "/ta_video/_update/" + self.youtube_id + path = f"ta_video/_update/{self.youtube_id}" data = { "doc": {"player": {"watched": True, "watched_date": self.stamp}} } if revert: data["doc"]["player"]["watched"] = False - payload = json.dumps(data) - request = requests.post( - url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) + response, status_code = ElasticWrap(path).post(data=data) + if status_code != 200: + print(response) raise ValueError("failed to mark video as watched") def mark_channel_watched(self): """change watched status of every video in channel""" + path = "ta_video/_update_by_query" + must_list = [ + {"term": {"channel.channel_id": {"value": self.youtube_id}}}, + {"term": {"player.watched": {"value": False}}}, + ] data = { - "query": { - "bool": { - "must": [ - { - "term": { - "channel.channel_id": { - "value": self.youtube_id - } - } - }, - {"term": {"player.watched": {"value": False}}}, - ] - } - }, + "query": {"bool": {"must": must_list}}, "script": { "source": "ctx._source.player['watched'] = true", "lang": "painless", }, } - payload = json.dumps(data) - url = f"{self.ES_URL}/ta_video/_update_by_query" - request = requests.post( - url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) + + response, status_code = ElasticWrap(path).post(data=data) + if status_code != 200: + print(response) raise ValueError("failed mark channel as watched") def mark_playlist_watched(self): """change watched state of all videos in playlist""" + path = "ta_video/_update_by_query" + must_list = [ + {"term": {"playlist.keyword": {"value": self.youtube_id}}}, + {"term": {"player.watched": {"value": False}}}, + ] data = { - "query": { - "bool": { - "must": [ - { - "term": { - "playlist.keyword": {"value": self.youtube_id} - } - }, - {"term": {"player.watched": {"value": False}}}, - ] - } - }, + "query": {"bool": {"must": must_list}}, "script": { "source": "ctx._source.player['watched'] = true", "lang": "painless", }, } - payload = json.dumps(data) - url = f"{self.ES_URL}/ta_video/_update_by_query" - request = requests.post( - url, data=payload, headers=self.HEADERS, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) + + response, status_code = ElasticWrap(path).post(data=data) + if status_code != 200: + print(response) raise ValueError("failed mark playlist as watched") diff --git a/tubearchivist/home/src/index/filesystem.py b/tubearchivist/home/src/index/filesystem.py index 0dc8302..c6f1caa 100644 --- a/tubearchivist/home/src/index/filesystem.py +++ b/tubearchivist/home/src/index/filesystem.py @@ -10,11 +10,10 @@ import os import re import shutil import subprocess -from datetime import datetime -import requests from home.src.download.queue import PendingList from home.src.download.yt_dlp_handler import VideoDownloader +from home.src.es.connect import ElasticWrap from home.src.index.reindex import Reindex from home.src.index.video import index_new_video from home.src.ta.config import AppConfig @@ -26,8 +25,6 @@ class FilesystemScanner: """handle scanning and fixing from filesystem""" CONFIG = AppConfig().config - ES_URL = CONFIG["application"]["es_url"] - ES_AUTH = CONFIG["application"]["es_auth"] VIDEOS = CONFIG["application"]["videos"] def __init__(self): @@ -148,25 +145,16 @@ class FilesystemScanner: bulk_list.append(json.dumps(source)) # add last newline bulk_list.append("\n") - query_str = "\n".join(bulk_list) - # make the call - headers = {"Content-type": "application/x-ndjson"} - url = self.ES_URL + "/_bulk" - request = requests.post( - url, data=query_str, headers=headers, auth=self.ES_AUTH - ) - if not request.ok: - print(request.text) + data = "\n".join(bulk_list) + _, _ = ElasticWrap("_bulk").post(data=data, ndjson=True) def delete_from_index(self): """find indexed but deleted mediafile""" for indexed in self.to_delete: youtube_id = indexed[0] print(f"deleting {youtube_id} from index") - url = self.ES_URL + "/ta_video/_doc/" + youtube_id - request = requests.delete(url, auth=self.ES_AUTH) - if not request.ok: - print(request.text) + path = f"ta_video/_doc/{youtube_id}" + _, _ = ElasticWrap(path).delete() class ManualImport: @@ -319,10 +307,7 @@ def scan_filesystem(): def reindex_old_documents(): """daily refresh of old documents""" - # continue if needed - reindex_handler = Reindex() - reindex_handler.check_outdated() - reindex_handler.reindex() - # set timestamp - now = int(datetime.now().strftime("%s")) - RedisArchivist().set_message("last_reindex", now, expire=False) + handler = Reindex() + handler.check_outdated() + handler.reindex() + RedisArchivist().set_message("last_reindex", handler.now, expire=False) diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index cf1435f..1494e5a 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -4,85 +4,60 @@ functionality: - index and update in es """ -import json from datetime import datetime from math import ceil from time import sleep -import requests from home.src.download.queue import PendingList from home.src.download.thumbnails import ThumbManager +from home.src.es.connect import ElasticWrap from home.src.index.channel import YoutubeChannel from home.src.index.playlist import YoutubePlaylist from home.src.index.video import YoutubeVideo from home.src.ta.config import AppConfig -from home.src.ta.helper import get_total_hits class Reindex: """check for outdated documents and refresh data from youtube""" + MATCH_FIELD = { + "ta_video": "active", + "ta_channel": "channel_active", + "ta_playlist": "playlist_active", + } + MULTIPLY = 1.2 + def __init__(self): # config - config = AppConfig().config - self.sleep_interval = config["downloads"]["sleep_interval"] - self.es_url = config["application"]["es_url"] - self.es_auth = config["application"]["es_auth"] - self.refresh_interval = config["scheduler"]["check_reindex_days"] - self.integrate_ryd = config["downloads"]["integrate_ryd"] + self.now = int(datetime.now().strftime("%s")) + self.config = AppConfig().config + self.interval = self.config["scheduler"]["check_reindex_days"] # scan self.all_youtube_ids = False self.all_channel_ids = False self.all_playlist_ids = False - def get_daily(self): + def _get_daily(self): """get daily refresh values""" - total_videos = get_total_hits( - "ta_video", self.es_url, self.es_auth, "active" - ) - video_daily = ceil(total_videos / self.refresh_interval * 1.2) - total_channels = get_total_hits( - "ta_channel", self.es_url, self.es_auth, "channel_active" - ) - channel_daily = ceil(total_channels / self.refresh_interval * 1.2) - total_playlists = get_total_hits( - "ta_playlist", self.es_url, self.es_auth, "playlist_active" - ) - playlist_daily = ceil(total_playlists / self.refresh_interval * 1.2) + total_videos = self._get_total_hits("ta_video") + video_daily = ceil(total_videos / self.interval * self.MULTIPLY) + total_channels = self._get_total_hits("ta_channel") + channel_daily = ceil(total_channels / self.interval * self.MULTIPLY) + total_playlists = self._get_total_hits("ta_playlist") + playlist_daily = ceil(total_playlists / self.interval * self.MULTIPLY) return (video_daily, channel_daily, playlist_daily) - def get_outdated_vids(self, size): - """get daily videos to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 - data = { - "size": size, - "query": { - "bool": { - "must": [ - {"match": {"active": True}}, - {"range": {"vid_last_refresh": {"lte": now_lte}}}, - ] - } - }, - "sort": [{"vid_last_refresh": {"order": "asc"}}], - "_source": False, - } - query_str = json.dumps(data) - url = self.es_url + "/ta_video/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_youtube_ids = [i["_id"] for i in response_dict["hits"]["hits"]] - return all_youtube_ids + def _get_total_hits(self, index): + """get total hits from index""" + match_field = self.MATCH_FIELD[index] + path = f"{index}/_search?filter_path=hits.total" + data = {"query": {"match": {match_field: True}}} + response, _ = ElasticWrap(path).post(data=data) + total_hits = response["hits"]["total"]["value"] + return total_hits - def get_unrated_vids(self): - """get all videos without rating if ryd integration is enabled""" - headers = {"Content-type": "application/json"} + def _get_unrated_vids(self): + """get max 200 videos without rating if ryd integration is enabled""" data = { "size": 200, "query": { @@ -91,86 +66,78 @@ class Reindex: } }, } - query_str = json.dumps(data) - url = self.es_url + "/ta_video/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - missing_rating = [i["_id"] for i in response_dict["hits"]["hits"]] + response, _ = ElasticWrap("ta_video/_search").get(data=data) + + missing_rating = [i["_id"] for i in response["hits"]["hits"]] self.all_youtube_ids = self.all_youtube_ids + missing_rating - def get_outdated_channels(self, size): - """get daily channels to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 + def _get_outdated_vids(self, size): + """get daily videos to refresh""" + now_lte = self.now - self.interval * 24 * 60 * 60 + must_list = [ + {"match": {"active": True}}, + {"range": {"vid_last_refresh": {"lte": now_lte}}}, + ] data = { "size": size, - "query": { - "bool": { - "must": [ - {"match": {"channel_active": True}}, - {"range": {"channel_last_refresh": {"lte": now_lte}}}, - ] - } - }, + "query": {"bool": {"must": must_list}}, + "sort": [{"vid_last_refresh": {"order": "asc"}}], + "_source": False, + } + response, _ = ElasticWrap("ta_video/_search").get(data=data) + + all_youtube_ids = [i["_id"] for i in response["hits"]["hits"]] + return all_youtube_ids + + def _get_outdated_channels(self, size): + """get daily channels to refresh""" + now_lte = self.now - self.interval * 24 * 60 * 60 + must_list = [ + {"match": {"channel_active": True}}, + {"range": {"channel_last_refresh": {"lte": now_lte}}}, + ] + data = { + "size": size, + "query": {"bool": {"must": must_list}}, "sort": [{"channel_last_refresh": {"order": "asc"}}], "_source": False, } - query_str = json.dumps(data) - url = self.es_url + "/ta_channel/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_channel_ids = [i["_id"] for i in response_dict["hits"]["hits"]] + response, _ = ElasticWrap("ta_channel/_search").get(data=data) + + all_channel_ids = [i["_id"] for i in response["hits"]["hits"]] return all_channel_ids - def get_outdated_playlists(self, size): + def _get_outdated_playlists(self, size): """get daily outdated playlists to refresh""" - headers = {"Content-type": "application/json"} - now = int(datetime.now().strftime("%s")) - now_lte = now - self.refresh_interval * 24 * 60 * 60 + now_lte = self.now - self.interval * 24 * 60 * 60 + must_list = [ + {"match": {"playlist_active": True}}, + {"range": {"playlist_last_refresh": {"lte": now_lte}}}, + ] data = { "size": size, - "query": { - "bool": { - "must": [ - {"match": {"playlist_active": True}}, - {"range": {"playlist_last_refresh": {"lte": now_lte}}}, - ] - } - }, + "query": {"bool": {"must": must_list}}, "sort": [{"playlist_last_refresh": {"order": "asc"}}], "_source": False, } - query_str = json.dumps(data) - url = self.es_url + "/ta_playlist/_search" - response = requests.get( - url, data=query_str, headers=headers, auth=self.es_auth - ) - if not response.ok: - print(response.text) - response_dict = json.loads(response.text) - all_playlist_ids = [i["_id"] for i in response_dict["hits"]["hits"]] + response, _ = ElasticWrap("ta_playlist/_search").get(data=data) + + all_playlist_ids = [i["_id"] for i in response["hits"]["hits"]] return all_playlist_ids def check_outdated(self): """add missing vids and channels""" - video_daily, channel_daily, playlist_daily = self.get_daily() - self.all_youtube_ids = self.get_outdated_vids(video_daily) - self.all_channel_ids = self.get_outdated_channels(channel_daily) - self.all_playlist_ids = self.get_outdated_playlists(playlist_daily) - if self.integrate_ryd: - self.get_unrated_vids() + video_daily, channel_daily, playlist_daily = self._get_daily() + self.all_youtube_ids = self._get_outdated_vids(video_daily) + self.all_channel_ids = self._get_outdated_channels(channel_daily) + self.all_playlist_ids = self._get_outdated_playlists(playlist_daily) + + integrate_ryd = self.config["downloads"]["integrate_ryd"] + if integrate_ryd: + self._get_unrated_vids() @staticmethod - def reindex_single_video(youtube_id): + def _reindex_single_video(youtube_id): """refresh data for single video""" video = YoutubeVideo(youtube_id) @@ -188,6 +155,8 @@ class Reindex: return video.delete_subtitles() + video.check_subtitles() + # add back video.json_data["player"] = player video.json_data["date_downloaded"] = date_downloaded @@ -204,20 +173,21 @@ class Reindex: return @staticmethod - def reindex_single_channel(channel_id): + def _reindex_single_channel(channel_id): """refresh channel data and sync to videos""" channel = YoutubeChannel(channel_id) channel.get_from_es() subscribed = channel.json_data["channel_subscribed"] - overwrites = channel.json_data["channel_overwrites"] + overwrites = channel.json_data.get("channel_overwrites", False) channel.get_from_youtube() channel.json_data["channel_subscribed"] = subscribed - channel.json_data["channel_overwrites"] = overwrites + if overwrites: + channel.json_data["channel_overwrites"] = overwrites channel.upload_to_es() channel.sync_to_videos() @staticmethod - def reindex_single_playlist(playlist_id, all_indexed_ids): + def _reindex_single_playlist(playlist_id, all_indexed_ids): """refresh playlist data""" playlist = YoutubePlaylist(playlist_id) playlist.get_from_es() @@ -234,18 +204,19 @@ class Reindex: def reindex(self): """reindex what's needed""" + sleep_interval = self.config["downloads"]["sleep_interval"] # videos print(f"reindexing {len(self.all_youtube_ids)} videos") for youtube_id in self.all_youtube_ids: - self.reindex_single_video(youtube_id) - if self.sleep_interval: - sleep(self.sleep_interval) + self._reindex_single_video(youtube_id) + if sleep_interval: + sleep(sleep_interval) # channels print(f"reindexing {len(self.all_channel_ids)} channels") for channel_id in self.all_channel_ids: - self.reindex_single_channel(channel_id) - if self.sleep_interval: - sleep(self.sleep_interval) + self._reindex_single_channel(channel_id) + if sleep_interval: + sleep(sleep_interval) # playlist print(f"reindexing {len(self.all_playlist_ids)} playlists") if self.all_playlist_ids: @@ -253,6 +224,6 @@ class Reindex: handler.get_indexed() all_indexed_ids = [i["youtube_id"] for i in handler.all_videos] for playlist_id in self.all_playlist_ids: - self.reindex_single_playlist(playlist_id, all_indexed_ids) - if self.sleep_interval: - sleep(self.sleep_interval) + self._reindex_single_playlist(playlist_id, all_indexed_ids) + if sleep_interval: + sleep(sleep_interval) diff --git a/tubearchivist/home/src/index/video.py b/tubearchivist/home/src/index/video.py index 9968cc7..d59650f 100644 --- a/tubearchivist/home/src/index/video.py +++ b/tubearchivist/home/src/index/video.py @@ -296,7 +296,6 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle): self._add_stats() self.add_file_path() self.add_player() - self._check_subtitles() if self.config["downloads"]["integrate_ryd"]: self._get_ryd_stats() @@ -369,7 +368,7 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle): channel_dir = os.path.join(self.app_conf["videos"], channel) all_files = os.listdir(channel_dir) for file in all_files: - if self.youtube_id in file: + if self.youtube_id in file and file.endswith(".mp4"): vid_path = os.path.join(channel_dir, file) break else: @@ -441,7 +440,7 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle): return True - def _check_subtitles(self): + def check_subtitles(self): """optionally add subtitles""" handler = YoutubeSubtitle(self) subtitles = handler.get_subtitles() @@ -451,8 +450,9 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle): def delete_subtitles(self): """delete indexed subtitles""" + path = "ta_subtitle/_delete_by_query?refresh=true" data = {"query": {"term": {"youtube_id": {"value": self.youtube_id}}}} - _, _ = ElasticWrap("ta_subtitle/_delete_by_query").post(data=data) + _, _ = ElasticWrap(path).post(data=data) def index_new_video(youtube_id): @@ -462,5 +462,6 @@ def index_new_video(youtube_id): if not video.json_data: raise ValueError("failed to get metadata for " + youtube_id) + video.check_subtitles() video.upload_to_es() return video.json_data diff --git a/tubearchivist/home/src/ta/helper.py b/tubearchivist/home/src/ta/helper.py index d577dcd..04cea5e 100644 --- a/tubearchivist/home/src/ta/helper.py +++ b/tubearchivist/home/src/ta/helper.py @@ -3,31 +3,15 @@ Loose collection of helper functions - don't import AppConfig class here to avoid circular imports """ -import json import re import string import subprocess import unicodedata from urllib.parse import parse_qs, urlparse -import requests import yt_dlp -def get_total_hits(index, es_url, es_auth, match_field): - """get total hits from index""" - headers = {"Content-type": "application/json"} - data = {"query": {"match": {match_field: True}}} - payload = json.dumps(data) - url = f"{es_url}/{index}/_search?filter_path=hits.total" - request = requests.post(url, data=payload, headers=headers, auth=es_auth) - if not request.ok: - print(request.text) - total_json = json.loads(request.text) - total_hits = total_json["hits"]["total"]["value"] - return total_hits - - def clean_string(file_name): """clean string to only asci characters""" whitelist = "-_.() " + string.ascii_letters + string.digits diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index 6a9efea..0e9c23f 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -11,12 +11,20 @@ import redis from home.src.ta.helper import ignore_filelist -class RedisArchivist: - """collection of methods to interact with redis""" +class RedisBase: + """connection base for redis""" REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 NAME_SPACE = "ta:" + + def __init__(self): + self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) + + +class RedisArchivist(RedisBase): + """collection of methods to interact with redis""" + CHANNELS = [ "download", "add", @@ -27,14 +35,9 @@ class RedisArchivist: "setting", ] - def __init__(self): - self.redis_connection = redis.Redis( - host=self.REDIS_HOST, port=self.REDIS_PORT - ) - def set_message(self, key, message, expire=True): """write new message to redis""" - self.redis_connection.execute_command( + self.conn.execute_command( "JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message) ) @@ -43,15 +46,11 @@ class RedisArchivist: secs = 20 else: secs = expire - self.redis_connection.execute_command( - "EXPIRE", self.NAME_SPACE + key, secs - ) + self.conn.execute_command("EXPIRE", self.NAME_SPACE + key, secs) def get_message(self, key): """get message dict from redis""" - reply = self.redis_connection.execute_command( - "JSON.GET", self.NAME_SPACE + key - ) + reply = self.conn.execute_command("JSON.GET", self.NAME_SPACE + key) if reply: json_str = json.loads(reply) else: @@ -61,7 +60,7 @@ class RedisArchivist: def list_items(self, query): """list all matches""" - reply = self.redis_connection.execute_command( + reply = self.conn.execute_command( "KEYS", self.NAME_SPACE + query + "*" ) all_matches = [i.decode().lstrip(self.NAME_SPACE) for i in reply] @@ -74,14 +73,12 @@ class RedisArchivist: def del_message(self, key): """delete key from redis""" - response = self.redis_connection.execute_command( - "DEL", self.NAME_SPACE + key - ) + response = self.conn.execute_command("DEL", self.NAME_SPACE + key) return response def get_lock(self, lock_key): """handle lock for task management""" - redis_lock = self.redis_connection.lock(self.NAME_SPACE + lock_key) + redis_lock = self.conn.lock(self.NAME_SPACE + lock_key) return redis_lock def get_progress(self): @@ -89,7 +86,7 @@ class RedisArchivist: all_messages = [] for channel in self.CHANNELS: key = "message:" + channel - reply = self.redis_connection.execute_command( + reply = self.conn.execute_command( "JSON.GET", self.NAME_SPACE + key ) if reply: @@ -120,19 +117,12 @@ class RedisArchivist: return mess_dict -class RedisQueue: +class RedisQueue(RedisBase): """dynamically interact with the download queue in redis""" - REDIS_HOST = os.environ.get("REDIS_HOST") - REDIS_PORT = os.environ.get("REDIS_PORT") - NAME_SPACE = "ta:" - - if not REDIS_PORT: - REDIS_PORT = 6379 - - def __init__(self, key): - self.key = self.NAME_SPACE + key - self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) + def __init__(self): + super().__init__() + self.key = self.NAME_SPACE + "dl_queue" def get_all(self): """return all elements in list""" diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index b0f86fd..b8f419b 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -8,8 +8,8 @@ Functionality: import os -import home.apps as startup_apps from celery import Celery, shared_task +from home.apps import StartupCheck from home.src.download.queue import PendingList from home.src.download.subscriptions import ( ChannelSubscription, @@ -98,7 +98,7 @@ def download_pending(): @shared_task def download_single(youtube_id): """start download single video now""" - queue = RedisQueue("dl_queue") + queue = RedisQueue() queue.add_priority(youtube_id) print("Added to queue with priority: " + youtube_id) # start queue if needed @@ -181,7 +181,7 @@ def kill_dl(task_id): app.control.revoke(task_id, terminate=True) _ = RedisArchivist().del_message("dl_queue_id") - RedisQueue("dl_queue").clear() + RedisQueue().clear() # clear cache cache_dir = os.path.join(CONFIG["application"]["cache_dir"], "download") @@ -274,5 +274,5 @@ try: app.conf.beat_schedule = ScheduleBuilder().build_schedule() except KeyError: # update path from v0.0.8 to v0.0.9 to load new defaults - startup_apps.sync_redis_state() + StartupCheck().sync_redis_state() app.conf.beat_schedule = ScheduleBuilder().build_schedule() diff --git a/tubearchivist/home/templates/home/channel_id.html b/tubearchivist/home/templates/home/channel_id.html index b372237..80b090b 100644 --- a/tubearchivist/home/templates/home/channel_id.html +++ b/tubearchivist/home/templates/home/channel_id.html @@ -53,36 +53,44 @@
Total Videos archived: {{ max_hits }}
Watched:
{% endif %} + -