manual reindex, #build

- Added reindex buttons for videos, channels, playlists
- [API] Added endpoints to controll reindex queue
- Added disctiction between *False* and *None* comments
- Fix for cast, enable by setting ENABLE_CAST=True env var
- Fix for failed comment extraction failing whole video
This commit is contained in:
simon 2022-12-19 15:37:38 +07:00
commit c987c13fc7
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
32 changed files with 589 additions and 232 deletions

48
ARCHITECTURE.md Normal file
View File

@ -0,0 +1,48 @@
# The Inner Workings of Tube Archivist
This is a high level overview of the architecture of Tube Archivist, intended for interested contributors to find your way around quickly.
```
Tube Archivist
______________________|_____________________
| | |
------------------- --------------- -------------------
| | | | | |
| DjangoProject | | RedisJson | | ElasticSearch |
| | | | | |
------------------- --------------- -------------------
```
## DjangoProject
This is the main Python application. Django serves its data container internally with **Uwsgi** on port 8080, the interface is served with **Nginx** on the public port 8000.
Users created static files like media files and artwork as well as application artwork like logos and fonts are served directly from Nginx, while the rest of the application uses uwsgi_pass to proxy the requests to uwsgi.
Config files are located in the `docker_assets` folder. The script `run.sh` is the container `CMD` command and entry point, validating env vars, connection to ElasticSearch (ES) and will start the application.
Compared to other Django projects, this application doesn't make use of the database models, due to a lack of integration with ES. This project has its own abstractions and integrations, treating ES as a REST API.
Long running application tasks are handed off to **Celery** - using **Redis** as a broker - to run asynchronously from the main threads.
- All tasks are defined in the `home.tasks.py` module.
There are three Django apps:
- **config**: The root app, routing the main endpoints and the main `settings.py` file
- **api**: The API app with its views and functionality
- **home**: Most of the application logic, templates and views, will probably get split up further in the future.
The *home* app is split up into packages in the `src` directory:
- **download**: All download related classes, interact with yt-dlp, download artwork, handle the download queue and post processing tasks.
- **es**: All index setup and validation classes, handles mapping validations and makes mapping changes, wrapper functions to simplify interactions with Elasticsearch, backup and restore.
- **frontend**: All direct interactions with the frontend, like Django forms, searching, watched state changes, and legacy api_calls in the process of moving to the api app.
- **index**: Contains all functionality for scraping and indexing videos, channels, playlists, comments, subtitles, etc...
- **ta**: Loose collection of functions and classes, handle application config and contains redis wrapper classes.
## RedisJson
Holds the main application config json object that gets dynamically edited from the frontend, serves as a message broker for **Celery**. Redis serves as a temporary and thread safe link between Django and the frontend, storing progress messages and temporary queues for processing. Used to store locking keys for threads and execution details for tasks.
- Wrapper classes to interact with Redis are located in the `home.src.ta.ta_redis.py` module.
## ElasticSearch (ES)
Is used to store and index all metadata, functions as an application database and makes it all searchable. The mapping defines which fields are indexed as searchable text fields and which fields are used for match filtering.
- The index setup and validation is handled in the `home.src.es.index_setup.py` module.
- Wrapper classes for making requests to ES are located in the `home.src.es.connect.py` module.

View File

@ -25,6 +25,11 @@ if [[ -n "$TA_UWSGI_PORT" ]]; then
sed -i "s/8080/$TA_UWSGI_PORT/g" /app/uwsgi.ini
fi
# disable auth on static files for cast support
if [[ -n "$ENABLE_CAST" ]]; then
sed -i "/auth_request/d" /etc/nginx/sites-available/default
fi
# wait for elasticsearch
counter=0
until curl -u "$ELASTIC_USER":"$ELASTIC_PASSWORD" "$ES_URL" -fs; do

View File

@ -38,6 +38,7 @@ Note:
**Additional**
- [Login](#login-view)
- [Task](#task-view) WIP
- [Refresh](#refresh-view)
- [Cookie](#cookie-view)
- [Search](#search-view)
- [Ping](#ping-view)
@ -306,6 +307,52 @@ List of valid task names:
- **download_pending**: Start the download queue
- **rescan_pending**: Rescan your subscriptions
## Refresh View
GET /api/refresh/
parameters:
- **type**: one of *video*, *channel*, *playlist*, optional
- **id**: item id, optional
without specifying type: return total for all queued items:
```json
{
"total_queued": 2,
"type": "all",
"state": "running"
}
```
specify type: return total items queue of this type:
```json
{
"total_queued": 2,
"type": "video",
"state": "running"
}
```
specify type *and* id to get state of item in queue:
```json
{
"total_queued": 2,
"type": "video",
"state": "in_queue",
"id": "video-id"
}
```
POST /api/refresh/
Parameter:
- extract_videos: to refresh all videos for channels/playlists, default False
Manually start a refresh task: post list of *video*, *channel*, *playlist* IDs.
```json
{
"video": ["video1", "video2", "video3"],
"channel": ["channel1", "channel2", "channel3"],
"playlist": ["playlist1", "playlist2"]
}
```
## Cookie View
Check your youtube cookie settings, *status* turns to `true` if cookie has been validated.

View File

@ -12,6 +12,7 @@ from api.views import (
PlaylistApiListView,
PlaylistApiVideoView,
PlaylistApiView,
RefreshView,
SearchView,
SnapshotApiListView,
SnapshotApiView,
@ -98,6 +99,11 @@ urlpatterns = [
DownloadApiView.as_view(),
name="api-download",
),
path(
"refresh/",
RefreshView.as_view(),
name="api-refresh",
),
path(
"task/",
TaskApiView.as_view(),

View File

@ -8,11 +8,12 @@ from home.src.es.connect import ElasticWrap
from home.src.es.snapshot import ElasticSnapshot
from home.src.frontend.searching import SearchForm
from home.src.index.generic import Pagination
from home.src.index.reindex import ReindexProgress
from home.src.index.video import SponsorBlock
from home.src.ta.config import AppConfig
from home.src.ta.helper import UrlListParser
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.tasks import extrac_dl, subscribe_to
from home.tasks import check_reindex, extrac_dl, subscribe_to
from rest_framework.authentication import (
SessionAuthentication,
TokenAuthentication,
@ -382,7 +383,7 @@ class DownloadApiView(ApiBaseView):
print(f"{video_id}: change status to {item_status}")
PendingInteract(video_id=video_id, status=item_status).update_status()
RedisQueue().clear_item(video_id)
RedisQueue(queue_name="dl_queue").clear_item(video_id)
return Response(request.data)
@ -589,6 +590,38 @@ class SnapshotApiView(ApiBaseView):
return Response(response)
class RefreshView(ApiBaseView):
"""resolves to /api/refresh/
GET: get refresh progress
POST: start a manual refresh task
"""
def get(self, request):
"""handle get request"""
request_type = request.GET.get("type")
request_id = request.GET.get("id")
if request_id and not request_type:
return Response({"status": "Bad Request"}, status=400)
try:
progress = ReindexProgress(
request_type=request_type, request_id=request_id
).get_progress()
except ValueError:
return Response({"status": "Bad Request"}, status=400)
return Response(progress)
def post(self, request):
"""handle post request"""
data = request.data
extract_videos = bool(request.GET.get("extract_videos", False))
check_reindex.delay(data=data, extract_videos=extract_videos)
return Response(data)
class CookieView(ApiBaseView):
"""resolves to /api/cookie/
GET: check if cookie is enabled

View File

@ -77,6 +77,7 @@ class StartupCheck:
"downloading",
"dl_queue",
"dl_queue_id",
"reindex",
"rescan",
"run_backup",
]

View File

@ -270,7 +270,7 @@ class PendingList(PendingIndex):
"channel_id": vid["channel_id"],
"duration": duration_str,
"published": published,
"timestamp": int(datetime.now().strftime("%s")),
"timestamp": int(datetime.now().timestamp()),
}
if self.all_channels:
youtube_details.update(

View File

@ -151,7 +151,7 @@ class CookieHandler:
now = datetime.now()
message = {
"status": response,
"validated": int(now.strftime("%s")),
"validated": int(now.timestamp()),
"validated_str": now.strftime("%Y-%m-%d %H:%M"),
}
RedisArchivist().set_message("cookie:valid", message)

View File

@ -28,7 +28,7 @@ class DownloadPostProcess:
def __init__(self, download):
self.download = download
self.now = int(datetime.now().strftime("%s"))
self.now = int(datetime.now().timestamp())
self.pending = False
def run(self):
@ -150,7 +150,8 @@ class DownloadPostProcess:
for idx, video_id in enumerate(self.download.videos):
comment = Comments(video_id, config=self.download.config)
comment.build_json(notify=(idx, total_videos))
comment.upload_comments()
if comment.json_data:
comment.upload_comments()
key = "message:download"
message = {
@ -184,7 +185,7 @@ class VideoDownloader:
"""setup download queue in redis loop until no more items"""
self._setup_queue()
queue = RedisQueue()
queue = RedisQueue(queue_name="dl_queue")
limit_queue = self.config["downloads"]["limit_count"]
if limit_queue:
@ -275,7 +276,7 @@ class VideoDownloader:
RedisArchivist().set_message(self.MSG, mess_dict, expire=True)
return
RedisQueue().add_list(to_add)
RedisQueue(queue_name="dl_queue").add_list(to_add)
def _progress_hook(self, response):
"""process the progress_hooks from yt_dlp"""

View File

@ -125,8 +125,7 @@ class IndexPaginate:
def validate_data(self):
"""add pit and size to data"""
if "sort" not in self.data.keys():
print(self.data)
raise ValueError("missing sort key in data")
self.data.update({"sort": [{"_doc": {"order": "desc"}}]})
self.data["size"] = self.size or self.DEFAULT_SIZE
self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"}

View File

@ -135,7 +135,7 @@ class ElasticSnapshot:
return True
last_stamp = snap_dicts[0]["end_stamp"]
now = int(datetime.now().strftime("%s"))
now = int(datetime.now().timestamp())
outdated = (now - last_stamp) / 60 / 60 > 24
if outdated:
print("snapshot: is outdated, create new now")

View File

@ -123,7 +123,7 @@ class PostData:
print(f"{video_id}: ignore video from download queue")
PendingInteract(video_id=video_id, status="ignore").update_status()
# also clear from redis queue
RedisQueue().clear_item(video_id)
RedisQueue(queue_name="dl_queue").clear_item(video_id)
return {"success": True}
@staticmethod
@ -141,7 +141,7 @@ class PostData:
to_execute = self.exec_val
if to_execute == "stop":
print("stopping download queue")
RedisQueue().clear()
RedisQueue(queue_name="dl_queue").clear()
elif to_execute == "kill":
task_id = RedisArchivist().get_message("dl_queue_id")
if not isinstance(task_id, str):

View File

@ -68,12 +68,6 @@ class ApplicationSettingsForm(forms.Form):
("1", "enable sponsorblock integration"),
]
CAST_CHOICES = [
("", "-- change Cast integration --"),
("0", "disable Cast"),
("1", "enable Cast"),
]
SNAPSHOT_CHOICES = [
("", "-- change snapshot settings --"),
("0", "disable system snapshots"),
@ -139,9 +133,6 @@ class ApplicationSettingsForm(forms.Form):
downloads_integrate_sponsorblock = forms.ChoiceField(
widget=forms.Select, choices=SP_CHOICES, required=False
)
application_enable_cast = forms.ChoiceField(
widget=forms.Select, choices=CAST_CHOICES, required=False
)
application_enable_snapshot = forms.ChoiceField(
widget=forms.Select, choices=SNAPSHOT_CHOICES, required=False
)

View File

@ -14,7 +14,7 @@ class WatchState:
def __init__(self, youtube_id):
self.youtube_id = youtube_id
self.stamp = int(datetime.now().strftime("%s"))
self.stamp = int(datetime.now().timestamp())
def mark_as_watched(self):
"""update es with new watched value"""

View File

@ -88,7 +88,7 @@ class ChannelScraper:
# build and return dict
self.json_data = {
"channel_active": True,
"channel_last_refresh": int(datetime.now().strftime("%s")),
"channel_last_refresh": int(datetime.now().timestamp()),
"channel_subs": self._get_channel_subs(main_tab),
"channel_name": main_tab["title"],
"channel_banner_url": self._get_thumbnails(main_tab, "banner"),
@ -203,7 +203,7 @@ class YoutubeChannel(YouTubeItem):
print(f"{self.youtube_id}: fallback to video metadata")
self.json_data = {
"channel_active": False,
"channel_last_refresh": int(datetime.now().strftime("%s")),
"channel_last_refresh": int(datetime.now().timestamp()),
"channel_subs": fallback.get("channel_follower_count", 0),
"channel_name": fallback["uploader"],
"channel_banner_url": False,

View File

@ -33,14 +33,14 @@ class Comments:
self._send_notification(notify)
comments_raw, channel_id = self.get_yt_comments()
if comments_raw:
self.format_comments(comments_raw)
else:
self.comments_format = []
if not comments_raw and not channel_id:
return
self.format_comments(comments_raw)
self.json_data = {
"youtube_id": self.youtube_id,
"comment_last_refresh": int(datetime.now().strftime("%s")),
"comment_last_refresh": int(datetime.now().timestamp()),
"comment_channel_id": channel_id,
"comment_comments": self.comments_format,
}
@ -96,6 +96,9 @@ class Comments:
"""get comments from youtube"""
yt_obs = self.build_yt_obs()
info_json = YtWrap(yt_obs).extract(self.youtube_id)
if not info_json:
return False, False
comments_raw = info_json.get("comments")
channel_id = info_json.get("channel_id")
return comments_raw, channel_id
@ -104,9 +107,10 @@ class Comments:
"""process comments to match format"""
comments = []
for comment in comments_raw:
cleaned_comment = self.clean_comment(comment)
comments.append(cleaned_comment)
if comments_raw:
for comment in comments_raw:
cleaned_comment = self.clean_comment(comment)
comments.append(cleaned_comment)
self.comments_format = comments
@ -169,6 +173,9 @@ class Comments:
return
self.build_json()
if not self.json_data:
return
es_comments = self.get_es_comments()
if not self.comments_format:

View File

@ -14,11 +14,9 @@ import subprocess
from home.src.download.queue import PendingList
from home.src.download.thumbnails import ThumbManager
from home.src.es.connect import ElasticWrap
from home.src.index.reindex import Reindex
from home.src.index.video import YoutubeVideo, index_new_video
from home.src.ta.config import AppConfig
from home.src.ta.helper import clean_string, ignore_filelist
from home.src.ta.ta_redis import RedisArchivist
from PIL import Image, ImageFile
from yt_dlp.utils import ISO639Utils
@ -606,11 +604,3 @@ def scan_filesystem():
for missing_vid in filesystem_handler.to_index:
youtube_id = missing_vid[2]
index_new_video(youtube_id)
def reindex_old_documents():
"""daily refresh of old documents"""
handler = Reindex()
handler.check_outdated()
handler.reindex()
RedisArchivist().set_message("last_reindex", handler.now)

View File

@ -66,7 +66,7 @@ class YoutubePlaylist(YouTubeItem):
"playlist_channel_id": self.youtube_meta["channel_id"],
"playlist_thumbnail": playlist_thumbnail,
"playlist_description": self.youtube_meta["description"] or False,
"playlist_last_refresh": int(datetime.now().strftime("%s")),
"playlist_last_refresh": int(datetime.now().timestamp()),
}
def get_entries(self, playlistend=False):

View File

@ -7,148 +7,249 @@ functionality:
import os
import shutil
from datetime import datetime
from math import ceil
from time import sleep
from home.src.download.queue import PendingList
from home.src.download.thumbnails import ThumbManager
from home.src.download.yt_dlp_base import CookieHandler
from home.src.download.yt_dlp_handler import VideoDownloader
from home.src.es.connect import ElasticWrap
from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.index.channel import YoutubeChannel
from home.src.index.comments import Comments
from home.src.index.playlist import YoutubePlaylist
from home.src.index.video import YoutubeVideo
from home.src.ta.config import AppConfig
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
class Reindex:
"""check for outdated documents and refresh data from youtube"""
class ReindexBase:
"""base config class for reindex task"""
MATCH_FIELD = {
"ta_video": "active",
"ta_channel": "channel_active",
"ta_playlist": "playlist_active",
REINDEX_CONFIG = {
"video": {
"index_name": "ta_video",
"queue_name": "reindex:ta_video",
"active_key": "active",
"refresh_key": "vid_last_refresh",
},
"channel": {
"index_name": "ta_channel",
"queue_name": "reindex:ta_channel",
"active_key": "channel_active",
"refresh_key": "channel_last_refresh",
},
"playlist": {
"index_name": "ta_playlist",
"queue_name": "reindex:ta_playlist",
"active_key": "playlist_active",
"refresh_key": "playlist_last_refresh",
},
}
MULTIPLY = 1.2
def __init__(self):
# config
self.now = int(datetime.now().strftime("%s"))
self.config = AppConfig().config
self.now = int(datetime.now().timestamp())
def populate(self, all_ids, reindex_config):
"""add all to reindex ids to redis queue"""
if not all_ids:
return
RedisQueue(queue_name=reindex_config["queue_name"]).add_list(all_ids)
class ReindexOutdated(ReindexBase):
"""add outdated documents to reindex queue"""
def __init__(self):
super().__init__()
self.interval = self.config["scheduler"]["check_reindex_days"]
# scan
self.all_youtube_ids = False
self.all_channel_ids = False
self.all_playlist_ids = False
def check_cookie(self):
"""validate cookie if enabled"""
if self.config["downloads"]["cookie_import"]:
valid = CookieHandler(self.config).validate()
if not valid:
return
def add_outdated(self):
"""add outdated documents"""
for reindex_config in self.REINDEX_CONFIG.values():
total_hits = self._get_total_hits(reindex_config)
daily_should = self._get_daily_should(total_hits)
all_ids = self._get_outdated_ids(reindex_config, daily_should)
self.populate(all_ids, reindex_config)
def _get_daily(self):
"""get daily refresh values"""
total_videos = self._get_total_hits("ta_video")
video_daily = ceil(total_videos / self.interval * self.MULTIPLY)
if video_daily >= 10000:
video_daily = 9999
total_channels = self._get_total_hits("ta_channel")
channel_daily = ceil(total_channels / self.interval * self.MULTIPLY)
total_playlists = self._get_total_hits("ta_playlist")
playlist_daily = ceil(total_playlists / self.interval * self.MULTIPLY)
return (video_daily, channel_daily, playlist_daily)
def _get_total_hits(self, index):
@staticmethod
def _get_total_hits(reindex_config):
"""get total hits from index"""
match_field = self.MATCH_FIELD[index]
path = f"{index}/_search?filter_path=hits.total"
data = {"query": {"match": {match_field: True}}}
index_name = reindex_config["index_name"]
active_key = reindex_config["active_key"]
path = f"{index_name}/_search?filter_path=hits.total"
data = {"query": {"match": {active_key: True}}}
response, _ = ElasticWrap(path).post(data=data)
total_hits = response["hits"]["total"]["value"]
return total_hits
def _get_unrated_vids(self):
"""get max 200 videos without rating if ryd integration is enabled"""
must_not_list = [
{"exists": {"field": "stats.average_rating"}},
{"term": {"active": {"value": False}}},
]
data = {"size": 200, "query": {"bool": {"must_not": must_not_list}}}
response, _ = ElasticWrap("ta_video/_search").get(data=data)
def _get_daily_should(self, total_hits):
"""calc how many should reindex daily"""
daily_should = int((total_hits // self.interval + 1) * self.MULTIPLY)
if daily_should >= 10000:
daily_should = 9999
missing_rating = [i["_id"] for i in response["hits"]["hits"]]
self.all_youtube_ids = self.all_youtube_ids + missing_rating
return daily_should
def _get_outdated_vids(self, size):
"""get daily videos to refresh"""
def _get_outdated_ids(self, reindex_config, daily_should):
"""get outdated from index_name"""
index_name = reindex_config["index_name"]
refresh_key = reindex_config["refresh_key"]
now_lte = self.now - self.interval * 24 * 60 * 60
must_list = [
{"match": {"active": True}},
{"range": {"vid_last_refresh": {"lte": now_lte}}},
{"range": {refresh_key: {"lte": now_lte}}},
]
data = {
"size": size,
"size": daily_should,
"query": {"bool": {"must": must_list}},
"sort": [{"vid_last_refresh": {"order": "asc"}}],
"sort": [{refresh_key: {"order": "asc"}}],
"_source": False,
}
response, _ = ElasticWrap("ta_video/_search").get(data=data)
response, _ = ElasticWrap(f"{index_name}/_search").get(data=data)
all_youtube_ids = [i["_id"] for i in response["hits"]["hits"]]
return all_youtube_ids
all_ids = [i["_id"] for i in response["hits"]["hits"]]
return all_ids
def _get_outdated_channels(self, size):
"""get daily channels to refresh"""
now_lte = self.now - self.interval * 24 * 60 * 60
must_list = [
{"match": {"channel_active": True}},
{"range": {"channel_last_refresh": {"lte": now_lte}}},
]
class ReindexManual(ReindexBase):
"""
manually add ids to reindex queue from API
data_example = {
"video": ["video1", "video2", "video3"],
"channel": ["channel1", "channel2", "channel3"],
"playlist": ["playlist1", "playlist2"],
}
extract_videos to also reindex all videos of channel/playlist
"""
def __init__(self, extract_videos=False):
super().__init__()
self.extract_videos = extract_videos
self.data = False
def extract_data(self, data):
"""process data"""
self.data = data
for key, values in self.data.items():
reindex_config = self.REINDEX_CONFIG.get(key)
if not reindex_config:
print(f"reindex type {key} not valid")
raise ValueError
self.process_index(reindex_config, values)
def process_index(self, index_config, values):
"""process values per index"""
index_name = index_config["index_name"]
if index_name == "ta_video":
self._add_videos(values)
elif index_name == "ta_channel":
self._add_channels(values)
elif index_name == "ta_playlist":
self._add_playlists(values)
def _add_videos(self, values):
"""add list of videos to reindex queue"""
if not values:
return
RedisQueue("reindex:ta_video").add_list(values)
def _add_channels(self, values):
"""add list of channels to reindex queue"""
RedisQueue("reindex:ta_channel").add_list(values)
if self.extract_videos:
for channel_id in values:
all_videos = self._get_channel_videos(channel_id)
self._add_videos(all_videos)
def _add_playlists(self, values):
"""add list of playlists to reindex queue"""
RedisQueue("reindex:ta_playlist").add_list(values)
if self.extract_videos:
for playlist_id in values:
all_videos = self._get_playlist_videos(playlist_id)
self._add_videos(all_videos)
def _get_channel_videos(self, channel_id):
"""get all videos from channel"""
data = {
"size": size,
"query": {"bool": {"must": must_list}},
"sort": [{"channel_last_refresh": {"order": "asc"}}],
"_source": False,
"query": {"term": {"channel.channel_id": {"value": channel_id}}},
"_source": ["youtube_id"],
}
response, _ = ElasticWrap("ta_channel/_search").get(data=data)
all_results = IndexPaginate("ta_video", data).get_results()
return [i["youtube_id"] for i in all_results]
all_channel_ids = [i["_id"] for i in response["hits"]["hits"]]
return all_channel_ids
def _get_outdated_playlists(self, size):
"""get daily outdated playlists to refresh"""
now_lte = self.now - self.interval * 24 * 60 * 60
must_list = [
{"match": {"playlist_active": True}},
{"range": {"playlist_last_refresh": {"lte": now_lte}}},
]
def _get_playlist_videos(self, playlist_id):
"""get all videos from playlist"""
data = {
"size": size,
"query": {"bool": {"must": must_list}},
"sort": [{"playlist_last_refresh": {"order": "asc"}}],
"_source": False,
"query": {"term": {"playlist.keyword": {"value": playlist_id}}},
"_source": ["youtube_id"],
}
response, _ = ElasticWrap("ta_playlist/_search").get(data=data)
all_results = IndexPaginate("ta_video", data).get_results()
return [i["youtube_id"] for i in all_results]
all_playlist_ids = [i["_id"] for i in response["hits"]["hits"]]
return all_playlist_ids
def check_outdated(self):
"""add missing vids and channels"""
video_daily, channel_daily, playlist_daily = self._get_daily()
self.all_youtube_ids = self._get_outdated_vids(video_daily)
self.all_channel_ids = self._get_outdated_channels(channel_daily)
self.all_playlist_ids = self._get_outdated_playlists(playlist_daily)
class Reindex(ReindexBase):
"""reindex all documents from redis queue"""
integrate_ryd = self.config["downloads"]["integrate_ryd"]
if integrate_ryd:
self._get_unrated_vids()
def __init__(self):
super().__init__()
self.all_indexed_ids = False
def reindex_all(self):
"""reindex all in queue"""
if self.cookie_invalid():
print("[reindex] cookie invalid, exiting...")
return
for index_config in self.REINDEX_CONFIG.values():
if not RedisQueue(index_config["queue_name"]).has_item():
continue
while True:
has_next = self.reindex_index(index_config)
if not has_next:
break
RedisArchivist().set_message("last_reindex", self.now)
def reindex_index(self, index_config):
"""reindex all of a single index"""
reindex = self.get_reindex_map(index_config["index_name"])
youtube_id = RedisQueue(index_config["queue_name"]).get_next()
if youtube_id:
reindex(youtube_id)
sleep_interval = self.config["downloads"].get("sleep_interval", 0)
sleep(sleep_interval)
return bool(youtube_id)
def get_reindex_map(self, index_name):
"""return def to run for index"""
def_map = {
"ta_video": self._reindex_single_video,
"ta_channel": self._reindex_single_channel,
"ta_playlist": self._reindex_single_playlist,
}
return def_map.get(index_name)
def _reindex_single_video(self, youtube_id):
"""wrapper to handle channel name changes"""
try:
self._reindex_single_video_call(youtube_id)
except FileNotFoundError:
ChannelUrlFixer(youtube_id, self.config)
self._reindex_single_video_call(youtube_id)
def _reindex_single_video_call(self, youtube_id):
"""refresh data for single video"""
video = YoutubeVideo(youtube_id)
@ -206,13 +307,13 @@ class Reindex:
channel.upload_to_es()
channel.sync_to_videos()
@staticmethod
def _reindex_single_playlist(playlist_id, all_indexed_ids):
def _reindex_single_playlist(self, playlist_id):
"""refresh playlist data"""
self._get_all_videos()
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
subscribed = playlist.json_data["playlist_subscribed"]
playlist.all_youtube_ids = all_indexed_ids
playlist.all_youtube_ids = self.all_indexed_ids
playlist.build_json(scrape=True)
if not playlist.json_data:
playlist.deactivate()
@ -222,37 +323,97 @@ class Reindex:
playlist.upload_to_es()
return
def reindex(self):
"""reindex what's needed"""
sleep_interval = self.config["downloads"]["sleep_interval"]
# videos
print(f"reindexing {len(self.all_youtube_ids)} videos")
for youtube_id in self.all_youtube_ids:
try:
self._reindex_single_video(youtube_id)
except FileNotFoundError:
# handle channel name change here
ChannelUrlFixer(youtube_id, self.config).run()
self._reindex_single_video(youtube_id)
if sleep_interval:
sleep(sleep_interval)
# channels
print(f"reindexing {len(self.all_channel_ids)} channels")
for channel_id in self.all_channel_ids:
self._reindex_single_channel(channel_id)
if sleep_interval:
sleep(sleep_interval)
# playlist
print(f"reindexing {len(self.all_playlist_ids)} playlists")
if self.all_playlist_ids:
handler = PendingList()
handler.get_download()
handler.get_indexed()
all_indexed_ids = [i["youtube_id"] for i in handler.all_videos]
for playlist_id in self.all_playlist_ids:
self._reindex_single_playlist(playlist_id, all_indexed_ids)
if sleep_interval:
sleep(sleep_interval)
def _get_all_videos(self):
"""add all videos for playlist index validation"""
if self.all_indexed_ids:
return
handler = PendingList()
handler.get_download()
handler.get_indexed()
self.all_indexed_ids = [i["youtube_id"] for i in handler.all_videos]
def cookie_invalid(self):
"""return true if cookie is enabled and invalid"""
if not self.config["downloads"]["cookie_import"]:
return False
valid = CookieHandler(self.config).validate()
return valid
class ReindexProgress(ReindexBase):
"""
get progress of reindex task
request_type: key of self.REINDEX_CONFIG
request_id: id of request_type
return = {
"state": "running" | "queued" | False
"total_queued": int
"in_queue_name": "queue_name"
}
"""
def __init__(self, request_type=False, request_id=False):
super().__init__()
self.request_type = request_type
self.request_id = request_id
def get_progress(self):
"""get progress from task"""
queue_name, request_type = self._get_queue_name()
total = self._get_total_in_queue(queue_name)
progress = {
"total_queued": total,
"type": request_type,
}
state = self._get_state(total, queue_name)
progress.update(state)
return progress
def _get_queue_name(self):
"""return queue_name, queue_type, raise exception on error"""
if not self.request_type:
return "all", "all"
reindex_config = self.REINDEX_CONFIG.get(self.request_type)
if not reindex_config:
print(f"reindex_config not found: {self.request_type}")
raise ValueError
return reindex_config["queue_name"], self.request_type
def _get_total_in_queue(self, queue_name):
"""get all items in queue"""
total = 0
if queue_name == "all":
queues = [i["queue_name"] for i in self.REINDEX_CONFIG.values()]
for queue in queues:
total += len(RedisQueue(queue).get_all())
else:
total += len(RedisQueue(queue_name).get_all())
return total
def _get_state(self, total, queue_name):
"""get state based on request_id"""
state_dict = {}
if self.request_id:
state = RedisQueue(queue_name).in_queue(self.request_id)
state_dict.update({"id": self.request_id, "state": state})
return state_dict
if total:
state = "running"
else:
state = "empty"
state_dict.update({"state": state})
return state_dict
class ChannelUrlFixer:

View File

@ -285,7 +285,7 @@ class SubtitleParser:
"title": video.json_data.get("title"),
"subtitle_channel": channel.get("channel_name"),
"subtitle_channel_id": channel.get("channel_id"),
"subtitle_last_refresh": int(datetime.now().strftime("%s")),
"subtitle_last_refresh": int(datetime.now().timestamp()),
"subtitle_lang": self.lang,
"subtitle_source": source,
}

View File

@ -28,7 +28,7 @@ class SponsorBlock:
def __init__(self, user_id=False):
self.user_id = user_id
self.user_agent = f"{settings.TA_UPSTREAM} {settings.TA_VERSION}"
self.last_refresh = int(datetime.now().strftime("%s"))
self.last_refresh = int(datetime.now().timestamp())
def get_sb_id(self):
"""get sponsorblock userid or generate if needed"""
@ -180,7 +180,7 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
upload_date = self.youtube_meta["upload_date"]
upload_date_time = datetime.strptime(upload_date, "%Y%m%d")
published = upload_date_time.strftime("%Y-%m-%d")
last_refresh = int(datetime.now().strftime("%s"))
last_refresh = int(datetime.now().timestamp())
# base64_blur = ThumbManager().get_base64_blur(self.youtube_id)
base64_blur = False
# build json_data basics

View File

@ -47,18 +47,6 @@ class AppConfig:
@staticmethod
def get_config_env():
"""read environment application variables"""
host_uid_env = os.environ.get("HOST_UID")
if host_uid_env:
host_uid = int(host_uid_env)
else:
host_uid = False
host_gid_env = os.environ.get("HOST_GID")
if host_gid_env:
host_gid = int(host_gid_env)
else:
host_gid = False
es_pass = os.environ.get("ELASTIC_PASSWORD")
es_user = os.environ.get("ELASTIC_USER", default="elastic")
@ -66,8 +54,9 @@ class AppConfig:
"REDIS_HOST": os.environ.get("REDIS_HOST"),
"es_url": os.environ.get("ES_URL"),
"es_auth": (es_user, es_pass),
"HOST_UID": host_uid,
"HOST_GID": host_gid,
"HOST_UID": int(os.environ.get("HOST_UID", False)),
"HOST_GID": int(os.environ.get("HOST_GID", False)),
"enable_cast": bool(os.environ.get("ENABLE_CAST")),
}
return application

View File

@ -102,11 +102,11 @@ class RedisArchivist(RedisBase):
class RedisQueue(RedisBase):
"""dynamically interact with the download queue in redis"""
"""dynamically interact with queues in redis"""
def __init__(self):
def __init__(self, queue_name):
super().__init__()
self.key = self.NAME_SPACE + "dl_queue"
self.key = f"{self.NAME_SPACE}{queue_name}"
def get_all(self):
"""return all elements in list"""
@ -114,6 +114,14 @@ class RedisQueue(RedisBase):
all_elements = [i.decode() for i in result]
return all_elements
def in_queue(self, element):
"""check if element is in list"""
result = self.conn.execute_command("LPOS", self.key, element)
if result is not None:
return "in_queue"
return False
def add_list(self, to_add):
"""add list to queue"""
self.conn.execute_command("RPUSH", self.key, *to_add)

View File

@ -20,11 +20,8 @@ from home.src.download.yt_dlp_handler import VideoDownloader
from home.src.es.backup import ElasticBackup
from home.src.es.index_setup import ElasitIndexWrap
from home.src.index.channel import YoutubeChannel
from home.src.index.filesystem import (
ImportFolderScanner,
reindex_old_documents,
scan_filesystem,
)
from home.src.index.filesystem import ImportFolderScanner, scan_filesystem
from home.src.index.reindex import Reindex, ReindexManual, ReindexOutdated
from home.src.ta.config import AppConfig, ScheduleBuilder
from home.src.ta.helper import UrlListParser, clear_dl_cache
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
@ -99,7 +96,7 @@ def download_pending():
@shared_task
def download_single(youtube_id):
"""start download single video now"""
queue = RedisQueue()
queue = RedisQueue(queue_name="dl_queue")
queue.add_priority(youtube_id)
print("Added to queue with priority: " + youtube_id)
# start queue if needed
@ -136,9 +133,27 @@ def extrac_dl(youtube_ids):
@shared_task(name="check_reindex")
def check_reindex():
def check_reindex(data=False, extract_videos=False):
"""run the reindex main command"""
reindex_old_documents()
if data:
ReindexManual(extract_videos=extract_videos).extract_data(data)
have_lock = False
reindex_lock = RedisArchivist().get_lock("reindex")
try:
have_lock = reindex_lock.acquire(blocking=False)
if have_lock:
if not data:
ReindexOutdated().add_outdated()
Reindex().reindex_all()
else:
print("Did not acquire reindex lock.")
finally:
if have_lock:
reindex_lock.release()
@shared_task
@ -192,7 +207,7 @@ def kill_dl(task_id):
app.control.revoke(task_id, terminate=True)
_ = RedisArchivist().del_message("dl_queue_id")
RedisQueue().clear()
RedisQueue(queue_name="dl_queue").clear()
clear_dl_cache(CONFIG)

View File

@ -47,10 +47,20 @@
{% elif channel_info.channel_views > 0 %}
<p>Channel views: {{ channel_info.channel_views|intcomma }}</p>
{% endif %}
<button onclick="deleteConfirm()" id="delete-item">Delete Channel</button>
<div class="delete-confirm" id="delete-button">
<span>Delete {{ channel_info.channel_name }} including all videos? </span><button class="danger-button" onclick="deleteChannel(this)" data-id="{{ channel_info.channel_id }}">Delete</button> <button onclick="cancelDelete()">Cancel</button>
<div class="button-box">
<button onclick="deleteConfirm()" id="delete-item">Delete Channel</button>
<div class="delete-confirm" id="delete-button">
<span>Delete {{ channel_info.channel_name }} including all videos? </span><button class="danger-button" onclick="deleteChannel(this)" data-id="{{ channel_info.channel_id }}">Delete</button> <button onclick="cancelDelete()">Cancel</button>
</div>
</div>
{% if reindex %}
<p>Reindex scheduled</p>
{% else %}
<div id="reindex-button" class="button-box">
<button data-id="{{ channel_info.channel_id }}" data-type="channel" onclick="reindex(this)" title="Reindex Channel {{ channel_info.channel_name }}">Reindex</button>
<button data-id="{{ channel_info.channel_id }}" data-type="channel" data-extract-videos="true" onclick="reindex(this)" title="Reindex Videos of {{ channel_info.channel_name }}">Reindex Videos</button>
</div>
{% endif %}
</div>
</div>
</div>

View File

@ -52,6 +52,14 @@
<p>Total Videos archived: {{ max_hits }}/{{ playlist_info.playlist_entries|length }}</p>
<p>Watched: <button title="Mark all videos from {{ playlist_info.playlist_name }} as watched" type="button" id="watched-button" data-id="{{ playlist_info.playlist_id }}" onclick="isWatchedButton(this)">Mark as watched</button></p>
{% endif %}
{% if reindex %}
<p>Reindex scheduled</p>
{% else %}
<div id="reindex-button" class="button-box">
<button data-id="{{ playlist_info.playlist_id }}" data-type="playlist" onclick="reindex(this)" title="Reindex Playlist {{ playlist_info.playlist_name }}">Reindex</button>
<button data-id="{{ playlist_info.playlist_id }}" data-type="playlist" data-extract-videos="true" onclick="reindex(this)" title="Reindex Videos of {{ playlist_info.playlist_name }}">Reindex Videos</button>
</div>
{% endif %}
</div>
</div>
</div>

View File

@ -165,11 +165,6 @@
<i>Before activating that, make sure you have a scraping sleep interval of at least 3 secs set to avoid ratelimiting issues.</i><br>
{{ app_form.downloads_integrate_sponsorblock }}
</div>
<div class="settings-item">
<p>Current Cast integration: <span class="settings-current">{{ config.application.enable_cast }}</span></p>
<i>Enabling Cast will load an additional JS library from Google. HTTPS and a supported browser are required for this integration.</i><br>
{{ app_form.application_enable_cast }}
</div>
</div>
<div class="settings-group">
<h2 id="snapshots">Snapshots</h2>

View File

@ -56,10 +56,19 @@
{% else %}
<p>Youtube: Deactivated</p>
{% endif %}
<a download="" href="/media/{{ video.media_url }}"><button id="download-item">Download File</button></a>
<button onclick="deleteConfirm()" id="delete-item">Delete Video</button>
<div class="delete-confirm" id="delete-button">
<span>Are you sure? </span><button class="danger-button" onclick="deleteVideo(this)" data-id="{{ video.youtube_id }}" data-redirect = "{{ video.channel.channel_id }}">Delete</button> <button onclick="cancelDelete()">Cancel</button>
{% if reindex %}
<p>Reindex scheduled</p>
{% else %}
<div id="reindex-button" class="button-box">
<button data-id="{{ video.youtube_id }}" data-type="video" onclick="reindex(this)" title="Reindex {{ video.title }}">Reindex</button>
</div>
{% endif %}
<div class="button-box">
<a download="" href="/media/{{ video.media_url }}"><button id="download-item">Download File</button></a>
<button onclick="deleteConfirm()" id="delete-item">Delete Video</button>
<div class="delete-confirm" id="delete-button">
<span>Are you sure? </span><button class="danger-button" onclick="deleteVideo(this)" data-id="{{ video.youtube_id }}" data-redirect = "{{ video.channel.channel_id }}">Delete</button> <button onclick="cancelDelete()">Cancel</button>
</div>
</div>
</div>
</div>
@ -140,13 +149,17 @@
</div>
<script>getSimilarVideos('{{ video.youtube_id }}')</script>
</div>
{% if video.comment_count %}
<div class="comments-section">
<h3>Comments: {{video.comment_count}}</h3>
<div id="comments-list" class="comments-list">
</div>
{% if video.comment_count == 0 %}
<div class="comments-section">
<span>Video has no comments</span>
</div>
{% elif video.comment_count %}
<div class="comments-section">
<h3>Comments: {{video.comment_count}}</h3>
<div id="comments-list" class="comments-list">
</div>
<script>getComments('{{ video.youtube_id }}')</script>
</div>
{% endif %}
</div>
<script>

View File

@ -35,6 +35,7 @@ from home.src.frontend.searching import SearchHandler
from home.src.index.channel import YoutubeChannel, channel_overwrites
from home.src.index.generic import Pagination
from home.src.index.playlist import YoutubePlaylist
from home.src.index.reindex import ReindexProgress
from home.src.ta.config import AppConfig, ScheduleBuilder
from home.src.ta.helper import UrlListParser, time_parser
from home.src.ta.ta_redis import RedisArchivist
@ -569,17 +570,18 @@ class ChannelIdAboutView(ChannelIdBaseView):
self.initiate_vars(request)
self.channel_has_pending(channel_id)
path = f"ta_channel/_doc/{channel_id}"
response, _ = ElasticWrap(path).get()
response, _ = ElasticWrap(f"ta_channel/_doc/{channel_id}").get()
channel_info = SearchProcess(response).process()
channel_name = channel_info["channel_name"]
reindex = ReindexProgress(
request_type="channel", request_id=channel_id
).get_progress()
self.context.update(
{
"title": "Channel: About " + channel_name,
"title": "Channel: About " + channel_info["channel_name"],
"channel_info": channel_info,
"channel_overwrite_form": ChannelOverwriteForm,
"reindex": reindex.get("state"),
}
)
@ -706,12 +708,17 @@ class PlaylistIdView(ArchivistResultsView):
self._update_view_data(playlist_id, playlist_info)
self.find_results()
self.match_progress()
reindex = ReindexProgress(
request_type="playlist", request_id=playlist_id
).get_progress()
self.context.update(
{
"title": "Playlist: " + playlist_name,
"playlist_info": playlist_info,
"playlist_name": playlist_name,
"channel_info": channel_info,
"reindex": reindex.get("state"),
}
)
return render(request, "home/playlist_id.html", self.context)
@ -844,11 +851,8 @@ class VideoView(View):
def get(self, request, video_id):
"""get single video"""
config_handler = AppConfig(request.user.id)
position = time_parser(request.GET.get("t"))
path = f"ta_video/_doc/{video_id}"
look_up = SearchHandler(path, config=False)
video_hit = look_up.get_data()
video_data = video_hit[0]["source"]
look_up = SearchHandler(f"ta_video/_doc/{video_id}", config=False)
video_data = look_up.get_data()[0]["source"]
try:
rating = video_data["stats"]["average_rating"]
video_data["stats"]["average_rating"] = self.star_creator(rating)
@ -861,16 +865,20 @@ class VideoView(View):
else:
playlist_nav = False
video_title = video_data["title"]
reindex = ReindexProgress(
request_type="video", request_id=video_id
).get_progress()
context = {
"video": video_data,
"playlist_nav": playlist_nav,
"title": video_title,
"title": video_data.get("title"),
"colors": config_handler.colors,
"cast": config_handler.config["application"]["enable_cast"],
"version": settings.TA_VERSION,
"config": config_handler.config,
"position": position,
"position": time_parser(request.GET.get("t")),
"reindex": reindex.get("state"),
}
return render(request, "home/video.html", context)

View File

@ -1,11 +1,11 @@
beautifulsoup4==4.11.1
celery==5.2.7
Django==4.1.3
Django==4.1.4
django-auth-ldap==4.1.0
django-cors-headers==3.13.0
djangorestframework==3.14.0
Pillow==9.3.0
redis==4.3.5
redis==4.4.0
requests==2.28.1
ryd-client==0.0.6
uWSGI==2.0.21

View File

@ -119,6 +119,10 @@ button:hover {
color: var(--main-bg);
}
.button-box {
padding: 5px 0;
}
.unsubscribe {
background-color: var(--accent-font-light);
}

View File

@ -147,6 +147,24 @@ function toggleCheckbox(checkbox) {
}, 500);
}
// start reindex task
function reindex(button) {
let apiEndpoint = '/api/refresh/';
if (button.getAttribute('data-extract-videos')) {
apiEndpoint += '?extract_videos=true';
}
let type = button.getAttribute('data-type');
let id = button.getAttribute('data-id');
let data = {};
data[type] = [id];
apiRequest(apiEndpoint, 'POST', data);
let message = document.createElement('p');
message.innerText = 'Reindex scheduled';
document.getElementById('reindex-button').replaceWith(message);
}
// download page buttons
function rescanPending() {
let payload = JSON.stringify({ rescan_pending: true });