WIP: new import folder parser for offline import

This commit is contained in:
simon 2022-07-26 19:51:47 +07:00
parent 36560735f2
commit 26cc7846c6
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 172 additions and 1 deletions

View File

@ -157,7 +157,178 @@ class FilesystemScanner:
_, _ = ElasticWrap(path).delete()
class ManualImport:
class ImportFolderScanner:
"""import and indexing existing video files
- identify all media files belonging to a video
- identify youtube id
- convert if needed
"""
CONFIG = AppConfig().config
CACHE_DIR = CONFIG["application"]["cache_dir"]
IMPORT_DIR = os.path.join(CACHE_DIR, "import")
EXT_MAP = {
"media": [".mp4", ".mkv", ".webm"],
"metadata": [".json"],
"thumb": [".jpg", ".png", ".webp"],
"subtitle": [".vtt"],
}
def __init__(self):
self.to_import = False
def scan(self):
"""scan and match media files"""
all_files = self.get_all_files()
self.match_files(all_files)
self.process_videos()
return self.to_import
def get_all_files(self):
"""get all files in /import"""
all_files = ignore_filelist(os.listdir(self.IMPORT_DIR))
all_files.sort()
return all_files
@staticmethod
def _get_template():
"""base dict for video"""
return {
"media": False,
"video_id": False,
"metadata": False,
"thumb": False,
"subtitle": [],
}
def match_files(self, all_files):
"""loop through all files, join what matches"""
self.to_import = []
current_video = self._get_template()
last_base = False
for file_path in all_files:
base_name_raw, ext = os.path.splitext(file_path)
base_name, _ = os.path.splitext(base_name_raw)
key, file_path = self._detect_type(file_path, ext)
if not key or not file_path:
continue
if base_name != last_base:
if last_base:
self.to_import.append(current_video)
current_video = self._get_template()
last_base = base_name
if key == "subtitle":
current_video["subtitle"].append(file_path)
else:
current_video[key] = file_path
if current_video.get("media"):
self.to_import.append(current_video)
def _detect_type(self, file_path, ext):
"""detect metadata type for file"""
for key, value in self.EXT_MAP.items():
if ext in value:
return key, file_path
return False, False
def process_videos(self):
"""loop through all videos"""
for current_video in self.to_import:
self._detect_youtube_id(current_video)
self._dump_thumb(current_video)
self._convert_video(current_video)
def _detect_youtube_id(self, current_video):
"""find video id from filename or json"""
print(current_video)
youtube_id = self._extract_id_from_filename(current_video["media"])
if youtube_id:
current_video["video_id"] = youtube_id
return
youtube_id = self._extract_id_from_json(current_video["metadata"])
if youtube_id:
current_video["video_id"] = youtube_id
return
print(current_video["media"])
raise ValueError("failed to find video id")
@staticmethod
def _extract_id_from_filename(file_name):
"""
look at the file name for the youtube id
expects filename ending in [<youtube_id>].<ext>
"""
base_name, _ = os.path.splitext(file_name)
id_search = re.search(r"\[([a-zA-Z0-9_-]{11})\]$", base_name)
if id_search:
youtube_id = id_search.group(1)
return youtube_id
print(f"id extraction failed from filename: {file_name}")
return False
def _extract_id_from_json(self, json_file):
"""open json file and extract id"""
json_path = os.path.join(self.CACHE_DIR, "import", json_file)
with open(json_path, "r", encoding="utf-8") as f:
json_content = f.read()
youtube_id = json.loads(json_content)["id"]
return youtube_id
def _dump_thumb(self, current_video):
"""extract embedded thumb before converting"""
if current_video["thumb"]:
return
# write thumb to disk here
# ffmpeg -dump_attachment:t "" -i filename.mkv
# ffmpeg -i video.mp4 -map 0:v -map -0:V -c copy cover.jpg
# webm
def _convert_video(self, current_video):
"""convert if needed"""
current_path = os.path.join(
self.CACHE_DIR, "import", current_video["media"]
)
base_path, ext = os.path.splitext(current_path)
if ext == ".mp4":
return
new_path = base_path + ".mp4"
subprocess.run(
[
"ffmpeg",
"-i",
current_path,
new_path,
"-loglevel",
"warning",
"-stats",
],
check=True,
)
current_video["media"] = new_path
os.remove(current_path)
class ManualImportOld:
"""import and indexing existing video files"""
CONFIG = AppConfig().config