refac pending list, implement flat add

This commit is contained in:
Simon 2025-07-09 16:16:01 +07:00
parent b6d38e9319
commit 15ec8b5ab6
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
6 changed files with 234 additions and 159 deletions

View File

@ -56,7 +56,7 @@ class ElasticWrap:
return response.json(), response.status_code return response.json(), response.status_code
def post( def post(
self, data: bool | dict = False, ndjson: bool = False self, data: bool | dict | str = False, ndjson: bool = False
) -> tuple[dict, int]: ) -> tuple[dict, int]:
"""post data to es""" """post data to es"""

View File

@ -103,8 +103,11 @@ def requests_headers() -> dict[str, str]:
return {"User-Agent": template} return {"User-Agent": template}
def date_parser(timestamp: int | str) -> str: def date_parser(timestamp: int | str | None) -> str | None:
"""return formatted date string""" """return formatted date string"""
if timestamp is None:
return None
if isinstance(timestamp, int): if isinstance(timestamp, int):
date_obj = datetime.fromtimestamp(timestamp, tz=timezone.utc) date_obj = datetime.fromtimestamp(timestamp, tz=timezone.utc)
elif isinstance(timestamp, str): elif isinstance(timestamp, str):
@ -184,11 +187,12 @@ def get_duration_sec(file_path: str) -> int:
return duration_sec return duration_sec
def get_duration_str(seconds: int) -> str: def get_duration_str(seconds: int | float) -> str:
"""Return a human-readable duration string from seconds.""" """Return a human-readable duration string from seconds."""
if not seconds: if not seconds:
return "NA" return "NA"
seconds = int(seconds)
units = [("y", 31536000), ("d", 86400), ("h", 3600), ("m", 60), ("s", 1)] units = [("y", 31536000), ("d", 86400), ("h", 3600), ("m", 60), ("s", 1)]
duration_parts = [] duration_parts = []

View File

@ -15,9 +15,9 @@ class DownloadItemSerializer(serializers.Serializer):
channel_indexed = serializers.BooleanField() channel_indexed = serializers.BooleanField()
channel_name = serializers.CharField() channel_name = serializers.CharField()
duration = serializers.CharField() duration = serializers.CharField()
published = serializers.CharField() published = serializers.CharField(allow_null=True)
status = serializers.ChoiceField(choices=["pending", "ignore"]) status = serializers.ChoiceField(choices=["pending", "ignore"])
timestamp = serializers.IntegerField() timestamp = serializers.IntegerField(allow_null=True)
title = serializers.CharField() title = serializers.CharField()
vid_thumb_url = serializers.CharField() vid_thumb_url = serializers.CharField()
vid_type = serializers.ChoiceField(choices=VideoTypeEnum.values()) vid_type = serializers.ChoiceField(choices=VideoTypeEnum.values())
@ -76,6 +76,7 @@ class AddToDownloadQuerySerializer(serializers.Serializer):
"""add to queue query serializer""" """add to queue query serializer"""
autostart = serializers.BooleanField(required=False) autostart = serializers.BooleanField(required=False)
flat = serializers.BooleanField(required=False)
class BulkUpdateDowloadQuerySerializer(serializers.Serializer): class BulkUpdateDowloadQuerySerializer(serializers.Serializer):

View File

@ -4,16 +4,18 @@ Functionality:
- linked with ta_dowload index - linked with ta_dowload index
""" """
import json
from datetime import datetime from datetime import datetime
from appsettings.src.config import AppConfig from appsettings.src.config import AppConfig
from channel.src.index import YoutubeChannel
from common.src.es_connect import ElasticWrap, IndexPaginate from common.src.es_connect import ElasticWrap, IndexPaginate
from common.src.helper import get_duration_str, is_shorts, rand_sleep from common.src.helper import get_duration_str, is_shorts, rand_sleep
from download.src.subscriptions import ChannelSubscription from download.src.subscriptions import ChannelSubscription
from download.src.thumbnails import ThumbManager from download.src.thumbnails import ThumbManager
from download.src.yt_dlp_base import YtWrap
from playlist.src.index import YoutubePlaylist from playlist.src.index import YoutubePlaylist
from video.src.constants import VideoTypeEnum from video.src.constants import VideoTypeEnum
from video.src.index import YoutubeVideo
class PendingIndex: class PendingIndex:
@ -195,22 +197,27 @@ class PendingList(PendingIndex):
"check_formats": None, "check_formats": None,
} }
def __init__(self, youtube_ids=False, task=False): def __init__(
self, youtube_ids=False, task=False, auto_start=False, flat=False
):
super().__init__() super().__init__()
self.config = AppConfig().config self.config = AppConfig().config
self.youtube_ids = youtube_ids self.youtube_ids = youtube_ids
self.task = task self.task = task
self.auto_start = auto_start
self.flat = flat
self.to_skip = False self.to_skip = False
self.missing_videos = False
def parse_url_list(self, auto_start=False):
"""extract youtube ids from list"""
self.missing_videos = [] self.missing_videos = []
def parse_url_list(self):
"""extract youtube ids from list"""
self.get_download() self.get_download()
self.get_indexed() self.get_indexed()
self.get_channels()
total = len(self.youtube_ids) total = len(self.youtube_ids)
for idx, entry in enumerate(self.youtube_ids): for idx, entry in enumerate(self.youtube_ids):
self._process_entry(entry, auto_start=auto_start) self._process_entry(entry)
rand_sleep(self.config)
if not self.task: if not self.task:
continue continue
@ -219,102 +226,214 @@ class PendingList(PendingIndex):
progress=(idx + 1) / total, progress=(idx + 1) / total,
) )
def _process_entry(self, entry, auto_start=False): def _process_entry(self, entry: dict):
"""process single entry from url list""" """process single entry from url list"""
vid_type = self._get_vid_type(entry)
if entry["type"] == "video": if entry["type"] == "video":
self._add_video(entry["url"], vid_type, auto_start=auto_start) self._add_video(entry["url"], entry["vid_type"])
elif entry["type"] == "channel": elif entry["type"] == "channel":
self._parse_channel(entry["url"], vid_type) self._parse_channel(entry["url"], entry["vid_type"])
elif entry["type"] == "playlist": elif entry["type"] == "playlist":
self._parse_playlist(entry["url"]) self._parse_playlist(entry["url"])
else: else:
raise ValueError(f"invalid url_type: {entry}") raise ValueError(f"invalid url_type: {entry}")
@staticmethod def _add_video(self, url, vid_type):
def _get_vid_type(entry):
"""add vid type enum if available"""
vid_type_str = entry.get("vid_type")
if not vid_type_str:
return VideoTypeEnum.UNKNOWN
return VideoTypeEnum(vid_type_str)
def _add_video(self, url, vid_type, auto_start=False):
"""add video to list""" """add video to list"""
if auto_start and url in set( if self.auto_start and url in set(
i["youtube_id"] for i in self.all_pending i["youtube_id"] for i in self.all_pending
): ):
PendingInteract(youtube_id=url, status="priority").update_status() PendingInteract(youtube_id=url, status="priority").update_status()
return return
if url not in self.missing_videos and url not in self.to_skip: if url in self.missing_videos or url in self.to_skip:
self.missing_videos.append((url, vid_type))
else:
print(f"{url}: skipped adding already indexed video to download.") print(f"{url}: skipped adding already indexed video to download.")
else:
to_add = self._parse_video(url, vid_type)
if to_add:
self.missing_videos.append(to_add)
def _parse_channel(self, url, vid_type): def _parse_channel(self, url, vid_type):
"""add all videos of channel to list""" """parse channel"""
video_results = ChannelSubscription().get_last_youtube_videos( video_results = ChannelSubscription().get_last_youtube_videos(
url, limit=False, query_filter=vid_type url, limit=False, query_filter=vid_type
) )
for video_entry in video_results: channel_handler = YoutubeChannel(url)
self._add_video(video_entry["id"], video_entry["vid_type"]) channel_handler.build_json(upload=False)
def _parse_playlist(self, url): for video_data in video_results:
"""add all videos of playlist to list""" video_id = video_data["id"]
playlist = YoutubePlaylist(url) if video_id in self.to_skip:
is_active = playlist.update_playlist()
if not is_active:
message = f"{playlist.youtube_id}: failed to extract metadata"
print(message)
raise ValueError(message)
entries = playlist.json_data["playlist_entries"]
to_add = [i["youtube_id"] for i in entries if not i["downloaded"]]
if not to_add:
return
for video_id in to_add:
# match vid_type later
self._add_video(video_id, VideoTypeEnum.UNKNOWN)
def add_to_pending(self, status="pending", auto_start=False):
"""add missing videos to pending list"""
self.get_channels()
total = len(self.missing_videos)
videos_added = []
for idx, (youtube_id, vid_type) in enumerate(self.missing_videos):
if self.task and self.task.is_stopped():
break
print(f"{youtube_id}: [{idx + 1}/{total}]: add to queue")
self._notify_add(idx, total)
video_details = self.get_youtube_details(youtube_id, vid_type)
if not video_details:
rand_sleep(self.config)
continue continue
video_details.update( if self.flat:
if not video_data.get("channel"):
channel_name = channel_handler.json_data["channel_name"]
video_data["channel"] = channel_name
if not video_data.get("channel_id"):
channel_id = channel_handler.json_data["channel_id"]
video_data["channel_id"] = channel_id
to_add = self._parse_entry(
youtube_id=video_id, video_data=video_data
)
else:
to_add = self._parse_video(video_id, vid_type)
if to_add:
self.missing_videos.append(to_add)
def _parse_playlist(self, url):
"""fast parse playlist"""
playlist = YoutubePlaylist(url)
playlist.get_from_youtube()
video_results = playlist.youtube_meta["entries"]
for video_data in video_results:
video_id = video_data["id"]
if video_id in self.to_skip:
continue
if self.flat:
to_add = self._parse_entry(video_id, video_data)
else:
to_add = self._parse_video(video_id, vid_type=None)
if to_add:
self.missing_videos.append(to_add)
def _parse_video(self, url, vid_type):
"""parse video"""
video = YoutubeVideo(youtube_id=url)
video.get_from_youtube()
video_data = video.youtube_meta
video_data["vid_type"] = vid_type
to_add = self._parse_entry(youtube_id=url, video_data=video_data)
return to_add
def _parse_entry(self, youtube_id: str, video_data: dict) -> dict | None:
"""parse entry"""
if video_data.get("id") != youtube_id:
# skip premium videos with different id or redirects
print(f"{youtube_id}: skipping redirect, id not matching")
return None
if video_data.get("live_status") in ["is_upcoming", "is_live"]:
print(f"{youtube_id}: skip is_upcoming or is_live")
return None
to_add = {
"youtube_id": video_data["id"],
"title": video_data["title"],
"vid_thumb_url": self.__extract_thumb(video_data),
"duration": get_duration_str(video_data.get("duration", 0)),
"published": self.__extract_published(video_data),
"timestamp": int(datetime.now().timestamp()),
"vid_type": self.__extract_vid_type(video_data),
"channel_name": video_data["channel"],
"channel_id": video_data["channel_id"],
"channel_indexed": video_data["channel_id"] in self.all_channels,
}
thumb_url = to_add["vid_thumb_url"]
ThumbManager(to_add["youtube_id"]).download_video_thumb(thumb_url)
return to_add
def __extract_thumb(self, video_data) -> str | None:
"""extract thumb"""
if "thumbnail" in video_data:
return video_data["thumbnail"]
if "thumbnails" in video_data:
return video_data["thumbnails"][-1]["url"]
return None
def __extract_published(self, video_data) -> str | int | None:
"""build published date or timestamp"""
timestamp = video_data.get("timestamp")
if timestamp:
return timestamp
upload_date = video_data.get("upload_date")
if not upload_date:
return None
upload_date_time = datetime.strptime(upload_date, "%Y%m%d")
published = upload_date_time.strftime("%Y-%m-%d")
return published
def __extract_vid_type(self, video_data) -> str:
"""build vid type"""
if "vid_type" in video_data:
return video_data["vid_type"]
if video_data.get("live_status") == "was_live":
return VideoTypeEnum.STREAMS.value
if video_data.get("width", 0) > video_data.get("height", 0):
return VideoTypeEnum.VIDEOS.value
duration = video_data.get("duration")
if duration and isinstance(duration, int):
if duration > 3 * 60:
return VideoTypeEnum.VIDEOS.value
if is_shorts(video_data["id"]):
return VideoTypeEnum.SHORTS.value
return VideoTypeEnum.VIDEOS.value
def add_to_pending(self, status="pending") -> int:
"""add missing videos to pending list"""
total = len(self.missing_videos)
if not self.missing_videos:
self._notify_empty()
return 0
self._notify_start(total)
bulk_list = []
for video_entry in self.missing_videos:
video_entry.update(
{ {
"status": status, "status": status,
"auto_start": auto_start, "auto_start": self.auto_start,
} }
) )
video_id = video_entry["youtube_id"]
action = {"index": {"_index": "ta_download", "_id": video_id}}
bulk_list.append(json.dumps(action))
bulk_list.append(json.dumps(video_entry))
url = video_details["vid_thumb_url"] # add last newline
ThumbManager(youtube_id).download_video_thumb(url) bulk_list.append("\n")
es_url = f"ta_download/_doc/{youtube_id}" query_str = "\n".join(bulk_list)
_, _ = ElasticWrap(es_url).put(video_details) _, status_code = ElasticWrap("_bulk").post(query_str, ndjson=True)
videos_added.append(youtube_id) if status_code != 200:
self._notify_fail(status_code)
else:
self._notify_done(total)
if idx != total: return len(self.missing_videos)
rand_sleep(self.config)
return videos_added def _notify_empty(self):
"""notify nothing to add"""
if not self.task:
return
def _notify_add(self, idx, total): self.task.send_progress(
message_lines=[
"Extractinc videos completed.",
"No new videos found to add.",
]
)
def _notify_start(self, total):
"""send notification for adding videos to download queue""" """send notification for adding videos to download queue"""
if not self.task: if not self.task:
return return
@ -322,82 +441,30 @@ class PendingList(PendingIndex):
self.task.send_progress( self.task.send_progress(
message_lines=[ message_lines=[
"Adding new videos to download queue.", "Adding new videos to download queue.",
f"Extracting items {idx + 1}/{total}", f"Bulk adding {total} videos",
], ]
progress=(idx + 1) / total,
) )
def get_youtube_details(self, youtube_id, vid_type=VideoTypeEnum.VIDEOS): def _notify_done(self, total):
"""get details from youtubedl for single pending video""" """send done notification"""
vid = YtWrap(self.yt_obs, self.config).extract(youtube_id) if not self.task:
if not vid: return
return False
if vid.get("id") != youtube_id: self.task.send_progress(
# skip premium videos with different id message_lines=[
print(f"{youtube_id}: skipping premium video, id not matching") "Adding new videos to the queue completed.",
return False f"Added {total} videos.",
# stop if video is streaming live now ]
if vid["live_status"] in ["is_upcoming", "is_live"]: )
print(f"{youtube_id}: skip is_upcoming or is_live")
return False
if vid["live_status"] == "was_live": def _notify_fail(self, status_code):
vid_type = VideoTypeEnum.STREAMS """failed to add"""
else: if not self.task:
if self._check_shorts(vid): return
vid_type = VideoTypeEnum.SHORTS
else:
vid_type = VideoTypeEnum.VIDEOS
if not vid.get("channel"): self.task.send_progress(
print(f"{youtube_id}: skip video not part of channel") message_lines=[
return False "Adding extracted videos failed.",
f"Status code: {status_code}",
return self._parse_youtube_details(vid, vid_type) ]
)
@staticmethod
def _check_shorts(vid):
"""check if vid is shorts video"""
if vid["width"] > vid["height"]:
return False
duration = vid.get("duration")
if duration and isinstance(duration, int):
if duration > 3 * 60:
return False
return is_shorts(vid["id"])
def _parse_youtube_details(self, vid, vid_type=VideoTypeEnum.VIDEOS):
"""parse response"""
vid_id = vid.get("id")
# build dict
youtube_details = {
"youtube_id": vid_id,
"channel_name": vid["channel"],
"vid_thumb_url": vid["thumbnail"],
"title": vid["title"],
"channel_id": vid["channel_id"],
"duration": get_duration_str(vid["duration"]),
"published": self._build_published(vid),
"timestamp": int(datetime.now().timestamp()),
"vid_type": vid_type.value,
"channel_indexed": vid["channel_id"] in self.all_channels,
}
return youtube_details
@staticmethod
def _build_published(vid):
"""build published date or timestamp"""
timestamp = vid["timestamp"]
if timestamp:
return timestamp
upload_date = vid["upload_date"]
upload_date_time = datetime.strptime(upload_date, "%Y%m%d")
published = upload_date_time.strftime("%Y-%m-%d")
return published

View File

@ -107,12 +107,13 @@ class DownloadApiListView(ApiBaseView):
validated_query = query_serializer.validated_data validated_query = query_serializer.validated_data
auto_start = validated_query.get("autostart") auto_start = validated_query.get("autostart")
print(f"auto_start: {auto_start}") flat = validated_query.get("flat", False)
print(f"auto_start: {auto_start}, flat: {flat}")
to_add = validated_data["data"] to_add = validated_data["data"]
pending = [i["youtube_id"] for i in to_add if i["status"] == "pending"] pending = [i["youtube_id"] for i in to_add if i["status"] == "pending"]
url_str = " ".join(pending) url_str = " ".join(pending)
task = extrac_dl.delay(url_str, auto_start=auto_start) task = extrac_dl.delay(url_str, auto_start=auto_start, flat=flat)
message = { message = {
"message": "add to queue task started", "message": "add to queue task started",

View File

@ -147,7 +147,9 @@ def download_pending(self, auto_only=False):
@shared_task(name="extract_download", bind=True, base=BaseTask) @shared_task(name="extract_download", bind=True, base=BaseTask)
def extrac_dl(self, youtube_ids, auto_start=False, status="pending"): def extrac_dl(
self, youtube_ids, auto_start=False, flat=False, status="pending"
):
"""parse list passed and add to pending""" """parse list passed and add to pending"""
TaskManager().init(self) TaskManager().init(self)
if isinstance(youtube_ids, str): if isinstance(youtube_ids, str):
@ -155,17 +157,17 @@ def extrac_dl(self, youtube_ids, auto_start=False, status="pending"):
else: else:
to_add = youtube_ids to_add = youtube_ids
pending_handler = PendingList(youtube_ids=to_add, task=self) pending_handler = PendingList(
pending_handler.parse_url_list(auto_start=auto_start) youtube_ids=to_add, task=self, auto_start=auto_start, flat=flat
videos_added = pending_handler.add_to_pending(
status=status, auto_start=auto_start
) )
pending_handler.parse_url_list()
videos_added = pending_handler.add_to_pending(status=status)
if auto_start: if auto_start:
download_pending.delay(auto_only=True) download_pending.delay(auto_only=True)
if videos_added: if videos_added:
return f"added {len(videos_added)} Videos to Queue" return f"added {videos_added} Videos to Queue"
return None return None