Improve subtitle selection

This commit is contained in:
Joel Puig Rubio 2025-02-09 22:34:15 +01:00
parent ab83bd9b16
commit 48f0f93740
No known key found for this signature in database
GPG Key ID: F2034CA3D1EECBD9
2 changed files with 109 additions and 67 deletions

View File

@ -6,6 +6,7 @@ Loose collection of helper functions
import json
import os
import random
import re
import string
import subprocess
from datetime import datetime
@ -291,3 +292,59 @@ def calc_is_watched(duration: float, position: float) -> bool:
threshold = 0.9
return position >= duration * threshold
def orderedSet_from_options(
options, alias_dict, *, use_regex=False, start=None
):
assert "all" in alias_dict, '"all" alias is required'
requested = list(start or [])
for val in options:
discard = val.startswith("-")
if discard:
val = val[1:]
if val in alias_dict:
val = (
alias_dict[val]
if not discard
else [
i[1:] if i.startswith("-") else f"-{i}"
for i in alias_dict[val]
]
)
# NB: Do not allow regex in aliases for performance
requested = orderedSet_from_options(
val, alias_dict, start=requested
)
continue
current = (
filter(re.compile(val, re.I).fullmatch, alias_dict["all"])
if use_regex
else [val] if val in alias_dict["all"] else None
)
if current is None:
raise ValueError(val)
if discard:
for item in current:
while item in requested:
requested.remove(item)
else:
requested.extend(current)
return orderedSet(requested)
def orderedSet(iterable, *, lazy=False):
"""Remove all duplicates from the input iterable"""
def _iter():
seen = [] # Do not use set since the items can be unhashable
for x in iterable:
if x not in seen:
seen.append(x)
yield x
return _iter() if lazy else list(_iter())

View File

@ -7,12 +7,14 @@ functionality:
import json
import os
import re
from datetime import datetime
from operator import itemgetter
import requests
from common.src.env_settings import EnvironmentSettings
from common.src.es_connect import ElasticWrap
from common.src.helper import requests_headers
from common.src.helper import orderedSet_from_options, requests_headers
class YoutubeSubtitle:
@ -35,85 +37,67 @@ class YoutubeSubtitle:
# no subtitles
return False
relevant_subtitles = []
for lang in self.languages:
user_sub = self._get_user_subtitles(lang)
if user_sub:
relevant_subtitles.append(user_sub)
continue
available_subtitles = self._get_all_subtitles("user")
if self.video.config["downloads"]["subtitle_source"] == "auto":
for lang, auto_cap in self._get_all_subtitles("auto"):
if lang not in available_subtitles:
available_subtitles[lang] = auto_cap
if self.video.config["downloads"]["subtitle_source"] == "auto":
auto_cap = self._get_auto_caption(lang)
if auto_cap:
relevant_subtitles.append(auto_cap)
all_sub_langs = tuple(available_subtitles.keys())
relevant_subtitles = False
try:
relevant_subtitles = [
available_subtitles[lang]
for lang in orderedSet_from_options(
self.languages, {"all": all_sub_langs}, use_regex=True
)
]
except re.error as e:
raise ValueError(f"wrong regex in subtitle config: {e.pattern}")
return relevant_subtitles
def _get_auto_caption(self, lang):
"""get auto_caption subtitles"""
print(f"{self.video.youtube_id}-{lang}: get auto generated subtitles")
all_subtitles = self.video.youtube_meta.get("automatic_captions")
def _get_all_subtitles(self, source):
"""get video subtitles or automatic captions"""
print(f"{self.video.youtube_id}: get {source} subtitles")
youtube_meta_keys = {"user": "subtitles", "auto": "automatic_captions"}
if not (youtube_meta_key := youtube_meta_keys.get(source, None)):
raise ValueError(f"unknown subtitles source: {source}")
all_subtitles = self.video.youtube_meta.get(youtube_meta_key)
if not all_subtitles:
return False
return {}
video_media_url = self.video.json_data["media_url"]
media_url = video_media_url.replace(".mp4", f".{lang}.vtt")
all_formats = all_subtitles.get(lang)
if not all_formats:
return False
subtitle_json3 = [i for i in all_formats if i["ext"] == "json3"]
if not subtitle_json3:
print(f"{self.video.youtube_id}-{lang}: json3 not processed")
return False
subtitle = subtitle_json3[0]
subtitle.update(
{"lang": lang, "source": "auto", "media_url": media_url}
)
return subtitle
def _normalize_lang(self):
"""normalize country specific language keys"""
all_subtitles = self.video.youtube_meta.get("subtitles")
if not all_subtitles:
return False
all_keys = list(all_subtitles.keys())
for key in all_keys:
lang = key.split("-")[0]
old = all_subtitles.pop(key)
candidate_subtitles = {}
for lang, all_formats in all_subtitles.items():
if lang == "live_chat":
# not supported yet
continue
all_subtitles[lang] = old
return all_subtitles
video_media_url = self.video.json_data["media_url"]
media_url = video_media_url.replace(".mp4", f".{lang}.vtt")
if not all_formats:
# no subtitles found
continue
def _get_user_subtitles(self, lang):
"""get subtitles uploaded from channel owner"""
print(f"{self.video.youtube_id}-{lang}: get user uploaded subtitles")
all_subtitles = self._normalize_lang()
if not all_subtitles:
return False
subtitle_json3 = [i for i in all_formats if i["ext"] == "json3"]
if not subtitle_json3:
print(f"{self.video.youtube_id}-{lang}: json3 not processed")
continue
video_media_url = self.video.json_data["media_url"]
media_url = video_media_url.replace(".mp4", f".{lang}.vtt")
all_formats = all_subtitles.get(lang)
if not all_formats:
# no user subtitles found
return False
subtitle = subtitle_json3[0]
subtitle.update(
{"lang": lang, "source": source, "media_url": media_url}
)
candidate_subtitles[lang] = subtitle
subtitle = [i for i in all_formats if i["ext"] == "json3"][0]
subtitle.update(
{"lang": lang, "source": "user", "media_url": media_url}
)
return subtitle
return candidate_subtitles
def download_subtitles(self, relevant_subtitles):
"""download subtitle files to archive"""
subtitle_list = ", ".join(map(itemgetter("lang"), relevant_subtitles))
print(
f"{self.video.youtube_id}: downloading subtitles: {subtitle_list}"
)
videos_base = EnvironmentSettings.MEDIA_DIR
indexed = []
for subtitle in relevant_subtitles:
@ -124,12 +108,13 @@ class YoutubeSubtitle:
subtitle["url"], headers=requests_headers(), timeout=30
)
if not response.ok:
print(f"{self.video.youtube_id}: failed to download subtitle")
subtitle_key = f"{self.video.youtube_id}-{lang}"
print(f"{subtitle_key}: failed to download subtitle")
print(response.text)
continue
if not response.text:
print(f"{self.video.youtube_id}: skip empty subtitle")
print(f"{subtitle_key}: skip empty subtitle")
continue
parser = SubtitleParser(response.text, lang, source)