mirror of
https://github.com/tubearchivist/tubearchivist-frontend.git
synced 2024-11-22 11:50:14 +00:00
rewrite SubtitleParser, #180
This commit is contained in:
parent
40c8e6d146
commit
c186798e78
@ -6,7 +6,6 @@ functionality:
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
@ -65,7 +64,7 @@ class YoutubeSubtitle:
|
|||||||
if not all_formats:
|
if not all_formats:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
subtitle = [i for i in all_formats if i["ext"] == "vtt"][0]
|
subtitle = [i for i in all_formats if i["ext"] == "json3"][0]
|
||||||
subtitle.update(
|
subtitle.update(
|
||||||
{"lang": lang, "source": "auto", "media_url": media_url}
|
{"lang": lang, "source": "auto", "media_url": media_url}
|
||||||
)
|
)
|
||||||
@ -102,7 +101,7 @@ class YoutubeSubtitle:
|
|||||||
# no user subtitles found
|
# no user subtitles found
|
||||||
return False
|
return False
|
||||||
|
|
||||||
subtitle = [i for i in all_formats if i["ext"] == "vtt"][0]
|
subtitle = [i for i in all_formats if i["ext"] == "json3"][0]
|
||||||
subtitle.update(
|
subtitle.update(
|
||||||
{"lang": lang, "source": "user", "media_url": media_url}
|
{"lang": lang, "source": "user", "media_url": media_url}
|
||||||
)
|
)
|
||||||
@ -145,109 +144,65 @@ class YoutubeSubtitle:
|
|||||||
class SubtitleParser:
|
class SubtitleParser:
|
||||||
"""parse subtitle str from youtube"""
|
"""parse subtitle str from youtube"""
|
||||||
|
|
||||||
time_reg = r"^([0-9]{2}:?){3}\.[0-9]{3} --> ([0-9]{2}:?){3}\.[0-9]{3}"
|
|
||||||
stamp_reg = r"<([0-9]{2}:?){3}\.[0-9]{3}>"
|
|
||||||
tag_reg = r"</?c>"
|
|
||||||
|
|
||||||
def __init__(self, subtitle_str, lang):
|
def __init__(self, subtitle_str, lang):
|
||||||
self.subtitle_str = subtitle_str
|
self.subtitle_raw = json.loads(subtitle_str)
|
||||||
self.lang = lang
|
self.lang = lang
|
||||||
self.header = False
|
self.all_cues = False
|
||||||
self.parsed_cue_list = False
|
|
||||||
self.all_text_lines = False
|
|
||||||
self.matched = False
|
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
"""collection to process subtitle string"""
|
"""extract relevant que data"""
|
||||||
self._parse_cues()
|
self.all_cues = []
|
||||||
self._match_text_lines()
|
for idx, event in enumerate(self.subtitle_raw.get("events")):
|
||||||
self._add_id()
|
cue = {
|
||||||
self._timestamp_check()
|
"start": self.ms_conv(event["tStartMs"]),
|
||||||
|
"end": self.ms_conv(event["tStartMs"] + event["dDurationMs"]),
|
||||||
|
"text": "".join([i.get("utf8") for i in event["segs"]]),
|
||||||
|
"idx": idx + 1,
|
||||||
|
}
|
||||||
|
self.all_cues.append(cue)
|
||||||
|
|
||||||
def _parse_cues(self):
|
@staticmethod
|
||||||
"""split into cues"""
|
def ms_conv(ms):
|
||||||
all_cues = self.subtitle_str.replace("\n \n", "\n").split("\n\n")
|
"""convert ms to timestamp"""
|
||||||
self.header = all_cues[0]
|
hours = str((ms // (1000 * 60 * 60)) % 24).zfill(2)
|
||||||
self.all_text_lines = []
|
minutes = str((ms // (1000 * 60)) % 60).zfill(2)
|
||||||
self.parsed_cue_list = [self._cue_cleaner(i) for i in all_cues[1:]]
|
secs = str((ms // 1000) % 60).zfill(2)
|
||||||
|
millis = str(ms % 1000).zfill(3)
|
||||||
|
|
||||||
def _cue_cleaner(self, cue):
|
return f"{hours}:{minutes}:{secs}.{millis}"
|
||||||
"""parse single cue"""
|
|
||||||
all_lines = cue.split("\n")
|
|
||||||
cue_dict = {"lines": []}
|
|
||||||
|
|
||||||
for line in all_lines:
|
|
||||||
if re.match(self.time_reg, line):
|
|
||||||
clean = re.search(self.time_reg, line).group()
|
|
||||||
start, end = clean.split(" --> ")
|
|
||||||
cue_dict.update({"start": start, "end": end})
|
|
||||||
else:
|
|
||||||
clean = re.sub(self.stamp_reg, "", line)
|
|
||||||
clean = re.sub(self.tag_reg, "", clean)
|
|
||||||
cue_dict["lines"].append(clean)
|
|
||||||
if clean.strip() and clean not in self.all_text_lines[-4:]:
|
|
||||||
# remove immediate duplicates
|
|
||||||
self.all_text_lines.append(clean)
|
|
||||||
|
|
||||||
return cue_dict
|
|
||||||
|
|
||||||
def _match_text_lines(self):
|
|
||||||
"""match unique text lines with timestamps"""
|
|
||||||
|
|
||||||
self.matched = []
|
|
||||||
|
|
||||||
while self.all_text_lines:
|
|
||||||
check = self.all_text_lines[0]
|
|
||||||
matches = [i for i in self.parsed_cue_list if check in i["lines"]]
|
|
||||||
new_cue = matches[-1]
|
|
||||||
new_cue["start"] = matches[0]["start"]
|
|
||||||
|
|
||||||
for line in new_cue["lines"]:
|
|
||||||
try:
|
|
||||||
self.all_text_lines.remove(line)
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
self.matched.append(new_cue)
|
|
||||||
|
|
||||||
def _timestamp_check(self):
|
|
||||||
"""check if end timestamp is bigger than start timestamp"""
|
|
||||||
for idx, cue in enumerate(self.matched):
|
|
||||||
# this
|
|
||||||
end = int(re.sub("[^0-9]", "", cue.get("end")))
|
|
||||||
# next
|
|
||||||
try:
|
|
||||||
next_cue = self.matched[idx + 1]
|
|
||||||
except IndexError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
start_next = int(re.sub("[^0-9]", "", next_cue.get("start")))
|
|
||||||
if end > start_next:
|
|
||||||
self.matched[idx]["end"] = next_cue.get("start")
|
|
||||||
|
|
||||||
def _add_id(self):
|
|
||||||
"""add id to matched cues"""
|
|
||||||
for idx, _ in enumerate(self.matched):
|
|
||||||
self.matched[idx]["id"] = idx + 1
|
|
||||||
|
|
||||||
def get_subtitle_str(self):
|
def get_subtitle_str(self):
|
||||||
"""stitch cues and return processed new string"""
|
"""create vtt text str from cues"""
|
||||||
new_subtitle_str = self.header + "\n\n"
|
subtitle_str = f"WEBVTT\nKind: captions\nLanguage: {self.lang}"
|
||||||
|
|
||||||
for cue in self.matched:
|
for cue in self.all_cues:
|
||||||
timestamp = f"{cue.get('start')} --> {cue.get('end')}"
|
stamp = f"{cue.get('start')} --> {cue.get('end')}"
|
||||||
lines = "\n".join(cue.get("lines"))
|
cue_text = f"\n\n{cue.get('idx')}\n{stamp}\n{cue.get('text')}"
|
||||||
cue_text = f"{cue.get('id')}\n{timestamp}\n{lines}\n\n"
|
subtitle_str = subtitle_str + cue_text
|
||||||
new_subtitle_str = new_subtitle_str + cue_text
|
|
||||||
|
|
||||||
return new_subtitle_str
|
return subtitle_str
|
||||||
|
|
||||||
def create_bulk_import(self, video, source):
|
def create_bulk_import(self, video, source):
|
||||||
"""process matched for es import"""
|
"""subtitle lines for es import"""
|
||||||
|
documents = self.create_documents(video, source)
|
||||||
bulk_list = []
|
bulk_list = []
|
||||||
channel = video.json_data.get("channel")
|
|
||||||
|
|
||||||
document = {
|
for document in documents:
|
||||||
|
document_id = document.get("subtitle_fragment_id")
|
||||||
|
action = {"index": {"_index": "ta_subtitle", "_id": document_id}}
|
||||||
|
bulk_list.append(json.dumps(action))
|
||||||
|
bulk_list.append(json.dumps(document))
|
||||||
|
|
||||||
|
bulk_list.append("\n")
|
||||||
|
query_str = "\n".join(bulk_list)
|
||||||
|
|
||||||
|
return query_str
|
||||||
|
|
||||||
|
def create_documents(self, video, source):
|
||||||
|
"""process documents"""
|
||||||
|
documents = self.chunk_list(video.youtube_id)
|
||||||
|
channel = video.json_data.get("channel")
|
||||||
|
meta_dict = {
|
||||||
"youtube_id": video.youtube_id,
|
"youtube_id": video.youtube_id,
|
||||||
"title": video.json_data.get("title"),
|
"title": video.json_data.get("title"),
|
||||||
"subtitle_channel": channel.get("channel_name"),
|
"subtitle_channel": channel.get("channel_name"),
|
||||||
@ -257,26 +212,35 @@ class SubtitleParser:
|
|||||||
"subtitle_source": source,
|
"subtitle_source": source,
|
||||||
}
|
}
|
||||||
|
|
||||||
for match in self.matched:
|
_ = [i.update(meta_dict) for i in documents]
|
||||||
match_id = match.get("id")
|
|
||||||
document_id = f"{video.youtube_id}-{self.lang}-{match_id}"
|
return documents
|
||||||
action = {"index": {"_index": "ta_subtitle", "_id": document_id}}
|
|
||||||
document.update(
|
def chunk_list(self, youtube_id):
|
||||||
{
|
"""join cues for bulk import"""
|
||||||
"subtitle_fragment_id": document_id,
|
chunk_list = []
|
||||||
"subtitle_start": match.get("start"),
|
|
||||||
"subtitle_end": match.get("end"),
|
chunk = {}
|
||||||
"subtitle_index": match_id,
|
for cue in self.all_cues:
|
||||||
"subtitle_line": " ".join(match.get("lines")),
|
if chunk:
|
||||||
|
text = f"{chunk.get('subtitle_line')} {cue.get('text')}\n"
|
||||||
|
chunk["subtitle_line"] = text
|
||||||
|
else:
|
||||||
|
idx = len(chunk_list) + 1
|
||||||
|
chunk = {
|
||||||
|
"subtitle_index": idx,
|
||||||
|
"subtitle_line": cue.get("text"),
|
||||||
|
"subtitle_start": cue.get("start"),
|
||||||
}
|
}
|
||||||
)
|
|
||||||
bulk_list.append(json.dumps(action))
|
|
||||||
bulk_list.append(json.dumps(document))
|
|
||||||
|
|
||||||
bulk_list.append("\n")
|
chunk["subtitle_fragment_id"] = f"{youtube_id}-{self.lang}-{idx}"
|
||||||
query_str = "\n".join(bulk_list)
|
|
||||||
|
|
||||||
return query_str
|
if cue["idx"] % 5 == 0:
|
||||||
|
chunk["subtitle_end"] = cue.get("end")
|
||||||
|
chunk_list.append(chunk)
|
||||||
|
chunk = {}
|
||||||
|
|
||||||
|
return chunk_list
|
||||||
|
|
||||||
|
|
||||||
class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
|
class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
|
||||||
|
Loading…
Reference in New Issue
Block a user