refactor UrlParser into own module, rename enum fields

This commit is contained in:
simon 2023-01-06 15:37:55 +07:00
parent cd8b8aafc6
commit d9f73622a5
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
8 changed files with 157 additions and 117 deletions

View File

@ -13,8 +13,8 @@ from home.src.index.generic import Pagination
from home.src.index.reindex import ReindexProgress
from home.src.index.video import SponsorBlock, YoutubeVideo
from home.src.ta.config import AppConfig
from home.src.ta.helper import UrlListParser
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.urlparser import Parser
from home.tasks import check_reindex, download_single, extrac_dl, subscribe_to
from rest_framework.authentication import (
SessionAuthentication,
@ -484,7 +484,7 @@ class DownloadApiListView(ApiBaseView):
pending = [i["youtube_id"] for i in to_add if i["status"] == "pending"]
url_str = " ".join(pending)
try:
youtube_ids = UrlListParser(url_str).process_list()
youtube_ids = Parser(url_str).parse()
except ValueError:
message = f"failed to parse: {url_str}"
print(message)

View File

@ -163,7 +163,7 @@ class PendingList(PendingIndex):
def _process_entry(self, entry):
"""process single entry from url list"""
if entry["type"] == "video":
vid_type = entry.get("vid_type", VideoTypeEnum.VIDEOS)
vid_type = self._get_vid_type(entry)
self._add_video(entry["url"], vid_type)
elif entry["type"] == "channel":
self._parse_channel(entry["url"])
@ -173,6 +173,15 @@ class PendingList(PendingIndex):
else:
raise ValueError(f"invalid url_type: {entry}")
@staticmethod
def _get_vid_type(entry):
"""add vid type enum if available"""
vid_type_str = entry.get("vid_type")
if not vid_type_str:
return VideoTypeEnum.VIDEOS
return VideoTypeEnum(vid_type_str)
def _add_video(self, url, vid_type=VideoTypeEnum.VIDEOS):
"""add video to list"""
if url not in self.missing_videos and url not in self.to_skip:

View File

@ -9,8 +9,8 @@ from home.src.download.subscriptions import (
PlaylistSubscription,
)
from home.src.index.playlist import YoutubePlaylist
from home.src.ta.helper import UrlListParser
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.urlparser import Parser
from home.tasks import (
download_pending,
index_channel_playlists,
@ -123,7 +123,7 @@ class PostData:
"""unsubscribe from channels or playlists"""
id_unsub = self.exec_val
print(f"{id_unsub}: unsubscribe")
to_unsub_list = UrlListParser(id_unsub).process_list()
to_unsub_list = Parser(id_unsub).parse()
for to_unsub in to_unsub_list:
unsub_type = to_unsub["type"]
unsub_id = to_unsub["url"]

View File

@ -6,7 +6,7 @@ functionality:
from datetime import datetime
from home.src.es.connect import ElasticWrap
from home.src.ta.helper import UrlListParser
from home.src.ta.urlparser import Parser
class WatchState:
@ -34,7 +34,7 @@ class WatchState:
def _dedect_type(self):
"""find youtube id type"""
print(self.youtube_id)
url_process = UrlListParser(self.youtube_id).process_list()
url_process = Parser(self.youtube_id).parse()
url_type = url_process[0]["type"]
return url_type

View File

@ -11,10 +11,8 @@ import string
import subprocess
import unicodedata
from datetime import datetime
from urllib.parse import parse_qs, urlparse
import requests
from home.src.download.yt_dlp_base import YtWrap
def clean_string(file_name):
@ -147,106 +145,6 @@ def is_short(youtube_id):
return response.status_code == 200
class UrlListParser:
"""take a multi line string and detect valid youtube ids"""
def __init__(self, url_str):
self.url_list = [i.strip() for i in url_str.split()]
def process_list(self):
"""loop through the list"""
youtube_ids = []
for url in self.url_list:
parsed = urlparse(url)
print(f"processing: {url}")
print(parsed)
if not parsed.netloc:
# is not a url
id_type = self.find_valid_id(url)
youtube_id = url
elif "youtube.com" not in url and "youtu.be" not in url:
raise ValueError(f"{url} is not a youtube link")
elif parsed.path:
# is a url
youtube_id, id_type = self.detect_from_url(parsed)
else:
# not detected
raise ValueError(f"failed to detect {url}")
youtube_ids.append({"url": youtube_id, "type": id_type})
return youtube_ids
def detect_from_url(self, parsed):
"""detect from parsed url"""
if parsed.netloc == "youtu.be":
# shortened
youtube_id = parsed.path.strip("/")
_ = self.find_valid_id(youtube_id)
return youtube_id, "video"
if parsed.query:
# detect from query string
query_parsed = parse_qs(parsed.query)
if "v" in query_parsed:
youtube_id = query_parsed["v"][0]
_ = self.find_valid_id(youtube_id)
return youtube_id, "video"
if "list" in query_parsed:
youtube_id = query_parsed["list"][0]
return youtube_id, "playlist"
if parsed.path.startswith("/channel/"):
# channel id in url
youtube_id = parsed.path.split("/")[2]
_ = self.find_valid_id(youtube_id)
return youtube_id, "channel"
# detect channel with yt_dlp
youtube_id = self.extract_channel_name(parsed.geturl())
return youtube_id, "channel"
@staticmethod
def find_valid_id(id_str):
"""detect valid id from length of string"""
str_len = len(id_str)
if str_len == 11:
id_type = "video"
elif str_len == 24:
id_type = "channel"
elif str_len in [34, 18] or id_str in ["LL", "WL"]:
id_type = "playlist"
else:
# unable to parse
raise ValueError("not a valid id_str: " + id_str)
return id_type
@staticmethod
def extract_channel_name(url):
"""find channel id from channel name with yt-dlp help"""
obs_request = {
"skip_download": True,
"extract_flat": True,
"playlistend": 0,
}
url_info = YtWrap(obs_request).extract(url)
channel_id = url_info.get("channel_id", False)
if channel_id:
return channel_id
url = url_info.get("url", False)
if url:
# handle old channel name redirect with url path split
channel_id = urlparse(url).path.strip("/").split("/")[1]
return channel_id
print(f"failed to extract channel id from {url}")
raise ValueError
class DurationConverter:
"""
using ffmpeg to get and parse duration from filepath

View File

@ -0,0 +1,133 @@
"""
Functionality:
- detect valid youtube ids and links from multi line string
- identify vid_type if possible
"""
from urllib.parse import parse_qs, urlparse
from home.src.download.yt_dlp_base import YtWrap
from home.src.index.video_constants import VideoTypeEnum
class Parser:
"""take a multi line string and detect valid youtube ids"""
def __init__(self, url_str):
self.url_list = [i.strip() for i in url_str.split()]
def parse(self):
"""parse the list"""
ids = []
for url in self.url_list:
parsed = urlparse(url)
if parsed.netloc:
# is url
identified = self.process_url(parsed)
else:
# is not url
identified = self._find_valid_id(url)
if "vid_type" not in identified:
identified.update(self._detect_vid_type(parsed.path))
ids.append(identified)
return ids
def process_url(self, parsed):
"""process as url"""
if parsed.netloc == "youtu.be":
# shortened
youtube_id = parsed.path.strip("/")
return self._validate_expected(youtube_id, "video")
query_parsed = parse_qs(parsed.query)
if "v" in query_parsed:
# video from v query str
youtube_id = query_parsed["v"][0]
return self._validate_expected(youtube_id, "video")
if "list" in query_parsed:
# playlist from list query str
youtube_id = query_parsed["list"][0]
return self._validate_expected(youtube_id, "playlist")
all_paths = parsed.path.strip("/").split("/")
if all_paths[0] == "shorts":
# is shorts video
item = self._validate_expected(all_paths[1], "video")
item.update({"vid_type": VideoTypeEnum.SHORTS.value})
return item
if all_paths[0] == "channel":
return self._validate_expected(all_paths[1], "channel")
# detect channel
channel_id = self._extract_channel_name(parsed.geturl())
return {"type": "channel", "url": channel_id}
def _validate_expected(self, youtube_id, expected_type):
"""raise value error if not matching"""
matched = self._find_valid_id(youtube_id)
if matched["type"] != expected_type:
raise ValueError(
f"{youtube_id} not of expected type {expected_type}"
)
return {"type": expected_type, "url": youtube_id}
def _find_valid_id(self, id_str):
"""detect valid id from length of string"""
if id_str in ("LL", "WL"):
return {"type": "playlist", "url": id_str}
if id_str.startswith("@"):
url = f"https://www.youtube.com/{id_str}"
channel_id = self._extract_channel_name(url)
return {"type": "channel", "url": channel_id}
len_id_str = len(id_str)
if len_id_str == 11:
item_type = "video"
elif len_id_str == 24:
item_type = "channel"
elif len_id_str in (34, 18):
item_type = "playlist"
else:
raise ValueError(f"not a valid id_str: {id_str}")
return {"type": item_type, "url": id_str}
@staticmethod
def _extract_channel_name(url):
"""find channel id from channel name with yt-dlp help"""
obs_request = {
"skip_download": True,
"extract_flat": True,
"playlistend": 0,
}
url_info = YtWrap(obs_request).extract(url)
channel_id = url_info.get("channel_id", False)
if channel_id:
return channel_id
url = url_info.get("url", False)
if url:
# handle old channel name redirect with url path split
channel_id = urlparse(url).path.strip("/").split("/")[1]
return channel_id
print(f"failed to extract channel id from {url}")
raise ValueError
def _detect_vid_type(self, path):
"""try to match enum from path, needs to be serializable"""
last = path.strip("/").split("/")[-1]
try:
vid_type = VideoTypeEnum(last).value
except ValueError:
vid_type = VideoTypeEnum.UNKNOWN.value
return {"vid_type": vid_type}

View File

@ -25,8 +25,9 @@ from home.src.index.filesystem import ImportFolderScanner, scan_filesystem
from home.src.index.reindex import Reindex, ReindexManual, ReindexOutdated
from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder
from home.src.ta.helper import UrlListParser, clear_dl_cache
from home.src.ta.helper import clear_dl_cache
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.urlparser import Parser
CONFIG = AppConfig().config
REDIS_HOST = os.environ.get("REDIS_HOST")
@ -261,9 +262,8 @@ def re_sync_thumbs():
@shared_task
def subscribe_to(url_str):
"""take a list of urls to subscribe to"""
to_subscribe_list = UrlListParser(url_str).process_list()
counter = 1
for item in to_subscribe_list:
to_subscribe_list = Parser(url_str).parse()
for idx, item in enumerate(to_subscribe_list):
to_sub_id = item["url"]
if item["type"] == "playlist":
PlaylistSubscription().process_url_str([item])
@ -286,10 +286,9 @@ def subscribe_to(url_str):
"status": key,
"level": "info",
"title": "Subscribing to Channels",
"message": f"Processing {counter} of {len(to_subscribe_list)}",
"message": f"Processing {idx + 1} of {len(to_subscribe_list)}",
}
RedisArchivist().set_message(key, message=message, expire=True)
counter = counter + 1
@shared_task

View File

@ -38,8 +38,9 @@ from home.src.index.playlist import YoutubePlaylist
from home.src.index.reindex import ReindexProgress
from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder
from home.src.ta.helper import UrlListParser, time_parser
from home.src.ta.helper import time_parser
from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.urlparser import Parser
from home.tasks import extrac_dl, index_channel_playlists, subscribe_to
from rest_framework.authtoken.models import Token
@ -456,7 +457,7 @@ class DownloadView(ArchivistResultsView):
url_str = request.POST.get("vid_url")
print(url_str)
try:
youtube_ids = UrlListParser(url_str).process_list()
youtube_ids = Parser(url_str).parse()
except ValueError:
# failed to process
key = "message:add"