add task manager integration
This commit is contained in:
parent
b49e87c468
commit
6328e316f4
|
@ -156,3 +156,28 @@ class RedisQueue(RedisBase):
|
|||
"""check if queue as at least one pending item"""
|
||||
result = self.conn.execute_command("LRANGE", self.key, 0, 0)
|
||||
return bool(result)
|
||||
|
||||
|
||||
class TaskRedis(RedisBase):
|
||||
"""interact with redis tasks"""
|
||||
|
||||
BASE = "celery-task-meta-"
|
||||
EXPIRE = 60 * 60 * 24
|
||||
|
||||
def get_all(self):
|
||||
"""return all tasks"""
|
||||
all_keys = self.conn.execute_command("KEYS", f"{self.BASE}*")
|
||||
return [i.decode().replace(self.BASE, "") for i in all_keys]
|
||||
|
||||
def get_single(self, task_id):
|
||||
"""return content of single task"""
|
||||
result = self.conn.execute_command("GET", self.BASE + task_id).decode()
|
||||
return json.loads(result)
|
||||
|
||||
def set_key(self, task_id, message, expire=False):
|
||||
"""set value for lock, initial or update"""
|
||||
key = f"{self.BASE}{task_id}"
|
||||
self.conn.execute_command("SET", key, json.dumps(message))
|
||||
|
||||
if expire:
|
||||
self.conn.execute_command("EXPIRE", key, self.EXPIRE)
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
"""
|
||||
functionality:
|
||||
- interact with in redis stored task results
|
||||
- handle threads and locks
|
||||
"""
|
||||
|
||||
from home.src.ta.ta_redis import TaskRedis
|
||||
|
||||
|
||||
class TaskManager:
|
||||
"""manage tasks"""
|
||||
|
||||
def get_all_results(self):
|
||||
"""return all task results"""
|
||||
handler = TaskRedis()
|
||||
all_keys = handler.get_all()
|
||||
if not all_keys:
|
||||
return False
|
||||
|
||||
return [handler.get_single(i) for i in all_keys]
|
||||
|
||||
def get_tasks_by_name(self, task_name):
|
||||
"""get all tasks by name"""
|
||||
all_results = self.get_all_results()
|
||||
|
||||
return [i for i in all_results if i.get("name") == task_name]
|
||||
|
||||
def get_task(self, task_id):
|
||||
"""get single task"""
|
||||
return TaskRedis().get_single(task_id)
|
||||
|
||||
def is_pending(self, task):
|
||||
"""check if task_name is pending, pass task object"""
|
||||
tasks = self.get_tasks_by_name(task.name)
|
||||
return bool([i for i in tasks if i.get("status") == "PENDING"])
|
||||
|
||||
def init(self, task):
|
||||
"""pass task object from bind task to set initial pending message"""
|
||||
message = {
|
||||
"status": "PENDING",
|
||||
"result": None,
|
||||
"traceback": None,
|
||||
"name": task.name,
|
||||
"task_id": task.request.id,
|
||||
}
|
||||
TaskRedis().set_key(task.request.id, message)
|
Loading…
Reference in New Issue