[API] add index count and video view progress stats views

This commit is contained in:
Simon 2023-08-29 15:45:55 +07:00
parent 2563722f16
commit 3e9f1a392a
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
3 changed files with 128 additions and 0 deletions

View File

@ -0,0 +1,93 @@
"""aggregations"""
from home.src.es.connect import ElasticWrap
from home.src.index.video_streams import DurationConverter
class AggBase:
"""base class for aggregation calls"""
path: str = ""
data: dict = {}
name: str = ""
def get(self):
"""make get call"""
data_size = {"size": 0, "aggs": self.data}
response, _ = ElasticWrap(self.path).get(data_size)
print(f"[agg][{self.name}] took {response.get('took')} ms to process")
return response.get("aggregations")
def process(self):
"""implement in subclassess"""
raise NotImplementedError
class Primary(AggBase):
"""primary aggregation for total documents indexed"""
name = "primary"
path = "ta_video,ta_channel,ta_playlist,ta_subtitle,ta_download/_search"
data = {name: {"terms": {"field": "_index"}}}
def process(self):
"""make the call"""
aggregations = self.get()
buck = aggregations[self.name]["buckets"]
return {i.get("key").lstrip("_ta"): i.get("doc_count") for i in buck}
class WatchProgress(AggBase):
"""get watch progress"""
name = "watch_progress"
path = "ta_video/_search"
data = {
name: {
"terms": {"field": "player.watched"},
"aggs": {
"watch_docs": {
"filter": {"terms": {"player.watched": [True, False]}},
"aggs": {
"true_count": {"value_count": {"field": "_index"}},
"duration": {"sum": {"field": "player.duration"}},
},
},
},
}
}
def process(self):
"""make the call"""
aggregations = self.get()
buckets = aggregations[self.name]["buckets"]
response = {}
for bucket in buckets:
response.update(self._build_bucket(bucket))
return response
@staticmethod
def _build_bucket(bucket):
"""parse bucket"""
duration = int(bucket["watch_docs"]["duration"]["value"])
duration_str = DurationConverter().get_str(duration)
items = bucket["watch_docs"]["true_count"]["value"]
if bucket["key_as_string"] == "false":
key = "unwatched"
else:
key = "watched"
bucket_parsed = {
key: {
"duration": duration,
"duration_str": duration_str,
"items": items,
}
}
return bucket_parsed

View File

@ -136,4 +136,14 @@ urlpatterns = [
views.NotificationView.as_view(),
name="api-notification",
),
path(
"stats/primary/",
views.StatPrimaryView.as_view(),
name="api-stats-primary",
),
path(
"stats/watch/",
views.StatWatchProgress.as_view(),
name="api-stats-watch",
),
]

View File

@ -1,5 +1,6 @@
"""all API views"""
from api.src.aggs import Primary, WatchProgress
from api.src.search_processor import SearchProcess
from home.src.download.queue import PendingInteract
from home.src.download.subscriptions import (
@ -975,3 +976,27 @@ class NotificationView(ApiBaseView):
query = f"{query}:{filter_by}"
return Response(RedisArchivist().list_items(query))
class StatPrimaryView(ApiBaseView):
"""resolves to /api/stats/primary/
GET: return document count
"""
def get(self, request):
"""get stats"""
# pylint: disable=unused-argument
return Response(Primary().process())
class StatWatchProgress(ApiBaseView):
"""resolves to /api/stats/watchprogress/
GET: return watch/unwatch progress stats
"""
def get(self, request):
"""handle get request"""
# pylint: disable=unused-argument
return Response(WatchProgress().process())