refactor and consolidate Reindex class

This commit is contained in:
simon 2022-03-23 15:48:38 +07:00
parent 1f7d6871cf
commit fda520ad44
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
3 changed files with 98 additions and 149 deletions

View File

@ -10,7 +10,6 @@ import os
import re
import shutil
import subprocess
from datetime import datetime
import requests
from home.src.download.queue import PendingList
@ -319,10 +318,7 @@ def scan_filesystem():
def reindex_old_documents():
"""daily refresh of old documents"""
# continue if needed
reindex_handler = Reindex()
reindex_handler.check_outdated()
reindex_handler.reindex()
# set timestamp
now = int(datetime.now().strftime("%s"))
RedisArchivist().set_message("last_reindex", now, expire=False)
handler = Reindex()
handler.check_outdated()
handler.reindex()
RedisArchivist().set_message("last_reindex", handler.now, expire=False)

View File

@ -4,85 +4,60 @@ functionality:
- index and update in es
"""
import json
from datetime import datetime
from math import ceil
from time import sleep
import requests
from home.src.download.queue import PendingList
from home.src.download.thumbnails import ThumbManager
from home.src.es.connect import ElasticWrap
from home.src.index.channel import YoutubeChannel
from home.src.index.playlist import YoutubePlaylist
from home.src.index.video import YoutubeVideo
from home.src.ta.config import AppConfig
from home.src.ta.helper import get_total_hits
class Reindex:
"""check for outdated documents and refresh data from youtube"""
MATCH_FIELD = {
"ta_video": "active",
"ta_channel": "channel_active",
"ta_playlist": "playlist_active",
}
MULTIPLY = 1.2
def __init__(self):
# config
config = AppConfig().config
self.sleep_interval = config["downloads"]["sleep_interval"]
self.es_url = config["application"]["es_url"]
self.es_auth = config["application"]["es_auth"]
self.refresh_interval = config["scheduler"]["check_reindex_days"]
self.integrate_ryd = config["downloads"]["integrate_ryd"]
self.now = int(datetime.now().strftime("%s"))
self.config = AppConfig().config
self.interval = self.config["scheduler"]["check_reindex_days"]
# scan
self.all_youtube_ids = False
self.all_channel_ids = False
self.all_playlist_ids = False
def get_daily(self):
def _get_daily(self):
"""get daily refresh values"""
total_videos = get_total_hits(
"ta_video", self.es_url, self.es_auth, "active"
)
video_daily = ceil(total_videos / self.refresh_interval * 1.2)
total_channels = get_total_hits(
"ta_channel", self.es_url, self.es_auth, "channel_active"
)
channel_daily = ceil(total_channels / self.refresh_interval * 1.2)
total_playlists = get_total_hits(
"ta_playlist", self.es_url, self.es_auth, "playlist_active"
)
playlist_daily = ceil(total_playlists / self.refresh_interval * 1.2)
total_videos = self._get_total_hits("ta_video")
video_daily = ceil(total_videos / self.interval * self.MULTIPLY)
total_channels = self._get_total_hits("ta_channel")
channel_daily = ceil(total_channels / self.interval * self.MULTIPLY)
total_playlists = self._get_total_hits("ta_playlist")
playlist_daily = ceil(total_playlists / self.interval * self.MULTIPLY)
return (video_daily, channel_daily, playlist_daily)
def get_outdated_vids(self, size):
"""get daily videos to refresh"""
headers = {"Content-type": "application/json"}
now = int(datetime.now().strftime("%s"))
now_lte = now - self.refresh_interval * 24 * 60 * 60
data = {
"size": size,
"query": {
"bool": {
"must": [
{"match": {"active": True}},
{"range": {"vid_last_refresh": {"lte": now_lte}}},
]
}
},
"sort": [{"vid_last_refresh": {"order": "asc"}}],
"_source": False,
}
query_str = json.dumps(data)
url = self.es_url + "/ta_video/_search"
response = requests.get(
url, data=query_str, headers=headers, auth=self.es_auth
)
if not response.ok:
print(response.text)
response_dict = json.loads(response.text)
all_youtube_ids = [i["_id"] for i in response_dict["hits"]["hits"]]
return all_youtube_ids
def _get_total_hits(self, index):
"""get total hits from index"""
match_field = self.MATCH_FIELD[index]
path = f"{index}/_search?filter_path=hits.total"
data = {"query": {"match": {match_field: True}}}
response, _ = ElasticWrap(path).post(data=data)
total_hits = response["hits"]["total"]["value"]
return total_hits
def get_unrated_vids(self):
"""get all videos without rating if ryd integration is enabled"""
headers = {"Content-type": "application/json"}
def _get_unrated_vids(self):
"""get max 200 videos without rating if ryd integration is enabled"""
data = {
"size": 200,
"query": {
@ -91,86 +66,78 @@ class Reindex:
}
},
}
query_str = json.dumps(data)
url = self.es_url + "/ta_video/_search"
response = requests.get(
url, data=query_str, headers=headers, auth=self.es_auth
)
if not response.ok:
print(response.text)
response_dict = json.loads(response.text)
missing_rating = [i["_id"] for i in response_dict["hits"]["hits"]]
response, _ = ElasticWrap("ta_video/_search").get(data=data)
missing_rating = [i["_id"] for i in response["hits"]["hits"]]
self.all_youtube_ids = self.all_youtube_ids + missing_rating
def get_outdated_channels(self, size):
"""get daily channels to refresh"""
headers = {"Content-type": "application/json"}
now = int(datetime.now().strftime("%s"))
now_lte = now - self.refresh_interval * 24 * 60 * 60
def _get_outdated_vids(self, size):
"""get daily videos to refresh"""
now_lte = self.now - self.interval * 24 * 60 * 60
must_list = [
{"match": {"active": True}},
{"range": {"vid_last_refresh": {"lte": now_lte}}},
]
data = {
"size": size,
"query": {
"bool": {
"must": [
"query": {"bool": {"must": must_list}},
"sort": [{"vid_last_refresh": {"order": "asc"}}],
"_source": False,
}
response, _ = ElasticWrap("ta_video/_search").get(data=data)
all_youtube_ids = [i["_id"] for i in response["hits"]["hits"]]
return all_youtube_ids
def _get_outdated_channels(self, size):
"""get daily channels to refresh"""
now_lte = self.now - self.interval * 24 * 60 * 60
must_list = [
{"match": {"channel_active": True}},
{"range": {"channel_last_refresh": {"lte": now_lte}}},
]
}
},
data = {
"size": size,
"query": {"bool": {"must": must_list}},
"sort": [{"channel_last_refresh": {"order": "asc"}}],
"_source": False,
}
query_str = json.dumps(data)
url = self.es_url + "/ta_channel/_search"
response = requests.get(
url, data=query_str, headers=headers, auth=self.es_auth
)
if not response.ok:
print(response.text)
response_dict = json.loads(response.text)
all_channel_ids = [i["_id"] for i in response_dict["hits"]["hits"]]
response, _ = ElasticWrap("ta_channel/_search").get(data=data)
all_channel_ids = [i["_id"] for i in response["hits"]["hits"]]
return all_channel_ids
def get_outdated_playlists(self, size):
def _get_outdated_playlists(self, size):
"""get daily outdated playlists to refresh"""
headers = {"Content-type": "application/json"}
now = int(datetime.now().strftime("%s"))
now_lte = now - self.refresh_interval * 24 * 60 * 60
data = {
"size": size,
"query": {
"bool": {
"must": [
now_lte = self.now - self.interval * 24 * 60 * 60
must_list = [
{"match": {"playlist_active": True}},
{"range": {"playlist_last_refresh": {"lte": now_lte}}},
]
}
},
data = {
"size": size,
"query": {"bool": {"must": must_list}},
"sort": [{"playlist_last_refresh": {"order": "asc"}}],
"_source": False,
}
query_str = json.dumps(data)
url = self.es_url + "/ta_playlist/_search"
response = requests.get(
url, data=query_str, headers=headers, auth=self.es_auth
)
if not response.ok:
print(response.text)
response_dict = json.loads(response.text)
all_playlist_ids = [i["_id"] for i in response_dict["hits"]["hits"]]
response, _ = ElasticWrap("ta_playlist/_search").get(data=data)
all_playlist_ids = [i["_id"] for i in response["hits"]["hits"]]
return all_playlist_ids
def check_outdated(self):
"""add missing vids and channels"""
video_daily, channel_daily, playlist_daily = self.get_daily()
self.all_youtube_ids = self.get_outdated_vids(video_daily)
self.all_channel_ids = self.get_outdated_channels(channel_daily)
self.all_playlist_ids = self.get_outdated_playlists(playlist_daily)
if self.integrate_ryd:
self.get_unrated_vids()
video_daily, channel_daily, playlist_daily = self._get_daily()
self.all_youtube_ids = self._get_outdated_vids(video_daily)
self.all_channel_ids = self._get_outdated_channels(channel_daily)
self.all_playlist_ids = self._get_outdated_playlists(playlist_daily)
integrate_ryd = self.config["downloads"]["integrate_ryd"]
if integrate_ryd:
self._get_unrated_vids()
@staticmethod
def reindex_single_video(youtube_id):
def _reindex_single_video(youtube_id):
"""refresh data for single video"""
video = YoutubeVideo(youtube_id)
@ -204,20 +171,21 @@ class Reindex:
return
@staticmethod
def reindex_single_channel(channel_id):
def _reindex_single_channel(channel_id):
"""refresh channel data and sync to videos"""
channel = YoutubeChannel(channel_id)
channel.get_from_es()
subscribed = channel.json_data["channel_subscribed"]
overwrites = channel.json_data["channel_overwrites"]
overwrites = channel.json_data.get("channel_overwrites", False)
channel.get_from_youtube()
channel.json_data["channel_subscribed"] = subscribed
if overwrites:
channel.json_data["channel_overwrites"] = overwrites
channel.upload_to_es()
channel.sync_to_videos()
@staticmethod
def reindex_single_playlist(playlist_id, all_indexed_ids):
def _reindex_single_playlist(playlist_id, all_indexed_ids):
"""refresh playlist data"""
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
@ -234,18 +202,19 @@ class Reindex:
def reindex(self):
"""reindex what's needed"""
sleep_interval = self.config["downloads"]["sleep_interval"]
# videos
print(f"reindexing {len(self.all_youtube_ids)} videos")
for youtube_id in self.all_youtube_ids:
self.reindex_single_video(youtube_id)
if self.sleep_interval:
sleep(self.sleep_interval)
self._reindex_single_video(youtube_id)
if sleep_interval:
sleep(sleep_interval)
# channels
print(f"reindexing {len(self.all_channel_ids)} channels")
for channel_id in self.all_channel_ids:
self.reindex_single_channel(channel_id)
if self.sleep_interval:
sleep(self.sleep_interval)
self._reindex_single_channel(channel_id)
if sleep_interval:
sleep(sleep_interval)
# playlist
print(f"reindexing {len(self.all_playlist_ids)} playlists")
if self.all_playlist_ids:
@ -253,6 +222,6 @@ class Reindex:
handler.get_indexed()
all_indexed_ids = [i["youtube_id"] for i in handler.all_videos]
for playlist_id in self.all_playlist_ids:
self.reindex_single_playlist(playlist_id, all_indexed_ids)
if self.sleep_interval:
sleep(self.sleep_interval)
self._reindex_single_playlist(playlist_id, all_indexed_ids)
if sleep_interval:
sleep(sleep_interval)

View File

@ -3,31 +3,15 @@ Loose collection of helper functions
- don't import AppConfig class here to avoid circular imports
"""
import json
import re
import string
import subprocess
import unicodedata
from urllib.parse import parse_qs, urlparse
import requests
import yt_dlp
def get_total_hits(index, es_url, es_auth, match_field):
"""get total hits from index"""
headers = {"Content-type": "application/json"}
data = {"query": {"match": {match_field: True}}}
payload = json.dumps(data)
url = f"{es_url}/{index}/_search?filter_path=hits.total"
request = requests.post(url, data=payload, headers=headers, auth=es_auth)
if not request.ok:
print(request.text)
total_json = json.loads(request.text)
total_hits = total_json["hits"]["total"]["value"]
return total_hits
def clean_string(file_name):
"""clean string to only asci characters"""
whitelist = "-_.() " + string.ascii_letters + string.digits