
343 lines
10 KiB
Raw Normal View History

2021-09-05 17:10:14 +00:00
Loose collection of helper functions
- don't import AppConfig class here to avoid circular imports
import json
import os
import re
import string
import subprocess
import unicodedata
from urllib.parse import parse_qs, urlparse
2021-09-05 17:10:14 +00:00
import redis
2021-09-18 13:02:54 +00:00
import requests
import yt_dlp as youtube_dl
2021-09-05 17:10:14 +00:00
2021-10-28 08:49:58 +00:00
def get_total_hits(index, es_url, es_auth, match_field):
2021-09-21 09:25:22 +00:00
"""get total hits from index"""
headers = {"Content-type": "application/json"}
2021-09-05 17:10:14 +00:00
data = {"query": {"match": {match_field: True}}}
payload = json.dumps(data)
2021-09-21 09:25:22 +00:00
url = f"{es_url}/{index}/_search?"
2021-10-28 08:49:58 +00:00
request =, data=payload, headers=headers, auth=es_auth)
2021-09-05 17:10:14 +00:00
if not request.ok:
total_json = json.loads(request.text)
2021-09-21 09:25:22 +00:00
total_hits = total_json["hits"]["total"]["value"]
2021-09-05 17:10:14 +00:00
return total_hits
def clean_string(file_name):
2021-09-21 09:25:22 +00:00
"""clean string to only asci characters"""
2021-09-05 17:10:14 +00:00
whitelist = "-_.() " + string.ascii_letters + string.digits
2021-09-21 09:25:22 +00:00
normalized = unicodedata.normalize("NFKD", file_name)
ascii_only = normalized.encode("ASCII", "ignore").decode().strip()
white_listed = "".join(c for c in ascii_only if c in whitelist)
cleaned = re.sub(r"[ ]{2,}", " ", white_listed)
2021-09-05 17:10:14 +00:00
return cleaned
def ignore_filelist(filelist):
"""ignore temp files for os.listdir sanitizer"""
to_ignore = ["Icon\r\r", "Temporary Items", "Network Trash Folder"]
cleaned = []
for file_name in filelist:
if file_name.startswith(".") or file_name in to_ignore:
return cleaned
class UrlListParser:
"""take a multi line string and detect valid youtube ids"""
def __init__(self, url_str):
self.url_list = [i.strip() for i in url_str.split()]
def process_list(self):
"""loop through the list"""
youtube_ids = []
for url in self.url_list:
parsed = urlparse(url)
print(f"processing: {url}")
if not parsed.netloc:
# is not a url
id_type = self.find_valid_id(url)
youtube_id = url
2021-11-12 12:26:47 +00:00
elif "" not in url and "" not in url:
2021-10-31 10:05:13 +00:00
raise ValueError(f"{url} is not a youtube link")
elif parsed.path:
# is a url
youtube_id, id_type = self.detect_from_url(parsed)
# not detected
raise ValueError(f"failed to detect {url}")
youtube_ids.append({"url": youtube_id, "type": id_type})
return youtube_ids
def detect_from_url(self, parsed):
"""detect from parsed url"""
if parsed.netloc == "":
# shortened
youtube_id = parsed.path.strip("/")
2021-10-31 10:05:13 +00:00
_ = self.find_valid_id(youtube_id)
return youtube_id, "video"
if parsed.query:
# detect from query string
query_parsed = parse_qs(parsed.query)
if "v" in query_parsed.keys():
youtube_id = query_parsed["v"][0]
2021-10-31 10:05:13 +00:00
_ = self.find_valid_id(youtube_id)
return youtube_id, "video"
if "list" in query_parsed.keys():
youtube_id = query_parsed["list"][0]
return youtube_id, "playlist"
if parsed.path.startswith("/channel/"):
# channel id in url
youtube_id = parsed.path.split("/")[2]
2021-10-31 10:05:13 +00:00
_ = self.find_valid_id(youtube_id)
return youtube_id, "channel"
# dedect channel with yt_dlp
youtube_id = self.extract_channel_name(parsed.geturl())
return youtube_id, "channel"
def find_valid_id(id_str):
"""dedect valid id from length of string"""
str_len = len(id_str)
2021-09-05 17:10:14 +00:00
if str_len == 11:
id_type = "video"
2021-09-05 17:10:14 +00:00
elif str_len == 24:
id_type = "channel"
2021-11-29 07:21:19 +00:00
elif str_len in [34, 18]:
id_type = "playlist"
2021-09-10 08:07:38 +00:00
# unable to parse
raise ValueError("not a valid id_str: " + id_str)
return id_type
def extract_channel_name(url):
"""find channel id from channel name with yt-dlp help"""
obs = {
"default_search": "ytsearch",
"quiet": True,
"skip_download": True,
"extract_flat": True,
"playlistend": 0,
url_info = youtube_dl.YoutubeDL(obs).extract_info(url, download=False)
2021-10-31 10:05:13 +00:00
channel_id = url_info["channel_id"]
except KeyError as error:
print(f"failed to extract channel id from {url}")
raise ValueError from error
return channel_id
2021-09-05 17:10:14 +00:00
class RedisArchivist:
"""collection of methods to interact with redis"""
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT") or 6379
2021-10-27 11:07:35 +00:00
NAME_SPACE = "ta:"
2021-09-30 11:03:23 +00:00
def __init__(self):
2021-09-30 11:03:23 +00:00
self.redis_connection = redis.Redis(
host=self.REDIS_HOST, port=self.REDIS_PORT
def set_message(self, key, message, expire=True):
"""write new message to redis"""
2021-10-27 11:07:35 +00:00
"JSON.SET", self.NAME_SPACE + key, ".", json.dumps(message)
if expire:
if isinstance(expire, bool):
secs = 20
secs = expire
2021-10-27 11:07:35 +00:00
"EXPIRE", self.NAME_SPACE + key, secs
2021-10-27 11:07:35 +00:00
def get_message(self, key):
"""get message dict from redis"""
2021-10-27 11:07:35 +00:00
reply = self.redis_connection.execute_command(
"JSON.GET", self.NAME_SPACE + key
if reply:
json_str = json.loads(reply)
json_str = {"status": False}
return json_str
def del_message(self, key):
"""delete key from redis"""
2021-10-27 11:07:35 +00:00
response = self.redis_connection.execute_command(
"DEL", self.NAME_SPACE + key
return response
def get_lock(self, lock_key):
"""handle lock for task management"""
2021-10-27 11:07:35 +00:00
redis_lock = self.redis_connection.lock(self.NAME_SPACE + lock_key)
return redis_lock
def get_progress(self):
"""get a list of all progress messages"""
all_messages = []
for channel in self.CHANNELS:
key = "message:" + channel
reply = self.redis_connection.execute_command(
"JSON.GET", self.NAME_SPACE + key
if reply:
json_str = json.loads(reply)
return all_messages
def monitor_cache_dir(cache_dir):
look at download cache dir directly as alternative progress info
dl_cache = os.path.join(cache_dir, "download")
all_cache_file = os.listdir(dl_cache)
cache_file = ignore_filelist(all_cache_file)
if cache_file:
filename = cache_file[0][12:].replace("_", " ").split(".")[0]
mess_dict = {
2021-12-05 09:41:06 +00:00
"status": "message:download",
"level": "info",
"title": "Downloading: " + filename,
"message": "",
return False
return mess_dict
2021-09-05 17:10:14 +00:00
2021-09-23 09:58:47 +00:00
class RedisQueue:
"""dynamically interact with the download queue in redis"""
REDIS_HOST = os.environ.get("REDIS_HOST")
2021-09-30 11:03:23 +00:00
REDIS_PORT = os.environ.get("REDIS_PORT")
2021-10-27 11:07:35 +00:00
NAME_SPACE = "ta:"
2021-09-30 11:03:23 +00:00
if not REDIS_PORT:
2021-09-23 09:58:47 +00:00
def __init__(self, key):
2021-10-27 11:07:35 +00:00
self.key = self.NAME_SPACE + key
2021-09-30 11:03:23 +00:00
self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT)
2021-09-23 09:58:47 +00:00
def get_all(self):
"""return all elements in list"""
result = self.conn.execute_command("LRANGE", self.key, 0, -1)
all_elements = [i.decode() for i in result]
return all_elements
def add_list(self, to_add):
"""add list to queue"""
self.conn.execute_command("RPUSH", self.key, *to_add)
def add_priority(self, to_add):
"""add single video to front of queue"""
2021-09-23 09:58:47 +00:00
self.conn.execute_command("LPUSH", self.key, to_add)
def get_next(self):
"""return next element in the queue, False if none"""
result = self.conn.execute_command("LPOP", self.key)
if not result:
return False
next_element = result.decode()
return next_element
def clear(self):
"""delete list from redis"""
self.conn.execute_command("DEL", self.key)
def clear_item(self, to_clear):
"""remove single item from list if it's there"""
self.conn.execute_command("LREM", self.key, 0, to_clear)
def trim(self, size):
"""trim the queue based on settings amount"""
self.conn.execute_command("LTRIM", self.key, 0, size)
2021-09-23 09:58:47 +00:00
2021-09-05 17:10:14 +00:00
class DurationConverter:
using ffmpeg to get and parse duration from filepath
def get_sec(file_path):
2021-09-21 09:25:22 +00:00
"""read duration from file"""
duration =
2021-09-05 17:10:14 +00:00
duration_sec = int(float(duration.stdout.decode().strip()))
return duration_sec
def get_str(duration_sec):
2021-09-21 09:25:22 +00:00
"""takes duration in sec and returns clean string"""
if not duration_sec:
# failed to extract
return "NA"
2021-09-05 17:10:14 +00:00
hours = duration_sec // 3600
minutes = (duration_sec - (hours * 3600)) // 60
secs = duration_sec - (hours * 3600) - (minutes * 60)
duration_str = str()
if hours:
2021-09-21 09:25:22 +00:00
duration_str = str(hours).zfill(2) + ":"
2021-09-05 17:10:14 +00:00
if minutes:
2021-09-21 09:25:22 +00:00
duration_str = duration_str + str(minutes).zfill(2) + ":"
2021-09-05 17:10:14 +00:00
2021-09-21 09:25:22 +00:00
duration_str = duration_str + "00:"
2021-09-05 17:10:14 +00:00
duration_str = duration_str + str(secs).zfill(2)
return duration_str