tubearchivist-server/builder/monitor.py

164 lines
4.7 KiB
Python

"""monitor redis for tasks to execute"""
import json
import subprocess
import os
from datetime import datetime
import redis
class RedisBase:
"""connection base for redis"""
REDIS_HOST = "localhost"
REDIS_PORT = 6379
NAME_SPACE = "ta:"
TASK_KEY = NAME_SPACE + "task:buildx"
def __init__(self):
self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT)
class Monitor(RedisBase):
"""look for messages"""
def get_tasks(self):
"""get task list"""
response = self.conn.execute_command("JSON.GET", self.TASK_KEY)
tasks = json.loads(response.decode())
return tasks
def bootstrap(self):
"""create custom builder"""
print("validate builder")
command = ["docker", "buildx", "inspect"]
output = subprocess.run(command, check=True, capture_output=True)
inspect = output.stdout.decode()
config = {}
lines = [i for i in inspect.split("\n") if i]
for line in lines:
key, value = line.split(":", maxsplit=1)
if value:
config[key.strip()] = value.strip()
if not config["Name"].startswith("tubearchivist"):
print("create tubearchivist builder")
self._create_builder()
else:
print("tubearchivist builder already created")
def create_queue(self):
"""set initial json object for queue"""
if self.conn.execute_command(f"EXISTS {self.TASK_KEY}"):
print(f"{self.TASK_KEY} already exists")
return
message = {
"created": int(datetime.now().strftime("%s")),
"tasks": {}
}
self.conn.execute_command(
"JSON.SET", self.TASK_KEY, ".", json.dumps(message)
)
@staticmethod
def _create_builder():
"""create buildx builder"""
base = ["docker", "buildx"]
subprocess.run(
base + ["create", "--name", "tubearchivist"], check=True
)
subprocess.run(base + ["use", "tubearchivist"], check=True)
subprocess.run(base + ["inspect", "--bootstrap"], check=True)
def check_stored(self):
"""check for any stored task since last watch"""
task = self.get_tasks()
if task["tasks"]:
print("found stored task:")
for task_name in task["tasks"]:
print(task_name)
Builder(task_name).run()
def watch(self):
"""watch for messages"""
print("waiting for tasks")
watcher = self.conn.pubsub()
watcher.subscribe(self.TASK_KEY)
for i in watcher.listen():
if i["type"] == "message":
task = i["data"].decode()
print(task)
Builder(task).run()
class Builder(RedisBase):
"""execute task"""
CLONE_BASE = "clone"
def __init__(self, task):
super().__init__()
self.task = task
self.task_detail = False
def run(self):
"""run all steps"""
self.get_task()
self.clone()
self.build()
self.remove_task()
def get_task(self):
"""get what to execute"""
print("get task from redis")
response = self.conn.execute_command("JSON.GET", self.TASK_KEY)
response_json = json.loads(response.decode())
self.task_detail = response_json["tasks"][self.task]
def clone(self):
"""clone repo to destination"""
if not self.task_detail["clone"]:
print("skip clone")
return
print("clone repo")
repo_dir = os.path.join(self.CLONE_BASE, self.task_detail["name"])
if os.path.exists(repo_dir):
command = ["git", "-C", repo_dir, "pull"]
else:
command = ["git", "clone", self.task_detail["clone"], repo_dir]
subprocess.run(command, check=True)
def build(self):
"""build the container"""
command_list = self.task_detail["build"]
if all(isinstance(i, list) for i in command_list):
for command in command_list:
print(f"running: {command}")
subprocess.run(command, check=True)
else:
command = ["docker", "buildx"] + self.task_detail["build"]
command.append(os.path.join(self.CLONE_BASE, self.task))
print(f"running: {command}")
subprocess.run(command, check=True)
def remove_task(self):
"""remove task from redis queue"""
print("remove task from redis")
self.conn.json().delete(self.TASK_KEY, f".tasks.{self.task}")
if __name__ == "__main__":
handler = Monitor()
handler.bootstrap()
handler.create_queue()
handler.check_stored()
try:
handler.watch()
except KeyboardInterrupt:
print(" [X] cancle watch")