From 61b04ba5cf7ffb6fd20a4b91332c4186d659f70b Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 24 Jul 2023 10:51:13 +0700 Subject: [PATCH] channel migration take 2 --- .../config/management/commands/ta_migpath.py | 153 ++++++++---------- 1 file changed, 67 insertions(+), 86 deletions(-) diff --git a/tubearchivist/config/management/commands/ta_migpath.py b/tubearchivist/config/management/commands/ta_migpath.py index 78b4c2fc..f4078003 100644 --- a/tubearchivist/config/management/commands/ta_migpath.py +++ b/tubearchivist/config/management/commands/ta_migpath.py @@ -25,99 +25,83 @@ class Command(BaseCommand): def handle(self, *args, **options): """run commands""" self.stdout.write(TOPIC) - need_migration = self.channels_need_migration() - if not need_migration: + + handler = FolderMigration() + to_migrate = handler.get_to_migrate() + if not to_migrate: self.stdout.write( - self.style.SUCCESS(" no channel migration needed") + self.style.SUCCESS(" no channel migration needed\n") ) return + self.stdout.write(self.style.SUCCESS(" migrating channels")) + total_channels = handler.create_folders(to_migrate) self.stdout.write( - self.style.SUCCESS(f" migrating {len(need_migration)} channels") - ) - for channel in need_migration: - channel_name = channel["channel_name"] - channel_id = channel["channel_id"] - self.stdout.write( - self.style.SUCCESS( - f" migrating {channel_name} [{channel_id}]" - ) - ) - ChannelMigration(channel).migrate() - - self.stdout.write( - self.style.SUCCESS(" ✓ channel migration completed") + self.style.SUCCESS(f" created {total_channels} channels") ) - def channels_need_migration(self): - """get channels that need migration""" - all_indexed = self._get_channel_indexed() - all_folders = self._get_channel_folders() - need_migration = [] - for channel in all_indexed: - if channel["channel_id"] not in all_folders: - need_migration.append(channel) + self.stdout.write( + self.style.SUCCESS(f" migrating {len(to_migrate)} videos") + ) + handler.migrate_videos(to_migrate) + self.stdout.write(self.style.SUCCESS(" update videos in index")) + handler.send_bulk() - return need_migration + self.stdout.write(self.style.SUCCESS(" cleanup old folders")) + handler.delete_old() - def _get_channel_indexed(self): - """get all channels indexed""" - all_results = IndexPaginate("ta_channel", False).get_results() - - return all_results - - def _get_channel_folders(self): - """get all channel folders""" - base_folder = AppConfig().config["application"]["videos"] - existing_folders = ignore_filelist(os.listdir(base_folder)) - - return existing_folders + self.stdout.write(self.style.SUCCESS(" ✓ migration completed\n")) -class ChannelMigration: - """migrate single channel""" +class FolderMigration: + """migrate video archive folder""" - def __init__(self, channel): - self.channel = channel + def __init__(self): self.config = AppConfig().config self.videos = self.config["application"]["videos"] self.bulk_list = [] - def migrate(self): - """run migration""" - self._create_new_folder() - all_videos = self.get_channel_videos() - self.migrate_videos(all_videos) - self.send_bulk() - self.delete_old(all_videos) + def get_to_migrate(self): + """get videos to migrate""" + script = ( + "doc['media_url'].value == " + + "doc['channel.channel_id'].value + '/'" + + " + doc['youtube_id'].value + '.mp4'" + ) + data = { + "query": {"bool": {"must_not": [{"script": {"script": script}}]}}, + "_source": [ + "youtube_id", + "media_url", + "channel.channel_id", + "subtitles", + ], + } + response = IndexPaginate("ta_video", data).get_results() - def _create_new_folder(self): - """create new channel id folder""" + return response + + def create_folders(self, to_migrate): + """create required channel folders""" host_uid = self.config["application"]["HOST_UID"] host_gid = self.config["application"]["HOST_GID"] - new_path = os.path.join(self.videos, self.channel["channel_id"]) - if not os.path.exists(new_path): - os.mkdir(new_path) + all_channel_ids = {i["channel"]["channel_id"] for i in to_migrate} + + for channel_id in all_channel_ids: + new_folder = os.path.join(self.videos, channel_id) + os.makedirs(new_folder, exist_ok=True) if host_uid and host_gid: - os.chown(new_path, host_uid, host_gid) + os.chown(new_folder, host_uid, host_gid) - def get_channel_videos(self): - """get all videos of channel""" - data = { - "query": { - "term": { - "channel.channel_id": {"value": self.channel["channel_id"]} - } - } - } - all_videos = IndexPaginate("ta_video", data).get_results() + return len(all_channel_ids) - return all_videos - - def migrate_videos(self, all_videos): + def migrate_videos(self, to_migrate): """migrate all videos of channel""" - for video in all_videos: + for video in to_migrate: new_media_url = self._move_video_file(video) + if not new_media_url: + continue + all_subtitles = self._move_subtitles(video) action = { "update": {"_id": video["youtube_id"], "_index": "ta_video"} @@ -137,9 +121,10 @@ class ChannelMigration: return False new_media_url = os.path.join( - self.channel["channel_id"], video["youtube_id"] + ".mp4" + video["channel"]["channel_id"], video["youtube_id"] + ".mp4" ) - os.rename(old_path, os.path.join(self.videos, new_media_url)) + new_path = os.path.join(self.videos, new_media_url) + os.rename(old_path, new_path) return new_media_url @@ -155,11 +140,12 @@ class ChannelMigration: print(f"did not find expected subtitle at {old_path}") continue - ext = ".".join(old_path.split(".")[-2:]) new_media_url = os.path.join( - self.channel["channel_id"], video["youtube_id"] + f".{ext}" + video["channel"]["channel_id"], + f"{video.get('youtube_id')}.{subtitle.get('lang')}.vtt", ) - os.rename(old_path, os.path.join(self.videos, new_media_url)) + new_path = os.path.join(self.videos, new_media_url) + os.rename(old_path, new_path) subtitle["media_url"] = new_media_url return all_subtitles @@ -176,15 +162,10 @@ class ChannelMigration: if not status == 200: print(response) - def delete_old(self, all_videos): - """delete old folder path if empty""" - if not all_videos: - return - - channel_name = os.path.split(all_videos[0]["media_url"])[0] - old_path = os.path.join(self.videos, channel_name) - if os.path.exists(old_path) and not os.listdir(old_path): - os.rmdir(old_path) - return - - print(f"failed to clean up old folder {old_path}") + def delete_old(self): + """delete old empty folders""" + all_folders = ignore_filelist(os.listdir(self.videos)) + for folder in all_folders: + folder_path = os.path.join(self.videos, folder) + if not ignore_filelist(os.listdir(folder_path)): + os.rmdir(folder_path)