add typing
This commit is contained in:
parent
4067b6c182
commit
e385331f6c
|
@ -15,20 +15,20 @@ from urllib.parse import urlparse
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
|
|
||||||
def clean_string(file_name):
|
def clean_string(file_name: str) -> str:
|
||||||
"""clean string to only asci characters"""
|
"""clean string to only asci characters"""
|
||||||
whitelist = "-_.() " + string.ascii_letters + string.digits
|
whitelist = "-_.() " + string.ascii_letters + string.digits
|
||||||
normalized = unicodedata.normalize("NFKD", file_name)
|
normalized = unicodedata.normalize("NFKD", file_name)
|
||||||
ascii_only = normalized.encode("ASCII", "ignore").decode().strip()
|
ascii_only = normalized.encode("ASCII", "ignore").decode().strip()
|
||||||
white_listed = "".join(c for c in ascii_only if c in whitelist)
|
white_listed: str = "".join(c for c in ascii_only if c in whitelist)
|
||||||
cleaned = re.sub(r"[ ]{2,}", " ", white_listed)
|
cleaned: str = re.sub(r"[ ]{2,}", " ", white_listed)
|
||||||
return cleaned
|
return cleaned
|
||||||
|
|
||||||
|
|
||||||
def ignore_filelist(filelist):
|
def ignore_filelist(filelist: list[str]) -> list[str]:
|
||||||
"""ignore temp files for os.listdir sanitizer"""
|
"""ignore temp files for os.listdir sanitizer"""
|
||||||
to_ignore = ["Icon\r\r", "Temporary Items", "Network Trash Folder"]
|
to_ignore = ["Icon\r\r", "Temporary Items", "Network Trash Folder"]
|
||||||
cleaned = []
|
cleaned: list[str] = []
|
||||||
for file_name in filelist:
|
for file_name in filelist:
|
||||||
if file_name.startswith(".") or file_name in to_ignore:
|
if file_name.startswith(".") or file_name in to_ignore:
|
||||||
continue
|
continue
|
||||||
|
@ -38,13 +38,13 @@ def ignore_filelist(filelist):
|
||||||
return cleaned
|
return cleaned
|
||||||
|
|
||||||
|
|
||||||
def randomizor(length):
|
def randomizor(length: int) -> str:
|
||||||
"""generate random alpha numeric string"""
|
"""generate random alpha numeric string"""
|
||||||
pool = string.digits + string.ascii_letters
|
pool: str = string.digits + string.ascii_letters
|
||||||
return "".join(random.choice(pool) for i in range(length))
|
return "".join(random.choice(pool) for i in range(length))
|
||||||
|
|
||||||
|
|
||||||
def requests_headers():
|
def requests_headers() -> dict[str, str]:
|
||||||
"""build header with random user agent for requests outside of yt-dlp"""
|
"""build header with random user agent for requests outside of yt-dlp"""
|
||||||
|
|
||||||
chrome_versions = (
|
chrome_versions = (
|
||||||
|
@ -96,7 +96,7 @@ def requests_headers():
|
||||||
return {"User-Agent": template}
|
return {"User-Agent": template}
|
||||||
|
|
||||||
|
|
||||||
def date_praser(timestamp):
|
def date_praser(timestamp: int | str) -> str:
|
||||||
"""return formatted date string"""
|
"""return formatted date string"""
|
||||||
if isinstance(timestamp, int):
|
if isinstance(timestamp, int):
|
||||||
date_obj = datetime.fromtimestamp(timestamp)
|
date_obj = datetime.fromtimestamp(timestamp)
|
||||||
|
@ -106,7 +106,7 @@ def date_praser(timestamp):
|
||||||
return datetime.strftime(date_obj, "%d %b, %Y")
|
return datetime.strftime(date_obj, "%d %b, %Y")
|
||||||
|
|
||||||
|
|
||||||
def time_parser(timestamp):
|
def time_parser(timestamp: str) -> float:
|
||||||
"""return seconds from timestamp, false on empty"""
|
"""return seconds from timestamp, false on empty"""
|
||||||
if not timestamp:
|
if not timestamp:
|
||||||
return False
|
return False
|
||||||
|
@ -118,7 +118,7 @@ def time_parser(timestamp):
|
||||||
return int(hours) * 60 * 60 + int(minutes) * 60 + float(seconds)
|
return int(hours) * 60 * 60 + int(minutes) * 60 + float(seconds)
|
||||||
|
|
||||||
|
|
||||||
def clear_dl_cache(config):
|
def clear_dl_cache(config: dict) -> int:
|
||||||
"""clear leftover files from dl cache"""
|
"""clear leftover files from dl cache"""
|
||||||
print("clear download cache")
|
print("clear download cache")
|
||||||
cache_dir = os.path.join(config["application"]["cache_dir"], "download")
|
cache_dir = os.path.join(config["application"]["cache_dir"], "download")
|
||||||
|
@ -130,15 +130,15 @@ def clear_dl_cache(config):
|
||||||
return len(leftover_files)
|
return len(leftover_files)
|
||||||
|
|
||||||
|
|
||||||
def get_mapping():
|
def get_mapping() -> dict:
|
||||||
"""read index_mapping.json and get expected mapping and settings"""
|
"""read index_mapping.json and get expected mapping and settings"""
|
||||||
with open("home/src/es/index_mapping.json", "r", encoding="utf-8") as f:
|
with open("home/src/es/index_mapping.json", "r", encoding="utf-8") as f:
|
||||||
index_config = json.load(f).get("index_config")
|
index_config: dict = json.load(f).get("index_config")
|
||||||
|
|
||||||
return index_config
|
return index_config
|
||||||
|
|
||||||
|
|
||||||
def is_shorts(youtube_id):
|
def is_shorts(youtube_id: str) -> bool:
|
||||||
"""check if youtube_id is a shorts video, bot not it it's not a shorts"""
|
"""check if youtube_id is a shorts video, bot not it it's not a shorts"""
|
||||||
shorts_url = f"https://www.youtube.com/shorts/{youtube_id}"
|
shorts_url = f"https://www.youtube.com/shorts/{youtube_id}"
|
||||||
response = requests.head(
|
response = requests.head(
|
||||||
|
@ -148,10 +148,10 @@ def is_shorts(youtube_id):
|
||||||
return response.status_code == 200
|
return response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
def ta_host_parser(ta_host):
|
def ta_host_parser(ta_host: str) -> tuple[list[str], list[str]]:
|
||||||
"""parse ta_host env var for ALLOWED_HOSTS and CSRF_TRUSTED_ORIGINS"""
|
"""parse ta_host env var for ALLOWED_HOSTS and CSRF_TRUSTED_ORIGINS"""
|
||||||
allowed_hosts = []
|
allowed_hosts: list[str] = []
|
||||||
csrf_trusted_origins = []
|
csrf_trusted_origins: list[str] = []
|
||||||
for host in ta_host.split():
|
for host in ta_host.split():
|
||||||
host_clean = host.strip()
|
host_clean = host.strip()
|
||||||
if not host_clean.startswith("http"):
|
if not host_clean.startswith("http"):
|
||||||
|
|
Loading…
Reference in New Issue