tubearchivist-frontend/tubearchivist/home/src/index/video.py

490 lines
17 KiB
Python

"""
functionality:
- get metadata from youtube for a video
- index and update in es
"""
import json
import os
from datetime import datetime
import requests
from home.src.es.connect import ElasticWrap
from home.src.index import channel as ta_channel
from home.src.index.generic import YouTubeItem
from home.src.ta.helper import (
DurationConverter,
clean_string,
requests_headers,
)
from ryd_client import ryd_client
class YoutubeSubtitle:
"""handle video subtitle functionality"""
def __init__(self, video):
self.video = video
self.languages = False
def _sub_conf_parse(self):
"""add additional conf values to self"""
languages_raw = self.video.config["downloads"]["subtitle"]
if languages_raw:
self.languages = [i.strip() for i in languages_raw.split(",")]
def get_subtitles(self):
"""check what to do"""
self._sub_conf_parse()
if not self.languages:
# no subtitles
return False
relevant_subtitles = []
for lang in self.languages:
user_sub = self._get_user_subtitles(lang)
if user_sub:
relevant_subtitles.append(user_sub)
continue
if self.video.config["downloads"]["subtitle_source"] == "auto":
auto_cap = self._get_auto_caption(lang)
if auto_cap:
relevant_subtitles.append(auto_cap)
return relevant_subtitles
def _get_auto_caption(self, lang):
"""get auto_caption subtitles"""
print(f"{self.video.youtube_id}-{lang}: get auto generated subtitles")
all_subtitles = self.video.youtube_meta.get("automatic_captions")
if not all_subtitles:
return False
video_media_url = self.video.json_data["media_url"]
media_url = video_media_url.replace(".mp4", f"-{lang}.vtt")
all_formats = all_subtitles.get(lang)
if not all_formats:
return False
subtitle = [i for i in all_formats if i["ext"] == "json3"][0]
subtitle.update(
{"lang": lang, "source": "auto", "media_url": media_url}
)
return subtitle
def _normalize_lang(self):
"""normalize country specific language keys"""
all_subtitles = self.video.youtube_meta.get("subtitles")
if not all_subtitles:
return False
all_keys = list(all_subtitles.keys())
for key in all_keys:
lang = key.split("-")[0]
old = all_subtitles.pop(key)
if lang == "live_chat":
continue
all_subtitles[lang] = old
return all_subtitles
def _get_user_subtitles(self, lang):
"""get subtitles uploaded from channel owner"""
print(f"{self.video.youtube_id}-{lang}: get user uploaded subtitles")
all_subtitles = self._normalize_lang()
if not all_subtitles:
return False
video_media_url = self.video.json_data["media_url"]
media_url = video_media_url.replace(".mp4", f"-{lang}.vtt")
all_formats = all_subtitles.get(lang)
if not all_formats:
# no user subtitles found
return False
subtitle = [i for i in all_formats if i["ext"] == "json3"][0]
subtitle.update(
{"lang": lang, "source": "user", "media_url": media_url}
)
return subtitle
def download_subtitles(self, relevant_subtitles):
"""download subtitle files to archive"""
videos_base = self.video.config["application"]["videos"]
for subtitle in relevant_subtitles:
dest_path = os.path.join(videos_base, subtitle["media_url"])
source = subtitle["source"]
lang = subtitle.get("lang")
response = requests.get(
subtitle["url"], headers=requests_headers()
)
if not response.ok:
print(f"{self.video.youtube_id}: failed to download subtitle")
print(response.text)
continue
parser = SubtitleParser(response.text, lang, source)
parser.process()
subtitle_str = parser.get_subtitle_str()
self._write_subtitle_file(dest_path, subtitle_str)
if self.video.config["downloads"]["subtitle_index"]:
query_str = parser.create_bulk_import(self.video, source)
self._index_subtitle(query_str)
@staticmethod
def _write_subtitle_file(dest_path, subtitle_str):
"""write subtitle file to disk"""
# create folder here for first video of channel
os.makedirs(os.path.split(dest_path)[0], exist_ok=True)
with open(dest_path, "w", encoding="utf-8") as subfile:
subfile.write(subtitle_str)
@staticmethod
def _index_subtitle(query_str):
"""send subtitle to es for indexing"""
_, _ = ElasticWrap("_bulk").post(data=query_str, ndjson=True)
class SubtitleParser:
"""parse subtitle str from youtube"""
def __init__(self, subtitle_str, lang, source):
self.subtitle_raw = json.loads(subtitle_str)
self.lang = lang
self.source = source
self.all_cues = False
def process(self):
"""extract relevant que data"""
all_events = self.subtitle_raw.get("events")
if self.source == "auto":
all_events = self._flat_auto_caption(all_events)
self.all_cues = []
for idx, event in enumerate(all_events):
cue = {
"start": self._ms_conv(event["tStartMs"]),
"end": self._ms_conv(event["tStartMs"] + event["dDurationMs"]),
"text": "".join([i.get("utf8") for i in event["segs"]]),
"idx": idx + 1,
}
self.all_cues.append(cue)
@staticmethod
def _flat_auto_caption(all_events):
"""flatten autocaption segments"""
flatten = []
for event in all_events:
if "segs" not in event.keys():
continue
text = "".join([i.get("utf8") for i in event.get("segs")])
if not text.strip():
continue
if flatten:
# fix overlapping retiming issue
last_end = flatten[-1]["tStartMs"] + flatten[-1]["dDurationMs"]
if event["tStartMs"] < last_end:
joined = flatten[-1]["segs"][0]["utf8"] + "\n" + text
flatten[-1]["segs"][0]["utf8"] = joined
continue
event.update({"segs": [{"utf8": text}]})
flatten.append(event)
return flatten
@staticmethod
def _ms_conv(ms):
"""convert ms to timestamp"""
hours = str((ms // (1000 * 60 * 60)) % 24).zfill(2)
minutes = str((ms // (1000 * 60)) % 60).zfill(2)
secs = str((ms // 1000) % 60).zfill(2)
millis = str(ms % 1000).zfill(3)
return f"{hours}:{minutes}:{secs}.{millis}"
def get_subtitle_str(self):
"""create vtt text str from cues"""
subtitle_str = f"WEBVTT\nKind: captions\nLanguage: {self.lang}"
for cue in self.all_cues:
stamp = f"{cue.get('start')} --> {cue.get('end')}"
cue_text = f"\n\n{cue.get('idx')}\n{stamp}\n{cue.get('text')}"
subtitle_str = subtitle_str + cue_text
return subtitle_str
def create_bulk_import(self, video, source):
"""subtitle lines for es import"""
documents = self._create_documents(video, source)
bulk_list = []
for document in documents:
document_id = document.get("subtitle_fragment_id")
action = {"index": {"_index": "ta_subtitle", "_id": document_id}}
bulk_list.append(json.dumps(action))
bulk_list.append(json.dumps(document))
bulk_list.append("\n")
query_str = "\n".join(bulk_list)
return query_str
def _create_documents(self, video, source):
"""process documents"""
documents = self._chunk_list(video.youtube_id)
channel = video.json_data.get("channel")
meta_dict = {
"youtube_id": video.youtube_id,
"title": video.json_data.get("title"),
"subtitle_channel": channel.get("channel_name"),
"subtitle_channel_id": channel.get("channel_id"),
"subtitle_last_refresh": int(datetime.now().strftime("%s")),
"subtitle_lang": self.lang,
"subtitle_source": source,
}
_ = [i.update(meta_dict) for i in documents]
return documents
def _chunk_list(self, youtube_id):
"""join cues for bulk import"""
chunk_list = []
chunk = {}
for cue in self.all_cues:
if chunk:
text = f"{chunk.get('subtitle_line')} {cue.get('text')}\n"
chunk["subtitle_line"] = text
else:
idx = len(chunk_list) + 1
chunk = {
"subtitle_index": idx,
"subtitle_line": cue.get("text"),
"subtitle_start": cue.get("start"),
}
chunk["subtitle_fragment_id"] = f"{youtube_id}-{self.lang}-{idx}"
if cue["idx"] % 5 == 0:
chunk["subtitle_end"] = cue.get("end")
chunk_list.append(chunk)
chunk = {}
return chunk_list
class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
"""represents a single youtube video"""
es_path = False
index_name = "ta_video"
yt_base = "https://www.youtube.com/watch?v="
def __init__(self, youtube_id):
super().__init__(youtube_id)
self.channel_id = False
self.es_path = f"{self.index_name}/_doc/{youtube_id}"
def build_json(self):
"""build json dict of video"""
self.get_from_youtube()
if not self.youtube_meta:
return
self._process_youtube_meta()
self._add_channel()
self._add_stats()
self.add_file_path()
self.add_player()
if self.config["downloads"]["integrate_ryd"]:
self._get_ryd_stats()
if self.config["downloads"]["integrate_sponsorblock"]:
self._get_sponsorblock()
return
def _process_youtube_meta(self):
"""extract relevant fields from youtube"""
# extract
self.channel_id = self.youtube_meta["channel_id"]
upload_date = self.youtube_meta["upload_date"]
upload_date_time = datetime.strptime(upload_date, "%Y%m%d")
published = upload_date_time.strftime("%Y-%m-%d")
last_refresh = int(datetime.now().strftime("%s"))
# build json_data basics
self.json_data = {
"title": self.youtube_meta["title"],
"description": self.youtube_meta["description"],
"category": self.youtube_meta["categories"],
"vid_thumb_url": self.youtube_meta["thumbnail"],
"tags": self.youtube_meta["tags"],
"published": published,
"vid_last_refresh": last_refresh,
"date_downloaded": last_refresh,
"youtube_id": self.youtube_id,
"active": True,
}
def _add_channel(self):
"""add channel dict to video json_data"""
channel = ta_channel.YoutubeChannel(self.channel_id)
channel.build_json(upload=True)
self.json_data.update({"channel": channel.json_data})
def _add_stats(self):
"""add stats dicst to json_data"""
# likes
like_count = self.youtube_meta.get("like_count", 0)
dislike_count = self.youtube_meta.get("dislike_count", 0)
self.json_data.update(
{
"stats": {
"view_count": self.youtube_meta["view_count"],
"like_count": like_count,
"dislike_count": dislike_count,
"average_rating": self.youtube_meta["average_rating"],
}
}
)
def build_dl_cache_path(self):
"""find video path in dl cache"""
cache_dir = self.app_conf["cache_dir"]
cache_path = f"{cache_dir}/download/"
all_cached = os.listdir(cache_path)
for file_cached in all_cached:
if self.youtube_id in file_cached:
vid_path = os.path.join(cache_path, file_cached)
return vid_path
raise FileNotFoundError
def add_player(self):
"""add player information for new videos"""
try:
# when indexing from download task
vid_path = self.build_dl_cache_path()
except FileNotFoundError as err:
# when reindexing needs to handle title rename
channel = os.path.split(self.json_data["media_url"])[0]
channel_dir = os.path.join(self.app_conf["videos"], channel)
all_files = os.listdir(channel_dir)
for file in all_files:
if self.youtube_id in file and file.endswith(".mp4"):
vid_path = os.path.join(channel_dir, file)
break
else:
raise FileNotFoundError("could not find video file") from err
duration_handler = DurationConverter()
duration = duration_handler.get_sec(vid_path)
duration_str = duration_handler.get_str(duration)
self.json_data.update(
{
"player": {
"watched": False,
"duration": duration,
"duration_str": duration_str,
}
}
)
def add_file_path(self):
"""build media_url for where file will be located"""
channel_name = self.json_data["channel"]["channel_name"]
clean_channel_name = clean_string(channel_name)
if len(clean_channel_name) <= 3:
# fall back to channel id
clean_channel_name = self.json_data["channel"]["channel_id"]
timestamp = self.json_data["published"].replace("-", "")
youtube_id = self.json_data["youtube_id"]
title = self.json_data["title"]
clean_title = clean_string(title)
filename = f"{timestamp}_{youtube_id}_{clean_title}.mp4"
media_url = os.path.join(clean_channel_name, filename)
self.json_data["media_url"] = media_url
def delete_media_file(self):
"""delete video file, meta data"""
self.get_from_es()
video_base = self.app_conf["videos"]
to_del = [self.json_data.get("media_url")]
all_subtitles = self.json_data.get("subtitles")
if all_subtitles:
to_del = to_del + [i.get("media_url") for i in all_subtitles]
for media_url in to_del:
file_path = os.path.join(video_base, media_url)
os.remove(file_path)
self.del_in_es()
self.delete_subtitles()
def _get_ryd_stats(self):
"""get optional stats from returnyoutubedislikeapi.com"""
try:
print(f"{self.youtube_id}: get ryd stats")
result = ryd_client.get(self.youtube_id)
except requests.exceptions.ConnectionError:
print(f"{self.youtube_id}: failed to query ryd api, skipping")
return False
if result["status"] == 404:
return False
dislikes = {
"dislike_count": result["dislikes"],
"average_rating": result["rating"],
}
self.json_data["stats"].update(dislikes)
return True
def _get_sponsorblock(self):
"""get optional sponsorblock timestamps from sponsor.ajay.app"""
api = "https://sponsor.ajay.app/api"
url = f"{api}/skipSegments?videoID={self.youtube_id}"
print(f"{self.youtube_id}: get sponsorblock timestamps")
response = requests.get(url)
if not response.ok:
print(f"{self.youtube_id}: sponsorblock failed: {response.text}")
return
self.json_data["sponsorblock"] = response.json()
def check_subtitles(self):
"""optionally add subtitles"""
handler = YoutubeSubtitle(self)
subtitles = handler.get_subtitles()
if subtitles:
self.json_data["subtitles"] = subtitles
handler.download_subtitles(relevant_subtitles=subtitles)
def delete_subtitles(self):
"""delete indexed subtitles"""
path = "ta_subtitle/_delete_by_query?refresh=true"
data = {"query": {"term": {"youtube_id": {"value": self.youtube_id}}}}
_, _ = ElasticWrap(path).post(data=data)
def index_new_video(youtube_id):
"""combined classes to create new video in index"""
video = YoutubeVideo(youtube_id)
video.build_json()
if not video.json_data:
raise ValueError("failed to get metadata for " + youtube_id)
video.check_subtitles()
video.upload_to_es()
return video.json_data