diff --git a/backend/download/serializers.py b/backend/download/serializers.py new file mode 100644 index 00000000..7ced40ac --- /dev/null +++ b/backend/download/serializers.py @@ -0,0 +1,94 @@ +"""download serializers""" + +# pylint: disable=abstract-method + +from common.serializers import PaginationSerializer, ValidateUnknownFieldsMixin +from rest_framework import serializers + + +class DownloadItemSerializer(serializers.Serializer): + """serialize download item""" + + auto_start = serializers.BooleanField() + channel_id = serializers.CharField() + channel_indexed = serializers.BooleanField() + channel_name = serializers.CharField() + duration = serializers.CharField() + published = serializers.CharField() + status = serializers.ChoiceField(choices=["pending", "ignore"]) + timestamp = serializers.IntegerField() + title = serializers.CharField() + vid_thumb_url = serializers.CharField() + vid_type = serializers.ChoiceField( + choices=["videos", "streams", "shorts", "unknown"] + ) + youtube_id = serializers.CharField() + _index = serializers.CharField(required=False) + _score = serializers.IntegerField(required=False) + + +class DownloadListSerializer(serializers.Serializer): + """serialize download list""" + + data = DownloadItemSerializer(many=True) + paginate = PaginationSerializer() + + +class DownloadListQuerySerializer( + ValidateUnknownFieldsMixin, serializers.Serializer +): + """serialize query params for download list""" + + filter = serializers.ChoiceField( + choices=["pending", "ignore"], required=False + ) + channel = serializers.CharField(required=False, help_text="channel ID") + + +class DownloadListQueueDeleteQuerySerializer(serializers.Serializer): + """serialize bulk delete download queue query string""" + + filter = serializers.ChoiceField(choices=["pending", "ignore"]) + + +class AddDownloadItemSerializer(serializers.Serializer): + """serialize single item to add""" + + youtube_id = serializers.CharField() + status = serializers.ChoiceField(choices=["pending"]) + + +class AddToDownloadListSerializer(serializers.Serializer): + """serialize add to download queue data""" + + data = AddDownloadItemSerializer(many=True) + + +class AddToDownloadQuerySerializer(serializers.Serializer): + """add to queue query serializer""" + + autostart = serializers.BooleanField(required=False) + + +class DownloadQueueItemUpdateSerializer(serializers.Serializer): + """update single download queue item""" + + status = serializers.ChoiceField( + choices=["pending", "ignore", "ignore-force", "priority"] + ) + + +class DownloadAggBucketSerializer(serializers.Serializer): + """serialize bucket""" + + key = serializers.ListField(child=serializers.CharField()) + key_as_string = serializers.CharField() + doc_count = serializers.IntegerField() + + +class DownloadAggsSerializer(serializers.Serializer): + """serialize download channel bucket aggregations""" + + doc_count_error_upper_bound = serializers.IntegerField() + sum_other_doc_count = serializers.IntegerField() + buckets = DownloadAggBucketSerializer(many=True) diff --git a/backend/download/views.py b/backend/download/views.py index 33e245a1..6ef1f7ff 100644 --- a/backend/download/views.py +++ b/backend/download/views.py @@ -1,7 +1,22 @@ """all download API views""" +from common.serializers import ( + AsyncTaskResponseSerializer, + ErrorResponseSerializer, +) from common.views_base import AdminOnly, ApiBaseView +from download.serializers import ( + AddToDownloadListSerializer, + AddToDownloadQuerySerializer, + DownloadAggsSerializer, + DownloadItemSerializer, + DownloadListQuerySerializer, + DownloadListQueueDeleteQuerySerializer, + DownloadListSerializer, + DownloadQueueItemUpdateSerializer, +) from download.src.queue import PendingInteract +from drf_spectacular.utils import OpenApiResponse, extend_schema from rest_framework.response import Response from task.tasks import download_pending, extrac_dl @@ -17,8 +32,14 @@ class DownloadApiListView(ApiBaseView): valid_filter = ["pending", "ignore"] permission_classes = [AdminOnly] + @extend_schema( + responses={ + 200: OpenApiResponse(DownloadListSerializer()), + }, + parameters=[DownloadListQuerySerializer()], + ) def get(self, request): - """get request""" + """get download queue list""" query_filter = request.GET.get("filter", False) self.data.update( { @@ -29,16 +50,16 @@ class DownloadApiListView(ApiBaseView): } ) - must_list = [] - if query_filter: - if query_filter not in self.valid_filter: - message = f"invalid url query filter: {query_filter}" - print(message) - return Response({"message": message}, status=400) + serializer = DownloadListQuerySerializer(data=request.query_params) + serializer.is_valid(raise_exception=True) + validated_data = serializer.validated_data + must_list = [] + query_filter = validated_data.get("filter") + if query_filter: must_list.append({"term": {"status": {"value": query_filter}}}) - filter_channel = request.GET.get("channel", False) + filter_channel = validated_data.get("channel") if filter_channel: must_list.append( {"term": {"channel_id": {"value": filter_channel}}} @@ -47,39 +68,169 @@ class DownloadApiListView(ApiBaseView): self.data["query"] = {"bool": {"must": must_list}} self.get_document_list(request) - return Response(self.response) + serializer = DownloadListSerializer(self.response) + + return Response(serializer.data) @staticmethod + @extend_schema( + request=AddToDownloadListSerializer(), + parameters=[AddToDownloadQuerySerializer()], + responses={ + 200: OpenApiResponse( + AsyncTaskResponseSerializer(), + description="New async task started", + ), + 400: OpenApiResponse( + ErrorResponseSerializer(), description="Bad request" + ), + }, + ) def post(request): """add list of videos to download queue""" - data = request.data - auto_start = bool(request.GET.get("autostart")) - try: - to_add = data["data"] - except KeyError: - message = "missing expected data key" - print(message) - return Response({"message": message}, status=400) + data_serializer = AddToDownloadListSerializer(data=request.data) + data_serializer.is_valid(raise_exception=True) + validated_data = data_serializer.validated_data + + query_serializer = AddToDownloadQuerySerializer( + data=request.query_params + ) + query_serializer.is_valid(raise_exception=True) + validated_query = query_serializer.validated_data + + auto_start = validated_query.get("autostart") + print(f"auto_start: {auto_start}") + to_add = validated_data["data"] pending = [i["youtube_id"] for i in to_add if i["status"] == "pending"] url_str = " ".join(pending) - extrac_dl.delay(url_str, auto_start=auto_start) + task = extrac_dl.delay(url_str, auto_start=auto_start) - return Response(data) + message = { + "message": "add to queue task started", + "task_id": task.id, + } + response_serializer = AsyncTaskResponseSerializer(message) + return Response(response_serializer.data) + + @extend_schema( + parameters=[DownloadListQueueDeleteQuerySerializer()], + responses={ + 204: OpenApiResponse(description="Download items deleted"), + 400: OpenApiResponse( + ErrorResponseSerializer(), description="Bad request" + ), + }, + ) def delete(self, request): - """delete download queue""" - query_filter = request.GET.get("filter", False) - if query_filter not in self.valid_filter: - message = f"invalid url query filter: {query_filter}" - print(message) - return Response({"message": message}, status=400) + """bulk delete download queue items by filter""" + serializer = DownloadListQueueDeleteQuerySerializer( + data=request.query_params + ) + serializer.is_valid(raise_exception=True) + validated_query = serializer.validated_data + query_filter = validated_query["filter"] message = f"delete queue by status: {query_filter}" print(message) PendingInteract(status=query_filter).delete_by_status() - return Response({"message": message}) + return Response(status=204) + + +class DownloadApiView(ApiBaseView): + """resolves to /api/download// + GET: returns metadata dict of an item in the download queue + POST: update status of item to pending or ignore + DELETE: forget from download queue + """ + + search_base = "ta_download/_doc/" + valid_status = ["pending", "ignore", "ignore-force", "priority"] + permission_classes = [AdminOnly] + + @extend_schema( + responses={ + 200: OpenApiResponse(DownloadItemSerializer()), + 404: OpenApiResponse( + ErrorResponseSerializer(), + description="Download item not found", + ), + }, + ) + def get(self, request, video_id): + # pylint: disable=unused-argument + """get download queue item""" + self.get_document(video_id) + if not self.response: + error = ErrorResponseSerializer( + {"error": "Download item not found"} + ) + return Response(error.data, status=404) + + response_serializer = DownloadItemSerializer(self.response) + + return Response(response_serializer.data, status=self.status_code) + + @extend_schema( + request=DownloadQueueItemUpdateSerializer(), + responses={ + 200: OpenApiResponse( + DownloadQueueItemUpdateSerializer(), + description="Download item update", + ), + 400: OpenApiResponse( + ErrorResponseSerializer(), description="Bad request" + ), + 404: OpenApiResponse( + ErrorResponseSerializer(), + description="Download item not found", + ), + }, + ) + def post(self, request, video_id): + """post to video to change status""" + data_serializer = DownloadQueueItemUpdateSerializer(data=request.data) + data_serializer.is_valid(raise_exception=True) + validated_data = data_serializer.validated_data + item_status = validated_data["status"] + + if item_status == "ignore-force": + extrac_dl.delay(video_id, status="ignore") + return Response(data_serializer.data) + + _, status_code = PendingInteract(video_id).get_item() + if status_code == 404: + error = ErrorResponseSerializer( + {"error": "Download item not found"} + ) + return Response(error.data, status=404) + + print(f"{video_id}: change status to {item_status}") + PendingInteract(video_id, item_status).update_status() + if item_status == "priority": + download_pending.delay(auto_only=True) + + return Response(data_serializer.data) + + @staticmethod + @extend_schema( + responses={ + 204: OpenApiResponse(description="delete download item"), + 404: OpenApiResponse( + ErrorResponseSerializer(), + description="Download item not found", + ), + }, + ) + def delete(request, video_id): + # pylint: disable=unused-argument + """delete single video from queue""" + print(f"{video_id}: delete from queue") + PendingInteract(video_id).delete_item() + + return Response(status=204) class DownloadAggsApiView(ApiBaseView): @@ -90,9 +241,24 @@ class DownloadAggsApiView(ApiBaseView): search_base = "ta_download/_search" valid_filter_view = ["ignore", "pending"] + @extend_schema( + parameters=[DownloadListQueueDeleteQuerySerializer()], + responses={ + 200: OpenApiResponse(DownloadAggsSerializer()), + 400: OpenApiResponse( + ErrorResponseSerializer(), description="bad request" + ), + }, + ) def get(self, request): """get aggs""" - filter_view = request.GET.get("filter") + serializer = DownloadListQueueDeleteQuerySerializer( + data=request.query_params + ) + serializer.is_valid(raise_exception=True) + validated_query = serializer.validated_data + + filter_view = validated_query.get("filter") if filter_view: if filter_view not in self.valid_filter_view: message = f"invalid filter: {filter_view}" @@ -121,57 +287,6 @@ class DownloadAggsApiView(ApiBaseView): } ) self.get_aggs() + serializer = DownloadAggsSerializer(self.response["channel_downloads"]) - return Response(self.response) - - -class DownloadApiView(ApiBaseView): - """resolves to /api/download// - GET: returns metadata dict of an item in the download queue - POST: update status of item to pending or ignore - DELETE: forget from download queue - """ - - search_base = "ta_download/_doc/" - valid_status = ["pending", "ignore", "ignore-force", "priority"] - permission_classes = [AdminOnly] - - def get(self, request, video_id): - # pylint: disable=unused-argument - """get request""" - self.get_document(video_id) - return Response(self.response, status=self.status_code) - - def post(self, request, video_id): - """post to video to change status""" - item_status = request.data.get("status") - if item_status not in self.valid_status: - message = f"{video_id}: invalid status {item_status}" - print(message) - return Response({"message": message}, status=400) - - if item_status == "ignore-force": - extrac_dl.delay(video_id, status="ignore") - message = f"{video_id}: set status to ignore" - return Response(request.data) - - _, status_code = PendingInteract(video_id).get_item() - if status_code == 404: - message = f"{video_id}: item not found {status_code}" - return Response({"message": message}, status=404) - - print(f"{video_id}: change status to {item_status}") - PendingInteract(video_id, item_status).update_status() - if item_status == "priority": - download_pending.delay(auto_only=True) - - return Response(request.data) - - @staticmethod - def delete(request, video_id): - # pylint: disable=unused-argument - """delete single video from queue""" - print(f"{video_id}: delete from queue") - PendingInteract(video_id).delete_item() - - return Response({"success": True}) + return Response(serializer.data)