switch channel index to yt-dlp, index tags
This commit is contained in:
parent
b7bfeaf215
commit
2b66786728
|
@ -6,158 +6,15 @@ functionality:
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import requests
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from home.src.download import queue # partial import
|
from home.src.download import queue # partial import
|
||||||
from home.src.download.thumbnails import ThumbManager
|
from home.src.download.thumbnails import ThumbManager
|
||||||
from home.src.download.yt_dlp_base import YtWrap
|
from home.src.download.yt_dlp_base import YtWrap
|
||||||
from home.src.es.connect import ElasticWrap, IndexPaginate
|
from home.src.es.connect import ElasticWrap, IndexPaginate
|
||||||
from home.src.index.generic import YouTubeItem
|
from home.src.index.generic import YouTubeItem
|
||||||
from home.src.index.playlist import YoutubePlaylist
|
from home.src.index.playlist import YoutubePlaylist
|
||||||
from home.src.ta.helper import clean_string, requests_headers
|
from home.src.ta.helper import clean_string
|
||||||
|
|
||||||
|
|
||||||
class ChannelScraper:
|
|
||||||
"""custom scraper using bs4 to scrape channel about page
|
|
||||||
will be able to be integrated into yt-dlp
|
|
||||||
once #2237 and #2350 are merged upstream
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, channel_id):
|
|
||||||
self.channel_id = channel_id
|
|
||||||
self.soup = False
|
|
||||||
self.yt_json = False
|
|
||||||
self.json_data = False
|
|
||||||
|
|
||||||
def get_json(self):
|
|
||||||
"""main method to return channel dict"""
|
|
||||||
self.get_soup()
|
|
||||||
self._extract_yt_json()
|
|
||||||
if self._is_deactivated():
|
|
||||||
return False
|
|
||||||
|
|
||||||
self._parse_channel_main()
|
|
||||||
self._parse_channel_meta()
|
|
||||||
return self.json_data
|
|
||||||
|
|
||||||
def get_soup(self):
|
|
||||||
"""return soup from youtube"""
|
|
||||||
print(f"{self.channel_id}: scrape channel data from youtube")
|
|
||||||
url = f"https://www.youtube.com/channel/{self.channel_id}/about?hl=en"
|
|
||||||
cookies = {"CONSENT": "YES+xxxxxxxxxxxxxxxxxxxxxxxxxxx"}
|
|
||||||
response = requests.get(
|
|
||||||
url, cookies=cookies, headers=requests_headers(), timeout=10
|
|
||||||
)
|
|
||||||
if response.ok:
|
|
||||||
channel_page = response.text
|
|
||||||
else:
|
|
||||||
print(f"{self.channel_id}: failed to extract channel info")
|
|
||||||
raise ConnectionError
|
|
||||||
self.soup = BeautifulSoup(channel_page, "html.parser")
|
|
||||||
|
|
||||||
def _extract_yt_json(self):
|
|
||||||
"""parse soup and get ytInitialData json"""
|
|
||||||
all_scripts = self.soup.find("body").find_all("script")
|
|
||||||
for script in all_scripts:
|
|
||||||
if "var ytInitialData = " in str(script):
|
|
||||||
script_content = str(script)
|
|
||||||
break
|
|
||||||
# extract payload
|
|
||||||
script_content = script_content.split("var ytInitialData = ")[1]
|
|
||||||
json_raw = script_content.rstrip(";</script>")
|
|
||||||
self.yt_json = json.loads(json_raw)
|
|
||||||
|
|
||||||
def _is_deactivated(self):
|
|
||||||
"""check if channel is deactivated"""
|
|
||||||
alerts = self.yt_json.get("alerts")
|
|
||||||
if not alerts:
|
|
||||||
return False
|
|
||||||
|
|
||||||
for alert in alerts:
|
|
||||||
alert_text = alert["alertRenderer"]["text"]["simpleText"]
|
|
||||||
print(f"{self.channel_id}: failed to extract, {alert_text}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _parse_channel_main(self):
|
|
||||||
"""extract maintab values from scraped channel json data"""
|
|
||||||
main_tab = self.yt_json["header"]["c4TabbedHeaderRenderer"]
|
|
||||||
# build and return dict
|
|
||||||
self.json_data = {
|
|
||||||
"channel_active": True,
|
|
||||||
"channel_last_refresh": int(datetime.now().timestamp()),
|
|
||||||
"channel_subs": self._get_channel_subs(main_tab),
|
|
||||||
"channel_name": main_tab["title"],
|
|
||||||
"channel_banner_url": self._get_thumbnails(main_tab, "banner"),
|
|
||||||
"channel_tvart_url": self._get_thumbnails(main_tab, "tvBanner"),
|
|
||||||
"channel_id": self.channel_id,
|
|
||||||
"channel_subscribed": False,
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_thumbnails(main_tab, thumb_name):
|
|
||||||
"""extract banner url from main_tab"""
|
|
||||||
try:
|
|
||||||
all_banners = main_tab[thumb_name]["thumbnails"]
|
|
||||||
banner = sorted(all_banners, key=lambda k: k["width"])[-1]["url"]
|
|
||||||
except KeyError:
|
|
||||||
banner = False
|
|
||||||
|
|
||||||
return banner
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_channel_subs(main_tab):
|
|
||||||
"""process main_tab to get channel subs as int"""
|
|
||||||
try:
|
|
||||||
sub_text_simple = main_tab["subscriberCountText"]["simpleText"]
|
|
||||||
sub_text = sub_text_simple.split(" ")[0]
|
|
||||||
if sub_text[-1] == "K":
|
|
||||||
channel_subs = int(float(sub_text.replace("K", "")) * 1000)
|
|
||||||
elif sub_text[-1] == "M":
|
|
||||||
channel_subs = int(float(sub_text.replace("M", "")) * 1000000)
|
|
||||||
elif int(sub_text) >= 0:
|
|
||||||
channel_subs = int(sub_text)
|
|
||||||
else:
|
|
||||||
message = f"{sub_text} not dealt with"
|
|
||||||
print(message)
|
|
||||||
except KeyError:
|
|
||||||
channel_subs = 0
|
|
||||||
|
|
||||||
return channel_subs
|
|
||||||
|
|
||||||
def _parse_channel_meta(self):
|
|
||||||
"""extract meta tab values from channel payload"""
|
|
||||||
# meta tab
|
|
||||||
meta_tab = self.yt_json["metadata"]["channelMetadataRenderer"]
|
|
||||||
all_thumbs = meta_tab["avatar"]["thumbnails"]
|
|
||||||
thumb_url = sorted(all_thumbs, key=lambda k: k["width"])[-1]["url"]
|
|
||||||
# stats tab
|
|
||||||
renderer = "twoColumnBrowseResultsRenderer"
|
|
||||||
all_tabs = self.yt_json["contents"][renderer]["tabs"]
|
|
||||||
for tab in all_tabs:
|
|
||||||
if "tabRenderer" in tab.keys():
|
|
||||||
if tab["tabRenderer"]["title"] == "About":
|
|
||||||
about_tab = tab["tabRenderer"]["content"][
|
|
||||||
"sectionListRenderer"
|
|
||||||
]["contents"][0]["itemSectionRenderer"]["contents"][0][
|
|
||||||
"channelAboutFullMetadataRenderer"
|
|
||||||
]
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
channel_views_text = about_tab["viewCountText"]["simpleText"]
|
|
||||||
channel_views = int(re.sub(r"\D", "", channel_views_text))
|
|
||||||
except KeyError:
|
|
||||||
channel_views = 0
|
|
||||||
|
|
||||||
self.json_data.update(
|
|
||||||
{
|
|
||||||
"channel_description": meta_tab["description"],
|
|
||||||
"channel_thumb_url": thumb_url,
|
|
||||||
"channel_views": channel_views,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class YoutubeChannel(YouTubeItem):
|
class YoutubeChannel(YouTubeItem):
|
||||||
|
@ -166,36 +23,93 @@ class YoutubeChannel(YouTubeItem):
|
||||||
es_path = False
|
es_path = False
|
||||||
index_name = "ta_channel"
|
index_name = "ta_channel"
|
||||||
yt_base = "https://www.youtube.com/channel/"
|
yt_base = "https://www.youtube.com/channel/"
|
||||||
|
yt_obs = {
|
||||||
|
"extract_flat": True,
|
||||||
|
"allow_playlist_files": True,
|
||||||
|
}
|
||||||
|
|
||||||
def __init__(self, youtube_id, task=False):
|
def __init__(self, youtube_id, task=False):
|
||||||
super().__init__(youtube_id)
|
super().__init__(youtube_id)
|
||||||
self.es_path = f"{self.index_name}/_doc/{youtube_id}"
|
|
||||||
self.all_playlists = False
|
self.all_playlists = False
|
||||||
self.task = task
|
self.task = task
|
||||||
|
|
||||||
|
def build_yt_url(self):
|
||||||
|
"""overwrite base to use channel about page"""
|
||||||
|
return f"{self.yt_base}{self.youtube_id}/about"
|
||||||
|
|
||||||
def build_json(self, upload=False, fallback=False):
|
def build_json(self, upload=False, fallback=False):
|
||||||
"""get from es or from youtube"""
|
"""get from es or from youtube"""
|
||||||
self.get_from_es()
|
self.get_from_es()
|
||||||
if self.json_data:
|
if self.json_data:
|
||||||
return
|
return
|
||||||
|
|
||||||
self.get_from_youtube(fallback)
|
self.get_from_youtube()
|
||||||
|
if not self.youtube_meta and fallback:
|
||||||
|
self._video_fallback(fallback)
|
||||||
|
else:
|
||||||
|
self._process_youtube_meta()
|
||||||
|
self.get_channel_art()
|
||||||
|
|
||||||
if upload:
|
if upload:
|
||||||
self.upload_to_es()
|
self.upload_to_es()
|
||||||
return
|
|
||||||
|
|
||||||
def get_from_youtube(self, fallback=False):
|
def _process_youtube_meta(self):
|
||||||
"""use bs4 to scrape channel about page"""
|
"""extract relevant fields"""
|
||||||
self.json_data = ChannelScraper(self.youtube_id).get_json()
|
self.youtube_meta["thumbnails"].reverse()
|
||||||
|
self.json_data = {
|
||||||
|
"channel_active": True,
|
||||||
|
"channel_description": self.youtube_meta.get("description", False),
|
||||||
|
"channel_id": self.youtube_id,
|
||||||
|
"channel_last_refresh": int(datetime.now().timestamp()),
|
||||||
|
"channel_name": self.youtube_meta["uploader"],
|
||||||
|
"channel_subs": self.youtube_meta.get("channel_follower_count", 0),
|
||||||
|
"channel_subscribed": False,
|
||||||
|
"channel_tags": self._parse_tags(self.youtube_meta.get("tags")),
|
||||||
|
"channel_banner_url": self._get_banner_art(),
|
||||||
|
"channel_thumb_url": self._get_thumb_art(),
|
||||||
|
"channel_tvart_url": self._get_tv_art(),
|
||||||
|
"channel_views": self.youtube_meta.get("view_count", 0),
|
||||||
|
}
|
||||||
|
|
||||||
if not self.json_data and fallback:
|
def _parse_tags(self, tags):
|
||||||
self._video_fallback(fallback)
|
"""parse channel tags"""
|
||||||
|
if not tags:
|
||||||
|
return False
|
||||||
|
|
||||||
if not self.json_data:
|
joined = " ".join(tags)
|
||||||
return
|
return [i.strip() for i in joined.split('"') if i and not i == " "]
|
||||||
|
|
||||||
self.get_channel_art()
|
def _get_thumb_art(self):
|
||||||
|
"""extract thumb art"""
|
||||||
|
for i in self.youtube_meta["thumbnails"]:
|
||||||
|
if not i.get("width"):
|
||||||
|
continue
|
||||||
|
if i.get("width") == i.get("height"):
|
||||||
|
return i["url"]
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _get_tv_art(self):
|
||||||
|
"""extract tv artwork"""
|
||||||
|
for i in self.youtube_meta["thumbnails"]:
|
||||||
|
if i.get("id") == "avatar_uncropped":
|
||||||
|
return i["url"]
|
||||||
|
if not i.get("width"):
|
||||||
|
continue
|
||||||
|
if i["width"] // i["height"] < 2:
|
||||||
|
return i["url"]
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _get_banner_art(self):
|
||||||
|
"""extract banner artwork"""
|
||||||
|
for i in self.youtube_meta["thumbnails"]:
|
||||||
|
if not i.get("width"):
|
||||||
|
continue
|
||||||
|
if i["width"] // i["height"] > 5:
|
||||||
|
return i["url"]
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def _video_fallback(self, fallback):
|
def _video_fallback(self, fallback):
|
||||||
"""use video metadata as fallback"""
|
"""use video metadata as fallback"""
|
||||||
|
@ -209,6 +123,7 @@ class YoutubeChannel(YouTubeItem):
|
||||||
"channel_tvart_url": False,
|
"channel_tvart_url": False,
|
||||||
"channel_id": self.youtube_id,
|
"channel_id": self.youtube_id,
|
||||||
"channel_subscribed": False,
|
"channel_subscribed": False,
|
||||||
|
"channel_tags": False,
|
||||||
"channel_description": False,
|
"channel_description": False,
|
||||||
"channel_thumb_url": False,
|
"channel_thumb_url": False,
|
||||||
"channel_views": 0,
|
"channel_views": 0,
|
||||||
|
|
|
@ -15,8 +15,8 @@ class YouTubeItem:
|
||||||
"""base class for youtube"""
|
"""base class for youtube"""
|
||||||
|
|
||||||
es_path = False
|
es_path = False
|
||||||
index_name = False
|
index_name = ""
|
||||||
yt_base = False
|
yt_base = ""
|
||||||
yt_obs = {
|
yt_obs = {
|
||||||
"skip_download": True,
|
"skip_download": True,
|
||||||
"noplaylist": True,
|
"noplaylist": True,
|
||||||
|
@ -24,18 +24,21 @@ class YouTubeItem:
|
||||||
|
|
||||||
def __init__(self, youtube_id):
|
def __init__(self, youtube_id):
|
||||||
self.youtube_id = youtube_id
|
self.youtube_id = youtube_id
|
||||||
|
self.es_path = f"{self.index_name}/_doc/{youtube_id}"
|
||||||
self.config = AppConfig().config
|
self.config = AppConfig().config
|
||||||
self.app_conf = self.config["application"]
|
self.app_conf = self.config["application"]
|
||||||
self.youtube_meta = False
|
self.youtube_meta = False
|
||||||
self.json_data = False
|
self.json_data = False
|
||||||
|
|
||||||
|
def build_yt_url(self):
|
||||||
|
"""build youtube url"""
|
||||||
|
return self.yt_base + self.youtube_id
|
||||||
|
|
||||||
def get_from_youtube(self):
|
def get_from_youtube(self):
|
||||||
"""use yt-dlp to get meta data from youtube"""
|
"""use yt-dlp to get meta data from youtube"""
|
||||||
print(f"{self.youtube_id}: get metadata from youtube")
|
print(f"{self.youtube_id}: get metadata from youtube")
|
||||||
url = self.yt_base + self.youtube_id
|
url = self.build_yt_url()
|
||||||
response = YtWrap(self.yt_obs, self.config).extract(url)
|
self.youtube_meta = YtWrap(self.yt_obs, self.config).extract(url)
|
||||||
|
|
||||||
self.youtube_meta = response
|
|
||||||
|
|
||||||
def get_from_es(self):
|
def get_from_es(self):
|
||||||
"""get indexed data from elastic search"""
|
"""get indexed data from elastic search"""
|
||||||
|
|
|
@ -26,7 +26,6 @@ class YoutubePlaylist(YouTubeItem):
|
||||||
|
|
||||||
def __init__(self, youtube_id):
|
def __init__(self, youtube_id):
|
||||||
super().__init__(youtube_id)
|
super().__init__(youtube_id)
|
||||||
self.es_path = f"{self.index_name}/_doc/{youtube_id}"
|
|
||||||
self.all_members = False
|
self.all_members = False
|
||||||
self.nav = False
|
self.nav = False
|
||||||
self.all_youtube_ids = []
|
self.all_youtube_ids = []
|
||||||
|
|
|
@ -138,7 +138,6 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
|
||||||
self.channel_id = False
|
self.channel_id = False
|
||||||
self.video_overwrites = video_overwrites
|
self.video_overwrites = video_overwrites
|
||||||
self.video_type = video_type
|
self.video_type = video_type
|
||||||
self.es_path = f"{self.index_name}/_doc/{youtube_id}"
|
|
||||||
self.offline_import = False
|
self.offline_import = False
|
||||||
|
|
||||||
def build_json(self, youtube_meta_overwrite=False, media_path=False):
|
def build_json(self, youtube_meta_overwrite=False, media_path=False):
|
||||||
|
|
|
@ -81,11 +81,18 @@
|
||||||
<button onclick="textExpand()" id="text-expand-button">Show more</button>
|
<button onclick="textExpand()" id="text-expand-button">Show more</button>
|
||||||
</div>
|
</div>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
<div class="description-box">
|
{% if channel_info.channel_tags %}
|
||||||
<h2>Customize {{ channel_info.channel_name }}</h2>
|
<div class="description-box">
|
||||||
</div>
|
<div class="video-tag-box">
|
||||||
|
{% for tag in channel_info.channel_tags %}
|
||||||
|
<span class="video-tag">{{ tag }}</span>
|
||||||
|
{% endfor %}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
<div id="overwrite-form" class="info-box">
|
<div id="overwrite-form" class="info-box">
|
||||||
<div class="info-box-item">
|
<div class="info-box-item">
|
||||||
|
<h2>Customize {{ channel_info.channel_name }}</h2>
|
||||||
<form class="overwrite-form" action="/channel/{{ channel_info.channel_id }}/about/" method="POST">
|
<form class="overwrite-form" action="/channel/{{ channel_info.channel_id }}/about/" method="POST">
|
||||||
{% csrf_token %}
|
{% csrf_token %}
|
||||||
<div class="overwrite-form-item">
|
<div class="overwrite-form-item">
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
beautifulsoup4==4.12.2
|
|
||||||
celery==5.2.7
|
celery==5.2.7
|
||||||
Django==4.2
|
Django==4.2
|
||||||
django-auth-ldap==4.3.0
|
django-auth-ldap==4.3.0
|
||||||
|
|
Loading…
Reference in New Issue