Filesystem organization, #build

Changed:
- Channged filesystem to static ids
- Improved error handling for download process
- Lots of fixes and improvements
This commit is contained in:
Simon 2023-07-25 00:08:59 +07:00
commit cd25eadd1c
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
28 changed files with 547 additions and 1306 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
# python testing cache # python testing cache
__pycache__ __pycache__
.venv
# django testing db # django testing db
db.sqlite3 db.sqlite3

View File

@ -3,7 +3,7 @@
# First stage to build python wheel # First stage to build python wheel
FROM python:3.10.9-slim-bullseye AS builder FROM python:3.11.3-slim-bullseye AS builder
ARG TARGETPLATFORM ARG TARGETPLATFORM
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
@ -14,7 +14,7 @@ COPY ./tubearchivist/requirements.txt /requirements.txt
RUN pip install --user -r requirements.txt RUN pip install --user -r requirements.txt
# build final image # build final image
FROM python:3.10.9-slim-bullseye as tubearchivist FROM python:3.11.3-slim-bullseye as tubearchivist
ARG TARGETPLATFORM ARG TARGETPLATFORM
ARG INSTALL_DEBUG ARG INSTALL_DEBUG

View File

@ -49,6 +49,7 @@ function sync_test {
--exclude ".gitignore" \ --exclude ".gitignore" \
--exclude "**/cache" \ --exclude "**/cache" \
--exclude "**/__pycache__/" \ --exclude "**/__pycache__/" \
--exclude ".venv" \
--exclude "db.sqlite3" \ --exclude "db.sqlite3" \
--exclude ".mypy_cache" \ --exclude ".mypy_cache" \
. -e ssh "$host":tubearchivist . -e ssh "$host":tubearchivist
@ -87,14 +88,14 @@ function validate {
# note: this logic is duplicated in the `./github/workflows/lint_python.yml` config # note: this logic is duplicated in the `./github/workflows/lint_python.yml` config
# if you update this file, you should update that as well # if you update this file, you should update that as well
echo "running black" echo "running black"
black --exclude "migrations/*" --diff --color --check -l 79 "$check_path" black --force-exclude "migrations/*" --diff --color --check -l 79 "$check_path"
echo "running codespell" echo "running codespell"
codespell --skip="./.git,./package.json,./package-lock.json,./node_modules,./.mypy_cache" "$check_path" codespell --skip="./.git,./.venv,./package.json,./package-lock.json,./node_modules,./.mypy_cache" "$check_path"
echo "running flake8" echo "running flake8"
flake8 "$check_path" --exclude "migrations" --count --max-complexity=10 \ flake8 "$check_path" --exclude "migrations,.venv" --count --max-complexity=10 \
--max-line-length=79 --show-source --statistics --max-line-length=79 --show-source --statistics
echo "running isort" echo "running isort"
isort --skip "migrations" --check-only --diff --profile black -l 79 "$check_path" isort --skip "migrations" --skip ".venv" --check-only --diff --profile black -l 79 "$check_path"
printf " \n> all validations passed\n" printf " \n> all validations passed\n"
} }

View File

@ -14,6 +14,7 @@ fi
python manage.py ta_envcheck python manage.py ta_envcheck
python manage.py ta_connection python manage.py ta_connection
python manage.py ta_startup python manage.py ta_startup
python manage.py ta_migpath
# start all tasks # start all tasks
nginx & nginx &

1048
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -11,7 +11,7 @@ from home.src.index.channel import YoutubeChannel
from home.src.index.generic import Pagination from home.src.index.generic import Pagination
from home.src.index.reindex import ReindexProgress from home.src.index.reindex import ReindexProgress
from home.src.index.video import SponsorBlock, YoutubeVideo from home.src.index.video import SponsorBlock, YoutubeVideo
from home.src.ta.config import AppConfig from home.src.ta.config import AppConfig, ReleaseVersion
from home.src.ta.ta_redis import RedisArchivist from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.task_manager import TaskCommand, TaskManager from home.src.ta.task_manager import TaskCommand, TaskManager
from home.src.ta.urlparser import Parser from home.src.ta.urlparser import Parser
@ -189,7 +189,7 @@ class VideoCommentView(ApiBaseView):
class VideoSimilarView(ApiBaseView): class VideoSimilarView(ApiBaseView):
"""resolves to /api/video/<video-id>/similar/ """resolves to /api/video/<video-id>/similar/
GET: return max 3 videos similar to this GET: return max 6 videos similar to this
""" """
search_base = "ta_video/_search/" search_base = "ta_video/_search/"
@ -535,7 +535,11 @@ class PingView(ApiBaseView):
@staticmethod @staticmethod
def get(request): def get(request):
"""get pong""" """get pong"""
data = {"response": "pong", "user": request.user.id} data = {
"response": "pong",
"user": request.user.id,
"version": ReleaseVersion().get_local_version(),
}
return Response(data) return Response(data)

View File

@ -86,6 +86,8 @@ class Command(BaseCommand):
continue continue
if status_code and status_code == 200: if status_code and status_code == 200:
path = "_cluster/health?wait_for_status=yellow&timeout=30s"
_, _ = ElasticWrap(path).get()
self.stdout.write( self.stdout.write(
self.style.SUCCESS(" ✓ ES connection established") self.style.SUCCESS(" ✓ ES connection established")
) )
@ -116,7 +118,7 @@ class Command(BaseCommand):
return return
message = ( message = (
" 🗙 ES connection failed. " " 🗙 ES version check failed. "
+ f"Expected {self.MIN_MAJOR}.{self.MIN_MINOR} but got {version}" + f"Expected {self.MIN_MAJOR}.{self.MIN_MINOR} but got {version}"
) )
self.stdout.write(self.style.ERROR(f"{message}")) self.stdout.write(self.style.ERROR(f"{message}"))

View File

@ -0,0 +1,171 @@
"""filepath migration from v0.3.6 to v0.3.7"""
import json
import os
from django.core.management.base import BaseCommand
from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.ta.config import AppConfig
from home.src.ta.helper import ignore_filelist
TOPIC = """
########################
# Filesystem Migration #
########################
"""
class Command(BaseCommand):
"""command framework"""
# pylint: disable=no-member
def handle(self, *args, **options):
"""run commands"""
self.stdout.write(TOPIC)
handler = FolderMigration()
to_migrate = handler.get_to_migrate()
if not to_migrate:
self.stdout.write(
self.style.SUCCESS(" no channel migration needed\n")
)
return
self.stdout.write(self.style.SUCCESS(" migrating channels"))
total_channels = handler.create_folders(to_migrate)
self.stdout.write(
self.style.SUCCESS(f" created {total_channels} channels")
)
self.stdout.write(
self.style.SUCCESS(f" migrating {len(to_migrate)} videos")
)
handler.migrate_videos(to_migrate)
self.stdout.write(self.style.SUCCESS(" update videos in index"))
handler.send_bulk()
self.stdout.write(self.style.SUCCESS(" cleanup old folders"))
handler.delete_old()
self.stdout.write(self.style.SUCCESS(" ✓ migration completed\n"))
class FolderMigration:
"""migrate video archive folder"""
def __init__(self):
self.config = AppConfig().config
self.videos = self.config["application"]["videos"]
self.bulk_list = []
def get_to_migrate(self):
"""get videos to migrate"""
script = (
"doc['media_url'].value == "
+ "doc['channel.channel_id'].value + '/'"
+ " + doc['youtube_id'].value + '.mp4'"
)
data = {
"query": {"bool": {"must_not": [{"script": {"script": script}}]}},
"_source": [
"youtube_id",
"media_url",
"channel.channel_id",
"subtitles",
],
}
response = IndexPaginate("ta_video", data).get_results()
return response
def create_folders(self, to_migrate):
"""create required channel folders"""
host_uid = self.config["application"]["HOST_UID"]
host_gid = self.config["application"]["HOST_GID"]
all_channel_ids = {i["channel"]["channel_id"] for i in to_migrate}
for channel_id in all_channel_ids:
new_folder = os.path.join(self.videos, channel_id)
os.makedirs(new_folder, exist_ok=True)
if host_uid and host_gid:
os.chown(new_folder, host_uid, host_gid)
return len(all_channel_ids)
def migrate_videos(self, to_migrate):
"""migrate all videos of channel"""
for video in to_migrate:
new_media_url = self._move_video_file(video)
if not new_media_url:
continue
all_subtitles = self._move_subtitles(video)
action = {
"update": {"_id": video["youtube_id"], "_index": "ta_video"}
}
source = {"doc": {"media_url": new_media_url}}
if all_subtitles:
source["doc"].update({"subtitles": all_subtitles})
self.bulk_list.append(json.dumps(action))
self.bulk_list.append(json.dumps(source))
def _move_video_file(self, video):
"""move video file to new location"""
old_path = os.path.join(self.videos, video["media_url"])
if not os.path.exists(old_path):
print(f"did not find expected video at {old_path}")
return False
new_media_url = os.path.join(
video["channel"]["channel_id"], video["youtube_id"] + ".mp4"
)
new_path = os.path.join(self.videos, new_media_url)
os.rename(old_path, new_path)
return new_media_url
def _move_subtitles(self, video):
"""move subtitle files to new location"""
all_subtitles = video.get("subtitles")
if not all_subtitles:
return False
for subtitle in all_subtitles:
old_path = os.path.join(self.videos, subtitle["media_url"])
if not os.path.exists(old_path):
print(f"did not find expected subtitle at {old_path}")
continue
new_media_url = os.path.join(
video["channel"]["channel_id"],
f"{video.get('youtube_id')}.{subtitle.get('lang')}.vtt",
)
new_path = os.path.join(self.videos, new_media_url)
os.rename(old_path, new_path)
subtitle["media_url"] = new_media_url
return all_subtitles
def send_bulk(self):
"""send bulk request to update index with new urls"""
if not self.bulk_list:
print("nothing to update")
return
self.bulk_list.append("\n")
data = "\n".join(self.bulk_list)
response, status = ElasticWrap("_bulk").post(data=data, ndjson=True)
if not status == 200:
print(response)
def delete_old(self):
"""delete old empty folders"""
all_folders = ignore_filelist(os.listdir(self.videos))
for folder in all_folders:
folder_path = os.path.join(self.videos, folder)
if not ignore_filelist(os.listdir(folder_path)):
os.rmdir(folder_path)

View File

@ -256,4 +256,4 @@ CORS_ALLOW_HEADERS = list(default_headers) + [
# TA application settings # TA application settings
TA_UPSTREAM = "https://github.com/tubearchivist/tubearchivist" TA_UPSTREAM = "https://github.com/tubearchivist/tubearchivist"
TA_VERSION = "v0.3.6" TA_VERSION = "v0.3.7-unstable"

View File

@ -114,7 +114,13 @@ class PendingInteract:
def update_status(self): def update_status(self):
"""update status of pending item""" """update status of pending item"""
if self.status == "priority": if self.status == "priority":
data = {"doc": {"status": "pending", "auto_start": True}} data = {
"doc": {
"status": "pending",
"auto_start": True,
"message": None,
}
}
else: else:
data = {"doc": {"status": self.status}} data = {"doc": {"status": self.status}}

View File

@ -270,7 +270,7 @@ class ValidatorCallback:
urls = ( urls = (
channel["_source"]["channel_thumb_url"], channel["_source"]["channel_thumb_url"],
channel["_source"]["channel_banner_url"], channel["_source"]["channel_banner_url"],
channel["_source"]["channel_tvart_url"], channel["_source"].get("channel_tvart_url", False),
) )
handler = ThumbManager(channel["_source"]["channel_id"]) handler = ThumbManager(channel["_source"]["channel_id"])
handler.download_channel_art(urls, skip_existing=True) handler.download_channel_art(urls, skip_existing=True)

View File

@ -48,11 +48,14 @@ class YtWrap:
with yt_dlp.YoutubeDL(self.obs) as ydl: with yt_dlp.YoutubeDL(self.obs) as ydl:
try: try:
ydl.download([url]) ydl.download([url])
except yt_dlp.utils.DownloadError: except yt_dlp.utils.DownloadError as err:
print(f"{url}: failed to download.") print(f"{url}: failed to download with message {err}")
return False if "Temporary failure in name resolution" in str(err):
raise ConnectionError("lost the internet, abort!") from err
return True return False, str(err)
return True, True
def extract(self, url): def extract(self, url):
"""make extract request""" """make extract request"""
@ -61,8 +64,17 @@ class YtWrap:
except cookiejar.LoadError: except cookiejar.LoadError:
print("cookie file is invalid") print("cookie file is invalid")
return False return False
except (yt_dlp.utils.ExtractorError, yt_dlp.utils.DownloadError): except yt_dlp.utils.ExtractorError as err:
print(f"{url}: failed to get info from youtube") print(f"{url}: failed to extract with message: {err}, continue...")
return False
except yt_dlp.utils.DownloadError as err:
if "This channel does not have a" in str(err):
return False
print(f"{url}: failed to get info from youtube with message {err}")
if "Temporary failure in name resolution" in str(err):
raise ConnectionError("lost the internet, abort!") from err
return False return False
return response return response

View File

@ -20,7 +20,7 @@ from home.src.index.playlist import YoutubePlaylist
from home.src.index.video import YoutubeVideo, index_new_video from home.src.index.video import YoutubeVideo, index_new_video
from home.src.index.video_constants import VideoTypeEnum from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig from home.src.ta.config import AppConfig
from home.src.ta.helper import clean_string, ignore_filelist from home.src.ta.helper import ignore_filelist
class DownloadPostProcess: class DownloadPostProcess:
@ -203,12 +203,13 @@ class VideoDownloader:
def _get_next(self, auto_only): def _get_next(self, auto_only):
"""get next item in queue""" """get next item in queue"""
must_list = [{"term": {"status": {"value": "pending"}}}] must_list = [{"term": {"status": {"value": "pending"}}}]
must_not_list = [{"exists": {"field": "message"}}]
if auto_only: if auto_only:
must_list.append({"term": {"auto_start": {"value": True}}}) must_list.append({"term": {"auto_start": {"value": True}}})
data = { data = {
"size": 1, "size": 1,
"query": {"bool": {"must": must_list}}, "query": {"bool": {"must": must_list, "must_not": must_not_list}},
"sort": [ "sort": [
{"auto_start": {"order": "desc"}}, {"auto_start": {"order": "desc"}},
{"timestamp": {"order": "asc"}}, {"timestamp": {"order": "asc"}},
@ -344,7 +345,9 @@ class VideoDownloader:
if youtube_id in file_name: if youtube_id in file_name:
obs["outtmpl"] = os.path.join(dl_cache, file_name) obs["outtmpl"] = os.path.join(dl_cache, file_name)
success = YtWrap(obs, self.config).download(youtube_id) success, message = YtWrap(obs, self.config).download(youtube_id)
if not success:
self._handle_error(youtube_id, message)
if self.obs["writethumbnail"]: if self.obs["writethumbnail"]:
# webp files don't get cleaned up automatically # webp files don't get cleaned up automatically
@ -356,28 +359,27 @@ class VideoDownloader:
return success return success
@staticmethod
def _handle_error(youtube_id, message):
"""store error message"""
data = {"doc": {"message": message}}
_, _ = ElasticWrap(f"ta_download/_update/{youtube_id}").post(data=data)
def move_to_archive(self, vid_dict): def move_to_archive(self, vid_dict):
"""move downloaded video from cache to archive""" """move downloaded video from cache to archive"""
videos = self.config["application"]["videos"] videos = self.config["application"]["videos"]
host_uid = self.config["application"]["HOST_UID"] host_uid = self.config["application"]["HOST_UID"]
host_gid = self.config["application"]["HOST_GID"] host_gid = self.config["application"]["HOST_GID"]
channel_name = clean_string(vid_dict["channel"]["channel_name"]) # make folder
if len(channel_name) <= 3: folder = os.path.join(videos, vid_dict["channel"]["channel_id"])
# fall back to channel id if not os.path.exists(folder):
channel_name = vid_dict["channel"]["channel_id"] os.makedirs(folder)
# make archive folder with correct permissions if host_uid and host_gid:
new_folder = os.path.join(videos, channel_name) os.chown(folder, host_uid, host_gid)
if not os.path.exists(new_folder): # move media file
os.makedirs(new_folder) media_file = vid_dict["youtube_id"] + ".mp4"
if host_uid and host_gid:
os.chown(new_folder, host_uid, host_gid)
# find real filename
cache_dir = self.config["application"]["cache_dir"] cache_dir = self.config["application"]["cache_dir"]
all_cached = ignore_filelist(os.listdir(cache_dir + "/download/")) old_path = os.path.join(cache_dir, "download", media_file)
for file_str in all_cached:
if vid_dict["youtube_id"] in file_str:
old_file = file_str
old_path = os.path.join(cache_dir, "download", old_file)
new_path = os.path.join(videos, vid_dict["media_url"]) new_path = os.path.join(videos, vid_dict["media_url"])
# move media file and fix permission # move media file and fix permission
shutil.move(old_path, new_path, copy_function=shutil.copyfile) shutil.move(old_path, new_path, copy_function=shutil.copyfile)

View File

@ -127,6 +127,12 @@ class IndexPaginate:
def validate_data(self): def validate_data(self):
"""add pit and size to data""" """add pit and size to data"""
if not self.data:
self.data = {}
if "query" not in self.data.keys():
self.data.update({"query": {"match_all": {}}})
if "sort" not in self.data.keys(): if "sort" not in self.data.keys():
self.data.update({"sort": [{"_doc": {"order": "desc"}}]}) self.data.update({"sort": [{"_doc": {"order": "desc"}}]})

View File

@ -380,6 +380,9 @@
}, },
"auto_start": { "auto_start": {
"type": "boolean" "type": "boolean"
},
"message": {
"type": "text"
} }
}, },
"expected_set": { "expected_set": {

View File

@ -14,7 +14,6 @@ from home.src.download.yt_dlp_base import YtWrap
from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.index.generic import YouTubeItem from home.src.index.generic import YouTubeItem
from home.src.index.playlist import YoutubePlaylist from home.src.index.playlist import YoutubePlaylist
from home.src.ta.helper import clean_string
class YoutubeChannel(YouTubeItem): class YoutubeChannel(YouTubeItem):
@ -177,12 +176,10 @@ class YoutubeChannel(YouTubeItem):
def get_folder_path(self): def get_folder_path(self):
"""get folder where media files get stored""" """get folder where media files get stored"""
channel_name = self.json_data["channel_name"] folder_path = os.path.join(
folder_name = clean_string(channel_name) self.app_conf["videos"],
if len(folder_name) <= 3: self.json_data["channel_id"],
# fall back to channel id )
folder_name = self.json_data["channel_id"]
folder_path = os.path.join(self.app_conf["videos"], folder_name)
return folder_path return folder_path
def delete_es_videos(self): def delete_es_videos(self):

View File

@ -120,7 +120,9 @@ class Comments:
"comment_timestamp": comment["timestamp"], "comment_timestamp": comment["timestamp"],
"comment_time_text": time_text, "comment_time_text": time_text,
"comment_likecount": comment["like_count"], "comment_likecount": comment["like_count"],
"comment_is_favorited": comment["is_favorited"], "comment_is_favorited": comment.get(
"is_favorited"
), # temporary fix for yt-dlp upstream issue 7389
"comment_author": comment["author"], "comment_author": comment["author"],
"comment_author_id": comment["author_id"], "comment_author_id": comment["author_id"],
"comment_author_thumbnail": comment["author_thumbnail"], "comment_author_thumbnail": comment["author_thumbnail"],

View File

@ -1,198 +1,85 @@
""" """
Functionality: Functionality:
- reindexing old documents
- syncing updated values between indexes
- scan the filesystem to delete or index - scan the filesystem to delete or index
""" """
import json
import os import os
from home.src.download.queue import PendingList from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.es.connect import ElasticWrap
from home.src.index.comments import CommentList from home.src.index.comments import CommentList
from home.src.index.video import index_new_video from home.src.index.video import YoutubeVideo, index_new_video
from home.src.ta.config import AppConfig from home.src.ta.config import AppConfig
from home.src.ta.helper import clean_string, ignore_filelist from home.src.ta.helper import ignore_filelist
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
class ScannerBase: class Scanner:
"""scan the filesystem base class""" """scan index and filesystem"""
CONFIG = AppConfig().config VIDEOS = AppConfig().config["application"]["videos"]
VIDEOS = CONFIG["application"]["videos"]
def __init__(self):
self.to_index = False
self.to_delete = False
self.mismatch = False
self.to_rename = False
def scan(self):
"""entry point, scan and compare"""
all_downloaded = self._get_all_downloaded()
all_indexed = self._get_all_indexed()
self.list_comarison(all_downloaded, all_indexed)
def _get_all_downloaded(self):
"""get a list of all video files downloaded"""
channels = os.listdir(self.VIDEOS)
all_channels = ignore_filelist(channels)
all_channels.sort()
all_downloaded = []
for channel_name in all_channels:
channel_path = os.path.join(self.VIDEOS, channel_name)
channel_files = os.listdir(channel_path)
channel_files_clean = ignore_filelist(channel_files)
all_videos = [i for i in channel_files_clean if i.endswith(".mp4")]
for video in all_videos:
youtube_id = video[9:20]
all_downloaded.append((channel_name, video, youtube_id))
return all_downloaded
@staticmethod
def _get_all_indexed():
"""get a list of all indexed videos"""
index_handler = PendingList()
index_handler.get_download()
index_handler.get_indexed()
all_indexed = []
for video in index_handler.all_videos:
youtube_id = video["youtube_id"]
media_url = video["media_url"]
published = video["published"]
title = video["title"]
all_indexed.append((youtube_id, media_url, published, title))
return all_indexed
def list_comarison(self, all_downloaded, all_indexed):
"""compare the lists to figure out what to do"""
self._find_unindexed(all_downloaded, all_indexed)
self._find_missing(all_downloaded, all_indexed)
self._find_bad_media_url(all_downloaded, all_indexed)
def _find_unindexed(self, all_downloaded, all_indexed):
"""find video files without a matching document indexed"""
all_indexed_ids = [i[0] for i in all_indexed]
self.to_index = []
for downloaded in all_downloaded:
if downloaded[2] not in all_indexed_ids:
self.to_index.append(downloaded)
def _find_missing(self, all_downloaded, all_indexed):
"""find indexed videos without matching media file"""
all_downloaded_ids = [i[2] for i in all_downloaded]
self.to_delete = []
for video in all_indexed:
youtube_id = video[0]
if youtube_id not in all_downloaded_ids:
self.to_delete.append(video)
def _find_bad_media_url(self, all_downloaded, all_indexed):
"""rename media files not matching the indexed title"""
self.mismatch = []
self.to_rename = []
for downloaded in all_downloaded:
channel, filename, downloaded_id = downloaded
# find in indexed
for indexed in all_indexed:
indexed_id, media_url, published, title = indexed
if indexed_id == downloaded_id:
# found it
pub = published.replace("-", "")
expected = f"{pub}_{indexed_id}_{clean_string(title)}.mp4"
new_url = os.path.join(channel, expected)
if expected != filename:
# file to rename
self.to_rename.append((channel, filename, expected))
if media_url != new_url:
# media_url to update in es
self.mismatch.append((indexed_id, new_url))
break
class Filesystem(ScannerBase):
"""handle scanning and fixing from filesystem"""
def __init__(self, task=False): def __init__(self, task=False):
super().__init__()
self.task = task self.task = task
self.to_delete = False
self.to_index = False
def process(self): def scan(self):
"""entry point""" """scan the filesystem"""
downloaded = self._get_downloaded()
indexed = self._get_indexed()
self.to_index = downloaded - indexed
self.to_delete = indexed - downloaded
def _get_downloaded(self):
"""get downloaded ids"""
if self.task: if self.task:
self.task.send_progress(["Scanning your archive and index."]) self.task.send_progress(["Scan your filesystem for videos."])
self.scan()
self.rename_files()
self.send_mismatch_bulk()
self.delete_from_index()
self.add_missing()
def rename_files(self): downloaded = set()
"""rename media files as identified by find_bad_media_url""" channels = ignore_filelist(os.listdir(self.VIDEOS))
if not self.to_rename: for channel in channels:
return folder = os.path.join(self.VIDEOS, channel)
files = ignore_filelist(os.listdir(folder))
downloaded.update({i.split(".")[0] for i in files})
total = len(self.to_rename) return downloaded
def _get_indexed(self):
"""get all indexed ids"""
if self.task: if self.task:
self.task.send_progress([f"Rename {total} media files."]) self.task.send_progress(["Get all videos indexed."])
for bad_filename in self.to_rename:
channel, filename, expected_filename = bad_filename
print(f"renaming [{filename}] to [{expected_filename}]")
old_path = os.path.join(self.VIDEOS, channel, filename)
new_path = os.path.join(self.VIDEOS, channel, expected_filename)
os.rename(old_path, new_path)
def send_mismatch_bulk(self): data = {"query": {"match_all": {}}, "_source": ["youtube_id"]}
"""build bulk update""" response = IndexPaginate("ta_video", data).get_results()
if not self.mismatch: return {i["youtube_id"] for i in response}
return
total = len(self.mismatch) def apply(self):
if self.task: """apply all changes"""
self.task.send_progress([f"Fix media urls for {total} files"]) self.delete()
bulk_list = [] self.index()
for video_mismatch in self.mismatch: self.url_fix()
youtube_id, media_url = video_mismatch
print(f"{youtube_id}: fixing media url {media_url}")
action = {"update": {"_id": youtube_id, "_index": "ta_video"}}
source = {"doc": {"media_url": media_url}}
bulk_list.append(json.dumps(action))
bulk_list.append(json.dumps(source))
# add last newline
bulk_list.append("\n")
data = "\n".join(bulk_list)
_, _ = ElasticWrap("_bulk").post(data=data, ndjson=True)
def delete_from_index(self): def delete(self):
"""find indexed but deleted mediafile""" """delete videos from index"""
if not self.to_delete: if not self.to_delete:
print("nothing to delete")
return return
total = len(self.to_delete)
if self.task: if self.task:
self.task.send_progress([f"Clean up {total} items from index."]) self.task.send_progress(
for indexed in self.to_delete: [f"Remove {len(self.to_delete)} videos from index."]
youtube_id = indexed[0] )
print(f"deleting {youtube_id} from index")
path = f"ta_video/_doc/{youtube_id}"
_, _ = ElasticWrap(path).delete()
def add_missing(self): for youtube_id in self.to_delete:
"""add missing videos to index""" YoutubeVideo(youtube_id).delete_media_file()
video_ids = [i[2] for i in self.to_index]
if not video_ids: def index(self):
"""index new"""
if not self.to_index:
print("nothing to index")
return return
total = len(video_ids) total = len(self.to_index)
for idx, youtube_id in enumerate(video_ids): for idx, youtube_id in enumerate(self.to_index):
if self.task: if self.task:
self.task.send_progress( self.task.send_progress(
message_lines=[ message_lines=[
@ -202,4 +89,36 @@ class Filesystem(ScannerBase):
) )
index_new_video(youtube_id) index_new_video(youtube_id)
CommentList(video_ids, task=self.task).index() CommentList(self.to_index, task=self.task).index()
def url_fix(self):
"""
update path v0.3.6 to v0.3.7
fix url not matching channel-videoid pattern
"""
bool_must = (
"doc['media_url'].value == "
+ "(doc['channel.channel_id'].value + '/' + "
+ "doc['youtube_id'].value) + '.mp4'"
)
to_update = (
"ctx._source['media_url'] = "
+ "ctx._source.channel['channel_id'] + '/' + "
+ "ctx._source['youtube_id'] + '.mp4'"
)
data = {
"query": {
"bool": {
"must_not": [{"script": {"script": {"source": bool_must}}}]
}
},
"script": {"source": to_update},
}
response, _ = ElasticWrap("ta_video/_update_by_query").post(data=data)
updated = response.get("updates")
if updated:
print(f"updated {updated} bad media_url")
if self.task:
self.task.send_progress(
[f"Updated {updated} wrong media urls."]
)

View File

@ -6,7 +6,6 @@ functionality:
import json import json
import os import os
import shutil
from datetime import datetime from datetime import datetime
from time import sleep from time import sleep
@ -14,7 +13,6 @@ from home.src.download.queue import PendingList
from home.src.download.subscriptions import ChannelSubscription from home.src.download.subscriptions import ChannelSubscription
from home.src.download.thumbnails import ThumbManager from home.src.download.thumbnails import ThumbManager
from home.src.download.yt_dlp_base import CookieHandler from home.src.download.yt_dlp_base import CookieHandler
from home.src.download.yt_dlp_handler import VideoDownloader
from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.index.channel import YoutubeChannel from home.src.index.channel import YoutubeChannel
from home.src.index.comments import Comments from home.src.index.comments import Comments
@ -54,6 +52,7 @@ class ReindexBase:
def __init__(self): def __init__(self):
self.config = AppConfig().config self.config = AppConfig().config
self.now = int(datetime.now().timestamp()) self.now = int(datetime.now().timestamp())
self.total = None
def populate(self, all_ids, reindex_config): def populate(self, all_ids, reindex_config):
"""add all to reindex ids to redis queue""" """add all to reindex ids to redis queue"""
@ -61,6 +60,7 @@ class ReindexBase:
return return
RedisQueue(queue_name=reindex_config["queue_name"]).add_list(all_ids) RedisQueue(queue_name=reindex_config["queue_name"]).add_list(all_ids)
self.total = None
class ReindexPopulate(ReindexBase): class ReindexPopulate(ReindexBase):
@ -238,18 +238,19 @@ class Reindex(ReindexBase):
if not RedisQueue(index_config["queue_name"]).has_item(): if not RedisQueue(index_config["queue_name"]).has_item():
continue continue
total = RedisQueue(index_config["queue_name"]).length() self.total = RedisQueue(index_config["queue_name"]).length()
while True: while True:
has_next = self.reindex_index(name, index_config, total) has_next = self.reindex_index(name, index_config)
if not has_next: if not has_next:
break break
def reindex_index(self, name, index_config, total): def reindex_index(self, name, index_config):
"""reindex all of a single index""" """reindex all of a single index"""
reindex = self.get_reindex_map(index_config["index_name"]) reindex = self.get_reindex_map(index_config["index_name"])
youtube_id = RedisQueue(index_config["queue_name"]).get_next() youtube_id = RedisQueue(index_config["queue_name"]).get_next()
if youtube_id: if youtube_id:
self._notify(name, index_config, total) if self.task:
self._notify(name, index_config)
reindex(youtube_id) reindex(youtube_id)
sleep_interval = self.config["downloads"].get("sleep_interval", 0) sleep_interval = self.config["downloads"].get("sleep_interval", 0)
sleep(sleep_interval) sleep(sleep_interval)
@ -266,23 +267,18 @@ class Reindex(ReindexBase):
return def_map.get(index_name) return def_map.get(index_name)
def _notify(self, name, index_config, total): def _notify(self, name, index_config):
"""send notification back to task""" """send notification back to task"""
if self.total is None:
self.total = RedisQueue(index_config["queue_name"]).length()
remaining = RedisQueue(index_config["queue_name"]).length() remaining = RedisQueue(index_config["queue_name"]).length()
idx = total - remaining idx = self.total - remaining
message = [f"Reindexing {name.title()}s {idx}/{total}"] message = [f"Reindexing {name.title()}s {idx}/{self.total}"]
progress = idx / total progress = idx / self.total
self.task.send_progress(message, progress=progress) self.task.send_progress(message, progress=progress)
def _reindex_single_video(self, youtube_id): def _reindex_single_video(self, youtube_id):
"""wrapper to handle channel name changes"""
try:
self._reindex_single_video_call(youtube_id)
except FileNotFoundError:
ChannelUrlFixer(youtube_id, self.config).run()
self._reindex_single_video_call(youtube_id)
def _reindex_single_video_call(self, youtube_id):
"""refresh data for single video""" """refresh data for single video"""
video = YoutubeVideo(youtube_id) video = YoutubeVideo(youtube_id)
@ -291,7 +287,10 @@ class Reindex(ReindexBase):
es_meta = video.json_data.copy() es_meta = video.json_data.copy()
# get new # get new
video.build_json() media_url = os.path.join(
self.config["application"]["videos"], es_meta["media_url"]
)
video.build_json(media_path=media_url)
if not video.youtube_meta: if not video.youtube_meta:
video.deactivate() video.deactivate()
return return
@ -466,65 +465,6 @@ class ReindexProgress(ReindexBase):
return state_dict return state_dict
class ChannelUrlFixer:
"""fix not matching channel names in reindex"""
def __init__(self, youtube_id, config):
self.youtube_id = youtube_id
self.config = config
self.video = False
def run(self):
"""check and run if needed"""
print(f"{self.youtube_id}: failed to build channel path, try to fix.")
video_path_is, video_folder_is = self.get_as_is()
if not os.path.exists(video_path_is):
print(f"giving up reindex, video in video: {self.video.json_data}")
raise ValueError
_, video_folder_should = self.get_as_should()
if video_folder_is != video_folder_should:
self.process(video_path_is)
else:
print(f"{self.youtube_id}: skip channel url fixer")
def get_as_is(self):
"""get video object as is"""
self.video = YoutubeVideo(self.youtube_id)
self.video.get_from_es()
video_path_is = os.path.join(
self.config["application"]["videos"],
self.video.json_data["media_url"],
)
video_folder_is = os.path.split(video_path_is)[0]
return video_path_is, video_folder_is
def get_as_should(self):
"""add fresh metadata from remote"""
self.video.get_from_youtube()
self.video.add_file_path()
video_path_should = os.path.join(
self.config["application"]["videos"],
self.video.json_data["media_url"],
)
video_folder_should = os.path.split(video_path_should)[0]
return video_path_should, video_folder_should
def process(self, video_path_is):
"""fix filepath"""
print(f"{self.youtube_id}: fixing channel rename.")
cache_dir = self.config["application"]["cache_dir"]
new_path = os.path.join(
cache_dir, "download", self.youtube_id + ".mp4"
)
shutil.move(video_path_is, new_path, copy_function=shutil.copyfile)
VideoDownloader().move_to_archive(self.video.json_data)
self.video.update_media_url()
class ChannelFullScan: class ChannelFullScan:
""" """
update from v0.3.0 to v0.3.1 update from v0.3.0 to v0.3.1

View File

@ -62,7 +62,12 @@ class YoutubeSubtitle:
if not all_formats: if not all_formats:
return False return False
subtitle = [i for i in all_formats if i["ext"] == "json3"][0] subtitle_json3 = [i for i in all_formats if i["ext"] == "json3"]
if not subtitle_json3:
print(f"{self.video.youtube_id}-{lang}: json3 not processed")
return False
subtitle = subtitle_json3[0]
subtitle.update( subtitle.update(
{"lang": lang, "source": "auto", "media_url": media_url} {"lang": lang, "source": "auto", "media_url": media_url}
) )

View File

@ -20,7 +20,7 @@ from home.src.index.video_streams import (
DurationConverter, DurationConverter,
MediaStreamExtractor, MediaStreamExtractor,
) )
from home.src.ta.helper import clean_string, randomizor from home.src.ta.helper import randomizor
from home.src.ta.ta_redis import RedisArchivist from home.src.ta.ta_redis import RedisArchivist
from ryd_client import ryd_client from ryd_client import ryd_client
@ -231,18 +231,24 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
def build_dl_cache_path(self): def build_dl_cache_path(self):
"""find video path in dl cache""" """find video path in dl cache"""
cache_dir = self.app_conf["cache_dir"] cache_dir = self.app_conf["cache_dir"]
cache_path = f"{cache_dir}/download/" video_id = self.json_data["youtube_id"]
all_cached = os.listdir(cache_path) cache_path = f"{cache_dir}/download/{video_id}.mp4"
for file_cached in all_cached: if os.path.exists(cache_path):
if self.youtube_id in file_cached: return cache_path
vid_path = os.path.join(cache_path, file_cached)
return vid_path channel_path = os.path.join(
self.app_conf["videos"],
self.json_data["channel"]["channel_id"],
f"{video_id}.mp4",
)
if os.path.exists(channel_path):
return channel_path
raise FileNotFoundError raise FileNotFoundError
def add_player(self, media_path=False): def add_player(self, media_path=False):
"""add player information for new videos""" """add player information for new videos"""
vid_path = self._get_vid_path(media_path) vid_path = media_path or self.build_dl_cache_path()
duration_handler = DurationConverter() duration_handler = DurationConverter()
duration = duration_handler.get_sec(vid_path) duration = duration_handler.get_sec(vid_path)
@ -259,7 +265,7 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
def add_streams(self, media_path=False): def add_streams(self, media_path=False):
"""add stream metadata""" """add stream metadata"""
vid_path = self._get_vid_path(media_path) vid_path = media_path or self.build_dl_cache_path()
media = MediaStreamExtractor(vid_path) media = MediaStreamExtractor(vid_path)
self.json_data.update( self.json_data.update(
{ {
@ -268,43 +274,12 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
} }
) )
def _get_vid_path(self, media_path=False):
"""get path of media file"""
if media_path:
return media_path
try:
# when indexing from download task
vid_path = self.build_dl_cache_path()
except FileNotFoundError as err:
# when reindexing needs to handle title rename
channel = os.path.split(self.json_data["media_url"])[0]
channel_dir = os.path.join(self.app_conf["videos"], channel)
all_files = os.listdir(channel_dir)
for file in all_files:
if self.youtube_id in file and file.endswith(".mp4"):
vid_path = os.path.join(channel_dir, file)
break
else:
raise FileNotFoundError("could not find video file") from err
return vid_path
def add_file_path(self): def add_file_path(self):
"""build media_url for where file will be located""" """build media_url for where file will be located"""
channel_name = self.json_data["channel"]["channel_name"] self.json_data["media_url"] = os.path.join(
clean_channel_name = clean_string(channel_name) self.json_data["channel"]["channel_id"],
if len(clean_channel_name) <= 3: self.json_data["youtube_id"] + ".mp4",
# fall back to channel id )
clean_channel_name = self.json_data["channel"]["channel_id"]
timestamp = self.json_data["published"].replace("-", "")
youtube_id = self.json_data["youtube_id"]
title = self.json_data["title"]
clean_title = clean_string(title)
filename = f"{timestamp}_{youtube_id}_{clean_title}.mp4"
media_url = os.path.join(clean_channel_name, filename)
self.json_data["media_url"] = media_url
def delete_media_file(self): def delete_media_file(self):
"""delete video file, meta data""" """delete video file, meta data"""

View File

@ -35,24 +35,27 @@ class DurationConverter:
return duration_sec return duration_sec
@staticmethod @staticmethod
def get_str(duration_sec): def get_str(seconds):
"""takes duration in sec and returns clean string""" """takes duration in sec and returns clean string"""
if not duration_sec: if not seconds:
# failed to extract # failed to extract
return "NA" return "NA"
hours = int(duration_sec // 3600) days = int(seconds // (24 * 3600))
minutes = int((duration_sec - (hours * 3600)) // 60) hours = int((seconds % (24 * 3600)) // 3600)
secs = int(duration_sec - (hours * 3600) - (minutes * 60)) minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
duration_str = str() duration_str = str()
if days:
duration_str = f"{days}d "
if hours: if hours:
duration_str = str(hours).zfill(2) + ":" duration_str = duration_str + str(hours).zfill(2) + ":"
if minutes: if minutes:
duration_str = duration_str + str(minutes).zfill(2) + ":" duration_str = duration_str + str(minutes).zfill(2) + ":"
else: else:
duration_str = duration_str + "00:" duration_str = duration_str + "00:"
duration_str = duration_str + str(secs).zfill(2) duration_str = duration_str + str(seconds).zfill(2)
return duration_str return duration_str

View File

@ -8,6 +8,7 @@ import json
import os import os
import re import re
from random import randint from random import randint
from time import sleep
import requests import requests
from celery.schedules import crontab from celery.schedules import crontab
@ -67,11 +68,19 @@ class AppConfig:
@staticmethod @staticmethod
def get_config_redis(): def get_config_redis():
"""read config json set from redis to overwrite defaults""" """read config json set from redis to overwrite defaults"""
config = RedisArchivist().get_message("config") for i in range(10):
if not list(config.values())[0]: try:
return False config = RedisArchivist().get_message("config")
if not list(config.values())[0]:
return False
return config return config
except Exception: # pylint: disable=broad-except
print(f"... Redis connection failed, retry [{i}/10]")
sleep(3)
raise ConnectionError("failed to connect to redis")
def update_config(self, form_post): def update_config(self, form_post):
"""update config values from settings form""" """update config values from settings form"""
@ -317,6 +326,10 @@ class ReleaseVersion:
RedisArchivist().set_message(self.NEW_KEY, message) RedisArchivist().set_message(self.NEW_KEY, message)
print(f"[{self.local_version}]: found new version {new_version}") print(f"[{self.local_version}]: found new version {new_version}")
def get_local_version(self):
"""read version from local"""
return self.local_version
def get_remote_version(self): def get_remote_version(self):
"""read version from remote""" """read version from remote"""
self.response = requests.get(self.REMOTE_URL, timeout=20).json() self.response = requests.get(self.REMOTE_URL, timeout=20).json()

View File

@ -6,25 +6,13 @@ Loose collection of helper functions
import json import json
import os import os
import random import random
import re
import string import string
import unicodedata
from datetime import datetime from datetime import datetime
from urllib.parse import urlparse from urllib.parse import urlparse
import requests import requests
def clean_string(file_name: str) -> str:
"""clean string to only asci characters"""
whitelist = "-_.() " + string.ascii_letters + string.digits
normalized = unicodedata.normalize("NFKD", file_name)
ascii_only = normalized.encode("ASCII", "ignore").decode().strip()
white_listed: str = "".join(c for c in ascii_only if c in whitelist)
cleaned: str = re.sub(r"[ ]{2,}", " ", white_listed)
return cleaned
def ignore_filelist(filelist: list[str]) -> list[str]: def ignore_filelist(filelist: list[str]) -> list[str]:
"""ignore temp files for os.listdir sanitizer""" """ignore temp files for os.listdir sanitizer"""
to_ignore = ["Icon\r\r", "Temporary Items", "Network Trash Folder"] to_ignore = ["Icon\r\r", "Temporary Items", "Network Trash Folder"]

View File

@ -19,7 +19,7 @@ from home.src.download.yt_dlp_handler import VideoDownloader
from home.src.es.backup import ElasticBackup from home.src.es.backup import ElasticBackup
from home.src.es.index_setup import ElasitIndexWrap from home.src.es.index_setup import ElasitIndexWrap
from home.src.index.channel import YoutubeChannel from home.src.index.channel import YoutubeChannel
from home.src.index.filesystem import Filesystem from home.src.index.filesystem import Scanner
from home.src.index.manual import ImportFolderScanner from home.src.index.manual import ImportFolderScanner
from home.src.index.reindex import Reindex, ReindexManual, ReindexPopulate from home.src.index.reindex import Reindex, ReindexManual, ReindexPopulate
from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder
@ -113,7 +113,7 @@ class BaseTask(Task):
"""callback for task failure""" """callback for task failure"""
print(f"{task_id} Failed callback") print(f"{task_id} Failed callback")
message, key = self._build_message(level="error") message, key = self._build_message(level="error")
message.update({"messages": ["Task failed"]}) message.update({"messages": [f"Task failed: {exc}"]})
RedisArchivist().set_message(key, message, expire=20) RedisArchivist().set_message(key, message, expire=20)
def on_success(self, retval, task_id, args, kwargs): def on_success(self, retval, task_id, args, kwargs):
@ -290,7 +290,9 @@ def rescan_filesystem(self):
return return
manager.init(self) manager.init(self)
Filesystem(task=self).process() handler = Scanner(task=self)
handler.scan()
handler.apply()
ThumbValidator(task=self).validate() ThumbValidator(task=self).validate()

View File

@ -97,6 +97,9 @@
<a href="https://www.youtube.com/watch?v={{ video.source.youtube_id }}" target="_blank"><h3>{{ video.source.title }}</h3></a> <a href="https://www.youtube.com/watch?v={{ video.source.youtube_id }}" target="_blank"><h3>{{ video.source.title }}</h3></a>
</div> </div>
<p>Published: {{ video.source.published }} | Duration: {{ video.source.duration }} | {{ video.source.youtube_id }}</p> <p>Published: {{ video.source.published }} | Duration: {{ video.source.duration }} | {{ video.source.youtube_id }}</p>
{% if video.source.message %}
<p class="danger-zone">{{ video.source.message }}</p>
{% endif %}
<div> <div>
{% if show_ignored_only %} {% if show_ignored_only %}
<button data-id="{{ video.source.youtube_id }}" onclick="forgetIgnore(this)">Forget</button> <button data-id="{{ video.source.youtube_id }}" onclick="forgetIgnore(this)">Forget</button>
@ -105,6 +108,9 @@
<button data-id="{{ video.source.youtube_id }}" onclick="toIgnore(this)">Ignore</button> <button data-id="{{ video.source.youtube_id }}" onclick="toIgnore(this)">Ignore</button>
<button id="{{ video.source.youtube_id }}" data-id="{{ video.source.youtube_id }}" onclick="downloadNow(this)">Download now</button> <button id="{{ video.source.youtube_id }}" data-id="{{ video.source.youtube_id }}" onclick="downloadNow(this)">Download now</button>
{% endif %} {% endif %}
{% if video.source.message %}
<button class="danger-button" data-id="{{ video.source.youtube_id }}" onclick="forgetIgnore(this)">Delete</button>
{% endif %}
</div> </div>
</div> </div>
</div> </div>

View File

@ -1,12 +1,12 @@
celery==5.2.7 celery==5.3.1
Django==4.2.1 Django==4.2.3
django-auth-ldap==4.3.0 django-auth-ldap==4.4.0
django-cors-headers==3.14.0 django-cors-headers==4.2.0
djangorestframework==3.14.0 djangorestframework==3.14.0
Pillow==9.5.0 Pillow==10.0.0
redis==4.5.4 redis==4.6.0
requests==2.30.0 requests==2.31.0
ryd-client==0.0.6 ryd-client==0.0.6
uWSGI==2.0.21 uWSGI==2.0.21
whitenoise==6.4.0 whitenoise==6.5.0
yt_dlp==2023.3.4 yt_dlp==2023.7.6

View File

@ -160,12 +160,12 @@ function dlPending() {
}, 500); }, 500);
} }
function addToQueue(autostart=false) { function addToQueue(autostart = false) {
let textArea = document.getElementById('id_vid_url'); let textArea = document.getElementById('id_vid_url');
if (textArea.value === '') { if (textArea.value === '') {
return return;
} }
let toPost = {data: [{youtube_id: textArea.value, status: 'pending'}]}; let toPost = { data: [{ youtube_id: textArea.value, status: 'pending' }] };
let apiEndpoint = '/api/download/'; let apiEndpoint = '/api/download/';
if (autostart) { if (autostart) {
apiEndpoint = `${apiEndpoint}?autostart=true`; apiEndpoint = `${apiEndpoint}?autostart=true`;