simplify channel overwrite handling in posprocessing

This commit is contained in:
Simon 2024-05-12 00:34:03 +02:00
parent d9ce9641e2
commit 2a9769d154
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 159 additions and 146 deletions

View File

@ -24,152 +24,6 @@ from home.src.ta.helper import ignore_filelist
from home.src.ta.settings import EnvironmentSettings
class DownloadPostProcess:
"""handle task to run after download queue finishes"""
def __init__(self, download):
self.download = download
self.now = int(datetime.now().timestamp())
self.pending = False
def run(self):
"""run all functions"""
self.pending = PendingList()
self.pending.get_download()
self.pending.get_channels()
self.pending.get_indexed()
self.auto_delete_all()
self.auto_delete_overwrites()
to_refresh = self.refresh_playlist()
self.match_videos(to_refresh)
self.get_comments()
def auto_delete_all(self):
"""handle auto delete"""
autodelete_days = self.download.config["downloads"]["autodelete_days"]
if not autodelete_days:
return
print(f"auto delete older than {autodelete_days} days")
now_lte = str(self.now - autodelete_days * 24 * 60 * 60)
data = {
"query": {"range": {"player.watched_date": {"lte": now_lte}}},
"sort": [{"player.watched_date": {"order": "asc"}}],
}
self._auto_delete_watched(data)
def auto_delete_overwrites(self):
"""handle per channel auto delete from overwrites"""
for channel_id, value in self.pending.channel_overwrites.items():
if "autodelete_days" in value:
autodelete_days = value.get("autodelete_days")
print(f"{channel_id}: delete older than {autodelete_days}d")
now_lte = str(self.now - autodelete_days * 24 * 60 * 60)
must_list = [
{"range": {"player.watched_date": {"lte": now_lte}}},
{"term": {"channel.channel_id": {"value": channel_id}}},
]
data = {
"query": {"bool": {"must": must_list}},
"sort": [{"player.watched_date": {"order": "desc"}}],
}
self._auto_delete_watched(data)
@staticmethod
def _auto_delete_watched(data):
"""delete watched videos after x days"""
to_delete = IndexPaginate("ta_video", data).get_results()
if not to_delete:
return
for video in to_delete:
youtube_id = video["youtube_id"]
print(f"{youtube_id}: auto delete video")
YoutubeVideo(youtube_id).delete_media_file()
print("add deleted to ignore list")
vids = [{"type": "video", "url": i["youtube_id"]} for i in to_delete]
pending = PendingList(youtube_ids=vids)
pending.parse_url_list()
_ = pending.add_to_pending(status="ignore")
def refresh_playlist(self) -> list[str]:
"""match videos with playlists"""
to_refresh = self._get_to_refresh_playlists()
total_playlist = len(to_refresh)
for idx, playlist_id in enumerate(to_refresh):
playlist = YoutubePlaylist(playlist_id)
playlist.update_playlist(skip_on_empty=True)
if not self.pending.task:
continue
channel_name = playlist.json_data["playlist_channel"]
playlist_title = playlist.json_data["playlist_name"]
message = [
f"Post Processing Playlists for: {channel_name}",
f"Validate: {playlist_title} - {idx + 1}/{total_playlist}",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
return to_refresh
def _get_to_refresh_playlists(self) -> list[str]:
"""get playlists to refresh"""
if self.download.task:
message = ["Post Processing Playlists", "Scanning for Playlists"]
self.download.task.send_progress(message)
to_refresh = []
for channel_id in self.download.channels:
channel = YoutubeChannel(channel_id)
overwrites = self.pending.channel_overwrites.get(channel_id, False)
if overwrites and overwrites.get("index_playlists"):
channel.get_all_playlists()
to_refresh.extend(channel.all_playlists)
subs = PlaylistSubscription().get_playlists()
for playlist in subs:
playlist_id = playlist["playlist_id"]
if playlist_id not in to_refresh:
to_refresh.append(playlist_id)
return to_refresh
def match_videos(self, to_refresh: list[str]) -> None:
"""scan rest of indexed playlists to match videos"""
data = {
"query": {
"bool": {"must_not": [{"terms": {"playlist_id": to_refresh}}]}
},
"_source": ["playlist_id"],
}
playlists = IndexPaginate("ta_playlist", data).get_results()
total_playlist = len(playlists)
for idx, to_match in enumerate(playlists):
playlist_id = to_match["playlist_id"]
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
playlist.add_vids_to_playlist()
playlist.remove_vids_from_playlist()
if not self.pending.task:
continue
message = [
"Post Processing Playlists.",
f"Validate Playlists: - {idx + 1}/{total_playlist}",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
def get_comments(self):
"""get comments from youtube"""
CommentList(self.download.videos, task=self.download.task).index()
class VideoDownloader:
"""
handle the video download functionality
@ -451,3 +305,162 @@ class VideoDownloader:
updated = response.get("updated")
if updated:
print(f"[download] reset auto start on {updated} videos.")
class DownloadPostProcess:
"""handle task to run after download queue finishes"""
def __init__(self, download: VideoDownloader) -> None:
self.download: VideoDownloader = download
self.now = int(datetime.now().timestamp())
self.channel_overwrites: dict | None = None
def run(self):
"""run all functions"""
self.channel_overwrites = self.get_channel_overwrites()
self.auto_delete_all()
self.auto_delete_overwrites()
to_refresh = self.refresh_playlist()
self.match_videos(to_refresh)
self.get_comments()
def get_channel_overwrites(self):
"""get overwrites"""
data = {
"query": {
"bool": {"must": [{"exists": {"field": "channel_overwrites"}}]}
},
"_source": ["channel_id", "channel_overwrites"],
}
result = IndexPaginate("ta_channel", data).get_results()
overwrites = {i["channel_id"]: i["channel_overwrites"] for i in result}
return overwrites
def auto_delete_all(self):
"""handle auto delete"""
autodelete_days = self.download.config["downloads"]["autodelete_days"]
if not autodelete_days:
return
print(f"auto delete older than {autodelete_days} days")
now_lte = str(self.now - autodelete_days * 24 * 60 * 60)
data = {
"query": {"range": {"player.watched_date": {"lte": now_lte}}},
"sort": [{"player.watched_date": {"order": "asc"}}],
}
self._auto_delete_watched(data)
def auto_delete_overwrites(self):
"""handle per channel auto delete from overwrites"""
for channel_id, value in self.channel_overwrites.items():
if "autodelete_days" in value:
autodelete_days = value.get("autodelete_days")
print(f"{channel_id}: delete older than {autodelete_days}d")
now_lte = str(self.now - autodelete_days * 24 * 60 * 60)
must_list = [
{"range": {"player.watched_date": {"lte": now_lte}}},
{"term": {"channel.channel_id": {"value": channel_id}}},
]
data = {
"query": {"bool": {"must": must_list}},
"sort": [{"player.watched_date": {"order": "desc"}}],
}
self._auto_delete_watched(data)
@staticmethod
def _auto_delete_watched(data):
"""delete watched videos after x days"""
to_delete = IndexPaginate("ta_video", data).get_results()
if not to_delete:
return
for video in to_delete:
youtube_id = video["youtube_id"]
print(f"{youtube_id}: auto delete video")
YoutubeVideo(youtube_id).delete_media_file()
print("add deleted to ignore list")
vids = [{"type": "video", "url": i["youtube_id"]} for i in to_delete]
pending = PendingList(youtube_ids=vids)
pending.parse_url_list()
_ = pending.add_to_pending(status="ignore")
def refresh_playlist(self) -> list[str]:
"""match videos with playlists"""
to_refresh = self._get_to_refresh_playlists()
total_playlist = len(to_refresh)
for idx, playlist_id in enumerate(to_refresh):
playlist = YoutubePlaylist(playlist_id)
playlist.update_playlist(skip_on_empty=True)
if self.download.task:
continue
channel_name = playlist.json_data["playlist_channel"]
playlist_title = playlist.json_data["playlist_name"]
message = [
f"Post Processing Playlists for: {channel_name}",
f"Validate: {playlist_title} - {idx + 1}/{total_playlist}",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
return to_refresh
def _get_to_refresh_playlists(self) -> list[str]:
"""get playlists to refresh"""
if self.download.task:
message = ["Post Processing Playlists", "Scanning for Playlists"]
self.download.task.send_progress(message)
to_refresh = []
for channel_id in self.download.channels:
channel = YoutubeChannel(channel_id)
channel.get_from_es()
overwrites = channel.get_overwrites()
if overwrites and overwrites.get("index_playlists"):
channel.get_all_playlists()
to_refresh.extend(channel.all_playlists)
subs = PlaylistSubscription().get_playlists()
for playlist in subs:
playlist_id = playlist["playlist_id"]
if playlist_id not in to_refresh:
to_refresh.append(playlist_id)
return to_refresh
def match_videos(self, to_refresh: list[str]) -> None:
"""scan rest of indexed playlists to match videos"""
must_not = [{"terms": {"playlist_id": to_refresh}}]
video_ids = list(self.download.videos)
must = [{"terms": {"playlist_entries.youtube_id": video_ids}}]
data = {
"query": {"bool": {"must_not": must_not, "must": must}},
"_source": ["playlist_id"],
}
playlists = IndexPaginate("ta_playlist", data).get_results()
total_playlist = len(playlists)
for idx, to_match in enumerate(playlists):
playlist_id = to_match["playlist_id"]
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
playlist.add_vids_to_playlist()
playlist.remove_vids_from_playlist()
if not self.download.task:
continue
message = [
"Post Processing Playlists.",
f"Validate Playlists: - {idx + 1}/{total_playlist}",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
def get_comments(self):
"""get comments from youtube"""
CommentList(self.download.videos, task=self.download.task).index()