2024-10-15 19:41:40 +02:00
|
|
|
"""Actions available for workflows."""
|
|
|
|
|
2025-02-22 23:47:16 +01:00
|
|
|
from __future__ import annotations
|
|
|
|
|
2024-10-15 19:41:40 +02:00
|
|
|
import os
|
2024-10-21 14:55:43 +02:00
|
|
|
from ast import literal_eval
|
2024-10-15 19:41:40 +02:00
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
from rwx import ps
|
2024-10-20 19:12:47 +02:00
|
|
|
from rwx.log import stream as log
|
2024-10-15 19:41:40 +02:00
|
|
|
|
|
|
|
from spcd.ci import project, projects
|
|
|
|
from spcd.shell import env
|
|
|
|
|
2024-10-21 14:55:43 +02:00
|
|
|
PREFIX = "INPUT_"
|
|
|
|
|
2024-10-15 19:41:40 +02:00
|
|
|
|
2024-10-20 19:12:47 +02:00
|
|
|
def action() -> None:
|
|
|
|
"""Display action inputs."""
|
2024-10-21 14:58:34 +02:00
|
|
|
for variable, value in parse_inputs().items():
|
|
|
|
log.info("%s = %s", variable, value)
|
2024-10-20 19:12:47 +02:00
|
|
|
|
|
|
|
|
2024-10-21 14:55:43 +02:00
|
|
|
def parse_inputs() -> dict[str, object]:
|
|
|
|
"""Parse inputs as a dictionary.
|
|
|
|
|
|
|
|
:return: name & value pairs
|
|
|
|
:rtype: dict[str, object]
|
|
|
|
"""
|
2024-10-21 17:25:39 +02:00
|
|
|
inputs = {}
|
2024-10-21 14:55:43 +02:00
|
|
|
for variable, value in sorted(projects.environment.items()):
|
|
|
|
if variable.startswith(PREFIX):
|
2024-10-21 15:59:45 +02:00
|
|
|
name = variable.removeprefix(PREFIX).lower()
|
2024-10-21 17:25:39 +02:00
|
|
|
inputs[name] = literal_eval(value)
|
|
|
|
return inputs
|
2024-10-21 14:55:43 +02:00
|
|
|
|
|
|
|
|
2024-10-20 18:50:50 +02:00
|
|
|
def synchronize(source: str | None = None, target: str | None = None) -> None:
|
2024-10-15 19:41:40 +02:00
|
|
|
"""Synchronize output towards a target.
|
|
|
|
|
|
|
|
:param source: where to deploy from
|
|
|
|
:type source: str | None
|
|
|
|
:param target: where to deploy to
|
|
|
|
:type target: str | None
|
|
|
|
"""
|
|
|
|
if not target:
|
|
|
|
user = "cd"
|
|
|
|
host = env.SPCD_PROJECT_PATH
|
|
|
|
root = (
|
|
|
|
Path(os.sep) / user / projects.group / project.name / project.branch
|
|
|
|
)
|
|
|
|
target = f"{user}@{host}:{root}"
|
|
|
|
if not source:
|
|
|
|
source = "out"
|
|
|
|
ps.run(
|
|
|
|
"rsync",
|
|
|
|
"--archive",
|
|
|
|
"--delete-before",
|
|
|
|
"--verbose",
|
|
|
|
f"{source}/",
|
|
|
|
f"{target}/",
|
|
|
|
)
|