refactor duration class into separate helper functions

This commit is contained in:
Simon 2023-09-04 18:49:10 +07:00
parent 317942b7e1
commit dc41e5062d
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
6 changed files with 55 additions and 78 deletions

View File

@ -1,7 +1,7 @@
"""aggregations"""
from home.src.es.connect import ElasticWrap
from home.src.index.video_streams import DurationConverter
from home.src.ta.helper import get_duration_str
class AggBase:
@ -119,7 +119,7 @@ class WatchProgress(AggBase):
{
"all": {
"duration": all_duration,
"duration_str": DurationConverter().get_str(all_duration),
"duration_str": get_duration_str(all_duration),
"items": aggregations["total_vids"].get("value"),
}
}
@ -135,7 +135,7 @@ class WatchProgress(AggBase):
"""parse bucket"""
duration = int(bucket["watch_docs"]["duration"]["value"])
duration_str = DurationConverter().get_str(duration)
duration_str = get_duration_str(duration)
items = bucket["watch_docs"]["true_count"]["value"]
if bucket["key_as_string"] == "false":
key = "unwatched"
@ -234,9 +234,7 @@ class BiggestChannel(AggBase):
"name": i["key"][0].title(),
"doc_count": i["doc_count"]["value"],
"duration": i["duration"]["value"],
"duration_str": DurationConverter().get_str(
i["duration"]["value"]
),
"duration_str": get_duration_str(int(i["duration"]["value"])),
"media_size": i["media_size"]["value"],
}
for i in buckets

View File

@ -16,9 +16,8 @@ from home.src.download.yt_dlp_base import YtWrap
from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.index.playlist import YoutubePlaylist
from home.src.index.video_constants import VideoTypeEnum
from home.src.index.video_streams import DurationConverter
from home.src.ta.config import AppConfig
from home.src.ta.helper import is_shorts
from home.src.ta.helper import get_duration_str, is_shorts
class PendingIndex:
@ -335,9 +334,6 @@ class PendingList(PendingIndex):
def _parse_youtube_details(self, vid, vid_type=VideoTypeEnum.VIDEOS):
"""parse response"""
vid_id = vid.get("id")
duration_str = DurationConverter.get_str(vid["duration"])
if duration_str == "NA":
print(f"skip extracting duration for: {vid_id}")
published = datetime.strptime(vid["upload_date"], "%Y%m%d").strftime(
"%Y-%m-%d"
)
@ -349,7 +345,7 @@ class PendingList(PendingIndex):
"vid_thumb_url": vid["thumbnail"],
"title": vid["title"],
"channel_id": vid["channel_id"],
"duration": duration_str,
"duration": get_duration_str(vid["duration"]),
"published": published,
"timestamp": int(datetime.now().timestamp()),
# Pulling enum value out so it is serializable

View File

@ -11,8 +11,8 @@ from datetime import datetime
from home.src.download.thumbnails import ThumbManager
from home.src.es.connect import ElasticWrap
from home.src.index.video_streams import DurationConverter
from home.src.ta.config import AppConfig
from home.src.ta.helper import get_duration_str
class SearchHandler:
@ -45,9 +45,9 @@ class SearchHandler:
if response.get("aggregations"):
self.aggs = response["aggregations"]
if "total_duration" in self.aggs:
duration_sec = self.aggs["total_duration"]["value"]
duration_sec = int(self.aggs["total_duration"]["value"])
self.aggs["total_duration"].update(
{"value_str": DurationConverter().get_str(duration_sec)}
{"value_str": get_duration_str(duration_sec)}
)
return return_value

View File

@ -16,11 +16,8 @@ from home.src.index import playlist as ta_playlist
from home.src.index.generic import YouTubeItem
from home.src.index.subtitle import YoutubeSubtitle
from home.src.index.video_constants import VideoTypeEnum
from home.src.index.video_streams import (
DurationConverter,
MediaStreamExtractor,
)
from home.src.ta.helper import randomizor
from home.src.index.video_streams import MediaStreamExtractor
from home.src.ta.helper import get_duration_sec, get_duration_str, randomizor
from home.src.ta.ta_redis import RedisArchivist
from ryd_client import ryd_client
@ -249,16 +246,14 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
def add_player(self, media_path=False):
"""add player information for new videos"""
vid_path = media_path or self.build_dl_cache_path()
duration = get_duration_sec(vid_path)
duration_handler = DurationConverter()
duration = duration_handler.get_sec(vid_path)
duration_str = duration_handler.get_str(duration)
self.json_data.update(
{
"player": {
"watched": False,
"duration": duration,
"duration_str": duration_str,
"duration_str": get_duration_str(duration),
}
}
)

View File

@ -5,60 +5,6 @@ import subprocess
from os import stat
class DurationConverter:
"""
using ffmpeg to get and parse duration from filepath
"""
@staticmethod
def get_sec(file_path):
"""read duration from file"""
duration = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
],
capture_output=True,
check=True,
)
duration_raw = duration.stdout.decode().strip()
if duration_raw == "N/A":
return 0
duration_sec = int(float(duration_raw))
return duration_sec
@staticmethod
def get_str(seconds):
"""takes duration in sec and returns clean string"""
if not seconds:
# failed to extract
return "NA"
days = int(seconds // (24 * 3600))
hours = int((seconds % (24 * 3600)) // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
duration_str = str()
if days:
duration_str = f"{days}d "
if hours:
duration_str = duration_str + str(hours).zfill(2) + ":"
if minutes:
duration_str = duration_str + str(minutes).zfill(2) + ":"
else:
duration_str = duration_str + "00:"
duration_str = duration_str + str(seconds).zfill(2)
return duration_str
class MediaStreamExtractor:
"""extract stream metadata"""

View File

@ -7,6 +7,7 @@ import json
import os
import random
import string
import subprocess
from datetime import datetime
from urllib.parse import urlparse
@ -141,6 +142,47 @@ def is_shorts(youtube_id: str) -> bool:
return response.status_code == 200
def get_duration_sec(file_path: str) -> int:
"""get duration of media file from file path"""
duration = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
],
capture_output=True,
check=True,
)
duration_raw = duration.stdout.decode().strip()
if duration_raw == "N/A":
return 0
duration_sec = int(float(duration_raw))
return duration_sec
def get_duration_str(seconds: int) -> str:
"""Return a human-readable duration string from seconds."""
if not seconds:
return "NA"
units = [("y", 31536000), ("d", 86400), ("h", 3600), ("m", 60), ("s", 1)]
duration_parts = []
for unit_label, unit_seconds in units:
if seconds >= unit_seconds:
unit_count, seconds = divmod(seconds, unit_seconds)
duration_parts.append(f"{unit_count}{unit_label}")
return " ".join(duration_parts)
def ta_host_parser(ta_host: str) -> tuple[list[str], list[str]]:
"""parse ta_host env var for ALLOWED_HOSTS and CSRF_TRUSTED_ORIGINS"""
allowed_hosts: list[str] = [