mirror of
https://github.com/tubearchivist/tubearchivist.git
synced 2025-03-25 15:10:12 +00:00
369 lines
12 KiB
Python
369 lines
12 KiB
Python
"""
|
|
functionality:
|
|
- handle download and caching for thumbnails
|
|
- check for missing thumbnails
|
|
"""
|
|
|
|
import base64
|
|
import os
|
|
from io import BytesIO
|
|
from time import sleep
|
|
|
|
import requests
|
|
from home.src.download import queue # partial import
|
|
from home.src.es.connect import IndexPaginate
|
|
from home.src.ta.config import AppConfig
|
|
from mutagen.mp4 import MP4, MP4Cover
|
|
from PIL import Image, ImageFile, ImageFilter, UnidentifiedImageError
|
|
|
|
ImageFile.LOAD_TRUNCATED_IMAGES = True
|
|
|
|
|
|
class ThumbManagerBase:
|
|
"""base class for thumbnail management"""
|
|
|
|
CONFIG = AppConfig().config
|
|
CACHE_DIR = CONFIG["application"]["cache_dir"]
|
|
VIDEO_DIR = os.path.join(CACHE_DIR, "videos")
|
|
CHANNEL_DIR = os.path.join(CACHE_DIR, "channels")
|
|
PLAYLIST_DIR = os.path.join(CACHE_DIR, "playlists")
|
|
|
|
def __init__(self, item_id, item_type, fallback=False):
|
|
self.item_id = item_id
|
|
self.item_type = item_type
|
|
self.fallback = fallback
|
|
|
|
def download_raw(self, url):
|
|
"""download thumbnail for video"""
|
|
if not url:
|
|
return self.get_fallback()
|
|
|
|
for i in range(3):
|
|
try:
|
|
response = requests.get(url, stream=True, timeout=5)
|
|
if response.ok:
|
|
try:
|
|
return Image.open(response.raw)
|
|
except UnidentifiedImageError:
|
|
print(f"failed to open thumbnail: {url}")
|
|
return self.get_fallback()
|
|
|
|
if response.status_code == 404:
|
|
return self.get_fallback()
|
|
|
|
except requests.exceptions.RequestException:
|
|
print(f"{self.item_id}: retry thumbnail download {url}")
|
|
sleep((i + 1) ** i)
|
|
|
|
return False
|
|
|
|
def get_fallback(self):
|
|
"""get fallback thumbnail if not available"""
|
|
if self.fallback:
|
|
img_raw = Image.open(self.fallback)
|
|
return img_raw
|
|
|
|
app_root = self.CONFIG["application"]["app_root"]
|
|
default_map = {
|
|
"video": os.path.join(
|
|
app_root, "static/img/default-video-thumb.jpg"
|
|
),
|
|
"playlist": os.path.join(
|
|
app_root, "static/img/default-video-thumb.jpg"
|
|
),
|
|
"icon": os.path.join(
|
|
app_root, "static/img/default-channel-icon.jpg"
|
|
),
|
|
"banner": os.path.join(
|
|
app_root, "static/img/default-channel-banner.jpg"
|
|
),
|
|
}
|
|
|
|
img_raw = Image.open(default_map[self.item_type])
|
|
|
|
return img_raw
|
|
|
|
|
|
class ThumbManager(ThumbManagerBase):
|
|
"""handle thumbnails related functions"""
|
|
|
|
def __init__(self, item_id, item_type="video", fallback=False):
|
|
super().__init__(item_id, item_type, fallback=fallback)
|
|
|
|
def download(self, url):
|
|
"""download thumbnail"""
|
|
print(f"{self.item_id}: download {self.item_type} thumbnail")
|
|
if self.item_type == "video":
|
|
self.download_video_thumb(url)
|
|
elif self.item_type == "channel":
|
|
self.download_channel_art(url)
|
|
elif self.item_type == "playlist":
|
|
self.download_playlist_thumb(url)
|
|
|
|
def delete(self):
|
|
"""delete thumbnail file"""
|
|
print(f"{self.item_id}: delete {self.item_type} thumbnail")
|
|
if self.item_type == "video":
|
|
self.delete_video_thumb()
|
|
elif self.item_type == "channel":
|
|
self.delete_channel_thumb()
|
|
elif self.item_type == "playlist":
|
|
self.delete_playlist_thumb()
|
|
|
|
def download_video_thumb(self, url, skip_existing=False):
|
|
"""pass url for video thumbnail"""
|
|
folder_path = os.path.join(self.VIDEO_DIR, self.item_id[0].lower())
|
|
thumb_path = self.vid_thumb_path(absolute=True)
|
|
|
|
if skip_existing and os.path.exists(thumb_path):
|
|
return
|
|
|
|
os.makedirs(folder_path, exist_ok=True)
|
|
img_raw = self.download_raw(url)
|
|
width, height = img_raw.size
|
|
|
|
if not width / height == 16 / 9:
|
|
new_height = width / 16 * 9
|
|
offset = (height - new_height) / 2
|
|
img_raw = img_raw.crop((0, offset, width, height - offset))
|
|
|
|
img_raw.convert("RGB").save(thumb_path)
|
|
|
|
def vid_thumb_path(self, absolute=False, create_folder=False):
|
|
"""build expected path for video thumbnail from youtube_id"""
|
|
folder_name = self.item_id[0].lower()
|
|
folder_path = os.path.join("videos", folder_name)
|
|
thumb_path = os.path.join(folder_path, f"{self.item_id}.jpg")
|
|
if absolute:
|
|
thumb_path = os.path.join(self.CACHE_DIR, thumb_path)
|
|
|
|
if create_folder:
|
|
folder_path = os.path.join(self.CACHE_DIR, folder_path)
|
|
os.makedirs(folder_path, exist_ok=True)
|
|
|
|
return thumb_path
|
|
|
|
def download_channel_art(self, urls, skip_existing=False):
|
|
"""pass tuple of channel thumbnails"""
|
|
channel_thumb, channel_banner = urls
|
|
self._download_channel_thumb(channel_thumb, skip_existing)
|
|
self._download_channel_banner(channel_banner, skip_existing)
|
|
|
|
def _download_channel_thumb(self, channel_thumb, skip_existing):
|
|
"""download channel thumbnail"""
|
|
|
|
thumb_path = os.path.join(
|
|
self.CHANNEL_DIR, f"{self.item_id}_thumb.jpg"
|
|
)
|
|
self.item_type = "icon"
|
|
|
|
if skip_existing and os.path.exists(thumb_path):
|
|
return
|
|
|
|
img_raw = self.download_raw(channel_thumb)
|
|
img_raw.convert("RGB").save(thumb_path)
|
|
|
|
def _download_channel_banner(self, channel_banner, skip_existing):
|
|
"""download channel banner"""
|
|
|
|
banner_path = os.path.join(
|
|
self.CHANNEL_DIR, self.item_id + "_banner.jpg"
|
|
)
|
|
self.item_type = "banner"
|
|
if skip_existing and os.path.exists(banner_path):
|
|
return
|
|
|
|
img_raw = self.download_raw(channel_banner)
|
|
img_raw.convert("RGB").save(banner_path)
|
|
|
|
def download_playlist_thumb(self, url, skip_existing=False):
|
|
"""pass thumbnail url"""
|
|
thumb_path = os.path.join(self.PLAYLIST_DIR, f"{self.item_id}.jpg")
|
|
if skip_existing and os.path.exists(thumb_path):
|
|
return
|
|
|
|
img_raw = self.download_raw(url)
|
|
img_raw.convert("RGB").save(thumb_path)
|
|
|
|
def delete_video_thumb(self):
|
|
"""delete video thumbnail if exists"""
|
|
thumb_path = self.vid_thumb_path()
|
|
to_delete = os.path.join(self.CACHE_DIR, thumb_path)
|
|
if os.path.exists(to_delete):
|
|
os.remove(to_delete)
|
|
|
|
def delete_channel_thumb(self):
|
|
"""delete all artwork of channel"""
|
|
thumb = os.path.join(self.CHANNEL_DIR, f"{self.item_id}_thumb.jpg")
|
|
banner = os.path.join(self.CHANNEL_DIR, f"{self.item_id}_banner.jpg")
|
|
if os.path.exists(thumb):
|
|
os.remove(thumb)
|
|
if os.path.exists(banner):
|
|
os.remove(banner)
|
|
|
|
def delete_playlist_thumb(self):
|
|
"""delete playlist thumbnail"""
|
|
thumb_path = os.path.join(self.PLAYLIST_DIR, f"{self.item_id}.jpg")
|
|
if os.path.exists(thumb_path):
|
|
os.remove(thumb_path)
|
|
|
|
def get_vid_base64_blur(self):
|
|
"""return base64 encoded placeholder"""
|
|
file_path = os.path.join(self.CACHE_DIR, self.vid_thumb_path())
|
|
img_raw = Image.open(file_path)
|
|
img_raw.thumbnail((img_raw.width // 20, img_raw.height // 20))
|
|
img_blur = img_raw.filter(ImageFilter.BLUR)
|
|
buffer = BytesIO()
|
|
img_blur.save(buffer, format="JPEG")
|
|
img_data = buffer.getvalue()
|
|
img_base64 = base64.b64encode(img_data).decode()
|
|
data_url = f"data:image/jpg;base64,{img_base64}"
|
|
|
|
return data_url
|
|
|
|
|
|
class ValidatorCallback:
|
|
"""handle callback validate thumbnails page by page"""
|
|
|
|
def __init__(self, source, index_name):
|
|
self.source = source
|
|
self.index_name = index_name
|
|
|
|
def run(self):
|
|
"""run the task for page"""
|
|
print(f"{self.index_name}: validate artwork")
|
|
if self.index_name == "ta_video":
|
|
self._validate_videos()
|
|
elif self.index_name == "ta_channel":
|
|
self._validate_channels()
|
|
elif self.index_name == "ta_playlist":
|
|
self._validate_playlists()
|
|
|
|
def _validate_videos(self):
|
|
"""check if video thumbnails are correct"""
|
|
for video in self.source:
|
|
url = video["_source"]["vid_thumb_url"]
|
|
handler = ThumbManager(video["_source"]["youtube_id"])
|
|
handler.download_video_thumb(url, skip_existing=True)
|
|
|
|
def _validate_channels(self):
|
|
"""check if all channel artwork is there"""
|
|
for channel in self.source:
|
|
urls = (
|
|
channel["_source"]["channel_thumb_url"],
|
|
channel["_source"]["channel_banner_url"],
|
|
)
|
|
handler = ThumbManager(channel["_source"]["channel_id"])
|
|
handler.download_channel_art(urls, skip_existing=True)
|
|
|
|
def _validate_playlists(self):
|
|
"""check if all playlist artwork is there"""
|
|
for playlist in self.source:
|
|
url = playlist["_source"]["playlist_thumbnail"]
|
|
handler = ThumbManager(playlist["_source"]["playlist_id"])
|
|
handler.download_playlist_thumb(url, skip_existing=True)
|
|
|
|
|
|
class ThumbValidator:
|
|
"""validate thumbnails"""
|
|
|
|
def download_missing(self):
|
|
"""download all missing artwork"""
|
|
self.download_missing_videos()
|
|
self.download_missing_channels()
|
|
self.download_missing_playlists()
|
|
|
|
def download_missing_videos(self):
|
|
"""get all missing video thumbnails"""
|
|
data = {
|
|
"query": {"term": {"active": {"value": True}}},
|
|
"sort": [{"youtube_id": {"order": "asc"}}],
|
|
"_source": ["vid_thumb_url", "youtube_id"],
|
|
}
|
|
paginate = IndexPaginate(
|
|
"ta_video", data, size=5000, callback=ValidatorCallback
|
|
)
|
|
_ = paginate.get_results()
|
|
|
|
def download_missing_channels(self):
|
|
"""get all missing channel thumbnails"""
|
|
data = {
|
|
"query": {"term": {"channel_active": {"value": True}}},
|
|
"sort": [{"channel_id": {"order": "asc"}}],
|
|
"_source": {
|
|
"excludes": ["channel_description", "channel_overwrites"]
|
|
},
|
|
}
|
|
paginate = IndexPaginate(
|
|
"ta_channel", data, callback=ValidatorCallback
|
|
)
|
|
_ = paginate.get_results()
|
|
|
|
def download_missing_playlists(self):
|
|
"""get all missing playlist artwork"""
|
|
data = {
|
|
"query": {"term": {"playlist_active": {"value": True}}},
|
|
"sort": [{"playlist_id": {"order": "asc"}}],
|
|
"_source": ["playlist_id", "playlist_thumbnail"],
|
|
}
|
|
paginate = IndexPaginate(
|
|
"ta_playlist", data, callback=ValidatorCallback
|
|
)
|
|
_ = paginate.get_results()
|
|
|
|
|
|
class ThumbFilesystem:
|
|
"""filesystem tasks for thumbnails"""
|
|
|
|
CONFIG = AppConfig().config
|
|
CACHE_DIR = CONFIG["application"]["cache_dir"]
|
|
MEDIA_DIR = CONFIG["application"]["videos"]
|
|
VIDEO_DIR = os.path.join(CACHE_DIR, "videos")
|
|
|
|
def sync(self):
|
|
"""embed thumbnails to mediafiles"""
|
|
video_list = self.get_thumb_list()
|
|
self._embed_thumbs(video_list)
|
|
|
|
def get_thumb_list(self):
|
|
"""get list of mediafiles and matching thumbnails"""
|
|
pending = queue.PendingList()
|
|
pending.get_download()
|
|
pending.get_indexed()
|
|
|
|
video_list = []
|
|
for video in pending.all_videos:
|
|
video_id = video["youtube_id"]
|
|
media_url = os.path.join(self.MEDIA_DIR, video["media_url"])
|
|
thumb_path = os.path.join(
|
|
self.CACHE_DIR, ThumbManager(video_id).vid_thumb_path()
|
|
)
|
|
video_list.append(
|
|
{
|
|
"media_url": media_url,
|
|
"thumb_path": thumb_path,
|
|
}
|
|
)
|
|
|
|
return video_list
|
|
|
|
@staticmethod
|
|
def _embed_thumbs(video_list):
|
|
"""rewrite the thumbnail into media file"""
|
|
|
|
counter = 1
|
|
for video in video_list:
|
|
# loop through all videos
|
|
media_url = video["media_url"]
|
|
thumb_path = video["thumb_path"]
|
|
|
|
mutagen_vid = MP4(media_url)
|
|
with open(thumb_path, "rb") as f:
|
|
mutagen_vid["covr"] = [
|
|
MP4Cover(f.read(), imageformat=MP4Cover.FORMAT_JPEG)
|
|
]
|
|
mutagen_vid.save()
|
|
if counter % 50 == 0:
|
|
print(f"thumbnail write progress {counter}/{len(video_list)}")
|
|
counter = counter + 1
|