tubearchivist/tubearchivist/home/src/reindex.py

501 lines
18 KiB
Python
Raw Normal View History

2021-09-05 17:10:14 +00:00
"""
Functionality:
- reindexing old documents
- syncing updated values between indexes
- scan the filesystem to delete or index
"""
import json
import os
2021-09-13 15:17:36 +00:00
import re
import shutil
2021-09-18 13:02:54 +00:00
import subprocess
2021-09-05 17:10:14 +00:00
from datetime import datetime
from math import ceil
2021-09-18 13:02:54 +00:00
from time import sleep
2021-09-05 17:10:14 +00:00
import requests
from home.src.config import AppConfig
2021-09-18 13:02:54 +00:00
from home.src.download import ChannelSubscription, PendingList, VideoDownloader
2021-09-21 09:25:22 +00:00
from home.src.helper import (
RedisArchivist,
2021-09-21 09:25:22 +00:00
clean_string,
get_total_hits,
ignore_filelist,
2021-09-21 09:25:22 +00:00
)
2021-09-18 13:02:54 +00:00
from home.src.index import YoutubeChannel, YoutubeVideo, index_new_video
from home.src.thumbnails import ThumbManager
2021-09-05 17:10:14 +00:00
class Reindex:
2021-09-21 09:25:22 +00:00
"""check for outdated documents and refresh data from youtube"""
2021-09-05 17:10:14 +00:00
def __init__(self):
# config
config = AppConfig().config
2021-09-21 09:25:22 +00:00
self.sleep_interval = config["downloads"]["sleep_interval"]
self.es_url = config["application"]["es_url"]
2021-09-05 17:10:14 +00:00
self.refresh_interval = 90
# scan
self.video_daily, self.channel_daily = self.get_daily()
self.all_youtube_ids = False
self.all_channel_ids = False
2021-09-05 17:10:14 +00:00
def get_daily(self):
2021-09-21 09:25:22 +00:00
"""get daily refresh values"""
total_videos = get_total_hits("ta_video", self.es_url, "active")
2021-09-05 17:10:14 +00:00
video_daily = ceil(total_videos / self.refresh_interval * 1.2)
total_channels = get_total_hits(
2021-09-21 09:25:22 +00:00
"ta_channel", self.es_url, "channel_active"
2021-09-05 17:10:14 +00:00
)
channel_daily = ceil(total_channels / self.refresh_interval * 1.2)
return (video_daily, channel_daily)
def get_outdated_vids(self):
2021-09-21 09:25:22 +00:00
"""get daily videos to refresh"""
headers = {"Content-type": "application/json"}
2021-09-05 17:10:14 +00:00
now = int(datetime.now().strftime("%s"))
now_3m = now - 3 * 30 * 24 * 60 * 60
size = self.video_daily
data = {
"size": size,
"query": {
"bool": {
"must": [
{"match": {"active": True}},
2021-09-21 09:25:22 +00:00
{"range": {"vid_last_refresh": {"lte": now_3m}}},
2021-09-05 17:10:14 +00:00
]
}
},
2021-09-21 09:25:22 +00:00
"sort": [{"vid_last_refresh": {"order": "asc"}}],
"_source": False,
2021-09-05 17:10:14 +00:00
}
query_str = json.dumps(data)
2021-09-21 09:25:22 +00:00
url = self.es_url + "/ta_video/_search"
2021-09-05 17:10:14 +00:00
response = requests.get(url, data=query_str, headers=headers)
if not response.ok:
print(response.text)
response_dict = json.loads(response.text)
2021-09-21 09:25:22 +00:00
all_youtube_ids = [i["_id"] for i in response_dict["hits"]["hits"]]
2021-09-05 17:10:14 +00:00
return all_youtube_ids
def get_outdated_channels(self):
2021-09-21 09:25:22 +00:00
"""get daily channels to refresh"""
headers = {"Content-type": "application/json"}
2021-09-05 17:10:14 +00:00
now = int(datetime.now().strftime("%s"))
now_3m = now - 3 * 30 * 24 * 60 * 60
size = self.channel_daily
data = {
"size": size,
"query": {
"bool": {
"must": [
{"match": {"channel_active": True}},
2021-09-21 09:25:22 +00:00
{"range": {"channel_last_refresh": {"lte": now_3m}}},
2021-09-05 17:10:14 +00:00
]
}
},
"sort": [{"channel_last_refresh": {"order": "asc"}}],
2021-09-21 09:25:22 +00:00
"_source": False,
2021-09-05 17:10:14 +00:00
}
query_str = json.dumps(data)
2021-09-21 09:25:22 +00:00
url = self.es_url + "/ta_channel/_search"
2021-09-05 17:10:14 +00:00
response = requests.get(url, data=query_str, headers=headers)
if not response.ok:
print(response.text)
response_dict = json.loads(response.text)
2021-09-21 09:25:22 +00:00
all_channel_ids = [i["_id"] for i in response_dict["hits"]["hits"]]
2021-09-05 17:10:14 +00:00
return all_channel_ids
def check_outdated(self):
2021-09-21 09:25:22 +00:00
"""add missing vids and channels"""
2021-09-05 17:10:14 +00:00
self.all_youtube_ids = self.get_outdated_vids()
self.all_channel_ids = self.get_outdated_channels()
def rescrape_all_channels(self):
2021-09-21 09:25:22 +00:00
"""sync new data from channel to all matching videos"""
2021-09-05 17:10:14 +00:00
sleep_interval = self.sleep_interval
channel_sub_handler = ChannelSubscription()
2021-09-21 09:25:22 +00:00
all_channels = channel_sub_handler.get_channels(subscribed_only=False)
all_channel_ids = [i["channel_id"] for i in all_channels]
2021-09-05 17:10:14 +00:00
counter = 1
for channel_id in all_channel_ids:
2021-09-21 09:25:22 +00:00
message = f"Progress: {counter}/{len(all_channels)}"
2021-09-05 17:10:14 +00:00
mess_dict = {
"status": "scraping",
"level": "info",
"title": "Scraping all youtube channels",
2021-09-21 09:25:22 +00:00
"message": message,
2021-09-05 17:10:14 +00:00
}
RedisArchivist().set_message("progress:download", mess_dict)
2021-09-05 17:10:14 +00:00
channel_index = YoutubeChannel(channel_id)
2021-09-21 09:25:22 +00:00
subscribed = channel_index.channel_dict["channel_subscribed"]
2021-09-05 17:10:14 +00:00
channel_index.channel_dict = channel_index.build_channel_dict(
scrape=True
)
2021-09-21 09:25:22 +00:00
channel_index.channel_dict["channel_subscribed"] = subscribed
2021-09-05 17:10:14 +00:00
channel_index.upload_to_es()
channel_index.sync_to_videos()
counter = counter + 1
if sleep_interval:
sleep(sleep_interval)
@staticmethod
def reindex_single_video(youtube_id):
2021-09-21 09:25:22 +00:00
"""refresh data for single video"""
2021-09-05 17:10:14 +00:00
vid_handler = YoutubeVideo(youtube_id)
2021-10-08 09:18:01 +00:00
vid_handler.get_vid_dict()
2021-09-05 17:10:14 +00:00
if not vid_handler.vid_dict:
# stop if deactivated
vid_handler.deactivate()
return
es_vid_dict = vid_handler.get_es_data()
2021-09-21 09:25:22 +00:00
player = es_vid_dict["_source"]["player"]
date_downloaded = es_vid_dict["_source"]["date_downloaded"]
channel_dict = es_vid_dict["_source"]["channel"]
channel_name = channel_dict["channel_name"]
2021-09-05 17:10:14 +00:00
vid_handler.build_file_path(channel_name)
# add to vid_dict
2021-09-21 09:25:22 +00:00
vid_handler.vid_dict["player"] = player
vid_handler.vid_dict["date_downloaded"] = date_downloaded
vid_handler.vid_dict["channel"] = channel_dict
2021-09-05 17:10:14 +00:00
# update
vid_handler.upload_to_es()
2021-10-14 04:18:32 +00:00
thumb_handler = ThumbManager()
thumb_handler.delete_vid_thumb(youtube_id)
to_download = (youtube_id, vid_handler.vid_dict["vid_thumb_url"])
thumb_handler.download_vid([to_download], notify=False)
2021-09-05 17:10:14 +00:00
@staticmethod
def reindex_single_channel(channel_id):
2021-09-21 09:25:22 +00:00
"""refresh channel data and sync to videos"""
2021-09-05 17:10:14 +00:00
channel_handler = YoutubeChannel(channel_id)
2021-09-21 09:25:22 +00:00
subscribed = channel_handler.channel_dict["channel_subscribed"]
2021-09-05 17:10:14 +00:00
channel_handler.channel_dict = channel_handler.build_channel_dict(
scrape=True
)
2021-09-21 09:25:22 +00:00
channel_handler.channel_dict["channel_subscribed"] = subscribed
2021-10-14 04:18:32 +00:00
# update
2021-09-05 17:10:14 +00:00
channel_handler.upload_to_es()
channel_handler.sync_to_videos()
2021-10-14 04:18:32 +00:00
thumb_handler = ThumbManager()
thumb_handler.delete_chan_thumb(channel_id)
channel_thumb = channel_handler.channel_dict["channel_thumb_url"]
channel_banner = channel_handler.channel_dict["channel_banner_url"]
to_download = (channel_id, channel_thumb, channel_banner)
thumb_handler.download_chan([to_download])
2021-09-05 17:10:14 +00:00
def reindex(self):
2021-09-21 09:25:22 +00:00
"""reindex what's needed"""
2021-09-05 17:10:14 +00:00
# videos
2021-09-21 09:25:22 +00:00
print(f"reindexing {len(self.all_youtube_ids)} videos")
2021-09-05 17:10:14 +00:00
for youtube_id in self.all_youtube_ids:
self.reindex_single_video(youtube_id)
if self.sleep_interval:
sleep(self.sleep_interval)
# channels
2021-09-21 09:25:22 +00:00
print(f"reindexing {len(self.all_channel_ids)} channels")
2021-09-05 17:10:14 +00:00
for channel_id in self.all_channel_ids:
self.reindex_single_channel(channel_id)
if self.sleep_interval:
sleep(self.sleep_interval)
class FilesystemScanner:
2021-09-21 09:25:22 +00:00
"""handle scanning and fixing from filesystem"""
2021-09-05 17:10:14 +00:00
CONFIG = AppConfig().config
2021-09-21 09:25:22 +00:00
ES_URL = CONFIG["application"]["es_url"]
VIDEOS = CONFIG["application"]["videos"]
2021-09-05 17:10:14 +00:00
def __init__(self):
self.all_downloaded = self.get_all_downloaded()
self.all_indexed = self.get_all_indexed()
2021-09-18 10:28:16 +00:00
self.mismatch = None
2021-09-05 17:10:14 +00:00
self.to_rename = None
self.to_index = None
self.to_delete = None
def get_all_downloaded(self):
2021-09-21 09:25:22 +00:00
"""get a list of all video files downloaded"""
channels = os.listdir(self.VIDEOS)
all_channels = ignore_filelist(channels)
2021-09-05 17:10:14 +00:00
all_channels.sort()
all_downloaded = []
for channel_name in all_channels:
channel_path = os.path.join(self.VIDEOS, channel_name)
videos = os.listdir(channel_path)
all_videos = ignore_filelist(videos)
for video in all_videos:
2021-09-05 17:10:14 +00:00
youtube_id = video[9:20]
all_downloaded.append((channel_name, video, youtube_id))
return all_downloaded
@staticmethod
def get_all_indexed():
2021-09-21 09:25:22 +00:00
"""get a list of all indexed videos"""
2021-09-05 17:10:14 +00:00
index_handler = PendingList()
all_indexed_raw = index_handler.get_all_indexed()
all_indexed = []
for video in all_indexed_raw:
2021-09-21 09:25:22 +00:00
youtube_id = video["_id"]
media_url = video["_source"]["media_url"]
published = video["_source"]["published"]
title = video["_source"]["title"]
2021-09-05 17:10:14 +00:00
all_indexed.append((youtube_id, media_url, published, title))
return all_indexed
def list_comarison(self):
2021-09-21 09:25:22 +00:00
"""compare the lists to figure out what to do"""
2021-09-05 17:10:14 +00:00
self.find_unindexed()
self.find_missing()
self.find_bad_media_url()
def find_unindexed(self):
2021-09-21 09:25:22 +00:00
"""find video files without a matching document indexed"""
2021-09-05 17:10:14 +00:00
all_indexed_ids = [i[0] for i in self.all_indexed]
to_index = []
for downloaded in self.all_downloaded:
if downloaded[2] not in all_indexed_ids:
to_index.append(downloaded)
self.to_index = to_index
def find_missing(self):
2021-09-21 09:25:22 +00:00
"""find indexed videos without matching media file"""
2021-09-05 17:10:14 +00:00
all_downloaded_ids = [i[2] for i in self.all_downloaded]
to_delete = []
for video in self.all_indexed:
youtube_id = video[0]
if youtube_id not in all_downloaded_ids:
to_delete.append(video)
self.to_delete = to_delete
def find_bad_media_url(self):
2021-09-21 09:25:22 +00:00
"""rename media files not matching the indexed title"""
2021-09-05 17:10:14 +00:00
to_fix = []
to_rename = []
for downloaded in self.all_downloaded:
channel, filename, downloaded_id = downloaded
# find in indexed
for indexed in self.all_indexed:
indexed_id, media_url, published, title = indexed
if indexed_id == downloaded_id:
# found it
title_c = clean_string(title)
2021-09-21 09:25:22 +00:00
pub = published.replace("-", "")
expected_filename = f"{pub}_{indexed_id}_{title_c}.mp4"
2021-09-05 17:10:14 +00:00
new_url = os.path.join(channel, expected_filename)
if expected_filename != filename:
# file to rename
to_rename.append(
(channel, filename, expected_filename)
)
if media_url != new_url:
# media_url to update in es
to_fix.append((indexed_id, new_url))
break
2021-09-18 10:28:16 +00:00
self.mismatch = to_fix
2021-09-05 17:10:14 +00:00
self.to_rename = to_rename
def rename_files(self):
2021-09-21 09:25:22 +00:00
"""rename media files as identified by find_bad_media_url"""
2021-09-05 17:10:14 +00:00
for bad_filename in self.to_rename:
channel, filename, expected_filename = bad_filename
print(f"renaming [{filename}] to [{expected_filename}]")
2021-09-05 17:10:14 +00:00
old_path = os.path.join(self.VIDEOS, channel, filename)
new_path = os.path.join(self.VIDEOS, channel, expected_filename)
os.rename(old_path, new_path)
2021-09-18 10:28:16 +00:00
def send_mismatch_bulk(self):
2021-09-21 09:25:22 +00:00
"""build bulk update"""
2021-09-05 17:10:14 +00:00
bulk_list = []
2021-09-18 10:28:16 +00:00
for video_mismatch in self.mismatch:
youtube_id, media_url = video_mismatch
print(f"{youtube_id}: fixing media url {media_url}")
2021-09-21 09:25:22 +00:00
action = {"update": {"_id": youtube_id, "_index": "ta_video"}}
2021-09-05 17:10:14 +00:00
source = {"doc": {"media_url": media_url}}
bulk_list.append(json.dumps(action))
bulk_list.append(json.dumps(source))
# add last newline
2021-09-21 09:25:22 +00:00
bulk_list.append("\n")
query_str = "\n".join(bulk_list)
2021-09-05 17:10:14 +00:00
# make the call
2021-09-21 09:25:22 +00:00
headers = {"Content-type": "application/x-ndjson"}
url = self.ES_URL + "/_bulk"
2021-09-05 17:10:14 +00:00
request = requests.post(url, data=query_str, headers=headers)
if not request.ok:
print(request.text)
def delete_from_index(self):
2021-09-21 09:25:22 +00:00
"""find indexed but deleted mediafile"""
2021-09-05 17:10:14 +00:00
for indexed in self.to_delete:
youtube_id = indexed[0]
print(f"deleting {youtube_id} from index")
2021-09-21 09:25:22 +00:00
url = self.ES_URL + "/ta_video/_doc/" + youtube_id
2021-09-05 17:10:14 +00:00
request = requests.delete(url)
if not request.ok:
print(request.text)
2021-09-13 15:17:36 +00:00
class ManualImport:
2021-09-21 09:25:22 +00:00
"""import and indexing existing video files"""
2021-09-13 15:17:36 +00:00
CONFIG = AppConfig().config
2021-09-21 09:25:22 +00:00
CACHE_DIR = CONFIG["application"]["cache_dir"]
IMPORT_DIR = os.path.join(CACHE_DIR, "import")
2021-09-13 15:17:36 +00:00
def __init__(self):
self.identified = self.import_folder_parser()
def import_folder_parser(self):
2021-09-21 09:25:22 +00:00
"""detect files in import folder"""
import_files = os.listdir(self.IMPORT_DIR)
to_import = ignore_filelist(import_files)
2021-09-13 15:17:36 +00:00
to_import.sort()
2021-09-21 09:25:22 +00:00
video_files = [i for i in to_import if not i.endswith(".json")]
2021-09-13 15:17:36 +00:00
identified = []
for file_path in video_files:
2021-09-21 09:25:22 +00:00
file_dict = {"video_file": file_path}
2021-09-13 15:17:36 +00:00
file_name, _ = os.path.splitext(file_path)
matching_json = [
2021-09-21 09:25:22 +00:00
i
for i in to_import
if i.startswith(file_name) and i.endswith(".json")
2021-09-13 15:17:36 +00:00
]
if matching_json:
json_file = matching_json[0]
youtube_id = self.extract_id_from_json(json_file)
2021-09-21 09:25:22 +00:00
file_dict.update({"json_file": json_file})
2021-09-13 15:17:36 +00:00
else:
youtube_id = self.extract_id_from_filename(file_name)
2021-09-21 09:25:22 +00:00
file_dict.update({"json_file": False})
2021-09-13 15:17:36 +00:00
2021-09-21 09:25:22 +00:00
file_dict.update({"youtube_id": youtube_id})
2021-09-13 15:17:36 +00:00
identified.append(file_dict)
return identified
@staticmethod
def extract_id_from_filename(file_name):
"""
look at the file name for the youtube id
expects filename ending in [<youtube_id>].<ext>
"""
2021-09-21 09:25:22 +00:00
id_search = re.search(r"\[([a-zA-Z0-9_-]{11})\]$", file_name)
2021-09-13 15:17:36 +00:00
if id_search:
youtube_id = id_search.group(1)
return youtube_id
2021-09-21 09:25:22 +00:00
print("failed to extract youtube id for: " + file_name)
2021-09-13 15:17:36 +00:00
raise Exception
def extract_id_from_json(self, json_file):
2021-09-21 09:25:22 +00:00
"""open json file and extract id"""
json_path = os.path.join(self.CACHE_DIR, "import", json_file)
with open(json_path, "r", encoding="utf-8") as f:
2021-09-13 15:17:36 +00:00
json_content = f.read()
2021-09-21 09:25:22 +00:00
youtube_id = json.loads(json_content)["id"]
2021-09-13 15:17:36 +00:00
return youtube_id
def process_import(self):
2021-09-21 09:25:22 +00:00
"""go through identified media files"""
2021-09-13 15:17:36 +00:00
for media_file in self.identified:
2021-09-21 09:25:22 +00:00
json_file = media_file["json_file"]
video_file = media_file["video_file"]
youtube_id = media_file["youtube_id"]
2021-09-13 15:17:36 +00:00
2021-09-21 09:25:22 +00:00
video_path = os.path.join(self.CACHE_DIR, "import", video_file)
2021-09-13 15:17:36 +00:00
self.move_to_cache(video_path, youtube_id)
# identify and archive
vid_dict = index_new_video(youtube_id)
VideoDownloader([youtube_id]).move_to_archive(vid_dict)
# cleanup
if os.path.exists(video_path):
os.remove(video_path)
if json_file:
2021-09-21 09:25:22 +00:00
json_path = os.path.join(self.CACHE_DIR, "import", json_file)
2021-09-13 15:17:36 +00:00
os.remove(json_path)
def move_to_cache(self, video_path, youtube_id):
2021-09-21 09:25:22 +00:00
"""move identified video file to cache, convert to mp4"""
2021-09-13 15:17:36 +00:00
file_name = os.path.split(video_path)[-1]
video_file, ext = os.path.splitext(file_name)
# make sure youtube_id is in filename
2021-09-18 13:02:54 +00:00
if youtube_id not in video_file:
2021-09-21 09:25:22 +00:00
video_file = f"{video_file}_{youtube_id}"
# move, convert if needed
2021-09-21 09:25:22 +00:00
if ext == ".mp4":
new_file = video_file + ext
2021-09-21 09:25:22 +00:00
dest_path = os.path.join(self.CACHE_DIR, "download", new_file)
shutil.move(video_path, dest_path)
2021-09-13 15:17:36 +00:00
else:
2021-09-21 09:25:22 +00:00
print(f"processing with ffmpeg: {video_file}")
new_file = video_file + ".mp4"
dest_path = os.path.join(self.CACHE_DIR, "download", new_file)
2021-09-13 15:17:36 +00:00
subprocess.run(
2021-09-21 09:25:22 +00:00
[
"ffmpeg",
"-i",
video_path,
dest_path,
"-loglevel",
"warning",
"-stats",
],
check=True,
2021-09-13 15:17:36 +00:00
)
2021-09-05 17:10:14 +00:00
def scan_filesystem():
2021-09-21 09:25:22 +00:00
"""grouped function to delete and update index"""
2021-09-05 17:10:14 +00:00
filesystem_handler = FilesystemScanner()
filesystem_handler.list_comarison()
if filesystem_handler.to_rename:
print("renaming files")
2021-09-05 17:10:14 +00:00
filesystem_handler.rename_files()
2021-09-18 10:28:16 +00:00
if filesystem_handler.mismatch:
print("fixing media urls in index")
2021-09-18 10:28:16 +00:00
filesystem_handler.send_mismatch_bulk()
2021-09-05 17:10:14 +00:00
if filesystem_handler.to_delete:
print("delete metadata from index")
2021-09-05 17:10:14 +00:00
filesystem_handler.delete_from_index()
if filesystem_handler.to_index:
print("index new videos")
2021-09-05 17:10:14 +00:00
for missing_vid in filesystem_handler.to_index:
youtube_id = missing_vid[2]
index_new_video(youtube_id, missing_vid=missing_vid)
2021-09-05 17:10:14 +00:00
def reindex_old_documents():
2021-09-21 09:25:22 +00:00
"""daily refresh of old documents"""
# check needed last run
now = int(datetime.now().strftime("%s"))
last_reindex = RedisArchivist().get_message("last_reindex")
2021-09-15 15:21:12 +00:00
if isinstance(last_reindex, int) and now - last_reindex < 60 * 60 * 24:
return
# continue if needed
2021-09-05 17:10:14 +00:00
reindex_handler = Reindex()
reindex_handler.check_outdated()
reindex_handler.reindex()
# set timestamp
RedisArchivist().set_message("last_reindex", now, expire=False)