handle hard task reset at startup

This commit is contained in:
simon 2023-03-23 09:49:06 +07:00
parent 3e2b2f2be1
commit 272972429b
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
3 changed files with 42 additions and 10 deletions

View File

@ -14,6 +14,7 @@ from home.src.es.snapshot import ElasticSnapshot
from home.src.ta.config import AppConfig, ReleaseVersion
from home.src.ta.helper import clear_dl_cache
from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.task_manager import TaskManager
TOPIC = """
@ -35,6 +36,7 @@ class Command(BaseCommand):
self._sync_redis_state()
self._make_folders()
self._release_locks()
self._clear_tasks()
self._clear_dl_cache()
self._version_check()
self._mig_index_setup()
@ -96,9 +98,23 @@ class Command(BaseCommand):
if not has_changed:
self.stdout.write(self.style.SUCCESS(" no locks found"))
def _clear_tasks(self):
"""clear tasks and messages"""
self.stdout.write("[4] clear task leftovers")
TaskManager().fail_pending()
redis_con = RedisArchivist()
to_delete = redis_con.list_keys("message:")
if to_delete:
for key in to_delete:
redis_con.del_message(key)
self.stdout.write(
self.style.SUCCESS(f" ✓ cleared {len(to_delete)} messages")
)
def _clear_dl_cache(self):
"""clear leftover files from dl cache"""
self.stdout.write("[4] clear leftover files from dl cache")
self.stdout.write("[5] clear leftover files from dl cache")
config = AppConfig().config
leftover_files = clear_dl_cache(config)
if leftover_files:
@ -110,7 +126,7 @@ class Command(BaseCommand):
def _version_check(self):
"""remove new release key if updated now"""
self.stdout.write("[5] check for first run after update")
self.stdout.write("[6] check for first run after update")
new_version = ReleaseVersion().is_updated()
if new_version:
self.stdout.write(

View File

@ -57,18 +57,23 @@ class RedisArchivist(RedisBase):
return json_str
def list_items(self, query):
"""list all matches"""
def list_keys(self, query):
"""return all key matches"""
reply = self.conn.execute_command(
"KEYS", self.NAME_SPACE + query + "*"
)
all_matches = [i.decode().lstrip(self.NAME_SPACE) for i in reply]
all_results = []
for match in all_matches:
json_str = self.get_message(match)
all_results.append(json_str)
if not reply:
return False
return all_results
return [i.decode().lstrip(self.NAME_SPACE) for i in reply]
def list_items(self, query):
"""list all matches"""
all_matches = self.list_keys(query)
if not all_matches:
return False
return [self.get_message(i) for i in all_matches]
def del_message(self, key):
"""delete key from redis"""

View File

@ -67,6 +67,17 @@ class TaskManager:
}
TaskRedis().set_key(task.request.id, message)
def fail_pending(self):
"""
mark all pending as failed,
run at startup to recover from hard reset
"""
all_results = self.get_all_results()
for result in all_results:
if result.get("status") == "PENDING":
result["status"] = "FAILED"
TaskRedis().set_key(result["task_id"], result, expire=True)
class TaskCommand:
"""run commands on task"""