squash index.py refactor commits

This commit is contained in:
simon 2022-01-22 17:48:54 +07:00
parent 2f4d4e715b
commit b64a2f8195
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
7 changed files with 464 additions and 806 deletions

View File

@ -15,6 +15,7 @@ from time import sleep
import requests import requests
import yt_dlp import yt_dlp
from home.src.config import AppConfig from home.src.config import AppConfig
from home.src.es import IndexPaginate
from home.src.helper import ( from home.src.helper import (
DurationConverter, DurationConverter,
RedisArchivist, RedisArchivist,
@ -23,7 +24,6 @@ from home.src.helper import (
ignore_filelist, ignore_filelist,
) )
from home.src.index import ( from home.src.index import (
IndexPaginate,
YoutubeChannel, YoutubeChannel,
YoutubePlaylist, YoutubePlaylist,
YoutubeVideo, YoutubeVideo,
@ -69,7 +69,9 @@ class PendingList:
missing_videos = missing_videos + youtube_ids missing_videos = missing_videos + youtube_ids
elif url_type == "playlist": elif url_type == "playlist":
self.missing_from_playlists.append(entry) self.missing_from_playlists.append(entry)
video_results = YoutubePlaylist(url).get_entries() playlist = YoutubePlaylist(url)
playlist.build_json()
video_results = playlist.json_data.get("playlist_entries")
youtube_ids = [i["youtube_id"] for i in video_results] youtube_ids = [i["youtube_id"] for i in video_results]
missing_videos = missing_videos + youtube_ids missing_videos = missing_videos + youtube_ids
@ -346,34 +348,14 @@ class ChannelSubscription:
return missing_videos return missing_videos
def change_subscribe(self, channel_id, channel_subscribed): @staticmethod
def change_subscribe(channel_id, channel_subscribed):
"""subscribe or unsubscribe from channel and update""" """subscribe or unsubscribe from channel and update"""
if not isinstance(channel_subscribed, bool): channel = YoutubeChannel(channel_id)
print("invalid status, should be bool") channel.build_json()
return channel.json_data["channel_subscribed"] = channel_subscribed
headers = {"Content-type": "application/json"} channel.upload_to_es()
channel_handler = YoutubeChannel(channel_id) channel.sync_to_videos()
channel_dict = channel_handler.channel_dict
channel_dict["channel_subscribed"] = channel_subscribed
if channel_subscribed:
# handle subscribe
url = self.es_url + "/ta_channel/_doc/" + channel_id
payload = json.dumps(channel_dict)
print(channel_dict)
else:
url = self.es_url + "/ta_channel/_update/" + channel_id
payload = json.dumps({"doc": channel_dict})
# update channel
request = requests.post(
url, data=payload, headers=headers, auth=self.es_auth
)
if not request.ok:
print(request.text)
raise ValueError("failed change subscribe status")
# sync to videos
channel_handler.sync_to_videos()
if channel_handler.source == "scraped":
channel_handler.get_channel_art()
class PlaylistSubscription: class PlaylistSubscription:
@ -413,20 +395,15 @@ class PlaylistSubscription:
print(f"{playlist_id} not a playlist, skipping...") print(f"{playlist_id} not a playlist, skipping...")
continue continue
playlist_h = YoutubePlaylist( playlist_h = YoutubePlaylist(playlist_id)
playlist_id, all_youtube_ids=all_youtube_ids playlist_h.all_youtube_ids = all_youtube_ids
) playlist_h.build_json()
if not playlist_h.get_es_playlist(): playlist_h.json_data["playlist_subscribed"] = subscribed
playlist_h.get_playlist_dict() playlist_h.upload_to_es()
playlist_h.playlist_dict["playlist_subscribed"] = subscribed playlist_h.add_vids_to_playlist()
playlist_h.upload_to_es() self.channel_validate(playlist_h.json_data["playlist_channel_id"])
playlist_h.add_vids_to_playlist() thumb = playlist_h.json_data["playlist_thumbnail"]
thumb = playlist_h.playlist_dict["playlist_thumbnail"] new_thumbs.append((playlist_id, thumb))
new_thumbs.append((playlist_id, thumb))
self.channel_validate(playlist_h)
else:
self.change_subscribe(playlist_id, subscribe_status=True)
# notify # notify
message = { message = {
"status": "message:subplaylist", "status": "message:subplaylist",
@ -441,41 +418,18 @@ class PlaylistSubscription:
return new_thumbs return new_thumbs
@staticmethod @staticmethod
def channel_validate(playlist_handler): def channel_validate(channel_id):
"""make sure channel of playlist is there""" """make sure channel of playlist is there"""
channel_id = playlist_handler.playlist_dict["playlist_channel_id"] channel = YoutubeChannel(channel_id)
channel_handler = YoutubeChannel(channel_id) channel.build_json()
if channel_handler.source == "scraped":
channel_handler.channel_dict["channel_subscribed"] = False
channel_handler.upload_to_es()
channel_handler.get_channel_art()
def change_subscribe(self, playlist_id, subscribe_status): @staticmethod
def change_subscribe(playlist_id, subscribe_status):
"""change the subscribe status of a playlist""" """change the subscribe status of a playlist"""
es_url = self.config["application"]["es_url"] playlist = YoutubePlaylist(playlist_id)
es_auth = self.config["application"]["es_auth"] playlist.build_json()
playlist_handler = YoutubePlaylist(playlist_id) playlist.json_data["playlist_subscribed"] = subscribe_status
playlist_handler.get_playlist_dict() playlist.upload_to_es()
subed_now = playlist_handler.playlist_dict["playlist_subscribed"]
if subed_now == subscribe_status:
# status already as expected, do nothing
return False
# update subscribed status
headers = {"Content-type": "application/json"}
url = f"{es_url}/ta_playlist/_update/{playlist_id}"
payload = json.dumps(
{"doc": {"playlist_subscribed": subscribe_status}}
)
response = requests.post(
url, data=payload, headers=headers, auth=es_auth
)
if not response.ok:
print(response.text)
raise ValueError("failed to change subscribe status")
return True
@staticmethod @staticmethod
def get_to_ignore(): def get_to_ignore():
@ -493,26 +447,25 @@ class PlaylistSubscription:
to_ignore = self.get_to_ignore() to_ignore = self.get_to_ignore()
missing_videos = [] missing_videos = []
counter = 1 for idx, playlist_id in enumerate(all_playlists):
for playlist_id in all_playlists:
size_limit = self.config["subscriptions"]["channel_size"] size_limit = self.config["subscriptions"]["channel_size"]
playlist_handler = YoutubePlaylist(playlist_id) playlist = YoutubePlaylist(playlist_id)
playlist = playlist_handler.update_playlist() playlist.update_playlist()
if not playlist: if not playlist:
playlist_handler.deactivate() playlist.deactivate()
continue continue
playlist_entries = playlist.json_data["playlist_entries"]
if size_limit: if size_limit:
playlist_entries = playlist["playlist_entries"][:size_limit] del playlist_entries[size_limit:]
else:
playlist_entries = playlist["playlist_entries"]
all_missing = [i for i in playlist_entries if not i["downloaded"]] all_missing = [i for i in playlist_entries if not i["downloaded"]]
message = { message = {
"status": "message:rescan", "status": "message:rescan",
"level": "info", "level": "info",
"title": "Scanning playlists: Looking for new videos.", "title": "Scanning playlists: Looking for new videos.",
"message": f"Progress: {counter}/{len(all_playlists)}", "message": f"Progress: {idx + 1}/{len(all_playlists)}",
} }
RedisArchivist().set_message("message:rescan", message=message) RedisArchivist().set_message("message:rescan", message=message)
@ -520,7 +473,6 @@ class PlaylistSubscription:
youtube_id = video["youtube_id"] youtube_id = video["youtube_id"]
if youtube_id not in to_ignore: if youtube_id not in to_ignore:
missing_videos.append(youtube_id) missing_videos.append(youtube_id)
counter = counter + 1
return missing_videos return missing_videos
@ -751,15 +703,15 @@ class VideoDownloader:
playlists = YoutubeChannel(channel_id).get_indexed_playlists() playlists = YoutubeChannel(channel_id).get_indexed_playlists()
all_playlist_ids = [i["playlist_id"] for i in playlists] all_playlist_ids = [i["playlist_id"] for i in playlists]
for id_p, playlist_id in enumerate(all_playlist_ids): for id_p, playlist_id in enumerate(all_playlist_ids):
playlist_handler = YoutubePlaylist( playlist = YoutubePlaylist(playlist_id)
playlist_id, all_youtube_ids=all_youtube_ids playlist.all_youtube_ids = all_youtube_ids
) playlist.build_json(scrape=True)
playlist_dict = playlist_handler.update_playlist() if not playlist.json_data:
if not playlist_dict: playlist.deactivate()
playlist_handler.deactivate()
continue playlist.add_vids_to_playlist()
playlist.upload_to_es()
playlist_handler.add_vids_to_playlist()
# notify # notify
title = ( title = (
"Processing playlists for channels: " "Processing playlists for channels: "

View File

@ -29,7 +29,6 @@ class ElasticWrap:
def get(self, data=False): def get(self, data=False):
"""get data from es""" """get data from es"""
if data: if data:
response = requests.get(self.url, json=data, auth=self.auth) response = requests.get(self.url, json=data, auth=self.auth)
else: else:
@ -39,7 +38,7 @@ class ElasticWrap:
return response.json(), response.status_code return response.json(), response.status_code
def post(self, data, ndjson=False): def post(self, data=False, ndjson=False):
"""post data to es""" """post data to es"""
if ndjson: if ndjson:
headers = {"Content-type": "application/x-ndjson"} headers = {"Content-type": "application/x-ndjson"}
@ -48,20 +47,23 @@ class ElasticWrap:
headers = {"Content-type": "application/json"} headers = {"Content-type": "application/json"}
payload = json.dumps(data) payload = json.dumps(data)
response = requests.post( if data:
self.url, data=payload, header=headers, auth=self.auth response = requests.post(
) self.url, data=payload, headers=headers, auth=self.auth
)
else:
response = requests.post(self.url, headers=headers, auth=self.auth)
if not response.ok: if not response.ok:
print(response.text) print(response.text)
return response.json(), response.status_code return response.json(), response.status_code
def put(self, data): def put(self, data, refresh=False):
"""put data to es""" """put data to es"""
response = requests.put( if refresh:
f"{self.url}/?refresh=true", json=data, auth=self.auth self.url = f"{self.url}/?refresh=true"
) response = requests.put(f"{self.url}", json=data, auth=self.auth)
if not response.ok: if not response.ok:
print(response.text) print(response.text)
print(data) print(data)
@ -69,10 +71,74 @@ class ElasticWrap:
return response.json(), response.status_code return response.json(), response.status_code
def delete(self): def delete(self, data=False):
"""delete document from es""" """delete document from es"""
response = requests.delete(self.url, auth=self.auth) if data:
response = requests.delete(self.url, json=data, auth=self.auth)
else:
response = requests.delete(self.url, auth=self.auth)
if not response.ok: if not response.ok:
print(response.text) print(response.text)
return response.json(), response.status_code return response.json(), response.status_code
class IndexPaginate:
"""use search_after to go through whole index"""
DEFAULT_SIZE = 500
def __init__(self, index_name, data, size=False):
self.index_name = index_name
self.data = data
self.pit_id = False
self.size = size
def get_results(self):
"""get all results"""
self.get_pit()
self.validate_data()
all_results = self.run_loop()
self.clean_pit()
return all_results
def get_pit(self):
"""get pit for index"""
path = f"{self.index_name}/_pit?keep_alive=10m"
response, _ = ElasticWrap(path).post()
self.pit_id = response["id"]
def validate_data(self):
"""add pit and size to data"""
if "sort" not in self.data.keys():
print(self.data)
raise ValueError("missing sort key in data")
size = self.size or self.DEFAULT_SIZE
self.data["size"] = size
self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"}
def run_loop(self):
"""loop through results until last hit"""
all_results = []
while True:
response, _ = ElasticWrap("_search").get(data=self.data)
all_hits = response["hits"]["hits"]
if all_hits:
for hit in all_hits:
source = hit["_source"]
search_after = hit["sort"]
all_results.append(source)
# update search_after with last hit data
self.data["search_after"] = search_after
else:
break
return all_results
def clean_pit(self):
"""delete pit from elastic search"""
data = {"id": self.pit_id}
ElasticWrap("_pit").delete(data=data)

View File

@ -306,7 +306,7 @@ class PostData:
playlist_dict = self.exec_val playlist_dict = self.exec_val
playlist_id = playlist_dict["playlist-id"] playlist_id = playlist_dict["playlist-id"]
playlist_action = playlist_dict["playlist-action"] playlist_action = playlist_dict["playlist-action"]
print(f"delete {playlist_action} from playlist {playlist_id}") print(f"{playlist_id}: delete playlist {playlist_action}")
if playlist_action == "metadata": if playlist_action == "metadata":
YoutubePlaylist(playlist_id).delete_metadata() YoutubePlaylist(playlist_id).delete_metadata()
elif playlist_action == "all": elif playlist_action == "all":

File diff suppressed because it is too large Load Diff

View File

@ -189,87 +189,73 @@ class Reindex:
all_channels = channel_sub_handler.get_channels(subscribed_only=False) all_channels = channel_sub_handler.get_channels(subscribed_only=False)
all_channel_ids = [i["channel_id"] for i in all_channels] all_channel_ids = [i["channel_id"] for i in all_channels]
counter = 1
for channel_id in all_channel_ids: for channel_id in all_channel_ids:
channel_index = YoutubeChannel(channel_id) channel = YoutubeChannel(channel_id)
subscribed = channel_index.channel_dict["channel_subscribed"] subscribed = channel.json_data["channel_subscribed"]
channel_index.channel_dict = channel_index.build_channel_dict( channel.get_from_youtube()
scrape=True channel.json_data["channel_subscribed"] = subscribed
) channel.upload_to_es()
channel_index.channel_dict["channel_subscribed"] = subscribed channel.sync_to_videos()
channel_index.upload_to_es()
channel_index.sync_to_videos()
counter = counter + 1
if sleep_interval: if sleep_interval:
sleep(sleep_interval) sleep(sleep_interval)
@staticmethod @staticmethod
def reindex_single_video(youtube_id): def reindex_single_video(youtube_id):
"""refresh data for single video""" """refresh data for single video"""
vid_handler = YoutubeVideo(youtube_id) video = YoutubeVideo(youtube_id)
vid_handler.get_vid_dict()
if not vid_handler.vid_dict:
# stop if deactivated
vid_handler.deactivate()
return
es_vid_dict = vid_handler.get_es_data() # read current state
player = es_vid_dict["_source"]["player"] video.get_from_es()
date_downloaded = es_vid_dict["_source"]["date_downloaded"] player = video.json_data["player"]
channel_dict = es_vid_dict["_source"]["channel"] date_downloaded = video.json_data["date_downloaded"]
channel_name = channel_dict["channel_name"] channel_dict = video.json_data["channel"]
try: playlist = video.json_data.get("playlist")
playlist = es_vid_dict["_source"]["playlist"]
except KeyError:
playlist = False
vid_handler.build_file_path(channel_name) # get new
# add to vid_dict video.build_json()
vid_handler.vid_dict["player"] = player if not video.json_data:
vid_handler.vid_dict["date_downloaded"] = date_downloaded video.deactivate()
vid_handler.vid_dict["channel"] = channel_dict
# add back
video.json_data["player"] = player
video.json_data["date_downloaded"] = date_downloaded
video.json_data["channel"] = channel_dict
if playlist: if playlist:
vid_handler.vid_dict["playlist"] = playlist video.json_data["playlist"] = playlist
# update
vid_handler.upload_to_es() video.upload_to_es()
thumb_handler = ThumbManager() thumb_handler = ThumbManager()
thumb_handler.delete_vid_thumb(youtube_id) thumb_handler.delete_vid_thumb(youtube_id)
to_download = (youtube_id, vid_handler.vid_dict["vid_thumb_url"]) to_download = (youtube_id, video.json_data["vid_thumb_url"])
thumb_handler.download_vid([to_download], notify=False) thumb_handler.download_vid([to_download], notify=False)
@staticmethod @staticmethod
def reindex_single_channel(channel_id): def reindex_single_channel(channel_id):
"""refresh channel data and sync to videos""" """refresh channel data and sync to videos"""
channel_handler = YoutubeChannel(channel_id) channel = YoutubeChannel(channel_id)
subscribed = channel_handler.channel_dict["channel_subscribed"] channel.get_from_es()
channel_handler.channel_dict = channel_handler.build_channel_dict( subscribed = channel.json_data["channel_subscribed"]
scrape=True channel.get_from_youtube()
) channel.json_data["channel_subscribed"] = subscribed
channel_handler.channel_dict["channel_subscribed"] = subscribed channel.upload_to_es()
# update channel.sync_to_videos()
channel_handler.upload_to_es()
channel_handler.sync_to_videos()
thumb_handler = ThumbManager()
thumb_handler.delete_chan_thumb(channel_id)
channel_thumb = channel_handler.channel_dict["channel_thumb_url"]
channel_banner = channel_handler.channel_dict["channel_banner_url"]
to_download = (channel_id, channel_thumb, channel_banner)
thumb_handler.download_chan([to_download])
@staticmethod @staticmethod
def reindex_single_playlist(playlist_id, all_indexed_ids): def reindex_single_playlist(playlist_id, all_indexed_ids):
"""refresh playlist data""" """refresh playlist data"""
playlist_handler = YoutubePlaylist( playlist = YoutubePlaylist(playlist_id)
playlist_id, all_youtube_ids=all_indexed_ids playlist.get_from_es()
) subscribed = playlist.json_data["playlist_subscribed"]
playlist = playlist_handler.update_playlist() playlist.all_youtube_ids = all_indexed_ids
if not playlist: playlist.build_json(scrape=True)
playlist_handler.deactivate() if not playlist.json_data:
playlist.deactivate()
return return
playlist_thumbnail = (playlist_id, playlist["playlist_thumbnail"]) playlist.json_data["playlist_subscribed"] = subscribed
thumb_handler = ThumbManager() playlist.upload_to_es()
thumb_handler.download_playlist([playlist_thumbnail])
return return
def reindex(self): def reindex(self):
@ -586,7 +572,7 @@ def scan_filesystem():
print("index new videos") print("index new videos")
for missing_vid in filesystem_handler.to_index: for missing_vid in filesystem_handler.to_index:
youtube_id = missing_vid[2] youtube_id = missing_vid[2]
index_new_video(youtube_id, missing_vid=missing_vid) index_new_video(youtube_id)
def reindex_old_documents(): def reindex_old_documents():

View File

@ -266,17 +266,16 @@ def subscribe_to(url_str):
@shared_task @shared_task
def index_channel_playlists(channel_id): def index_channel_playlists(channel_id):
"""add all playlists of channel to index""" """add all playlists of channel to index"""
channel_handler = YoutubeChannel(channel_id) channel = YoutubeChannel(channel_id)
channel_name = channel_handler.channel_dict["channel_name"]
# notify # notify
mess_dict = { mess_dict = {
"status": "message:playlistscan", "status": "message:playlistscan",
"level": "info", "level": "info",
"title": "Looking for playlists", "title": "Looking for playlists",
"message": f'Scanning channel "{channel_name}" in progress', "message": f'Scanning channel "{channel.youtube_id}" in progress',
} }
RedisArchivist().set_message("message:playlistscan", mess_dict) RedisArchivist().set_message("message:playlistscan", mess_dict)
all_playlists = channel_handler.get_all_playlists() all_playlists = channel.get_all_playlists()
if not all_playlists: if not all_playlists:
print(f"no playlists found for channel {channel_id}") print(f"no playlists found for channel {channel_id}")
@ -295,28 +294,29 @@ def index_channel_playlists(channel_id):
} }
RedisArchivist().set_message("message:playlistscan", mess_dict) RedisArchivist().set_message("message:playlistscan", mess_dict)
print("add playlist: " + playlist_title) print("add playlist: " + playlist_title)
playlist_handler = YoutubePlaylist(
playlist_id, all_youtube_ids=all_youtube_ids playlist = YoutubePlaylist(playlist_id)
) playlist.all_youtube_ids = all_youtube_ids
playlist_handler.get_playlist_dict() playlist.build_json()
if not playlist_handler.playlist_dict:
if not playlist.json_data:
# skip if not available # skip if not available
continue continue
# don't add if no videos downloaded # don't add if no videos downloaded
downloaded = [ downloaded = [
i i
for i in playlist_handler.playlist_dict["playlist_entries"] for i in playlist.json_data["playlist_entries"]
if i["downloaded"] if i["downloaded"]
] ]
if not downloaded: if not downloaded:
continue continue
playlist_handler.upload_to_es()
playlist_handler.add_vids_to_playlist() playlist.upload_to_es()
playlist.add_vids_to_playlist()
if all_playlists: if all_playlists:
handler = ThumbManager() playlist.get_playlist_art()
missing_playlists = handler.get_missing_playlists()
handler.download_playlist(missing_playlists)
return return

View File

@ -624,11 +624,11 @@ class VideoView(View):
"""build playlist nav if available""" """build playlist nav if available"""
all_navs = [] all_navs = []
for playlist_id in playlists: for playlist_id in playlists:
handler = YoutubePlaylist(playlist_id) playlist = YoutubePlaylist(playlist_id)
handler.get_playlist_dict() playlist.get_from_es()
nav = handler.build_nav(video_id) playlist.build_nav(video_id)
if nav: if playlist.nav:
all_navs.append(nav) all_navs.append(playlist.nav)
return all_navs return all_navs