tubearchivist/tubearchivist/api/src/aggs.py

255 lines
7.5 KiB
Python
Raw Normal View History

"""aggregations"""
from home.src.es.connect import ElasticWrap
from home.src.ta.helper import get_duration_str
2023-11-09 03:34:08 +00:00
from home.src.ta.settings import EnvironmentSettings
class AggBase:
"""base class for aggregation calls"""
path: str = ""
data: dict = {}
name: str = ""
def get(self):
"""make get call"""
2023-08-30 10:42:10 +00:00
response, _ = ElasticWrap(self.path).get(self.data)
print(f"[agg][{self.name}] took {response.get('took')} ms to process")
return response.get("aggregations")
def process(self):
"""implement in subclassess"""
raise NotImplementedError
class Primary(AggBase):
"""primary aggregation for total documents indexed"""
name = "primary"
path = "ta_video,ta_channel,ta_playlist,ta_subtitle,ta_download/_search"
2023-08-31 05:51:00 +00:00
data = {
"size": 0,
"aggs": {
2023-08-31 07:34:08 +00:00
"video_type": {
"filter": {"exists": {"field": "active"}},
"aggs": {"filtered": {"terms": {"field": "vid_type"}}},
},
"channel_total": {"value_count": {"field": "channel_active"}},
2023-08-31 05:51:00 +00:00
"channel_sub": {"terms": {"field": "channel_subscribed"}},
2023-08-31 07:34:08 +00:00
"playlist_total": {"value_count": {"field": "playlist_active"}},
2023-08-31 05:51:00 +00:00
"playlist_sub": {"terms": {"field": "playlist_subscribed"}},
"download": {"terms": {"field": "status"}},
},
}
def process(self):
"""make the call"""
aggregations = self.get()
2023-08-31 07:34:08 +00:00
videos = {"total": aggregations["video_type"].get("doc_count")}
2023-08-31 05:51:00 +00:00
videos.update(
{
i.get("key"): i.get("doc_count")
2023-08-31 07:34:08 +00:00
for i in aggregations["video_type"]["filtered"]["buckets"]
2023-08-31 05:51:00 +00:00
}
)
channels = {"total": aggregations["channel_total"].get("value")}
channels.update(
{
"sub_" + i.get("key_as_string"): i.get("doc_count")
for i in aggregations["channel_sub"]["buckets"]
}
)
playlists = {"total": aggregations["playlist_total"].get("value")}
playlists.update(
{
"sub_" + i.get("key_as_string"): i.get("doc_count")
for i in aggregations["playlist_sub"]["buckets"]
}
)
downloads = {
i.get("key"): i.get("doc_count")
for i in aggregations["download"]["buckets"]
}
response = {
"videos": videos,
"channels": channels,
"playlists": playlists,
"downloads": downloads,
}
return response
class WatchProgress(AggBase):
"""get watch progress"""
name = "watch_progress"
path = "ta_video/_search"
data = {
2023-08-30 10:42:10 +00:00
"size": 0,
"aggs": {
name: {
"terms": {"field": "player.watched"},
"aggs": {
"watch_docs": {
"filter": {"terms": {"player.watched": [True, False]}},
"aggs": {
"true_count": {"value_count": {"field": "_index"}},
"duration": {"sum": {"field": "player.duration"}},
},
},
},
},
2023-08-31 14:31:47 +00:00
"total_duration": {"sum": {"field": "player.duration"}},
"total_vids": {"value_count": {"field": "_index"}},
2023-08-30 10:42:10 +00:00
},
}
def process(self):
"""make the call"""
aggregations = self.get()
buckets = aggregations[self.name]["buckets"]
response = {}
2023-08-31 14:31:47 +00:00
all_duration = int(aggregations["total_duration"].get("value"))
response.update(
{
"total": {
2023-08-31 14:31:47 +00:00
"duration": all_duration,
"duration_str": get_duration_str(all_duration),
2023-08-31 14:31:47 +00:00
"items": aggregations["total_vids"].get("value"),
}
}
)
for bucket in buckets:
2023-08-31 14:31:47 +00:00
response.update(self._build_bucket(bucket, all_duration))
return response
@staticmethod
2023-08-31 14:31:47 +00:00
def _build_bucket(bucket, all_duration):
"""parse bucket"""
duration = int(bucket["watch_docs"]["duration"]["value"])
duration_str = get_duration_str(duration)
items = bucket["watch_docs"]["true_count"]["value"]
if bucket["key_as_string"] == "false":
key = "unwatched"
else:
key = "watched"
bucket_parsed = {
key: {
"duration": duration,
"duration_str": duration_str,
2023-08-31 14:31:47 +00:00
"progress": duration / all_duration if all_duration else 0,
"items": items,
}
}
return bucket_parsed
2023-08-30 10:42:10 +00:00
class DownloadHist(AggBase):
"""get downloads histogram last week"""
name = "videos_last_week"
path = "ta_video/_search"
data = {
"size": 0,
"aggs": {
name: {
"date_histogram": {
"field": "date_downloaded",
"calendar_interval": "day",
"format": "yyyy-MM-dd",
"order": {"_key": "desc"},
2023-11-09 03:34:08 +00:00
"time_zone": EnvironmentSettings.TZ,
2023-08-30 10:42:10 +00:00
},
"aggs": {
2023-11-09 03:22:43 +00:00
"total_videos": {"value_count": {"field": "youtube_id"}},
"media_size": {"sum": {"field": "media_size"}},
2023-08-30 10:42:10 +00:00
},
}
},
2023-11-09 03:34:08 +00:00
"query": {
"range": {
"date_downloaded": {
"gte": "now-7d/d",
"time_zone": EnvironmentSettings.TZ,
}
}
},
2023-08-30 10:42:10 +00:00
}
def process(self):
"""process query"""
aggregations = self.get()
buckets = aggregations[self.name]["buckets"]
2023-09-02 04:22:03 +00:00
response = [
{
"date": i.get("key_as_string"),
"count": i.get("doc_count"),
2023-11-09 03:22:43 +00:00
"media_size": i["media_size"].get("value"),
2023-09-02 04:22:03 +00:00
}
for i in buckets
]
return response
2023-08-30 11:42:03 +00:00
class BiggestChannel(AggBase):
"""get channel aggregations"""
def __init__(self, order):
self.data["aggs"][self.name]["multi_terms"]["order"] = {order: "desc"}
2023-08-30 11:42:03 +00:00
name = "channel_stats"
path = "ta_video/_search"
data = {
"size": 0,
"aggs": {
name: {
"multi_terms": {
"terms": [
{"field": "channel.channel_name.keyword"},
{"field": "channel.channel_id"},
],
"order": {"doc_count": "desc"},
},
"aggs": {
"doc_count": {"value_count": {"field": "_index"}},
"duration": {"sum": {"field": "player.duration"}},
"media_size": {"sum": {"field": "media_size"}},
},
},
},
}
order_choices = ["doc_count", "duration", "media_size"]
2023-09-01 02:28:56 +00:00
def process(self):
"""process aggregation, order_by validated in the view"""
2023-08-30 11:42:03 +00:00
aggregations = self.get()
buckets = aggregations[self.name]["buckets"]
response = [
{
"id": i["key"][1],
"name": i["key"][0].title(),
"doc_count": i["doc_count"]["value"],
"duration": i["duration"]["value"],
"duration_str": get_duration_str(int(i["duration"]["value"])),
2023-08-30 11:42:03 +00:00
"media_size": i["media_size"]["value"],
}
for i in buckets
]
return response