mirror of
https://github.com/tubearchivist/tubearchivist-frontend.git
synced 2025-01-22 16:50:15 +00:00
refacor PendingList class into subclasses
This commit is contained in:
parent
1498fadf27
commit
05b8dbc02f
@ -5,36 +5,119 @@ Functionality:
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
import yt_dlp
|
||||
from home.src.download.subscriptions import ChannelSubscription
|
||||
from home.src.es.connect import IndexPaginate
|
||||
from home.src.download.subscriptions import (
|
||||
ChannelSubscription,
|
||||
PlaylistSubscription,
|
||||
)
|
||||
from home.src.download.thumbnails import ThumbManager
|
||||
from home.src.es.connect import ElasticWrap, IndexPaginate
|
||||
from home.src.index.playlist import YoutubePlaylist
|
||||
from home.src.ta.config import AppConfig
|
||||
from home.src.ta.helper import DurationConverter, ignore_filelist
|
||||
from home.src.ta.helper import DurationConverter
|
||||
from home.src.ta.ta_redis import RedisArchivist
|
||||
|
||||
|
||||
class PendingList:
|
||||
"""manage the pending videos list"""
|
||||
|
||||
CONFIG = AppConfig().config
|
||||
ES_URL = CONFIG["application"]["es_url"]
|
||||
ES_AUTH = CONFIG["application"]["es_auth"]
|
||||
VIDEOS = CONFIG["application"]["videos"]
|
||||
class PendingIndex:
|
||||
"""base class holding all export methods"""
|
||||
|
||||
def __init__(self):
|
||||
self.all_channel_ids = False
|
||||
self.all_downloaded = False
|
||||
self.missing_from_playlists = []
|
||||
self.all_pending = False
|
||||
self.all_ignored = False
|
||||
self.all_videos = False
|
||||
self.all_channels = False
|
||||
self.all_overwrites = False
|
||||
self.to_skip = False
|
||||
|
||||
def parse_url_list(self, youtube_ids):
|
||||
def get_download(self):
|
||||
"""get a list of all pending videos in ta_download"""
|
||||
data = {
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"timestamp": {"order": "asc"}}],
|
||||
}
|
||||
all_results = IndexPaginate("ta_download", data).get_results()
|
||||
|
||||
self.all_pending = []
|
||||
self.all_ignored = []
|
||||
self.to_skip = []
|
||||
|
||||
for result in all_results:
|
||||
self.to_skip.append(result["youtube_id"])
|
||||
if result["status"] == "pending":
|
||||
self.all_pending.append(result)
|
||||
elif result["status"] == "ignore":
|
||||
self.all_ignored.append(result)
|
||||
|
||||
def get_indexed(self):
|
||||
"""get a list of all videos indexed"""
|
||||
data = {
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"published": {"order": "desc"}}],
|
||||
}
|
||||
self.all_videos = IndexPaginate("ta_video", data).get_results()
|
||||
for video in self.all_videos:
|
||||
self.to_skip.append(video["youtube_id"])
|
||||
|
||||
def get_channels(self):
|
||||
"""get a list of all channels indexed"""
|
||||
self.all_channels = []
|
||||
self.all_overwrites = {}
|
||||
data = {
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"channel_id": {"order": "asc"}}],
|
||||
}
|
||||
channels = IndexPaginate("ta_channel", data).get_results()
|
||||
|
||||
for channel in channels:
|
||||
channel_id = channel["channel_id"]
|
||||
self.all_channels.append(channel_id)
|
||||
if channel.get("channel_overwrites"):
|
||||
self.all_overwrites.update(
|
||||
{channel_id: channel.get("channel_overwrites")}
|
||||
)
|
||||
|
||||
|
||||
class PendingInteract:
|
||||
"""interact with items in download queue"""
|
||||
|
||||
def __init__(self, video_id=False, status=False):
|
||||
self.video_id = video_id
|
||||
self.status = status
|
||||
|
||||
def delete_item(self):
|
||||
"""delete single item from pending"""
|
||||
path = f"ta_download/_doc/{self.video_id}"
|
||||
_, _ = ElasticWrap(path).delete()
|
||||
|
||||
def delete_by_status(self):
|
||||
"""delete all matching item by status"""
|
||||
data = {"query": {"term": {"status": {"value": self.status}}}}
|
||||
path = "ta_download/_delete_by_query"
|
||||
_, _ = ElasticWrap(path).post(data=data)
|
||||
|
||||
def update_status(self):
|
||||
"""update status field of pending item"""
|
||||
data = {"doc": {"status": self.status}}
|
||||
path = f"ta_download/_update/{self.video_id}"
|
||||
_, _ = ElasticWrap(path).post(data=data)
|
||||
|
||||
|
||||
class PendingList(PendingIndex):
|
||||
"""manage the pending videos list"""
|
||||
|
||||
def __init__(self, youtube_ids):
|
||||
super().__init__()
|
||||
self.youtube_ids = youtube_ids
|
||||
self.to_skip = False
|
||||
self.missing_videos = False
|
||||
|
||||
def parse_url_list(self):
|
||||
"""extract youtube ids from list"""
|
||||
missing_videos = []
|
||||
for entry in youtube_ids:
|
||||
self.missing_videos = []
|
||||
self.get_download()
|
||||
self.get_indexed()
|
||||
for entry in self.youtube_ids:
|
||||
# notify
|
||||
mess_dict = {
|
||||
"status": "message:add",
|
||||
@ -43,97 +126,89 @@ class PendingList:
|
||||
"message": "Extracting lists",
|
||||
}
|
||||
RedisArchivist().set_message("message:add", mess_dict)
|
||||
# extract
|
||||
url = entry["url"]
|
||||
url_type = entry["type"]
|
||||
if url_type == "video":
|
||||
missing_videos.append(url)
|
||||
elif url_type == "channel":
|
||||
video_results = ChannelSubscription().get_last_youtube_videos(
|
||||
url, limit=False
|
||||
)
|
||||
youtube_ids = [i[0] for i in video_results]
|
||||
missing_videos = missing_videos + youtube_ids
|
||||
elif url_type == "playlist":
|
||||
self.missing_from_playlists.append(entry)
|
||||
playlist = YoutubePlaylist(url)
|
||||
playlist.build_json()
|
||||
video_results = playlist.json_data.get("playlist_entries")
|
||||
youtube_ids = [i["youtube_id"] for i in video_results]
|
||||
missing_videos = missing_videos + youtube_ids
|
||||
self._process_entry(entry)
|
||||
|
||||
return missing_videos
|
||||
def _process_entry(self, entry):
|
||||
"""process single entry from url list"""
|
||||
if entry["type"] == "video":
|
||||
self._add_video(entry["url"])
|
||||
elif entry["type"] == "channel":
|
||||
self._parse_channel(entry["url"])
|
||||
elif entry["type"] == "playlist":
|
||||
self._parse_playlist(entry["url"])
|
||||
new_thumbs = PlaylistSubscription().process_url_str(
|
||||
[entry], subscribed=False
|
||||
)
|
||||
ThumbManager().download_playlist(new_thumbs)
|
||||
else:
|
||||
raise ValueError(f"invalid url_type: {entry}")
|
||||
|
||||
def add_to_pending(self, missing_videos, ignore=False):
|
||||
"""build the bulk json data from pending"""
|
||||
# check if channel is indexed
|
||||
channel_handler = ChannelSubscription()
|
||||
all_indexed = channel_handler.get_channels(subscribed_only=False)
|
||||
self.all_channel_ids = [i["channel_id"] for i in all_indexed]
|
||||
# check if already there
|
||||
self.all_downloaded = self.get_all_downloaded()
|
||||
def _add_video(self, url):
|
||||
"""add video to list"""
|
||||
if url not in self.missing_videos and url not in self.to_skip:
|
||||
self.missing_videos.append(url)
|
||||
|
||||
def _parse_channel(self, url):
|
||||
"""add all videos of channel to list"""
|
||||
video_results = ChannelSubscription().get_last_youtube_videos(
|
||||
url, limit=False
|
||||
)
|
||||
youtube_ids = [i[0] for i in video_results]
|
||||
for video_id in youtube_ids:
|
||||
self._add_video(video_id)
|
||||
|
||||
def _parse_playlist(self, url):
|
||||
"""add all videos of playlist to list"""
|
||||
playlist = YoutubePlaylist(url)
|
||||
playlist.build_json()
|
||||
video_results = playlist.json_data.get("playlist_entries")
|
||||
youtube_ids = [i["youtube_id"] for i in video_results]
|
||||
for video_id in youtube_ids:
|
||||
self._add_video(video_id)
|
||||
|
||||
def add_to_pending(self, status="pending"):
|
||||
"""add missing videos to pending list"""
|
||||
self.get_channels()
|
||||
bulk_list = []
|
||||
|
||||
thumb_handler = ThumbManager()
|
||||
for idx, youtube_id in enumerate(self.missing_videos):
|
||||
video_details = self._get_youtube_details(youtube_id)
|
||||
if not video_details:
|
||||
continue
|
||||
|
||||
video_details["status"] = status
|
||||
action = {"create": {"_id": youtube_id, "_index": "ta_download"}}
|
||||
bulk_list.append(json.dumps(action))
|
||||
bulk_list.append(json.dumps(video_details))
|
||||
|
||||
thumb_needed = [(youtube_id, video_details["vid_thumb_url"])]
|
||||
thumb_handler.download_vid(thumb_needed)
|
||||
self._notify_add(idx)
|
||||
|
||||
bulk_list, all_videos_added = self.build_bulk(missing_videos, ignore)
|
||||
# add last newline
|
||||
bulk_list.append("\n")
|
||||
query_str = "\n".join(bulk_list)
|
||||
headers = {"Content-type": "application/x-ndjson"}
|
||||
url = self.ES_URL + "/_bulk"
|
||||
request = requests.post(
|
||||
url, data=query_str, headers=headers, auth=self.ES_AUTH
|
||||
)
|
||||
if not request.ok:
|
||||
print(request)
|
||||
raise ValueError("failed to add video to download queue")
|
||||
_, _ = ElasticWrap("_bulk").post(query_str, ndjson=True)
|
||||
|
||||
return all_videos_added
|
||||
def _notify_add(self, idx):
|
||||
"""send notification for adding videos to download queue"""
|
||||
progress = f"{idx + 1}/{len(self.missing_videos)}"
|
||||
mess_dict = {
|
||||
"status": "message:add",
|
||||
"level": "info",
|
||||
"title": "Adding new videos to download queue.",
|
||||
"message": "Progress: " + progress,
|
||||
}
|
||||
if idx + 1 == len(self.missing_videos):
|
||||
RedisArchivist().set_message("message:add", mess_dict, expire=4)
|
||||
else:
|
||||
RedisArchivist().set_message("message:add", mess_dict)
|
||||
|
||||
def build_bulk(self, missing_videos, ignore=False):
|
||||
"""build the bulk lists"""
|
||||
bulk_list = []
|
||||
all_videos_added = []
|
||||
if idx + 1 % 25 == 0:
|
||||
print("adding to queue progress: " + progress)
|
||||
|
||||
for idx, youtube_id in enumerate(missing_videos):
|
||||
# check if already downloaded
|
||||
if youtube_id in self.all_downloaded:
|
||||
continue
|
||||
|
||||
video = self.get_youtube_details(youtube_id)
|
||||
# skip on download error
|
||||
if not video:
|
||||
continue
|
||||
|
||||
channel_indexed = video["channel_id"] in self.all_channel_ids
|
||||
video["channel_indexed"] = channel_indexed
|
||||
if ignore:
|
||||
video["status"] = "ignore"
|
||||
else:
|
||||
video["status"] = "pending"
|
||||
action = {"create": {"_id": youtube_id, "_index": "ta_download"}}
|
||||
bulk_list.append(json.dumps(action))
|
||||
bulk_list.append(json.dumps(video))
|
||||
all_videos_added.append((youtube_id, video["vid_thumb_url"]))
|
||||
# notify
|
||||
progress = f"{idx + 1}/{len(missing_videos)}"
|
||||
mess_dict = {
|
||||
"status": "message:add",
|
||||
"level": "info",
|
||||
"title": "Adding new videos to download queue.",
|
||||
"message": "Progress: " + progress,
|
||||
}
|
||||
if idx + 1 == len(missing_videos):
|
||||
RedisArchivist().set_message(
|
||||
"message:add", mess_dict, expire=4
|
||||
)
|
||||
else:
|
||||
RedisArchivist().set_message("message:add", mess_dict)
|
||||
if idx + 1 % 25 == 0:
|
||||
print("adding to queue progress: " + progress)
|
||||
|
||||
return bulk_list, all_videos_added
|
||||
|
||||
@staticmethod
|
||||
def get_youtube_details(youtube_id):
|
||||
def _get_youtube_details(self, youtube_id):
|
||||
"""get details from youtubedl for single pending video"""
|
||||
obs = {
|
||||
"default_search": "ytsearch",
|
||||
@ -151,113 +226,29 @@ class PendingList:
|
||||
# stop if video is streaming live now
|
||||
if vid["is_live"]:
|
||||
return False
|
||||
# parse response
|
||||
seconds = vid["duration"]
|
||||
duration_str = DurationConverter.get_str(seconds)
|
||||
|
||||
return self._parse_youtube_details(vid)
|
||||
|
||||
def _parse_youtube_details(self, vid):
|
||||
"""parse response"""
|
||||
vid_id = vid.get("id")
|
||||
duration_str = DurationConverter.get_str(vid["duration"])
|
||||
if duration_str == "NA":
|
||||
print(f"skip extracting duration for: {youtube_id}")
|
||||
upload_date = vid["upload_date"]
|
||||
upload_dt = datetime.strptime(upload_date, "%Y%m%d")
|
||||
published = upload_dt.strftime("%Y-%m-%d")
|
||||
print(f"skip extracting duration for: {vid_id}")
|
||||
published = datetime.strptime(vid["upload_date"], "%Y%m%d").strftime(
|
||||
"%Y-%m-%d"
|
||||
)
|
||||
|
||||
# build dict
|
||||
youtube_details = {
|
||||
"youtube_id": youtube_id,
|
||||
"youtube_id": vid_id,
|
||||
"channel_name": vid["channel"],
|
||||
"vid_thumb_url": vid["thumbnail"],
|
||||
"title": vid["title"],
|
||||
"channel_id": vid["channel_id"],
|
||||
"channel_indexed": vid["channel_id"] in self.all_channels,
|
||||
"duration": duration_str,
|
||||
"published": published,
|
||||
"timestamp": int(datetime.now().strftime("%s")),
|
||||
}
|
||||
return youtube_details
|
||||
|
||||
@staticmethod
|
||||
def get_all_pending():
|
||||
"""get a list of all pending videos in ta_download"""
|
||||
data = {
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"timestamp": {"order": "asc"}}],
|
||||
}
|
||||
all_results = IndexPaginate("ta_download", data).get_results()
|
||||
|
||||
all_pending = []
|
||||
all_ignore = []
|
||||
|
||||
for result in all_results:
|
||||
if result["status"] == "pending":
|
||||
all_pending.append(result)
|
||||
elif result["status"] == "ignore":
|
||||
all_ignore.append(result)
|
||||
|
||||
return all_pending, all_ignore
|
||||
|
||||
@staticmethod
|
||||
def get_all_indexed():
|
||||
"""get a list of all videos indexed"""
|
||||
|
||||
data = {
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"published": {"order": "desc"}}],
|
||||
}
|
||||
all_indexed = IndexPaginate("ta_video", data).get_results()
|
||||
|
||||
return all_indexed
|
||||
|
||||
def get_all_downloaded(self):
|
||||
"""get a list of all videos in archive"""
|
||||
channel_folders = os.listdir(self.VIDEOS)
|
||||
all_channel_folders = ignore_filelist(channel_folders)
|
||||
all_downloaded = []
|
||||
for channel_folder in all_channel_folders:
|
||||
channel_path = os.path.join(self.VIDEOS, channel_folder)
|
||||
videos = os.listdir(channel_path)
|
||||
all_videos = ignore_filelist(videos)
|
||||
youtube_vids = [i[9:20] for i in all_videos]
|
||||
for youtube_id in youtube_vids:
|
||||
all_downloaded.append(youtube_id)
|
||||
return all_downloaded
|
||||
|
||||
def delete_from_pending(self, youtube_id):
|
||||
"""delete the youtube_id from ta_download"""
|
||||
url = f"{self.ES_URL}/ta_download/_doc/{youtube_id}"
|
||||
response = requests.delete(url, auth=self.ES_AUTH)
|
||||
if not response.ok:
|
||||
print(response.text)
|
||||
|
||||
def delete_pending(self, status):
|
||||
"""delete download queue based on status value"""
|
||||
data = {"query": {"term": {"status": {"value": status}}}}
|
||||
payload = json.dumps(data)
|
||||
url = self.ES_URL + "/ta_download/_delete_by_query"
|
||||
headers = {"Content-type": "application/json"}
|
||||
response = requests.post(
|
||||
url, data=payload, headers=headers, auth=self.ES_AUTH
|
||||
)
|
||||
if not response.ok:
|
||||
print(response.text)
|
||||
|
||||
def ignore_from_pending(self, ignore_list):
|
||||
"""build the bulk query string"""
|
||||
|
||||
stamp = int(datetime.now().strftime("%s"))
|
||||
bulk_list = []
|
||||
|
||||
for youtube_id in ignore_list:
|
||||
action = {"update": {"_id": youtube_id, "_index": "ta_download"}}
|
||||
source = {"doc": {"status": "ignore", "timestamp": stamp}}
|
||||
bulk_list.append(json.dumps(action))
|
||||
bulk_list.append(json.dumps(source))
|
||||
|
||||
# add last newline
|
||||
bulk_list.append("\n")
|
||||
query_str = "\n".join(bulk_list)
|
||||
|
||||
headers = {"Content-type": "application/x-ndjson"}
|
||||
url = self.ES_URL + "/_bulk"
|
||||
request = requests.post(
|
||||
url, data=query_str, headers=headers, auth=self.ES_AUTH
|
||||
)
|
||||
if not request.ok:
|
||||
print(request)
|
||||
raise ValueError("failed to set video to ignore")
|
||||
|
Loading…
Reference in New Issue
Block a user