add media stream index startup migration

This commit is contained in:
simon 2023-04-15 18:27:03 +07:00
parent a17f05ef21
commit 3063236634
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 44 additions and 1 deletions

View File

@ -8,9 +8,10 @@ import os
from time import sleep from time import sleep
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from home.src.es.connect import ElasticWrap from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.es.index_setup import ElasitIndexWrap from home.src.es.index_setup import ElasitIndexWrap
from home.src.es.snapshot import ElasticSnapshot from home.src.es.snapshot import ElasticSnapshot
from home.src.index.video_streams import MediaStreamExtractor
from home.src.ta.config import AppConfig, ReleaseVersion from home.src.ta.config import AppConfig, ReleaseVersion
from home.src.ta.helper import clear_dl_cache from home.src.ta.helper import clear_dl_cache
from home.src.ta.ta_redis import RedisArchivist from home.src.ta.ta_redis import RedisArchivist
@ -42,6 +43,7 @@ class Command(BaseCommand):
self._mig_index_setup() self._mig_index_setup()
self._mig_snapshot_check() self._mig_snapshot_check()
self._mig_set_vid_type() self._mig_set_vid_type()
self._mig_set_streams()
def _sync_redis_state(self): def _sync_redis_state(self):
"""make sure redis gets new config.json values""" """make sure redis gets new config.json values"""
@ -193,3 +195,44 @@ class Command(BaseCommand):
self.stdout.write(response) self.stdout.write(response)
sleep(60) sleep(60)
raise CommandError(message) raise CommandError(message)
def _mig_set_streams(self):
"""migration: update from 0.3.5 to 0.3.6, set streams and media_size"""
self.stdout.write("[MIGRATION] index streams and media size")
videos = AppConfig().config["application"]["videos"]
data = {
"query": {
"bool": {"must_not": [{"exists": {"field": "streams"}}]}
},
"_source": ["media_url", "youtube_id"],
}
all_missing = IndexPaginate("ta_video", data).get_results()
if not all_missing:
self.stdout.write(" no videos need updating")
return
total = len(all_missing)
for idx, missing in enumerate(all_missing):
media_url = missing["media_url"]
youtube_id = missing["youtube_id"]
media_path = os.path.join(videos, media_url)
if not os.path.exists(media_path):
self.stdout.errors(f" file not found: {media_path}")
continue
media = MediaStreamExtractor(media_path)
vid_data = {
"doc": {
"streams": media.extract_metadata(),
"media_size": media.get_file_size(),
}
}
path = f"ta_video/_update/{youtube_id}"
response, status_code = ElasticWrap(path).post(data=vid_data)
if not status_code == 200:
self.stdout.errors(
f" update failed: {path}, {response}, {status_code}"
)
if idx % 100 == 0:
self.stdout.write(f" progress {idx}/{total}")