refacter filesystem scanner

This commit is contained in:
simon 2023-05-20 16:07:33 +07:00
parent 5334d79d0d
commit 9b30c7df6e
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
2 changed files with 89 additions and 168 deletions

View File

@ -1,198 +1,85 @@
"""
Functionality:
- reindexing old documents
- syncing updated values between indexes
- scan the filesystem to delete or index
"""
import json
import os
from home.src.download.queue import PendingList
from home.src.es.connect import ElasticWrap
from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.index.comments import CommentList
from home.src.index.video import index_new_video
from home.src.index.video import YoutubeVideo, index_new_video
from home.src.ta.config import AppConfig
from home.src.ta.helper import clean_string, ignore_filelist
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from home.src.ta.helper import ignore_filelist
class ScannerBase:
"""scan the filesystem base class"""
class Scanner:
"""scan index and filesystem"""
CONFIG = AppConfig().config
VIDEOS = CONFIG["application"]["videos"]
def __init__(self):
self.to_index = False
self.to_delete = False
self.mismatch = False
self.to_rename = False
def scan(self):
"""entry point, scan and compare"""
all_downloaded = self._get_all_downloaded()
all_indexed = self._get_all_indexed()
self.list_comarison(all_downloaded, all_indexed)
def _get_all_downloaded(self):
"""get a list of all video files downloaded"""
channels = os.listdir(self.VIDEOS)
all_channels = ignore_filelist(channels)
all_channels.sort()
all_downloaded = []
for channel_name in all_channels:
channel_path = os.path.join(self.VIDEOS, channel_name)
channel_files = os.listdir(channel_path)
channel_files_clean = ignore_filelist(channel_files)
all_videos = [i for i in channel_files_clean if i.endswith(".mp4")]
for video in all_videos:
youtube_id = video[9:20]
all_downloaded.append((channel_name, video, youtube_id))
return all_downloaded
@staticmethod
def _get_all_indexed():
"""get a list of all indexed videos"""
index_handler = PendingList()
index_handler.get_download()
index_handler.get_indexed()
all_indexed = []
for video in index_handler.all_videos:
youtube_id = video["youtube_id"]
media_url = video["media_url"]
published = video["published"]
title = video["title"]
all_indexed.append((youtube_id, media_url, published, title))
return all_indexed
def list_comarison(self, all_downloaded, all_indexed):
"""compare the lists to figure out what to do"""
self._find_unindexed(all_downloaded, all_indexed)
self._find_missing(all_downloaded, all_indexed)
self._find_bad_media_url(all_downloaded, all_indexed)
def _find_unindexed(self, all_downloaded, all_indexed):
"""find video files without a matching document indexed"""
all_indexed_ids = [i[0] for i in all_indexed]
self.to_index = []
for downloaded in all_downloaded:
if downloaded[2] not in all_indexed_ids:
self.to_index.append(downloaded)
def _find_missing(self, all_downloaded, all_indexed):
"""find indexed videos without matching media file"""
all_downloaded_ids = [i[2] for i in all_downloaded]
self.to_delete = []
for video in all_indexed:
youtube_id = video[0]
if youtube_id not in all_downloaded_ids:
self.to_delete.append(video)
def _find_bad_media_url(self, all_downloaded, all_indexed):
"""rename media files not matching the indexed title"""
self.mismatch = []
self.to_rename = []
for downloaded in all_downloaded:
channel, filename, downloaded_id = downloaded
# find in indexed
for indexed in all_indexed:
indexed_id, media_url, published, title = indexed
if indexed_id == downloaded_id:
# found it
pub = published.replace("-", "")
expected = f"{pub}_{indexed_id}_{clean_string(title)}.mp4"
new_url = os.path.join(channel, expected)
if expected != filename:
# file to rename
self.to_rename.append((channel, filename, expected))
if media_url != new_url:
# media_url to update in es
self.mismatch.append((indexed_id, new_url))
break
class Filesystem(ScannerBase):
"""handle scanning and fixing from filesystem"""
VIDEOS = AppConfig().config["application"]["videos"]
def __init__(self, task=False):
super().__init__()
self.task = task
self.to_delete = False
self.to_index = False
def process(self):
"""entry point"""
def scan(self):
"""scan the filesystem"""
downloaded = self._get_downloaded()
indexed = self._get_indexed()
self.to_index = downloaded - indexed
self.to_delete = indexed - downloaded
def _get_downloaded(self):
"""get downloaded ids"""
if self.task:
self.task.send_progress(["Scanning your archive and index."])
self.scan()
self.rename_files()
self.send_mismatch_bulk()
self.delete_from_index()
self.add_missing()
self.task.send_progress(["Scan your filesystem for videos."])
def rename_files(self):
"""rename media files as identified by find_bad_media_url"""
if not self.to_rename:
return
downloaded = set()
channels = ignore_filelist(os.listdir(self.VIDEOS))
for channel in channels:
folder = os.path.join(self.VIDEOS, channel)
files = ignore_filelist(os.listdir(folder))
downloaded.update(set(i.split(".")[0] for i in files))
total = len(self.to_rename)
return downloaded
def _get_indexed(self):
"""get all indexed ids"""
if self.task:
self.task.send_progress([f"Rename {total} media files."])
for bad_filename in self.to_rename:
channel, filename, expected_filename = bad_filename
print(f"renaming [{filename}] to [{expected_filename}]")
old_path = os.path.join(self.VIDEOS, channel, filename)
new_path = os.path.join(self.VIDEOS, channel, expected_filename)
os.rename(old_path, new_path)
self.task.send_progress(["Get all videos indexed."])
def send_mismatch_bulk(self):
"""build bulk update"""
if not self.mismatch:
return
data = {"query": {"match_all": {}}, "_source": ["youtube_id"]}
response = IndexPaginate("ta_video", data).get_results()
return set(i["youtube_id"] for i in response)
total = len(self.mismatch)
if self.task:
self.task.send_progress([f"Fix media urls for {total} files"])
bulk_list = []
for video_mismatch in self.mismatch:
youtube_id, media_url = video_mismatch
print(f"{youtube_id}: fixing media url {media_url}")
action = {"update": {"_id": youtube_id, "_index": "ta_video"}}
source = {"doc": {"media_url": media_url}}
bulk_list.append(json.dumps(action))
bulk_list.append(json.dumps(source))
# add last newline
bulk_list.append("\n")
data = "\n".join(bulk_list)
_, _ = ElasticWrap("_bulk").post(data=data, ndjson=True)
def apply(self):
"""apply all changes"""
self.delete()
self.index()
self.url_fix()
def delete_from_index(self):
"""find indexed but deleted mediafile"""
def delete(self):
"""delete videos from index"""
if not self.to_delete:
print("nothing to delete")
return
total = len(self.to_delete)
if self.task:
self.task.send_progress([f"Clean up {total} items from index."])
for indexed in self.to_delete:
youtube_id = indexed[0]
print(f"deleting {youtube_id} from index")
path = f"ta_video/_doc/{youtube_id}"
_, _ = ElasticWrap(path).delete()
self.task.send_progress(
[f"Remove {len(self.to_delete)} videos from index."]
)
def add_missing(self):
"""add missing videos to index"""
video_ids = [i[2] for i in self.to_index]
if not video_ids:
for youtube_id in self.to_delete:
YoutubeVideo(youtube_id).delete_media_file()
def index(self):
"""index new"""
if not self.to_index:
print("nothing to index")
return
total = len(video_ids)
for idx, youtube_id in enumerate(video_ids):
total = len(self.to_index)
for idx, youtube_id in enumerate(self.to_index):
if self.task:
self.task.send_progress(
message_lines=[
@ -202,4 +89,36 @@ class Filesystem(ScannerBase):
)
index_new_video(youtube_id)
CommentList(video_ids, task=self.task).index()
CommentList(self.to_index, task=self.task).index()
def url_fix(self):
"""
update path v0.3.6 to v0.3.7
fix url not matching channel-videoid pattern
"""
bool_must = (
"doc['media_url'].value == "
+ "(doc['channel.channel_id'].value + '/' + "
+ "doc['youtube_id'].value) + '.mp4'"
)
to_update = (
"ctx._source['media_url'] = "
+ "ctx._source.channel['channel_id'] + '/' + "
+ "ctx._source['youtube_id'] + '.mp4'"
)
data = {
"query": {
"bool": {
"must_not": [{"script": {"script": {"source": bool_must}}}]
}
},
"script": {"source": to_update},
}
response, _ = ElasticWrap("ta_video/_update_by_query").post(data=data)
updated = response.get("updates")
if updated:
print(f"updated {updated} bad media_url")
if self.task:
self.task.send_progress(
[f"Updated {updated} wrong media urls."]
)

View File

@ -19,7 +19,7 @@ from home.src.download.yt_dlp_handler import VideoDownloader
from home.src.es.backup import ElasticBackup
from home.src.es.index_setup import ElasitIndexWrap
from home.src.index.channel import YoutubeChannel
from home.src.index.filesystem import Filesystem
from home.src.index.filesystem import Scanner
from home.src.index.manual import ImportFolderScanner
from home.src.index.reindex import Reindex, ReindexManual, ReindexPopulate
from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder
@ -290,7 +290,9 @@ def rescan_filesystem(self):
return
manager.init(self)
Filesystem(task=self).process()
handler = Scanner(task=self)
handler.scan()
handler.apply()
ThumbValidator(task=self).validate()