diff --git a/tubearchivist/home/src/index/filesystem.py b/tubearchivist/home/src/index/filesystem.py index 9dd7b47..dfd556c 100644 --- a/tubearchivist/home/src/index/filesystem.py +++ b/tubearchivist/home/src/index/filesystem.py @@ -157,7 +157,178 @@ class FilesystemScanner: _, _ = ElasticWrap(path).delete() -class ManualImport: +class ImportFolderScanner: + """import and indexing existing video files + - identify all media files belonging to a video + - identify youtube id + - convert if needed + """ + + CONFIG = AppConfig().config + CACHE_DIR = CONFIG["application"]["cache_dir"] + IMPORT_DIR = os.path.join(CACHE_DIR, "import") + + EXT_MAP = { + "media": [".mp4", ".mkv", ".webm"], + "metadata": [".json"], + "thumb": [".jpg", ".png", ".webp"], + "subtitle": [".vtt"], + } + + def __init__(self): + self.to_import = False + + def scan(self): + """scan and match media files""" + all_files = self.get_all_files() + self.match_files(all_files) + self.process_videos() + + return self.to_import + + def get_all_files(self): + """get all files in /import""" + all_files = ignore_filelist(os.listdir(self.IMPORT_DIR)) + all_files.sort() + + return all_files + + @staticmethod + def _get_template(): + """base dict for video""" + return { + "media": False, + "video_id": False, + "metadata": False, + "thumb": False, + "subtitle": [], + } + + def match_files(self, all_files): + """loop through all files, join what matches""" + self.to_import = [] + + current_video = self._get_template() + last_base = False + + for file_path in all_files: + base_name_raw, ext = os.path.splitext(file_path) + base_name, _ = os.path.splitext(base_name_raw) + + key, file_path = self._detect_type(file_path, ext) + if not key or not file_path: + continue + + if base_name != last_base: + if last_base: + self.to_import.append(current_video) + + current_video = self._get_template() + last_base = base_name + + if key == "subtitle": + current_video["subtitle"].append(file_path) + else: + current_video[key] = file_path + + if current_video.get("media"): + self.to_import.append(current_video) + + def _detect_type(self, file_path, ext): + """detect metadata type for file""" + + for key, value in self.EXT_MAP.items(): + if ext in value: + return key, file_path + + return False, False + + def process_videos(self): + """loop through all videos""" + for current_video in self.to_import: + self._detect_youtube_id(current_video) + self._dump_thumb(current_video) + self._convert_video(current_video) + + def _detect_youtube_id(self, current_video): + """find video id from filename or json""" + print(current_video) + youtube_id = self._extract_id_from_filename(current_video["media"]) + if youtube_id: + current_video["video_id"] = youtube_id + return + + youtube_id = self._extract_id_from_json(current_video["metadata"]) + if youtube_id: + current_video["video_id"] = youtube_id + return + + print(current_video["media"]) + raise ValueError("failed to find video id") + + @staticmethod + def _extract_id_from_filename(file_name): + """ + look at the file name for the youtube id + expects filename ending in []. + """ + base_name, _ = os.path.splitext(file_name) + id_search = re.search(r"\[([a-zA-Z0-9_-]{11})\]$", base_name) + if id_search: + youtube_id = id_search.group(1) + return youtube_id + + print(f"id extraction failed from filename: {file_name}") + + return False + + def _extract_id_from_json(self, json_file): + """open json file and extract id""" + json_path = os.path.join(self.CACHE_DIR, "import", json_file) + with open(json_path, "r", encoding="utf-8") as f: + json_content = f.read() + + youtube_id = json.loads(json_content)["id"] + + return youtube_id + + def _dump_thumb(self, current_video): + """extract embedded thumb before converting""" + if current_video["thumb"]: + return + + # write thumb to disk here + # ffmpeg -dump_attachment:t "" -i filename.mkv + # ffmpeg -i video.mp4 -map 0:v -map -0:V -c copy cover.jpg + # webm + + def _convert_video(self, current_video): + """convert if needed""" + current_path = os.path.join( + self.CACHE_DIR, "import", current_video["media"] + ) + base_path, ext = os.path.splitext(current_path) + if ext == ".mp4": + return + + new_path = base_path + ".mp4" + subprocess.run( + [ + "ffmpeg", + "-i", + current_path, + new_path, + "-loglevel", + "warning", + "-stats", + ], + check=True, + ) + current_video["media"] = new_path + os.remove(current_path) + + +class ManualImportOld: """import and indexing existing video files""" CONFIG = AppConfig().config