417 lines
14 KiB
Python
417 lines
14 KiB
Python
"""
|
|
functionality:
|
|
- periodically refresh documents
|
|
- index and update in es
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
from datetime import datetime
|
|
from time import sleep
|
|
|
|
from home.src.download.queue import PendingList
|
|
from home.src.download.thumbnails import ThumbManager
|
|
from home.src.download.yt_dlp_base import CookieHandler
|
|
from home.src.download.yt_dlp_handler import VideoDownloader
|
|
from home.src.es.connect import ElasticWrap, IndexPaginate
|
|
from home.src.index.channel import YoutubeChannel
|
|
from home.src.index.comments import Comments
|
|
from home.src.index.playlist import YoutubePlaylist
|
|
from home.src.index.video import YoutubeVideo
|
|
from home.src.ta.config import AppConfig
|
|
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
|
|
|
|
|
|
class ReindexBase:
|
|
"""base config class for reindex task"""
|
|
|
|
REINDEX_CONFIG = [
|
|
{
|
|
"index_name": "ta_video",
|
|
"index_type": "videos",
|
|
"queue_name": "reindex:ta_video",
|
|
"active_key": "active",
|
|
"refresh_key": "vid_last_refresh",
|
|
},
|
|
{
|
|
"index_name": "ta_channel",
|
|
"index_type": "channels",
|
|
"queue_name": "reindex:ta_channel",
|
|
"active_key": "channel_active",
|
|
"refresh_key": "channel_last_refresh",
|
|
},
|
|
{
|
|
"index_name": "ta_playlist",
|
|
"index_type": "playlists",
|
|
"queue_name": "reindex:ta_playlist",
|
|
"active_key": "playlist_active",
|
|
"refresh_key": "playlist_last_refresh",
|
|
},
|
|
]
|
|
|
|
MULTIPLY = 1.2
|
|
|
|
def __init__(self):
|
|
self.config = AppConfig().config
|
|
self.now = int(datetime.now().strftime("%s"))
|
|
|
|
def populate(self, all_ids, reindex_config):
|
|
"""add all to reindex ids to redis queue"""
|
|
if not all_ids:
|
|
return
|
|
|
|
RedisQueue(queue_name=reindex_config["queue_name"]).add_list(all_ids)
|
|
|
|
|
|
class ReindexOutdated(ReindexBase):
|
|
"""add outdated documents to reindex queue"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.interval = self.config["scheduler"]["check_reindex_days"]
|
|
|
|
def add_outdated(self):
|
|
"""add outdated documents"""
|
|
for reindex_config in self.REINDEX_CONFIG:
|
|
total_hits = self._get_total_hits(reindex_config)
|
|
daily_should = self._get_daily_should(total_hits)
|
|
all_ids = self._get_outdated_ids(reindex_config, daily_should)
|
|
self.populate(all_ids, reindex_config)
|
|
|
|
@staticmethod
|
|
def _get_total_hits(reindex_config):
|
|
"""get total hits from index"""
|
|
index_name = reindex_config["index_name"]
|
|
active_key = reindex_config["active_key"]
|
|
path = f"{index_name}/_search?filter_path=hits.total"
|
|
data = {"query": {"match": {active_key: True}}}
|
|
response, _ = ElasticWrap(path).post(data=data)
|
|
total_hits = response["hits"]["total"]["value"]
|
|
return total_hits
|
|
|
|
def _get_daily_should(self, total_hits):
|
|
"""calc how many should reindex daily"""
|
|
daily_should = int((total_hits // self.interval + 1) * self.MULTIPLY)
|
|
if daily_should >= 10000:
|
|
daily_should = 9999
|
|
|
|
return daily_should
|
|
|
|
def _get_outdated_ids(self, reindex_config, daily_should):
|
|
"""get outdated from index_name"""
|
|
index_name = reindex_config["index_name"]
|
|
refresh_key = reindex_config["refresh_key"]
|
|
now_lte = self.now - self.interval * 24 * 60 * 60
|
|
must_list = [
|
|
{"match": {"active": True}},
|
|
{"range": {refresh_key: {"lte": now_lte}}},
|
|
]
|
|
data = {
|
|
"size": daily_should,
|
|
"query": {"bool": {"must": must_list}},
|
|
"sort": [{refresh_key: {"order": "asc"}}],
|
|
"_source": False,
|
|
}
|
|
response, _ = ElasticWrap(f"{index_name}/_search").get(data=data)
|
|
|
|
all_ids = [i["_id"] for i in response["hits"]["hits"]]
|
|
return all_ids
|
|
|
|
|
|
class ReindexManual(ReindexBase):
|
|
"""
|
|
manually add ids to reindex queue from API
|
|
data_example = {
|
|
"videos": ["video1", "video2", "video3"],
|
|
"channels": ["channel1", "channel2", "channel3"],
|
|
"playlists": ["playlist1", "playlist2"],
|
|
}
|
|
extract_videos to also reindex all videos of channel/playlist
|
|
"""
|
|
|
|
def __init__(self, extract_videos=False):
|
|
super().__init__()
|
|
self.extract_videos = extract_videos
|
|
self.data = False
|
|
|
|
def extract_data(self, data):
|
|
"""process data"""
|
|
self.data = data
|
|
for key, values in self.data.items():
|
|
reindex_config = self._get_reindex_config(key)
|
|
self.process_index(reindex_config, values)
|
|
|
|
def _get_reindex_config(self, index_type):
|
|
"""get reindex config for index"""
|
|
|
|
for reindex_config in self.REINDEX_CONFIG:
|
|
if reindex_config["index_type"] == index_type:
|
|
return reindex_config
|
|
|
|
print(f"reindex type {index_type} not valid")
|
|
raise ValueError
|
|
|
|
def process_index(self, index_config, values):
|
|
"""process values per index"""
|
|
index_name = index_config["index_name"]
|
|
if index_name == "ta_video":
|
|
self._add_videos(values)
|
|
elif index_name == "ta_channel":
|
|
self._add_channels(values)
|
|
elif index_name == "ta_playlist":
|
|
self._add_playlists(values)
|
|
|
|
def _add_videos(self, values):
|
|
"""add list of videos to reindex queue"""
|
|
if not values:
|
|
return
|
|
|
|
RedisQueue("reindex:ta_video").add_list(values)
|
|
|
|
def _add_channels(self, values):
|
|
"""add list of channels to reindex queue"""
|
|
RedisQueue("reindex:ta_channel").add_list(values)
|
|
|
|
if self.extract_videos:
|
|
for channel_id in values:
|
|
all_videos = self._get_channel_videos(channel_id)
|
|
self._add_videos(all_videos)
|
|
|
|
def _add_playlists(self, values):
|
|
"""add list of playlists to reindex queue"""
|
|
RedisQueue("reindex:ta_playlist").add_list(values)
|
|
|
|
if self.extract_videos:
|
|
for playlist_id in values:
|
|
all_videos = self._get_playlist_videos(playlist_id)
|
|
self._add_videos(all_videos)
|
|
|
|
def _get_channel_videos(self, channel_id):
|
|
"""get all videos from channel"""
|
|
data = {
|
|
"query": {"term": {"channel.channel_id": {"value": channel_id}}},
|
|
"_source": ["youtube_id"],
|
|
}
|
|
all_results = IndexPaginate("ta_video", data).get_results()
|
|
return [i["youtube_id"] for i in all_results]
|
|
|
|
def _get_playlist_videos(self, playlist_id):
|
|
"""get all videos from playlist"""
|
|
data = {
|
|
"query": {"term": {"playlist.keyword": {"value": playlist_id}}},
|
|
"_source": ["youtube_id"],
|
|
}
|
|
all_results = IndexPaginate("ta_video", data).get_results()
|
|
return [i["youtube_id"] for i in all_results]
|
|
|
|
|
|
class Reindex(ReindexBase):
|
|
"""reindex all documents from redis queue"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.all_indexed_ids = False
|
|
|
|
def reindex_all(self):
|
|
"""reindex all in queue"""
|
|
if self.cookie_invalid():
|
|
print("[reindex] cookie invalid, exiting...")
|
|
return
|
|
|
|
for index_config in self.REINDEX_CONFIG:
|
|
if not RedisQueue(index_config["queue_name"]).has_item():
|
|
continue
|
|
|
|
while True:
|
|
has_next = self.reindex_index(index_config)
|
|
if not has_next:
|
|
break
|
|
|
|
RedisArchivist().set_message("last_reindex", self.now)
|
|
|
|
def reindex_index(self, index_config):
|
|
"""reindex all of a single index"""
|
|
reindex = self.get_reindex_map(index_config["index_name"])
|
|
youtube_id = RedisQueue(index_config["queue_name"]).get_next()
|
|
if youtube_id:
|
|
reindex(youtube_id)
|
|
sleep_interval = self.config["downloads"].get("sleep_interval", 0)
|
|
sleep(sleep_interval)
|
|
|
|
return bool(youtube_id)
|
|
|
|
def get_reindex_map(self, index_name):
|
|
"""return def to run for index"""
|
|
def_map = {
|
|
"ta_video": self._reindex_single_video,
|
|
"ta_channel": self._reindex_single_channel,
|
|
"ta_playlist": self._reindex_single_playlist,
|
|
}
|
|
|
|
return def_map.get(index_name)
|
|
|
|
def _reindex_single_video(self, youtube_id):
|
|
"""wrapper to handle channel name changes"""
|
|
try:
|
|
self._reindex_single_video_call(youtube_id)
|
|
except FileNotFoundError:
|
|
ChannelUrlFixer(youtube_id, self.config)
|
|
self._reindex_single_video_call(youtube_id)
|
|
|
|
def _reindex_single_video_call(self, youtube_id):
|
|
"""refresh data for single video"""
|
|
video = YoutubeVideo(youtube_id)
|
|
|
|
# read current state
|
|
video.get_from_es()
|
|
player = video.json_data["player"]
|
|
date_downloaded = video.json_data["date_downloaded"]
|
|
channel_dict = video.json_data["channel"]
|
|
playlist = video.json_data.get("playlist")
|
|
subtitles = video.json_data.get("subtitles")
|
|
|
|
# get new
|
|
video.build_json()
|
|
if not video.youtube_meta:
|
|
video.deactivate()
|
|
return
|
|
|
|
video.delete_subtitles(subtitles=subtitles)
|
|
video.check_subtitles()
|
|
|
|
# add back
|
|
video.json_data["player"] = player
|
|
video.json_data["date_downloaded"] = date_downloaded
|
|
video.json_data["channel"] = channel_dict
|
|
if playlist:
|
|
video.json_data["playlist"] = playlist
|
|
|
|
video.upload_to_es()
|
|
|
|
thumb_handler = ThumbManager(youtube_id)
|
|
thumb_handler.delete_video_thumb()
|
|
thumb_handler.download_video_thumb(video.json_data["vid_thumb_url"])
|
|
|
|
Comments(youtube_id, config=self.config).reindex_comments()
|
|
|
|
return
|
|
|
|
@staticmethod
|
|
def _reindex_single_channel(channel_id):
|
|
"""refresh channel data and sync to videos"""
|
|
channel = YoutubeChannel(channel_id)
|
|
channel.get_from_es()
|
|
subscribed = channel.json_data["channel_subscribed"]
|
|
overwrites = channel.json_data.get("channel_overwrites", False)
|
|
channel.get_from_youtube()
|
|
if not channel.json_data:
|
|
channel.deactivate()
|
|
channel.get_from_es()
|
|
channel.sync_to_videos()
|
|
return
|
|
|
|
channel.json_data["channel_subscribed"] = subscribed
|
|
if overwrites:
|
|
channel.json_data["channel_overwrites"] = overwrites
|
|
channel.upload_to_es()
|
|
channel.sync_to_videos()
|
|
|
|
def _reindex_single_playlist(self, playlist_id):
|
|
"""refresh playlist data"""
|
|
self._get_all_videos()
|
|
playlist = YoutubePlaylist(playlist_id)
|
|
playlist.get_from_es()
|
|
subscribed = playlist.json_data["playlist_subscribed"]
|
|
playlist.all_youtube_ids = self.all_indexed_ids
|
|
playlist.build_json(scrape=True)
|
|
if not playlist.json_data:
|
|
playlist.deactivate()
|
|
return
|
|
|
|
playlist.json_data["playlist_subscribed"] = subscribed
|
|
playlist.upload_to_es()
|
|
return
|
|
|
|
def _get_all_videos(self):
|
|
"""add all videos for playlist index validation"""
|
|
if self.all_indexed_ids:
|
|
return
|
|
|
|
handler = PendingList()
|
|
handler.get_download()
|
|
handler.get_indexed()
|
|
self.all_indexed_ids = [i["youtube_id"] for i in handler.all_videos]
|
|
|
|
def cookie_invalid(self):
|
|
"""return true if cookie is enabled and invalid"""
|
|
if not self.config["downloads"]["cookie_import"]:
|
|
return False
|
|
|
|
valid = CookieHandler(self.config).validate()
|
|
return valid
|
|
|
|
|
|
def reindex_outdated():
|
|
"""reindex all outdated"""
|
|
ReindexOutdated().add_outdated()
|
|
Reindex().reindex_all()
|
|
|
|
|
|
class ChannelUrlFixer:
|
|
"""fix not matching channel names in reindex"""
|
|
|
|
def __init__(self, youtube_id, config):
|
|
self.youtube_id = youtube_id
|
|
self.config = config
|
|
self.video = False
|
|
|
|
def run(self):
|
|
"""check and run if needed"""
|
|
print(f"{self.youtube_id}: failed to build channel path, try to fix.")
|
|
video_path_is, video_folder_is = self.get_as_is()
|
|
if not os.path.exists(video_path_is):
|
|
print(f"giving up reindex, video in video: {self.video.json_data}")
|
|
raise ValueError
|
|
|
|
_, video_folder_should = self.get_as_should()
|
|
|
|
if video_folder_is != video_folder_should:
|
|
self.process(video_path_is)
|
|
else:
|
|
print(f"{self.youtube_id}: skip channel url fixer")
|
|
|
|
def get_as_is(self):
|
|
"""get video object as is"""
|
|
self.video = YoutubeVideo(self.youtube_id)
|
|
self.video.get_from_es()
|
|
video_path_is = os.path.join(
|
|
self.config["application"]["videos"],
|
|
self.video.json_data["media_url"],
|
|
)
|
|
video_folder_is = os.path.split(video_path_is)[0]
|
|
|
|
return video_path_is, video_folder_is
|
|
|
|
def get_as_should(self):
|
|
"""add fresh metadata from remote"""
|
|
self.video.get_from_youtube()
|
|
self.video.add_file_path()
|
|
|
|
video_path_should = os.path.join(
|
|
self.config["application"]["videos"],
|
|
self.video.json_data["media_url"],
|
|
)
|
|
video_folder_should = os.path.split(video_path_should)[0]
|
|
return video_path_should, video_folder_should
|
|
|
|
def process(self, video_path_is):
|
|
"""fix filepath"""
|
|
print(f"{self.youtube_id}: fixing channel rename.")
|
|
cache_dir = self.config["application"]["cache_dir"]
|
|
new_path = os.path.join(
|
|
cache_dir, "download", self.youtube_id + ".mp4"
|
|
)
|
|
shutil.move(video_path_is, new_path, copy_function=shutil.copyfile)
|
|
VideoDownloader().move_to_archive(self.video.json_data)
|
|
self.video.update_media_url()
|