diff --git a/tubearchivist/home/src/index/video.py b/tubearchivist/home/src/index/video.py index e22e52d..ab5c22b 100644 --- a/tubearchivist/home/src/index/video.py +++ b/tubearchivist/home/src/index/video.py @@ -5,6 +5,7 @@ functionality: """ import os +import re from datetime import datetime import requests @@ -121,6 +122,90 @@ class YoutubeSubtitle: print(f"{self.youtube_id}: failed to download subtitle") +class SubtitleParser: + """parse subtitle str from youtube""" + + time_reg = r"^([0-9]{2}:?){3}\.[0-9]{3} --> ([0-9]{2}:?){3}\.[0-9]{3}" + stamp_reg = r"<([0-9]{2}:?){3}\.[0-9]{3}>" + tag_reg = r"" + + def __init__(self, subtitle_str): + self.subtitle_str = subtitle_str + self.header = False + self.parsed_cue_list = False + self.all_text_lines = False + self.matched = False + + def process(self): + """collection to process subtitle string""" + self._parse_cues() + self._match_text_lines() + self._add_id() + + def _parse_cues(self): + """split into cues""" + all_cues = self.subtitle_str.replace("\n \n", "\n").split("\n\n") + self.header = all_cues[0] + self.all_text_lines = [] + self.parsed_cue_list = [self._cue_cleaner(i) for i in all_cues[1:]] + + def _cue_cleaner(self, cue): + """parse single cue""" + all_lines = cue.split("\n") + cue_dict = {"lines": []} + + for line in all_lines: + if re.match(r"^([0-9]{2}:?){3}", line): + clean = re.search(self.time_reg, line).group() + start, end = clean.split(" --> ") + cue_dict.update({"start": start, "end": end}) + else: + clean = re.sub(self.stamp_reg, "", line) + clean = re.sub(self.tag_reg, "", clean) + cue_dict["lines"].append(clean) + if clean and clean not in self.all_text_lines: + self.all_text_lines.append(clean) + + return cue_dict + + def _match_text_lines(self): + """match unique text lines with timestamps""" + + self.matched = [] + + while self.all_text_lines: + check = self.all_text_lines[0] + matches = [i for i in self.parsed_cue_list if check in i["lines"]] + new_cue = matches[-1] + new_cue["start"] = matches[0]["start"] + + for line in new_cue["lines"]: + try: + self.all_text_lines.remove(line) + except ValueError: + print("failed to process:") + print(line) + + self.matched.append(new_cue) + + def _add_id(self): + """add id to matched cues""" + for idx, _ in enumerate(self.matched): + self.matched[idx]["id"] = idx + 1 + + def get_subtitle_str(self): + """stitch cues and return processed new string""" + new_subtitle_str = self.header + "\n\n" + + for cue in self.matched: + timestamp = f"{cue.get('start')} --> {cue.get('end')}" + lines = "\n".join(cue.get("lines")) + cue_text = f"{cue.get('id')}\n{timestamp}\n{lines}\n\n" + new_subtitle_str = new_subtitle_str + cue_text + + return new_subtitle_str + + class YoutubeVideo(YouTubeItem, YoutubeSubtitle): """represents a single youtube video"""