From a2cae51f48a3ca962e7a3ff6c778d229618ad266 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 10 Feb 2022 17:02:19 +0700 Subject: [PATCH] bulk import subtitle lines into es --- tubearchivist/home/src/index/video.py | 76 ++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 13 deletions(-) diff --git a/tubearchivist/home/src/index/video.py b/tubearchivist/home/src/index/video.py index 3f39674..1385e16 100644 --- a/tubearchivist/home/src/index/video.py +++ b/tubearchivist/home/src/index/video.py @@ -4,11 +4,13 @@ functionality: - index and update in es """ +import json import os import re from datetime import datetime import requests +from home.src.es.connect import ElasticWrap from home.src.index import channel as ta_channel from home.src.index.generic import YouTubeItem from home.src.ta.helper import DurationConverter, clean_string @@ -110,20 +112,31 @@ class YoutubeSubtitle: videos_base = self.video.config["application"]["videos"] for subtitle in relevant_subtitles: dest_path = os.path.join(videos_base, subtitle["media_url"]) + source = subtitle["media_url"] response = requests.get(subtitle["url"]) - if subtitle["source"] == "auto": - parser = SubtitleParser(response.text) - parser.process() - subtitle_str_clean = parser.get_subtitle_str() - else: - subtitle_str_clean = response.text - if response.ok: - # create folder here for first video of channel - os.makedirs(os.path.split(dest_path)[0], exist_ok=True) - with open(dest_path, "w", encoding="utf-8") as subfile: - subfile.write(subtitle_str_clean) - else: + if not response.ok: print(f"{self.video.youtube_id}: failed to download subtitle") + continue + + parser = SubtitleParser(response.text, subtitle.get("lang")) + parser.process() + subtitle_str = parser.get_subtitle_str() + self._write_subtitle_file(dest_path, subtitle_str) + query_str = parser.create_bulk_import(self.video, source) + self._index_subtitle(query_str) + + @staticmethod + def _write_subtitle_file(dest_path, subtitle_str): + """write subtitle file to disk""" + # create folder here for first video of channel + os.makedirs(os.path.split(dest_path)[0], exist_ok=True) + with open(dest_path, "w", encoding="utf-8") as subfile: + subfile.write(subtitle_str) + + @staticmethod + def _index_subtitle(query_str): + """send subtitle to es for indexing""" + _, _ = ElasticWrap("_bulk").post(data=query_str, ndjson=True) class SubtitleParser: @@ -133,8 +146,9 @@ class SubtitleParser: stamp_reg = r"<([0-9]{2}:?){3}\.[0-9]{3}>" tag_reg = r"" - def __init__(self, subtitle_str): + def __init__(self, subtitle_str, lang): self.subtitle_str = subtitle_str + self.lang = lang self.header = False self.parsed_cue_list = False self.all_text_lines = False @@ -209,6 +223,42 @@ class SubtitleParser: return new_subtitle_str + def create_bulk_import(self, video, source): + """process matched for es import""" + bulk_list = [] + channel = video.json_data.get("channel") + + document = { + "youtube_id": video.youtube_id, + "title": video.json_data.get("title"), + "subtitle_channel": channel.get("channel_name"), + "subtitle_channel_id": channel.get("channel_id"), + "subtitle_last_refresh": int(datetime.now().strftime("%s")), + "subtitle_lang": self.lang, + "subtitle_source": source, + } + + for match in self.matched: + match_id = match.get("id") + document_id = f"{video.youtube_id}-{self.lang}-{match_id}" + action = {"index": {"_index": "ta_subtitle", "_id": document_id}} + document.update( + { + "subtitle_fragment_id": document_id, + "subtitle_start": match.get("start"), + "subtitle_end": match.get("end"), + "subtitle_index": match_id, + "subtitle_line": " ".join(match.get("lines")), + } + ) + bulk_list.append(json.dumps(action)) + bulk_list.append(json.dumps(document)) + + bulk_list.append("\n") + query_str = "\n".join(bulk_list) + + return query_str + class YoutubeVideo(YouTubeItem, YoutubeSubtitle): """represents a single youtube video"""