mirror of
https://github.com/tubearchivist/tubearchivist-server.git
synced 2024-11-14 16:10:12 +00:00
164 lines
4.7 KiB
Python
164 lines
4.7 KiB
Python
"""monitor redis for tasks to execute"""
|
|
|
|
import json
|
|
import subprocess
|
|
import os
|
|
|
|
from datetime import datetime
|
|
|
|
import redis
|
|
|
|
|
|
class RedisBase:
|
|
"""connection base for redis"""
|
|
|
|
REDIS_HOST = "localhost"
|
|
REDIS_PORT = 6379
|
|
NAME_SPACE = "ta:"
|
|
TASK_KEY = NAME_SPACE + "task:buildx"
|
|
|
|
def __init__(self):
|
|
self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT)
|
|
|
|
|
|
class Monitor(RedisBase):
|
|
"""look for messages"""
|
|
|
|
def get_tasks(self):
|
|
"""get task list"""
|
|
response = self.conn.execute_command("JSON.GET", self.TASK_KEY)
|
|
tasks = json.loads(response.decode())
|
|
return tasks
|
|
|
|
def bootstrap(self):
|
|
"""create custom builder"""
|
|
print("validate builder")
|
|
command = ["docker", "buildx", "inspect"]
|
|
output = subprocess.run(command, check=True, capture_output=True)
|
|
inspect = output.stdout.decode()
|
|
config = {}
|
|
lines = [i for i in inspect.split("\n") if i]
|
|
for line in lines:
|
|
key, value = line.split(":", maxsplit=1)
|
|
if value:
|
|
config[key.strip()] = value.strip()
|
|
|
|
if not config["Name"].startswith("tubearchivist"):
|
|
print("create tubearchivist builder")
|
|
self._create_builder()
|
|
else:
|
|
print("tubearchivist builder already created")
|
|
|
|
def create_queue(self):
|
|
"""set initial json object for queue"""
|
|
if self.conn.execute_command(f"EXISTS {self.TASK_KEY}"):
|
|
print(f"{self.TASK_KEY} already exists")
|
|
return
|
|
|
|
message = {
|
|
"created": int(datetime.now().strftime("%s")),
|
|
"tasks": {}
|
|
}
|
|
self.conn.execute_command(
|
|
"JSON.SET", self.TASK_KEY, ".", json.dumps(message)
|
|
)
|
|
|
|
@staticmethod
|
|
def _create_builder():
|
|
"""create buildx builder"""
|
|
base = ["docker", "buildx"]
|
|
subprocess.run(
|
|
base + ["create", "--name", "tubearchivist"], check=True
|
|
)
|
|
subprocess.run(base + ["use", "tubearchivist"], check=True)
|
|
subprocess.run(base + ["inspect", "--bootstrap"], check=True)
|
|
|
|
def check_stored(self):
|
|
"""check for any stored task since last watch"""
|
|
task = self.get_tasks()
|
|
if task["tasks"]:
|
|
print("found stored task:")
|
|
for task_name in task["tasks"]:
|
|
print(task_name)
|
|
Builder(task_name).run()
|
|
|
|
def watch(self):
|
|
"""watch for messages"""
|
|
print("waiting for tasks")
|
|
watcher = self.conn.pubsub()
|
|
watcher.subscribe(self.TASK_KEY)
|
|
for i in watcher.listen():
|
|
if i["type"] == "message":
|
|
task = i["data"].decode()
|
|
print(task)
|
|
Builder(task).run()
|
|
|
|
|
|
class Builder(RedisBase):
|
|
"""execute task"""
|
|
|
|
CLONE_BASE = "clone"
|
|
|
|
def __init__(self, task):
|
|
super().__init__()
|
|
self.task = task
|
|
self.task_detail = False
|
|
|
|
def run(self):
|
|
"""run all steps"""
|
|
self.get_task()
|
|
self.clone()
|
|
self.build()
|
|
self.remove_task()
|
|
|
|
def get_task(self):
|
|
"""get what to execute"""
|
|
print("get task from redis")
|
|
response = self.conn.execute_command("JSON.GET", self.TASK_KEY)
|
|
response_json = json.loads(response.decode())
|
|
self.task_detail = response_json["tasks"][self.task]
|
|
|
|
def clone(self):
|
|
"""clone repo to destination"""
|
|
if not self.task_detail["clone"]:
|
|
print("skip clone")
|
|
return
|
|
|
|
print("clone repo")
|
|
repo_dir = os.path.join(self.CLONE_BASE, self.task_detail["name"])
|
|
if os.path.exists(repo_dir):
|
|
command = ["git", "-C", repo_dir, "pull"]
|
|
else:
|
|
command = ["git", "clone", self.task_detail["clone"], repo_dir]
|
|
|
|
subprocess.run(command, check=True)
|
|
|
|
def build(self):
|
|
"""build the container"""
|
|
command_list = self.task_detail["build"]
|
|
if all(isinstance(i, list) for i in command_list):
|
|
for command in command_list:
|
|
print(f"running: {command}")
|
|
subprocess.run(command, check=True)
|
|
else:
|
|
command = ["docker", "buildx"] + self.task_detail["build"]
|
|
command.append(os.path.join(self.CLONE_BASE, self.task))
|
|
print(f"running: {command}")
|
|
subprocess.run(command, check=True)
|
|
|
|
def remove_task(self):
|
|
"""remove task from redis queue"""
|
|
print("remove task from redis")
|
|
self.conn.json().delete(self.TASK_KEY, f".tasks.{self.task}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
handler = Monitor()
|
|
handler.bootstrap()
|
|
handler.create_queue()
|
|
handler.check_stored()
|
|
try:
|
|
handler.watch()
|
|
except KeyboardInterrupt:
|
|
print(" [X] cancle watch")
|