spcd/cd/__init__.py
Marc Beninca f776b8468d
Some checks failed
/ job (push) Failing after 1m27s
step/log
2024-06-13 12:34:05 +02:00

159 lines
3.1 KiB
Python

"""Continuous Deployment."""
import os
from pathlib import Path
import env
from rwx import fs, ps
from rwx.log import stream as log
from cd.project import Project
from cd.projects import Projects
COMMANDS_PREFIX = "cd-"
projects = Projects()
project = Project(projects)
def cd_browse_workspace() -> None:
browse(project.root)
def cd_build_project() -> None:
for extension in ["py", "sh"]:
path = Path(project.root) / f"build.{extension}"
if path.exists():
ps.run(path)
break
else:
pass
def cd_clone_branch() -> None:
print(projects, end="")
split()
print(project, end="")
split()
print(
f"""\
{project.url}
""",
end="",
flush=True,
)
ps.run(
"git",
"clone",
"--branch",
project.branch,
"--",
project.url,
project.root,
)
def cd_list_environment() -> None:
for variable, value in sorted(projects.environment.items()):
print(variable, "=", value)
def cd_synchronize() -> None:
host = "rwx.work"
source = "out"
user = "cd"
#
root = Path(os.sep) / user / project.branch / projects.group / project.name
#
target = f"{user}@{host}:{root}"
ps.run(
"rsync",
"--archive",
"--delete-before",
"--verbose",
f"{source}/",
f"{target}/",
"--dry-run",
)
def browse(root: str) -> None:
paths = []
for directory, _, files in os.walk(root):
for file in files:
absolute_path = Path(directory) / file
relative_path = os.path.relpath(absolute_path, start=root)
paths.append(relative_path)
frame(root)
for path in sorted(paths):
print(path)
shut(root)
def cat(file: str) -> None:
frame(file)
print(fs.read_file_text(file).rstrip())
shut(file)
def install_commands(path: str) -> None:
step("Install commands")
user = Path("/usr/local/bin")
for command in [
"browse-workspace",
"build-project",
"clone-branch",
"list-environment",
"synchronize",
]:
print(command)
(user / f"{COMMANDS_PREFIX}{command}").symlink_to(path)
def set_ssh(*arguments: str) -> None:
step("Set SSH")
#
ssh_key, ssh_hosts = arguments
#
ssh_type = "ed25519"
#
home = Path("~").expanduser()
#
ssh = home / ".ssh"
ssh.mkdir(exist_ok=True, parents=True)
ssh.chmod(0o700)
#
key = ssh / f"id_{ssh_type}"
if ssh_key:
fs.write(key, ssh_key)
key.chmod(0o400)
#
known = ssh / "known_hosts"
if ssh_hosts:
fs.write(known, ssh_hosts)
known.chmod(0o400)
#
browse(ssh)
cat(known)
def frame(*arguments: str) -> None:
print(env.CD_OPEN, end="")
print(*arguments, flush=True)
def shut(*arguments: str) -> None:
print(env.CD_SHUT, end="")
print(*arguments, flush=True)
def split() -> None:
print(env.CD_SPLT, flush=True)
def step(*arguments: str) -> None:
env.CD_STEP += 1
log.log(env.CD_DOWN)
print(env.CD_VERT, env.CD_STEP, *arguments)
log.log(env.CD___UP)