mirror of
https://github.com/tubearchivist/tubearchivist.git
synced 2024-12-12 05:00:12 +00:00
347 lines
11 KiB
Python
347 lines
11 KiB
Python
"""all API views"""
|
|
|
|
from api.src.search_processor import SearchProcess
|
|
from home.src.download.thumbnails import ThumbManager
|
|
from home.src.es.connect import ElasticWrap
|
|
from home.src.index.video import SponsorBlock
|
|
from home.src.ta.config import AppConfig
|
|
from home.src.ta.helper import UrlListParser
|
|
from home.src.ta.ta_redis import RedisArchivist
|
|
from home.tasks import extrac_dl, subscribe_to
|
|
from rest_framework.authentication import (
|
|
SessionAuthentication,
|
|
TokenAuthentication,
|
|
)
|
|
from rest_framework.authtoken.models import Token
|
|
from rest_framework.authtoken.views import ObtainAuthToken
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
|
|
|
|
class ApiBaseView(APIView):
|
|
"""base view to inherit from"""
|
|
|
|
authentication_classes = [SessionAuthentication, TokenAuthentication]
|
|
permission_classes = [IsAuthenticated]
|
|
search_base = False
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.response = {"data": False, "config": AppConfig().config}
|
|
self.status_code = False
|
|
self.context = False
|
|
|
|
def get_document(self, document_id):
|
|
"""get single document from es"""
|
|
path = f"{self.search_base}{document_id}"
|
|
print(path)
|
|
response, status_code = ElasticWrap(path).get()
|
|
try:
|
|
self.response["data"] = SearchProcess(response).process()
|
|
except KeyError:
|
|
print(f"item not found: {document_id}")
|
|
self.response["data"] = False
|
|
self.status_code = status_code
|
|
|
|
def process_keys(self):
|
|
"""process keys for frontend"""
|
|
all_keys = self.response["data"].keys()
|
|
if "media_url" in all_keys:
|
|
media_url = self.response["data"]["media_url"]
|
|
self.response["data"]["media_url"] = f"/media/{media_url}"
|
|
if "vid_thumb_url" in all_keys:
|
|
youtube_id = self.response["data"]["youtube_id"]
|
|
vid_thumb_url = ThumbManager().vid_thumb_path(youtube_id)
|
|
cache_dir = self.response["config"]["application"]["cache_dir"]
|
|
new_thumb = f"{cache_dir}/{vid_thumb_url}"
|
|
self.response["data"]["vid_thumb_url"] = new_thumb
|
|
if "subtitles" in all_keys:
|
|
all_subtitles = self.response["data"]["subtitles"]
|
|
for idx, _ in enumerate(all_subtitles):
|
|
url = self.response["data"]["subtitles"][idx]["media_url"]
|
|
new_url = f"/media/{url}"
|
|
self.response["data"]["subtitles"][idx]["media_url"] = new_url
|
|
|
|
def get_paginate(self):
|
|
"""add pagination detail to response"""
|
|
self.response["paginate"] = False
|
|
|
|
def get_document_list(self, data):
|
|
"""get a list of results"""
|
|
print(self.search_base)
|
|
response, status_code = ElasticWrap(self.search_base).get(data=data)
|
|
self.response["data"] = SearchProcess(response).process()
|
|
self.status_code = status_code
|
|
|
|
|
|
class VideoApiView(ApiBaseView):
|
|
"""resolves to /api/video/<video_id>/
|
|
GET: returns metadata dict of video
|
|
"""
|
|
|
|
search_base = "ta_video/_doc/"
|
|
|
|
def get(self, request, video_id):
|
|
# pylint: disable=unused-argument
|
|
"""get request"""
|
|
self.get_document(video_id)
|
|
if self.response.get("data"):
|
|
self.process_keys()
|
|
return Response(self.response, status=self.status_code)
|
|
|
|
|
|
class VideoApiListView(ApiBaseView):
|
|
"""resolves to /api/video/
|
|
GET: returns list of videos
|
|
"""
|
|
|
|
search_base = "ta_video/_search/"
|
|
|
|
def get(self, request):
|
|
# pylint: disable=unused-argument
|
|
"""get request"""
|
|
data = {"query": {"match_all": {}}}
|
|
self.get_document_list(data)
|
|
self.get_paginate()
|
|
|
|
return Response(self.response)
|
|
|
|
|
|
class VideoProgressView(ApiBaseView):
|
|
"""resolves to /api/video/<video_id>/
|
|
handle progress status for video
|
|
"""
|
|
|
|
def get(self, request, video_id):
|
|
"""get progress for a single video"""
|
|
user_id = request.user.id
|
|
key = f"{user_id}:progress:{video_id}"
|
|
video_progress = RedisArchivist().get_message(key)
|
|
position = video_progress.get("position", 0)
|
|
|
|
self.response = {
|
|
"youtube_id": video_id,
|
|
"user_id": user_id,
|
|
"position": position,
|
|
}
|
|
return Response(self.response)
|
|
|
|
def post(self, request, video_id):
|
|
"""set progress position in redis"""
|
|
position = request.data.get("position", 0)
|
|
key = f"{request.user.id}:progress:{video_id}"
|
|
message = {"position": position, "youtube_id": video_id}
|
|
RedisArchivist().set_message(key, message, expire=False)
|
|
self.response = request.data
|
|
|
|
return Response(self.response)
|
|
|
|
def delete(self, request, video_id):
|
|
"""delete progress position"""
|
|
key = f"{request.user.id}:progress:{video_id}"
|
|
RedisArchivist().del_message(key)
|
|
self.response = {"progress-reset": video_id}
|
|
|
|
return Response(self.response)
|
|
|
|
|
|
class VideoSponsorView(ApiBaseView):
|
|
"""resolves to /api/video/<video_id>/sponsor/
|
|
handle sponsor block integration
|
|
"""
|
|
|
|
search_base = "ta_video/_doc/"
|
|
|
|
def get(self, request, video_id):
|
|
"""get sponsor info"""
|
|
# pylint: disable=unused-argument
|
|
|
|
self.get_document(video_id)
|
|
sponsorblock = self.response["data"].get("sponsorblock")
|
|
|
|
return Response(sponsorblock)
|
|
|
|
def post(self, request, video_id):
|
|
"""post verification and timestamps"""
|
|
if "segment" in request.data:
|
|
response, status_code = self._create_segment(request, video_id)
|
|
elif "vote" in request.data:
|
|
response, status_code = self._vote_on_segment(request)
|
|
|
|
return Response(response, status=status_code)
|
|
|
|
@staticmethod
|
|
def _create_segment(request, video_id):
|
|
"""create segment in API"""
|
|
start_time = request.data["segment"]["startTime"]
|
|
end_time = request.data["segment"]["endTime"]
|
|
response, status_code = SponsorBlock(request.user.id).post_timestamps(
|
|
video_id, start_time, end_time
|
|
)
|
|
|
|
return response, status_code
|
|
|
|
@staticmethod
|
|
def _vote_on_segment(request):
|
|
"""validate on existing segment"""
|
|
user_id = request.user.id
|
|
uuid = request.data["vote"]["uuid"]
|
|
vote = request.data["vote"]["yourVote"]
|
|
response, status_code = SponsorBlock(user_id).vote_on_segment(
|
|
uuid, vote
|
|
)
|
|
|
|
return response, status_code
|
|
|
|
|
|
class ChannelApiView(ApiBaseView):
|
|
"""resolves to /api/channel/<channel_id>/
|
|
GET: returns metadata dict of channel
|
|
"""
|
|
|
|
search_base = "ta_channel/_doc/"
|
|
|
|
def get(self, request, channel_id):
|
|
# pylint: disable=unused-argument
|
|
"""get request"""
|
|
self.get_document(channel_id)
|
|
return Response(self.response, status=self.status_code)
|
|
|
|
|
|
class ChannelApiListView(ApiBaseView):
|
|
"""resolves to /api/channel/
|
|
GET: returns list of channels
|
|
POST: edit a list of channels
|
|
"""
|
|
|
|
search_base = "ta_channel/_search/"
|
|
|
|
def get(self, request):
|
|
# pylint: disable=unused-argument
|
|
"""get request"""
|
|
data = {"query": {"match_all": {}}}
|
|
self.get_document_list(data)
|
|
self.get_paginate()
|
|
|
|
return Response(self.response)
|
|
|
|
@staticmethod
|
|
def post(request):
|
|
"""subscribe to list of channels"""
|
|
data = request.data
|
|
try:
|
|
to_add = data["data"]
|
|
except KeyError:
|
|
message = "missing expected data key"
|
|
print(message)
|
|
return Response({"message": message}, status=400)
|
|
|
|
pending = [i["channel_id"] for i in to_add if i["channel_subscribed"]]
|
|
url_str = " ".join(pending)
|
|
subscribe_to.delay(url_str)
|
|
|
|
return Response(data)
|
|
|
|
|
|
class PlaylistApiView(ApiBaseView):
|
|
"""resolves to /api/playlist/<playlist_id>/
|
|
GET: returns metadata dict of playlist
|
|
"""
|
|
|
|
search_base = "ta_playlist/_doc/"
|
|
|
|
def get(self, request, playlist_id):
|
|
# pylint: disable=unused-argument
|
|
"""get request"""
|
|
self.get_document(playlist_id)
|
|
return Response(self.response, status=self.status_code)
|
|
|
|
|
|
class DownloadApiView(ApiBaseView):
|
|
"""resolves to /api/download/<video_id>/
|
|
GET: returns metadata dict of an item in the download queue
|
|
"""
|
|
|
|
search_base = "ta_download/_doc/"
|
|
|
|
def get(self, request, video_id):
|
|
# pylint: disable=unused-argument
|
|
"""get request"""
|
|
self.get_document(video_id)
|
|
return Response(self.response, status=self.status_code)
|
|
|
|
|
|
class DownloadApiListView(ApiBaseView):
|
|
"""resolves to /api/download/
|
|
GET: returns latest videos in the download queue
|
|
POST: add a list of videos to download queue
|
|
"""
|
|
|
|
search_base = "ta_download/_search/"
|
|
|
|
def get(self, request):
|
|
# pylint: disable=unused-argument
|
|
"""get request"""
|
|
data = {"query": {"match_all": {}}}
|
|
self.get_document_list(data)
|
|
self.get_paginate()
|
|
return Response(self.response)
|
|
|
|
@staticmethod
|
|
def post(request):
|
|
"""add list of videos to download queue"""
|
|
print(f"request meta data: {request.META}")
|
|
data = request.data
|
|
try:
|
|
to_add = data["data"]
|
|
except KeyError:
|
|
message = "missing expected data key"
|
|
print(message)
|
|
return Response({"message": message}, status=400)
|
|
|
|
pending = [i["youtube_id"] for i in to_add if i["status"] == "pending"]
|
|
url_str = " ".join(pending)
|
|
try:
|
|
youtube_ids = UrlListParser(url_str).process_list()
|
|
except ValueError:
|
|
message = f"failed to parse: {url_str}"
|
|
print(message)
|
|
return Response({"message": message}, status=400)
|
|
|
|
extrac_dl.delay(youtube_ids)
|
|
|
|
return Response(data)
|
|
|
|
|
|
class PingView(ApiBaseView):
|
|
"""resolves to /api/ping/
|
|
GET: test your connection
|
|
"""
|
|
|
|
@staticmethod
|
|
def get(request):
|
|
"""get pong"""
|
|
data = {"response": "pong", "user": request.user.id}
|
|
return Response(data)
|
|
|
|
|
|
class LoginApiView(ObtainAuthToken):
|
|
"""resolves to /api/login/
|
|
POST: return token and username after successful login
|
|
"""
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
"""post data"""
|
|
# pylint: disable=no-member
|
|
serializer = self.serializer_class(
|
|
data=request.data, context={"request": request}
|
|
)
|
|
serializer.is_valid(raise_exception=True)
|
|
user = serializer.validated_data["user"]
|
|
token, _ = Token.objects.get_or_create(user=user)
|
|
|
|
print(f"returning token for user with id {user.pk}")
|
|
|
|
return Response({"token": token.key, "user_id": user.pk})
|