refactor using new filesystem structure

This commit is contained in:
Simon 2023-07-28 14:39:51 +07:00
parent 8ffd2cc7ac
commit 1837f9776b
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
3 changed files with 102 additions and 112 deletions

View File

@ -5,7 +5,7 @@ import os
import requests import requests
from src.config import get_config from src.config import get_config
from src.static_types import ConfigType, TAVideo from src.static_types import ConfigType, TAChannel, TAVideo
CONFIG: ConfigType = get_config() CONFIG: ConfigType = get_config()
EXPECTED_ENV = {"ta_url", "ta_token", "jf_url", "jf_token", "ta_video_path"} EXPECTED_ENV = {"ta_url", "ta_token", "jf_url", "jf_token", "ta_video_path"}
@ -65,19 +65,27 @@ class TubeArchivist:
headers: dict = {"Authorization": f"Token {ta_token}"} headers: dict = {"Authorization": f"Token {ta_token}"}
base: str = CONFIG["ta_url"] base: str = CONFIG["ta_url"]
def get(self, path: str) -> TAVideo: def get_video(self, video_id: str) -> TAVideo:
"""get document from ta""" """get video metadata"""
url: str = f"{self.base}/api/{path}" url: str = f"{self.base}/api/video/{video_id}/"
response = requests.get(url, headers=self.headers, timeout=10) response = requests.get(url, headers=self.headers, timeout=10)
if response.ok: if response.ok:
response_json = response.json() ta_video: TAVideo = response.json()["data"]
if "data" in response_json: return ta_video
return response.json().get("data")
return response.json() raise ValueError(f"video not found in TA: {url}")
raise ValueError(f"video not found in TA: {path}") def get_channel(self, channel_id: str) -> TAChannel | None:
"""get channel metadata"""
url: str = f"{self.base}/api/channel/{channel_id}/"
response = requests.get(url, headers=self.headers, timeout=10)
if response.ok:
ta_channel: TAChannel = response.json()["data"]
return ta_channel
print(f"channel not found in TA: {url}")
return None
def get_thumb(self, path: str) -> bytes: def get_thumb(self, path: str) -> bytes:
"""get encoded thumbnail from ta""" """get encoded thumbnail from ta"""
@ -91,7 +99,9 @@ class TubeArchivist:
def ping(self) -> None: def ping(self) -> None:
"""ping tubearchivist server""" """ping tubearchivist server"""
response = self.get("ping/") url: str = f"{self.base}/api/ping/"
response = requests.get(url, headers=self.headers, timeout=10)
if not response: if not response:
raise ConnectionError("failed to connect to tube archivist") raise ConnectionError("failed to connect to tube archivist")

View File

@ -13,23 +13,20 @@ class Episode:
self.youtube_id: str = youtube_id self.youtube_id: str = youtube_id
self.jf_id: str = jf_id self.jf_id: str = jf_id
def sync(self) -> None: def get_ta_video(self) -> TAVideo:
"""get ta metadata"""
ta_video: TAVideo = TubeArchivist().get_video(self.youtube_id)
return ta_video
def sync(self, ta_video: TAVideo) -> None:
"""sync episode metadata""" """sync episode metadata"""
ta_video: TAVideo = self.get_ta_video()
self.update_metadata(ta_video) self.update_metadata(ta_video)
self.update_artwork(ta_video) self.update_artwork(ta_video)
def get_ta_video(self) -> TAVideo:
"""get video metadata from ta"""
path: str = f"/video/{self.youtube_id}"
ta_video: TAVideo = TubeArchivist().get(path)
return ta_video
def update_metadata(self, ta_video: TAVideo) -> None: def update_metadata(self, ta_video: TAVideo) -> None:
"""update jellyfin metadata from item_id""" """update jellyfin metadata from item_id"""
published: str = ta_video["published"] published: str = ta_video["published"]
published_date: datetime = datetime.strptime(published, "%d %b, %Y") published_date: datetime = datetime.fromisoformat(published)
data: dict = { data: dict = {
"Id": self.jf_id, "Id": self.jf_id,
"Name": ta_video.get("title"), "Name": ta_video.get("title"),

View File

@ -33,10 +33,10 @@ class Library:
all_shows: list[JFShow] = self._get_all_series()["Items"] all_shows: list[JFShow] = self._get_all_series()["Items"]
for show in all_shows: for show in all_shows:
show_handler = Show(show) show_handler = Show(show)
folders: list[str] = show_handler.create_folders()
show_handler.validate_show() show_handler.validate_show()
show_handler.validate_episodes() folders: list[str] | None = show_handler.validate_episodes()
show_handler.delete_folders(folders) if folders:
show_handler.delete_folders(folders)
collection_id: str = self._get_collection() collection_id: str = self._get_collection()
self.set_collection_art(collection_id) self.set_collection_art(collection_id)
@ -78,81 +78,6 @@ class Show:
def __init__(self, show: JFShow): def __init__(self, show: JFShow):
self.show: JFShow = show self.show: JFShow = show
def _get_all_episodes(
self,
filter_new: bool = False,
limit: int | bool = False,
) -> list[JFEpisode]:
"""get all episodes of show"""
series_id: str = self.show["Id"]
path: str = f"Shows/{series_id}/Episodes?fields=Path,Studios"
if limit:
path = f"{path}&limit={limit}"
all_episodes = Jellyfin().get(path)
all_items: list[JFEpisode] = all_episodes["Items"]
if filter_new:
all_items = [i for i in all_items if not i["Studios"]]
return all_items
def _get_expected_seasons(self) -> set[str]:
"""get all expected seasons"""
episodes: list[JFEpisode] = self._get_all_episodes()
all_years: set[str] = {
os.path.split(i["Path"])[-1][:4] for i in episodes
}
return all_years
def _get_existing_seasons(self) -> list[str]:
"""get all seasons indexed of series"""
series_id: str = self.show["Id"]
path: str = f"Shows/{series_id}/Seasons"
all_seasons: dict = Jellyfin().get(path)
return [str(i.get("IndexNumber")) for i in all_seasons["Items"]]
def create_folders(self) -> list[str]:
"""create season folders if needed"""
all_expected: set[str] = self._get_expected_seasons()
all_existing: list[str] = self._get_existing_seasons()
base: str = get_config()["ta_video_path"]
channel_name: str = os.path.split(self.show["Path"])[-1]
folders: list[str] = []
for year in all_expected:
if year not in all_existing:
path: str = os.path.join(base, channel_name, year)
if not os.path.exists(path):
os.mkdir(path)
folders.append(path)
self._wait_for_seasons()
return folders
def delete_folders(self, folders: list[str]) -> None:
"""delete temporary folders created"""
for folder in folders:
os.removedirs(folder)
def _wait_for_seasons(self) -> None:
"""wait for seasons to be created"""
jf_id: str = self.show["Id"]
path: str = f"Items/{jf_id}/Refresh?Recursive=true&ImageRefreshMode=Default&MetadataRefreshMode=Default" # noqa: E501
Jellyfin().post(path, False)
for _ in range(12):
all_existing: set[str] = set(self._get_existing_seasons())
all_expected: set[str] = self._get_expected_seasons()
if all_expected.issubset(all_existing):
return
print(f"[setup][{jf_id}] waiting for seasons to be created")
sleep(5)
raise TimeoutError("timeout reached for creating season folder")
def validate_show(self) -> None: def validate_show(self) -> None:
"""set show metadata""" """set show metadata"""
ta_channel: TAChannel | None = self._get_ta_channel() ta_channel: TAChannel | None = self._get_ta_channel()
@ -163,15 +88,8 @@ class Show:
def _get_ta_channel(self) -> TAChannel | None: def _get_ta_channel(self) -> TAChannel | None:
"""get ta channel metadata""" """get ta channel metadata"""
episodes: list[JFEpisode] = self._get_all_episodes(limit=1) channel_id: str = self.show["Path"].split("/")[-1]
if not episodes: ta_channel: TAChannel | None = TubeArchivist().get_channel(channel_id)
return None
episode: JFEpisode = episodes[0]
youtube_id: str = os.path.split(episode["Path"])[-1][9:20]
path = f"/video/{youtube_id}"
ta_video: TAVideo = TubeArchivist().get(path)
ta_channel: TAChannel = ta_video["channel"]
return ta_channel return ta_channel
@ -213,15 +131,80 @@ class Show:
tvart = TubeArchivist().get_thumb(ta_channel["channel_tvart_url"]) tvart = TubeArchivist().get_thumb(ta_channel["channel_tvart_url"])
jf_handler.post_img(f"Items/{jf_id}/Images/Backdrop", tvart) jf_handler.post_img(f"Items/{jf_id}/Images/Backdrop", tvart)
def validate_episodes(self) -> None: def validate_episodes(self) -> list[str] | None:
"""sync all episodes""" """sync all episodes"""
showname: str = self.show["Name"] showname: str = self.show["Name"]
new_episodes: list[JFEpisode] = self._get_all_episodes(filter_new=True) new_episodes: list[JFEpisode] = self._get_all_episodes(filter_new=True)
if not new_episodes: if not new_episodes:
print(f"[show][{showname}] no new videos found") print(f"[show][{showname}] no new videos found")
return return None
print(f"[show][{showname}] indexing {len(new_episodes)} videos") print(f"[show][{showname}] indexing {len(new_episodes)} videos")
for video in new_episodes: seasons_created: list[str] = []
youtube_id: str = os.path.split(video["Path"])[-1][9:20] for jf_ep in new_episodes:
Episode(youtube_id, video["Id"]).sync() youtube_id: str = os.path.basename(jf_ep["Path"]).split(".")[0]
episode_handler = Episode(youtube_id, jf_ep["Id"])
ta_video: TAVideo = episode_handler.get_ta_video()
season_folder: str | None = self.create_season(ta_video, jf_ep)
episode_handler.sync(ta_video)
if season_folder:
seasons_created.append(season_folder)
return seasons_created
def _get_all_episodes(self, filter_new: bool = False) -> list[JFEpisode]:
"""get all episodes of show"""
series_id: str = self.show["Id"]
path: str = f"Shows/{series_id}/Episodes?fields=Path,Studios"
all_episodes = Jellyfin().get(path)
all_items: list[JFEpisode] = all_episodes["Items"]
if filter_new:
all_items = [i for i in all_items if not i["Studios"]]
return all_items
def create_season(self, ta_video: TAVideo, jf_ep: JFEpisode) -> str | None:
"""create season folders"""
existing_seasons = self._get_existing_seasons()
expected_season = ta_video["published"].split("-")[0]
if expected_season in existing_seasons:
return None
base: str = get_config()["ta_video_path"]
channel_folder = os.path.split(os.path.split(jf_ep["Path"])[0])[-1]
season_folder = os.path.join(base, channel_folder, expected_season)
os.makedirs(season_folder)
self._wait_for_season(expected_season)
return season_folder
def _wait_for_season(self, expected_season: str) -> None:
"""wait for season to be created in JF"""
jf_id: str = self.show["Id"]
path: str = f"Items/{jf_id}/Refresh?Recursive=true&ImageRefreshMode=Default&MetadataRefreshMode=Default" # noqa: E501
Jellyfin().post(path, False)
for _ in range(12):
all_existing: set[str] = set(self._get_existing_seasons())
if expected_season in all_existing:
return
print(f"[setup][{jf_id}] waiting for seasons to be created")
sleep(5)
raise TimeoutError("timeout reached for creating season folder")
def _get_existing_seasons(self) -> list[str]:
"""get all seasons indexed of series"""
series_id: str = self.show["Id"]
path: str = f"Shows/{series_id}/Seasons"
all_seasons: dict = Jellyfin().get(path)
return [str(i.get("IndexNumber")) for i in all_seasons["Items"]]
def delete_folders(self, folders: list[str]) -> None:
"""delete temporary folders created"""
for folder in folders:
os.removedirs(folder)