tubearchivist/tubearchivist/home/src/ta/helper.py

288 lines
8.3 KiB
Python

"""
Loose collection of helper functions
- don't import AppConfig class here to avoid circular imports
"""
import json
import os
import random
import re
import string
import subprocess
import unicodedata
from datetime import datetime
from urllib.parse import parse_qs, urlparse
from home.src.download.yt_dlp_base import YtWrap
def clean_string(file_name):
"""clean string to only asci characters"""
whitelist = "-_.() " + string.ascii_letters + string.digits
normalized = unicodedata.normalize("NFKD", file_name)
ascii_only = normalized.encode("ASCII", "ignore").decode().strip()
white_listed = "".join(c for c in ascii_only if c in whitelist)
cleaned = re.sub(r"[ ]{2,}", " ", white_listed)
return cleaned
def ignore_filelist(filelist):
"""ignore temp files for os.listdir sanitizer"""
to_ignore = ["Icon\r\r", "Temporary Items", "Network Trash Folder"]
cleaned = []
for file_name in filelist:
if file_name.startswith(".") or file_name in to_ignore:
continue
cleaned.append(file_name)
return cleaned
def randomizor(length):
"""generate random alpha numeric string"""
pool = string.digits + string.ascii_letters
return "".join(random.choice(pool) for i in range(length))
def requests_headers():
"""build header with random user agent for requests outside of yt-dlp"""
chrome_versions = (
"90.0.4430.212",
"90.0.4430.24",
"90.0.4430.70",
"90.0.4430.72",
"90.0.4430.85",
"90.0.4430.93",
"91.0.4472.101",
"91.0.4472.106",
"91.0.4472.114",
"91.0.4472.124",
"91.0.4472.164",
"91.0.4472.19",
"91.0.4472.77",
"92.0.4515.107",
"92.0.4515.115",
"92.0.4515.131",
"92.0.4515.159",
"92.0.4515.43",
"93.0.4556.0",
"93.0.4577.15",
"93.0.4577.63",
"93.0.4577.82",
"94.0.4606.41",
"94.0.4606.54",
"94.0.4606.61",
"94.0.4606.71",
"94.0.4606.81",
"94.0.4606.85",
"95.0.4638.17",
"95.0.4638.50",
"95.0.4638.54",
"95.0.4638.69",
"95.0.4638.74",
"96.0.4664.18",
"96.0.4664.45",
"96.0.4664.55",
"96.0.4664.93",
"97.0.4692.20",
)
template = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ f"Chrome/{random.choice(chrome_versions)} Safari/537.36"
)
return {"User-Agent": template}
def date_praser(timestamp):
"""return formatted date string"""
if isinstance(timestamp, int):
date_obj = datetime.fromtimestamp(timestamp)
elif isinstance(timestamp, str):
date_obj = datetime.strptime(timestamp, "%Y-%m-%d")
return datetime.strftime(date_obj, "%d %b, %Y")
def time_parser(timestamp):
"""return seconds from timestamp, false on empty"""
if not timestamp:
return False
if timestamp.isnumeric():
return int(timestamp)
hours, minutes, seconds = timestamp.split(":", maxsplit=3)
return int(hours) * 60 * 60 + int(minutes) * 60 + float(seconds)
def clear_dl_cache(config):
"""clear leftover files from dl cache"""
print("clear download cache")
cache_dir = os.path.join(config["application"]["cache_dir"], "download")
for cached in os.listdir(cache_dir):
to_delete = os.path.join(cache_dir, cached)
os.remove(to_delete)
def get_mapping():
"""read index_mapping.json and get expected mapping and settings"""
with open("home/src/es/index_mapping.json", "r", encoding="utf-8") as f:
index_config = json.load(f).get("index_config")
return index_config
class UrlListParser:
"""take a multi line string and detect valid youtube ids"""
def __init__(self, url_str):
self.url_list = [i.strip() for i in url_str.split()]
def process_list(self):
"""loop through the list"""
youtube_ids = []
for url in self.url_list:
parsed = urlparse(url)
print(f"processing: {url}")
print(parsed)
if not parsed.netloc:
# is not a url
id_type = self.find_valid_id(url)
youtube_id = url
elif "youtube.com" not in url and "youtu.be" not in url:
raise ValueError(f"{url} is not a youtube link")
elif parsed.path:
# is a url
youtube_id, id_type = self.detect_from_url(parsed)
else:
# not detected
raise ValueError(f"failed to detect {url}")
youtube_ids.append({"url": youtube_id, "type": id_type})
return youtube_ids
def detect_from_url(self, parsed):
"""detect from parsed url"""
if parsed.netloc == "youtu.be":
# shortened
youtube_id = parsed.path.strip("/")
_ = self.find_valid_id(youtube_id)
return youtube_id, "video"
if parsed.query:
# detect from query string
query_parsed = parse_qs(parsed.query)
if "v" in query_parsed:
youtube_id = query_parsed["v"][0]
_ = self.find_valid_id(youtube_id)
return youtube_id, "video"
if "list" in query_parsed:
youtube_id = query_parsed["list"][0]
return youtube_id, "playlist"
if parsed.path.startswith("/channel/"):
# channel id in url
youtube_id = parsed.path.split("/")[2]
_ = self.find_valid_id(youtube_id)
return youtube_id, "channel"
# detect channel with yt_dlp
youtube_id = self.extract_channel_name(parsed.geturl())
return youtube_id, "channel"
@staticmethod
def find_valid_id(id_str):
"""detect valid id from length of string"""
str_len = len(id_str)
if str_len == 11:
id_type = "video"
elif str_len == 24:
id_type = "channel"
elif str_len in [34, 18] or id_str in ["LL", "WL"]:
id_type = "playlist"
else:
# unable to parse
raise ValueError("not a valid id_str: " + id_str)
return id_type
@staticmethod
def extract_channel_name(url):
"""find channel id from channel name with yt-dlp help"""
obs_request = {
"skip_download": True,
"extract_flat": True,
"playlistend": 0,
}
url_info = YtWrap(obs_request).extract(url)
channel_id = url_info.get("channel_id", False)
if channel_id:
return channel_id
url = url_info.get("url", False)
if url:
# handle old channel name redirect with url path split
channel_id = urlparse(url).path.strip("/").split("/")[1]
return channel_id
print(f"failed to extract channel id from {url}")
raise ValueError
class DurationConverter:
"""
using ffmpeg to get and parse duration from filepath
"""
@staticmethod
def get_sec(file_path):
"""read duration from file"""
duration = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
],
capture_output=True,
check=True,
)
duration_raw = duration.stdout.decode().strip()
if duration_raw == "N/A":
return 0
duration_sec = int(float(duration_raw))
return duration_sec
@staticmethod
def get_str(duration_sec):
"""takes duration in sec and returns clean string"""
if not duration_sec:
# failed to extract
return "NA"
hours = duration_sec // 3600
minutes = (duration_sec - (hours * 3600)) // 60
secs = duration_sec - (hours * 3600) - (minutes * 60)
duration_str = str()
if hours:
duration_str = str(hours).zfill(2) + ":"
if minutes:
duration_str = duration_str + str(minutes).zfill(2) + ":"
else:
duration_str = duration_str + "00:"
duration_str = duration_str + str(secs).zfill(2)
return duration_str