diff --git a/tubearchivist/home/src/index/video.py b/tubearchivist/home/src/index/video.py index bc6f272..461fb89 100644 --- a/tubearchivist/home/src/index/video.py +++ b/tubearchivist/home/src/index/video.py @@ -6,7 +6,6 @@ functionality: import json import os -import re from datetime import datetime import requests @@ -65,7 +64,7 @@ class YoutubeSubtitle: if not all_formats: return False - subtitle = [i for i in all_formats if i["ext"] == "vtt"][0] + subtitle = [i for i in all_formats if i["ext"] == "json3"][0] subtitle.update( {"lang": lang, "source": "auto", "media_url": media_url} ) @@ -102,7 +101,7 @@ class YoutubeSubtitle: # no user subtitles found return False - subtitle = [i for i in all_formats if i["ext"] == "vtt"][0] + subtitle = [i for i in all_formats if i["ext"] == "json3"][0] subtitle.update( {"lang": lang, "source": "user", "media_url": media_url} ) @@ -145,109 +144,65 @@ class YoutubeSubtitle: class SubtitleParser: """parse subtitle str from youtube""" - time_reg = r"^([0-9]{2}:?){3}\.[0-9]{3} --> ([0-9]{2}:?){3}\.[0-9]{3}" - stamp_reg = r"<([0-9]{2}:?){3}\.[0-9]{3}>" - tag_reg = r"" - def __init__(self, subtitle_str, lang): - self.subtitle_str = subtitle_str + self.subtitle_raw = json.loads(subtitle_str) self.lang = lang - self.header = False - self.parsed_cue_list = False - self.all_text_lines = False - self.matched = False + self.all_cues = False def process(self): - """collection to process subtitle string""" - self._parse_cues() - self._match_text_lines() - self._add_id() - self._timestamp_check() + """extract relevant que data""" + self.all_cues = [] + for idx, event in enumerate(self.subtitle_raw.get("events")): + cue = { + "start": self.ms_conv(event["tStartMs"]), + "end": self.ms_conv(event["tStartMs"] + event["dDurationMs"]), + "text": "".join([i.get("utf8") for i in event["segs"]]), + "idx": idx + 1, + } + self.all_cues.append(cue) - def _parse_cues(self): - """split into cues""" - all_cues = self.subtitle_str.replace("\n \n", "\n").split("\n\n") - self.header = all_cues[0] - self.all_text_lines = [] - self.parsed_cue_list = [self._cue_cleaner(i) for i in all_cues[1:]] + @staticmethod + def ms_conv(ms): + """convert ms to timestamp""" + hours = str((ms // (1000 * 60 * 60)) % 24).zfill(2) + minutes = str((ms // (1000 * 60)) % 60).zfill(2) + secs = str((ms // 1000) % 60).zfill(2) + millis = str(ms % 1000).zfill(3) - def _cue_cleaner(self, cue): - """parse single cue""" - all_lines = cue.split("\n") - cue_dict = {"lines": []} - - for line in all_lines: - if re.match(self.time_reg, line): - clean = re.search(self.time_reg, line).group() - start, end = clean.split(" --> ") - cue_dict.update({"start": start, "end": end}) - else: - clean = re.sub(self.stamp_reg, "", line) - clean = re.sub(self.tag_reg, "", clean) - cue_dict["lines"].append(clean) - if clean.strip() and clean not in self.all_text_lines[-4:]: - # remove immediate duplicates - self.all_text_lines.append(clean) - - return cue_dict - - def _match_text_lines(self): - """match unique text lines with timestamps""" - - self.matched = [] - - while self.all_text_lines: - check = self.all_text_lines[0] - matches = [i for i in self.parsed_cue_list if check in i["lines"]] - new_cue = matches[-1] - new_cue["start"] = matches[0]["start"] - - for line in new_cue["lines"]: - try: - self.all_text_lines.remove(line) - except ValueError: - continue - - self.matched.append(new_cue) - - def _timestamp_check(self): - """check if end timestamp is bigger than start timestamp""" - for idx, cue in enumerate(self.matched): - # this - end = int(re.sub("[^0-9]", "", cue.get("end"))) - # next - try: - next_cue = self.matched[idx + 1] - except IndexError: - continue - - start_next = int(re.sub("[^0-9]", "", next_cue.get("start"))) - if end > start_next: - self.matched[idx]["end"] = next_cue.get("start") - - def _add_id(self): - """add id to matched cues""" - for idx, _ in enumerate(self.matched): - self.matched[idx]["id"] = idx + 1 + return f"{hours}:{minutes}:{secs}.{millis}" def get_subtitle_str(self): - """stitch cues and return processed new string""" - new_subtitle_str = self.header + "\n\n" + """create vtt text str from cues""" + subtitle_str = f"WEBVTT\nKind: captions\nLanguage: {self.lang}" - for cue in self.matched: - timestamp = f"{cue.get('start')} --> {cue.get('end')}" - lines = "\n".join(cue.get("lines")) - cue_text = f"{cue.get('id')}\n{timestamp}\n{lines}\n\n" - new_subtitle_str = new_subtitle_str + cue_text + for cue in self.all_cues: + stamp = f"{cue.get('start')} --> {cue.get('end')}" + cue_text = f"\n\n{cue.get('idx')}\n{stamp}\n{cue.get('text')}" + subtitle_str = subtitle_str + cue_text - return new_subtitle_str + return subtitle_str def create_bulk_import(self, video, source): - """process matched for es import""" + """subtitle lines for es import""" + documents = self.create_documents(video, source) bulk_list = [] - channel = video.json_data.get("channel") - document = { + for document in documents: + document_id = document.get("subtitle_fragment_id") + action = {"index": {"_index": "ta_subtitle", "_id": document_id}} + bulk_list.append(json.dumps(action)) + bulk_list.append(json.dumps(document)) + + bulk_list.append("\n") + query_str = "\n".join(bulk_list) + + return query_str + + def create_documents(self, video, source): + """process documents""" + documents = self.chunk_list(video.youtube_id) + channel = video.json_data.get("channel") + meta_dict = { "youtube_id": video.youtube_id, "title": video.json_data.get("title"), "subtitle_channel": channel.get("channel_name"), @@ -257,26 +212,35 @@ class SubtitleParser: "subtitle_source": source, } - for match in self.matched: - match_id = match.get("id") - document_id = f"{video.youtube_id}-{self.lang}-{match_id}" - action = {"index": {"_index": "ta_subtitle", "_id": document_id}} - document.update( - { - "subtitle_fragment_id": document_id, - "subtitle_start": match.get("start"), - "subtitle_end": match.get("end"), - "subtitle_index": match_id, - "subtitle_line": " ".join(match.get("lines")), + _ = [i.update(meta_dict) for i in documents] + + return documents + + def chunk_list(self, youtube_id): + """join cues for bulk import""" + chunk_list = [] + + chunk = {} + for cue in self.all_cues: + if chunk: + text = f"{chunk.get('subtitle_line')} {cue.get('text')}\n" + chunk["subtitle_line"] = text + else: + idx = len(chunk_list) + 1 + chunk = { + "subtitle_index": idx, + "subtitle_line": cue.get("text"), + "subtitle_start": cue.get("start"), } - ) - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(document)) - bulk_list.append("\n") - query_str = "\n".join(bulk_list) + chunk["subtitle_fragment_id"] = f"{youtube_id}-{self.lang}-{idx}" - return query_str + if cue["idx"] % 5 == 0: + chunk["subtitle_end"] = cue.get("end") + chunk_list.append(chunk) + chunk = {} + + return chunk_list class YoutubeVideo(YouTubeItem, YoutubeSubtitle):