better queue total management with score
This commit is contained in:
parent
5235af3d91
commit
db080e97bb
|
@ -348,11 +348,12 @@ class DownloadPostProcess:
|
|||
|
||||
def refresh_playlist(self) -> None:
|
||||
"""match videos with playlists"""
|
||||
total_playlist = self.add_playlists_to_refresh()
|
||||
self.add_playlists_to_refresh()
|
||||
|
||||
queue = RedisQueue(self.download.PLAYLIST_QUEUE)
|
||||
while True:
|
||||
playlist_id = queue.get_next()
|
||||
total = queue.max_score()
|
||||
playlist_id, idx = queue.get_next()
|
||||
if not playlist_id:
|
||||
break
|
||||
|
||||
|
@ -362,18 +363,16 @@ class DownloadPostProcess:
|
|||
if not self.download.task:
|
||||
continue
|
||||
|
||||
idx = total_playlist - queue.length()
|
||||
|
||||
channel_name = playlist.json_data["playlist_channel"]
|
||||
playlist_title = playlist.json_data["playlist_name"]
|
||||
message = [
|
||||
f"Post Processing Playlists for: {channel_name}",
|
||||
f"{playlist_title} [{idx}/{total_playlist}]",
|
||||
f"{playlist_title} [{idx}/{total}]",
|
||||
]
|
||||
progress = (idx) / total_playlist
|
||||
progress = (idx) / total
|
||||
self.download.task.send_progress(message, progress=progress)
|
||||
|
||||
def add_playlists_to_refresh(self) -> int:
|
||||
def add_playlists_to_refresh(self) -> None:
|
||||
"""add playlists to refresh"""
|
||||
if self.download.task:
|
||||
message = ["Post Processing Playlists", "Scanning for Playlists"]
|
||||
|
@ -383,8 +382,6 @@ class DownloadPostProcess:
|
|||
self._add_channel_playlists()
|
||||
self._add_video_playlists()
|
||||
|
||||
return RedisQueue(self.download.PLAYLIST_QUEUE).length()
|
||||
|
||||
def _add_playlist_sub(self):
|
||||
"""add subscribed playlists to refresh"""
|
||||
subs = PlaylistSubscription().get_playlists()
|
||||
|
@ -395,7 +392,7 @@ class DownloadPostProcess:
|
|||
"""add playlists from channels to refresh"""
|
||||
queue = RedisQueue(self.download.CHANNEL_QUEUE)
|
||||
while True:
|
||||
channel_id = queue.get_next()
|
||||
channel_id, _ = queue.get_next()
|
||||
if not channel_id:
|
||||
break
|
||||
|
||||
|
@ -424,9 +421,9 @@ class DownloadPostProcess:
|
|||
def match_videos(self) -> None:
|
||||
"""scan rest of indexed playlists to match videos"""
|
||||
queue = RedisQueue(self.download.PLAYLIST_QUICK)
|
||||
total_playlist = queue.length()
|
||||
while True:
|
||||
playlist_id = queue.get_next()
|
||||
total = queue.max_score()
|
||||
playlist_id, idx = queue.get_next()
|
||||
if not playlist_id:
|
||||
break
|
||||
|
||||
|
@ -438,12 +435,11 @@ class DownloadPostProcess:
|
|||
if not self.download.task:
|
||||
continue
|
||||
|
||||
idx = total_playlist - queue.length()
|
||||
message = [
|
||||
"Post Processing Playlists.",
|
||||
f"Validate Playlists: - {idx}/{total_playlist}",
|
||||
f"Validate Playlists: - {idx}/{total}",
|
||||
]
|
||||
progress = (idx) / total_playlist
|
||||
progress = (idx) / total
|
||||
self.download.task.send_progress(message, progress=progress)
|
||||
|
||||
def get_comments(self):
|
||||
|
|
|
@ -267,7 +267,7 @@ class Reindex(ReindexBase):
|
|||
def reindex_index(self, name, index_config):
|
||||
"""reindex all of a single index"""
|
||||
reindex = self.get_reindex_map(index_config["index_name"])
|
||||
youtube_id = RedisQueue(index_config["queue_name"]).get_next()
|
||||
youtube_id, _ = RedisQueue(index_config["queue_name"]).get_next()
|
||||
if youtube_id:
|
||||
if self.task:
|
||||
self._notify(name, index_config)
|
||||
|
|
|
@ -138,6 +138,14 @@ class RedisQueue(RedisBase):
|
|||
mapping = {i[1]: next_score + i[0] for i in enumerate(to_add)}
|
||||
self.conn.zadd(self.key, mapping)
|
||||
|
||||
def max_score(self) -> float | False:
|
||||
"""get max score"""
|
||||
last = self.conn.zrange(self.key, -1, -1, withscores=True)
|
||||
if not last:
|
||||
return False
|
||||
|
||||
return last[0][1]
|
||||
|
||||
def _get_next_score(self) -> float:
|
||||
"""get next score in queue to append"""
|
||||
last = self.conn.zrange(self.key, -1, -1, withscores=True)
|
||||
|
@ -146,13 +154,15 @@ class RedisQueue(RedisBase):
|
|||
|
||||
return last[0][1] + 1
|
||||
|
||||
def get_next(self) -> str | bool:
|
||||
def get_next(self) -> tuple[str | False, int | False]:
|
||||
"""return next element in the queue, if available"""
|
||||
result = self.conn.zpopmin(self.key)
|
||||
if not result:
|
||||
return False
|
||||
return False, False
|
||||
|
||||
return result[0][0]
|
||||
item, idx = result[0]
|
||||
|
||||
return item, idx
|
||||
|
||||
def clear(self) -> None:
|
||||
"""delete list from redis"""
|
||||
|
|
Loading…
Reference in New Issue