Redirect and celery memory usage workaround, #build

Changed:
- Limit life span of worker to avoid building up memory usage
- Validate video ID at index, raise error on redirect
- Clean up subtitles on channel delete
This commit is contained in:
Simon 2024-01-15 12:06:43 +07:00
commit 27b6efcab7
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
6 changed files with 47 additions and 24 deletions

View File

@ -18,7 +18,7 @@ python manage.py ta_migpath
# start all tasks
nginx &
celery -A home.tasks worker --loglevel=INFO &
celery -A home.tasks worker --loglevel=INFO --max-tasks-per-child 10 &
celery -A home beat --loglevel=INFO \
-s "${BEAT_SCHEDULE_PATH:-${cachedir}/celerybeat-schedule}" &
uwsgi --ini uwsgi.ini

View File

@ -269,4 +269,4 @@ CORS_ALLOW_HEADERS = list(default_headers) + [
# TA application settings
TA_UPSTREAM = "https://github.com/tubearchivist/tubearchivist"
TA_VERSION = "v0.4.5"
TA_VERSION = "v0.4.6-unstable"

View File

@ -199,6 +199,15 @@ class YoutubeChannel(YouTubeItem):
}
_, _ = ElasticWrap("ta_comment/_delete_by_query").post(data)
def delete_es_subtitles(self):
"""delete all subtitles from this channel"""
data = {
"query": {
"term": {"subtitle_channel_id": {"value": self.youtube_id}}
}
}
_, _ = ElasticWrap("ta_subtitle/_delete_by_query").post(data)
def delete_playlists(self):
"""delete all indexed playlist from es"""
all_playlists = self.get_indexed_playlists()
@ -229,6 +238,7 @@ class YoutubeChannel(YouTubeItem):
print(f"{self.youtube_id}: delete indexed videos")
self.delete_es_videos()
self.delete_es_comments()
self.delete_es_subtitles()
self.del_in_es()
def index_channel_playlists(self):

View File

@ -177,6 +177,7 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
def _process_youtube_meta(self):
"""extract relevant fields from youtube"""
self._validate_id()
# extract
self.channel_id = self.youtube_meta["channel_id"]
upload_date = self.youtube_meta["upload_date"]
@ -202,6 +203,19 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
"active": True,
}
def _validate_id(self):
"""validate expected video ID, raise value error on mismatch"""
remote_id = self.youtube_meta["id"]
if not self.youtube_id == remote_id:
# unexpected redirect
message = (
f"[reindex][{self.youtube_id}] got an unexpected redirect "
+ f"to {remote_id}, you are probably getting blocked by YT. "
"See FAQ for more details."
)
raise ValueError(message)
def _add_channel(self):
"""add channel dict to video json_data"""
channel = ta_channel.YoutubeChannel(self.channel_id)

View File

@ -269,14 +269,13 @@ class ReleaseVersion:
REMOTE_URL = "https://www.tubearchivist.com/api/release/latest/"
NEW_KEY = "versioncheck:new"
def __init__(self):
self.local_version = settings.TA_VERSION
self.is_unstable = settings.TA_VERSION.endswith("-unstable")
self.remote_version = False
self.is_breaking = False
self.response = False
def __init__(self) -> None:
self.local_version: str = settings.TA_VERSION
self.is_unstable: bool = settings.TA_VERSION.endswith("-unstable")
self.remote_version: str = ""
self.is_breaking: bool = False
def check(self):
def check(self) -> None:
"""check version"""
print(f"[{self.local_version}]: look for updates")
self.get_remote_version()
@ -290,18 +289,18 @@ class ReleaseVersion:
RedisArchivist().set_message(self.NEW_KEY, message)
print(f"[{self.local_version}]: found new version {new_version}")
def get_local_version(self):
def get_local_version(self) -> str:
"""read version from local"""
return self.local_version
def get_remote_version(self):
def get_remote_version(self) -> None:
"""read version from remote"""
sleep(randint(0, 60))
self.response = requests.get(self.REMOTE_URL, timeout=20).json()
self.remote_version = self.response["release_version"]
self.is_breaking = self.response["breaking_changes"]
response = requests.get(self.REMOTE_URL, timeout=20).json()
self.remote_version = response["release_version"]
self.is_breaking = response["breaking_changes"]
def _has_update(self):
def _has_update(self) -> str | bool:
"""check if there is an update"""
remote_parsed = self._parse_version(self.remote_version)
local_parsed = self._parse_version(self.local_version)
@ -314,12 +313,12 @@ class ReleaseVersion:
return False
@staticmethod
def _parse_version(version):
def _parse_version(version) -> tuple[int, ...]:
"""return version parts"""
clean = version.rstrip("-unstable").lstrip("v")
return tuple((int(i) for i in clean.split(".")))
def is_updated(self):
def is_updated(self) -> str | bool:
"""check if update happened in the mean time"""
message = self.get_update()
if not message:
@ -334,15 +333,15 @@ class ReleaseVersion:
return False
def get_update(self):
def get_update(self) -> dict:
"""return new version dict if available"""
message = RedisArchivist().get_message(self.NEW_KEY)
if not message.get("status"):
return False
return {}
return message
def clear_fail(self):
def clear_fail(self) -> None:
"""clear key, catch previous error in v0.4.5"""
message = self.get_update()
if not message:

View File

@ -1,13 +1,13 @@
apprise==1.6.0
apprise==1.7.1
celery==5.3.6
Django==4.2.7
Django==4.2.9
django-auth-ldap==4.6.0
django-cors-headers==4.3.1
djangorestframework==3.14.0
Pillow==10.1.0
Pillow==10.2.0
redis==5.0.0
requests==2.31.0
ryd-client==0.0.6
uWSGI==2.0.23
whitenoise==6.6.0
yt-dlp @ git+https://github.com/yt-dlp/yt-dlp@6b5d93b0b0240e287389d1d43b2d5293e18aa4cc
yt-dlp==2023.12.30