"""Actions available for workflows.""" import os from ast import literal_eval from pathlib import Path from rwx import ps from rwx.log import stream as log from spcd.ci import project, projects from spcd.shell import env PREFIX = "INPUT_" def action() -> None: """Display action inputs.""" for variable, value in parse_inputs().items(): log.info("%s = %s", variable, value) def parse_inputs() -> dict[str, object]: """Parse inputs as a dictionary. :return: name & value pairs :rtype: dict[str, object] """ d = {} for variable, value in sorted(projects.environment.items()): if variable.startswith(PREFIX): d[variable.removeprefix(PREFIX)] = literal_eval(value) return d def synchronize(source: str | None = None, target: str | None = None) -> None: """Synchronize output towards a target. :param source: where to deploy from :type source: str | None :param target: where to deploy to :type target: str | None """ if not target: user = "cd" host = env.SPCD_PROJECT_PATH root = ( Path(os.sep) / user / projects.group / project.name / project.branch ) target = f"{user}@{host}:{root}" if not source: source = "out" ps.run( "rsync", "--archive", "--delete-before", "--verbose", f"{source}/", f"{target}/", )