275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""
|
|
functionality:
|
|
- periodically refresh documents
|
|
- index and update in es
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from math import ceil
|
|
from time import sleep
|
|
|
|
import requests
|
|
from home.src.download.queue import PendingList
|
|
from home.src.download.subscriptions import ChannelSubscription
|
|
from home.src.download.thumbnails import ThumbManager
|
|
from home.src.index.channel import YoutubeChannel
|
|
from home.src.index.playlist import YoutubePlaylist
|
|
from home.src.index.video import YoutubeVideo
|
|
from home.src.ta.config import AppConfig
|
|
from home.src.ta.helper import get_total_hits
|
|
|
|
|
|
class Reindex:
|
|
"""check for outdated documents and refresh data from youtube"""
|
|
|
|
def __init__(self):
|
|
# config
|
|
config = AppConfig().config
|
|
self.sleep_interval = config["downloads"]["sleep_interval"]
|
|
self.es_url = config["application"]["es_url"]
|
|
self.es_auth = config["application"]["es_auth"]
|
|
self.refresh_interval = config["scheduler"]["check_reindex_days"]
|
|
self.integrate_ryd = config["downloads"]["integrate_ryd"]
|
|
# scan
|
|
self.all_youtube_ids = False
|
|
self.all_channel_ids = False
|
|
self.all_playlist_ids = False
|
|
|
|
def get_daily(self):
|
|
"""get daily refresh values"""
|
|
total_videos = get_total_hits(
|
|
"ta_video", self.es_url, self.es_auth, "active"
|
|
)
|
|
video_daily = ceil(total_videos / self.refresh_interval * 1.2)
|
|
total_channels = get_total_hits(
|
|
"ta_channel", self.es_url, self.es_auth, "channel_active"
|
|
)
|
|
channel_daily = ceil(total_channels / self.refresh_interval * 1.2)
|
|
total_playlists = get_total_hits(
|
|
"ta_playlist", self.es_url, self.es_auth, "playlist_active"
|
|
)
|
|
playlist_daily = ceil(total_playlists / self.refresh_interval * 1.2)
|
|
return (video_daily, channel_daily, playlist_daily)
|
|
|
|
def get_outdated_vids(self, size):
|
|
"""get daily videos to refresh"""
|
|
headers = {"Content-type": "application/json"}
|
|
now = int(datetime.now().strftime("%s"))
|
|
now_lte = now - self.refresh_interval * 24 * 60 * 60
|
|
data = {
|
|
"size": size,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{"match": {"active": True}},
|
|
{"range": {"vid_last_refresh": {"lte": now_lte}}},
|
|
]
|
|
}
|
|
},
|
|
"sort": [{"vid_last_refresh": {"order": "asc"}}],
|
|
"_source": False,
|
|
}
|
|
query_str = json.dumps(data)
|
|
url = self.es_url + "/ta_video/_search"
|
|
response = requests.get(
|
|
url, data=query_str, headers=headers, auth=self.es_auth
|
|
)
|
|
if not response.ok:
|
|
print(response.text)
|
|
response_dict = json.loads(response.text)
|
|
all_youtube_ids = [i["_id"] for i in response_dict["hits"]["hits"]]
|
|
return all_youtube_ids
|
|
|
|
def get_unrated_vids(self):
|
|
"""get all videos without rating if ryd integration is enabled"""
|
|
headers = {"Content-type": "application/json"}
|
|
data = {
|
|
"size": 200,
|
|
"query": {
|
|
"bool": {
|
|
"must_not": [{"exists": {"field": "stats.average_rating"}}]
|
|
}
|
|
},
|
|
}
|
|
query_str = json.dumps(data)
|
|
url = self.es_url + "/ta_video/_search"
|
|
response = requests.get(
|
|
url, data=query_str, headers=headers, auth=self.es_auth
|
|
)
|
|
if not response.ok:
|
|
print(response.text)
|
|
response_dict = json.loads(response.text)
|
|
missing_rating = [i["_id"] for i in response_dict["hits"]["hits"]]
|
|
self.all_youtube_ids = self.all_youtube_ids + missing_rating
|
|
|
|
def get_outdated_channels(self, size):
|
|
"""get daily channels to refresh"""
|
|
headers = {"Content-type": "application/json"}
|
|
now = int(datetime.now().strftime("%s"))
|
|
now_lte = now - self.refresh_interval * 24 * 60 * 60
|
|
data = {
|
|
"size": size,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{"match": {"channel_active": True}},
|
|
{"range": {"channel_last_refresh": {"lte": now_lte}}},
|
|
]
|
|
}
|
|
},
|
|
"sort": [{"channel_last_refresh": {"order": "asc"}}],
|
|
"_source": False,
|
|
}
|
|
query_str = json.dumps(data)
|
|
url = self.es_url + "/ta_channel/_search"
|
|
response = requests.get(
|
|
url, data=query_str, headers=headers, auth=self.es_auth
|
|
)
|
|
if not response.ok:
|
|
print(response.text)
|
|
response_dict = json.loads(response.text)
|
|
all_channel_ids = [i["_id"] for i in response_dict["hits"]["hits"]]
|
|
return all_channel_ids
|
|
|
|
def get_outdated_playlists(self, size):
|
|
"""get daily outdated playlists to refresh"""
|
|
headers = {"Content-type": "application/json"}
|
|
now = int(datetime.now().strftime("%s"))
|
|
now_lte = now - self.refresh_interval * 24 * 60 * 60
|
|
data = {
|
|
"size": size,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{"match": {"playlist_active": True}},
|
|
{"range": {"playlist_last_refresh": {"lte": now_lte}}},
|
|
]
|
|
}
|
|
},
|
|
"sort": [{"playlist_last_refresh": {"order": "asc"}}],
|
|
"_source": False,
|
|
}
|
|
query_str = json.dumps(data)
|
|
url = self.es_url + "/ta_playlist/_search"
|
|
response = requests.get(
|
|
url, data=query_str, headers=headers, auth=self.es_auth
|
|
)
|
|
if not response.ok:
|
|
print(response.text)
|
|
response_dict = json.loads(response.text)
|
|
all_playlist_ids = [i["_id"] for i in response_dict["hits"]["hits"]]
|
|
return all_playlist_ids
|
|
|
|
def check_outdated(self):
|
|
"""add missing vids and channels"""
|
|
video_daily, channel_daily, playlist_daily = self.get_daily()
|
|
self.all_youtube_ids = self.get_outdated_vids(video_daily)
|
|
self.all_channel_ids = self.get_outdated_channels(channel_daily)
|
|
self.all_playlist_ids = self.get_outdated_playlists(playlist_daily)
|
|
if self.integrate_ryd:
|
|
self.get_unrated_vids()
|
|
|
|
def rescrape_all_channels(self):
|
|
"""sync new data from channel to all matching videos"""
|
|
sleep_interval = self.sleep_interval
|
|
channel_sub_handler = ChannelSubscription()
|
|
all_channels = channel_sub_handler.get_channels(subscribed_only=False)
|
|
all_channel_ids = [i["channel_id"] for i in all_channels]
|
|
|
|
for channel_id in all_channel_ids:
|
|
channel = YoutubeChannel(channel_id)
|
|
subscribed = channel.json_data["channel_subscribed"]
|
|
channel.get_from_youtube()
|
|
channel.json_data["channel_subscribed"] = subscribed
|
|
channel.upload_to_es()
|
|
channel.sync_to_videos()
|
|
|
|
if sleep_interval:
|
|
sleep(sleep_interval)
|
|
|
|
@staticmethod
|
|
def reindex_single_video(youtube_id):
|
|
"""refresh data for single video"""
|
|
video = YoutubeVideo(youtube_id)
|
|
|
|
# read current state
|
|
video.get_from_es()
|
|
player = video.json_data["player"]
|
|
date_downloaded = video.json_data["date_downloaded"]
|
|
channel_dict = video.json_data["channel"]
|
|
playlist = video.json_data.get("playlist")
|
|
|
|
# get new
|
|
video.build_json()
|
|
if not video.youtube_meta:
|
|
video.deactivate()
|
|
return
|
|
|
|
video.delete_subtitles()
|
|
# add back
|
|
video.json_data["player"] = player
|
|
video.json_data["date_downloaded"] = date_downloaded
|
|
video.json_data["channel"] = channel_dict
|
|
if playlist:
|
|
video.json_data["playlist"] = playlist
|
|
|
|
video.upload_to_es()
|
|
|
|
thumb_handler = ThumbManager()
|
|
thumb_handler.delete_vid_thumb(youtube_id)
|
|
to_download = (youtube_id, video.json_data["vid_thumb_url"])
|
|
thumb_handler.download_vid([to_download], notify=False)
|
|
return
|
|
|
|
@staticmethod
|
|
def reindex_single_channel(channel_id):
|
|
"""refresh channel data and sync to videos"""
|
|
channel = YoutubeChannel(channel_id)
|
|
channel.get_from_es()
|
|
subscribed = channel.json_data["channel_subscribed"]
|
|
channel.get_from_youtube()
|
|
channel.json_data["channel_subscribed"] = subscribed
|
|
channel.upload_to_es()
|
|
channel.sync_to_videos()
|
|
|
|
@staticmethod
|
|
def reindex_single_playlist(playlist_id, all_indexed_ids):
|
|
"""refresh playlist data"""
|
|
playlist = YoutubePlaylist(playlist_id)
|
|
playlist.get_from_es()
|
|
subscribed = playlist.json_data["playlist_subscribed"]
|
|
playlist.all_youtube_ids = all_indexed_ids
|
|
playlist.build_json(scrape=True)
|
|
if not playlist.json_data:
|
|
playlist.deactivate()
|
|
return
|
|
|
|
playlist.json_data["playlist_subscribed"] = subscribed
|
|
playlist.upload_to_es()
|
|
return
|
|
|
|
def reindex(self):
|
|
"""reindex what's needed"""
|
|
# videos
|
|
print(f"reindexing {len(self.all_youtube_ids)} videos")
|
|
for youtube_id in self.all_youtube_ids:
|
|
self.reindex_single_video(youtube_id)
|
|
if self.sleep_interval:
|
|
sleep(self.sleep_interval)
|
|
# channels
|
|
print(f"reindexing {len(self.all_channel_ids)} channels")
|
|
for channel_id in self.all_channel_ids:
|
|
self.reindex_single_channel(channel_id)
|
|
if self.sleep_interval:
|
|
sleep(self.sleep_interval)
|
|
# playlist
|
|
print(f"reindexing {len(self.all_playlist_ids)} playlists")
|
|
if self.all_playlist_ids:
|
|
all_indexed = PendingList().get_all_indexed()
|
|
all_indexed_ids = [i["youtube_id"] for i in all_indexed]
|
|
for playlist_id in self.all_playlist_ids:
|
|
self.reindex_single_playlist(playlist_id, all_indexed_ids)
|
|
if self.sleep_interval:
|
|
sleep(self.sleep_interval)
|