channel migration take 2

This commit is contained in:
Simon 2023-07-24 10:51:13 +07:00
parent 2a60360f4a
commit 61b04ba5cf
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
1 changed files with 67 additions and 86 deletions

View File

@ -25,99 +25,83 @@ class Command(BaseCommand):
def handle(self, *args, **options):
"""run commands"""
self.stdout.write(TOPIC)
need_migration = self.channels_need_migration()
if not need_migration:
handler = FolderMigration()
to_migrate = handler.get_to_migrate()
if not to_migrate:
self.stdout.write(
self.style.SUCCESS(" no channel migration needed")
self.style.SUCCESS(" no channel migration needed\n")
)
return
self.stdout.write(self.style.SUCCESS(" migrating channels"))
total_channels = handler.create_folders(to_migrate)
self.stdout.write(
self.style.SUCCESS(f" migrating {len(need_migration)} channels")
)
for channel in need_migration:
channel_name = channel["channel_name"]
channel_id = channel["channel_id"]
self.stdout.write(
self.style.SUCCESS(
f" migrating {channel_name} [{channel_id}]"
)
)
ChannelMigration(channel).migrate()
self.stdout.write(
self.style.SUCCESS(" ✓ channel migration completed")
self.style.SUCCESS(f" created {total_channels} channels")
)
def channels_need_migration(self):
"""get channels that need migration"""
all_indexed = self._get_channel_indexed()
all_folders = self._get_channel_folders()
need_migration = []
for channel in all_indexed:
if channel["channel_id"] not in all_folders:
need_migration.append(channel)
self.stdout.write(
self.style.SUCCESS(f" migrating {len(to_migrate)} videos")
)
handler.migrate_videos(to_migrate)
self.stdout.write(self.style.SUCCESS(" update videos in index"))
handler.send_bulk()
return need_migration
self.stdout.write(self.style.SUCCESS(" cleanup old folders"))
handler.delete_old()
def _get_channel_indexed(self):
"""get all channels indexed"""
all_results = IndexPaginate("ta_channel", False).get_results()
return all_results
def _get_channel_folders(self):
"""get all channel folders"""
base_folder = AppConfig().config["application"]["videos"]
existing_folders = ignore_filelist(os.listdir(base_folder))
return existing_folders
self.stdout.write(self.style.SUCCESS(" ✓ migration completed\n"))
class ChannelMigration:
"""migrate single channel"""
class FolderMigration:
"""migrate video archive folder"""
def __init__(self, channel):
self.channel = channel
def __init__(self):
self.config = AppConfig().config
self.videos = self.config["application"]["videos"]
self.bulk_list = []
def migrate(self):
"""run migration"""
self._create_new_folder()
all_videos = self.get_channel_videos()
self.migrate_videos(all_videos)
self.send_bulk()
self.delete_old(all_videos)
def get_to_migrate(self):
"""get videos to migrate"""
script = (
"doc['media_url'].value == "
+ "doc['channel.channel_id'].value + '/'"
+ " + doc['youtube_id'].value + '.mp4'"
)
data = {
"query": {"bool": {"must_not": [{"script": {"script": script}}]}},
"_source": [
"youtube_id",
"media_url",
"channel.channel_id",
"subtitles",
],
}
response = IndexPaginate("ta_video", data).get_results()
def _create_new_folder(self):
"""create new channel id folder"""
return response
def create_folders(self, to_migrate):
"""create required channel folders"""
host_uid = self.config["application"]["HOST_UID"]
host_gid = self.config["application"]["HOST_GID"]
new_path = os.path.join(self.videos, self.channel["channel_id"])
if not os.path.exists(new_path):
os.mkdir(new_path)
all_channel_ids = {i["channel"]["channel_id"] for i in to_migrate}
for channel_id in all_channel_ids:
new_folder = os.path.join(self.videos, channel_id)
os.makedirs(new_folder, exist_ok=True)
if host_uid and host_gid:
os.chown(new_path, host_uid, host_gid)
os.chown(new_folder, host_uid, host_gid)
def get_channel_videos(self):
"""get all videos of channel"""
data = {
"query": {
"term": {
"channel.channel_id": {"value": self.channel["channel_id"]}
}
}
}
all_videos = IndexPaginate("ta_video", data).get_results()
return len(all_channel_ids)
return all_videos
def migrate_videos(self, all_videos):
def migrate_videos(self, to_migrate):
"""migrate all videos of channel"""
for video in all_videos:
for video in to_migrate:
new_media_url = self._move_video_file(video)
if not new_media_url:
continue
all_subtitles = self._move_subtitles(video)
action = {
"update": {"_id": video["youtube_id"], "_index": "ta_video"}
@ -137,9 +121,10 @@ class ChannelMigration:
return False
new_media_url = os.path.join(
self.channel["channel_id"], video["youtube_id"] + ".mp4"
video["channel"]["channel_id"], video["youtube_id"] + ".mp4"
)
os.rename(old_path, os.path.join(self.videos, new_media_url))
new_path = os.path.join(self.videos, new_media_url)
os.rename(old_path, new_path)
return new_media_url
@ -155,11 +140,12 @@ class ChannelMigration:
print(f"did not find expected subtitle at {old_path}")
continue
ext = ".".join(old_path.split(".")[-2:])
new_media_url = os.path.join(
self.channel["channel_id"], video["youtube_id"] + f".{ext}"
video["channel"]["channel_id"],
f"{video.get('youtube_id')}.{subtitle.get('lang')}.vtt",
)
os.rename(old_path, os.path.join(self.videos, new_media_url))
new_path = os.path.join(self.videos, new_media_url)
os.rename(old_path, new_path)
subtitle["media_url"] = new_media_url
return all_subtitles
@ -176,15 +162,10 @@ class ChannelMigration:
if not status == 200:
print(response)
def delete_old(self, all_videos):
"""delete old folder path if empty"""
if not all_videos:
return
channel_name = os.path.split(all_videos[0]["media_url"])[0]
old_path = os.path.join(self.videos, channel_name)
if os.path.exists(old_path) and not os.listdir(old_path):
os.rmdir(old_path)
return
print(f"failed to clean up old folder {old_path}")
def delete_old(self):
"""delete old empty folders"""
all_folders = ignore_filelist(os.listdir(self.videos))
for folder in all_folders:
folder_path = os.path.join(self.videos, folder)
if not ignore_filelist(os.listdir(folder_path)):
os.rmdir(folder_path)