Compare commits
13 commits
012b10f7d5
...
2327d5388d
Author | SHA1 | Date | |
---|---|---|---|
2327d5388d | |||
094d66bc33 | |||
6fab5ce9b4 | |||
0d77038392 | |||
2c6bec253c | |||
eb0f862125 | |||
b05de437d0 | |||
313cc30aec | |||
149ed2dc3b | |||
e9d228ba2c | |||
3940d32195 | |||
2e00140e82 | |||
b9754b5dde |
10 changed files with 196 additions and 50 deletions
|
@ -5,7 +5,11 @@ packages: list[str] = []
|
||||||
|
|
||||||
|
|
||||||
def need(command: str) -> None:
|
def need(command: str) -> None:
|
||||||
"""Assert package dependency for a command."""
|
"""Assert package dependency for a command.
|
||||||
|
|
||||||
|
:param command: name of the requested command
|
||||||
|
:type command: str
|
||||||
|
"""
|
||||||
package: str | None
|
package: str | None
|
||||||
match command:
|
match command:
|
||||||
case "debootstrap":
|
case "debootstrap":
|
||||||
|
|
|
@ -1,16 +1,24 @@
|
||||||
"""Wrap SquashFS commands."""
|
"""Wrap SquashFS commands."""
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from rwx import cmd, ps
|
from rwx import cmd, ps
|
||||||
|
|
||||||
cmd.need("mksquashfs")
|
cmd.need("mksquashfs")
|
||||||
|
|
||||||
|
|
||||||
def mksquashfs(input_root: str, output_file: str) -> None:
|
def mksquashfs(input_root: Path, output_file: Path) -> None:
|
||||||
"""Make a SquashFS bootable image file."""
|
"""Make a SquashFS bootable image file.
|
||||||
|
|
||||||
|
:param input_root: ?
|
||||||
|
:type input_root: Path
|
||||||
|
:param output_file: ?
|
||||||
|
:type output_file: Path
|
||||||
|
"""
|
||||||
ps.run(
|
ps.run(
|
||||||
"mksquashfs",
|
"mksquashfs",
|
||||||
input_root,
|
str(input_root),
|
||||||
output_file,
|
str(output_file),
|
||||||
"-comp",
|
"-comp",
|
||||||
"zstd",
|
"zstd",
|
||||||
"-Xcompression-level",
|
"-Xcompression-level",
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
"""Wrap Debian commands."""
|
"""Wrap Debian commands."""
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from rwx import cmd, ps
|
from rwx import cmd, ps
|
||||||
|
|
||||||
cmd.need("debootstrap")
|
cmd.need("debootstrap")
|
||||||
|
@ -8,14 +10,22 @@ BOOTSTRAP_ARCHITECTURE = "amd64"
|
||||||
BOOTSTRAP_VARIANT = "minbase"
|
BOOTSTRAP_VARIANT = "minbase"
|
||||||
|
|
||||||
|
|
||||||
def bootstrap(root_path: str, suite: str, mirror_location: str) -> None:
|
def bootstrap(root_path: Path, suite: str, mirror_location: str) -> None:
|
||||||
"""Boostrap a base operating filesystem."""
|
"""Boostrap a base operating filesystem.
|
||||||
command = [
|
|
||||||
("debootstrap",),
|
:param root_path: target output path
|
||||||
|
:type root_path: Path
|
||||||
|
:param suite: target distribution name
|
||||||
|
:type suite: str
|
||||||
|
:param mirror_location: source input repository
|
||||||
|
:type mirror_location: str
|
||||||
|
"""
|
||||||
|
command = (
|
||||||
|
"debootstrap",
|
||||||
("--arch", BOOTSTRAP_ARCHITECTURE),
|
("--arch", BOOTSTRAP_ARCHITECTURE),
|
||||||
("--variant", BOOTSTRAP_VARIANT),
|
("--variant", BOOTSTRAP_VARIANT),
|
||||||
(suite,),
|
suite,
|
||||||
(root_path,),
|
str(root_path),
|
||||||
(mirror_location,),
|
mirror_location,
|
||||||
]
|
)
|
||||||
ps.run(*command)
|
ps.run(*command)
|
||||||
|
|
|
@ -9,67 +9,121 @@ from rwx import ps
|
||||||
CHARSET = "UTF-8"
|
CHARSET = "UTF-8"
|
||||||
|
|
||||||
|
|
||||||
def create_image(file_path: str, size_bytes: int) -> None:
|
def create_image(file_path: Path, size_bytes: int) -> None:
|
||||||
"""Create a virtual device image file."""
|
"""Create a virtual device image file.
|
||||||
|
|
||||||
|
:param file_path: target image file
|
||||||
|
:type file_path: Path
|
||||||
|
:param size_bytes: virtual volume
|
||||||
|
:type size_bytes: int
|
||||||
|
"""
|
||||||
ps.run(
|
ps.run(
|
||||||
("qemu-img", "create"),
|
("qemu-img", "create"),
|
||||||
("-f", "qcow2"),
|
("-f", "qcow2"),
|
||||||
(file_path, str(size_bytes)),
|
(str(file_path), str(size_bytes)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def empty_file(path: Path) -> None:
|
def empty_file(path: Path) -> None:
|
||||||
"""Empty the file at provided path."""
|
"""Empty the file at provided path.
|
||||||
|
|
||||||
|
:param path: target file to empty
|
||||||
|
:type path: Path
|
||||||
|
"""
|
||||||
write(path, "")
|
write(path, "")
|
||||||
|
|
||||||
|
|
||||||
def get_mount_uuid(path: str) -> str:
|
def get_mount_uuid(path: Path) -> str:
|
||||||
"""Return the filesystem UUID of a mountpoint path."""
|
"""Return the filesystem UUID of a mountpoint path.
|
||||||
|
|
||||||
|
:param path: mountpoint path
|
||||||
|
:type path: Path
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
return ps.run_line(
|
return ps.run_line(
|
||||||
("findmnt",),
|
"findmnt",
|
||||||
("--noheadings",),
|
"--noheadings",
|
||||||
("--output", "UUID"),
|
("--output", "UUID"),
|
||||||
(path,),
|
str(path),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_path_mount(path: str) -> str:
|
def get_path_mount(path: Path) -> Path:
|
||||||
"""Return the mountpoint path of an arbitrary path."""
|
"""Return the mountpoint path of an arbitrary path.
|
||||||
return ps.run_line(
|
|
||||||
"stat",
|
:param path: arbitrary path
|
||||||
("--format", "%m"),
|
:type path: Path
|
||||||
path,
|
:rtype: Path
|
||||||
|
"""
|
||||||
|
return Path(
|
||||||
|
ps.run_line(
|
||||||
|
"stat",
|
||||||
|
("--format", "%m"),
|
||||||
|
str(path),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_path_uuid(path: str) -> str:
|
def get_path_uuid(path: Path) -> str:
|
||||||
"""Return the filesystem UUID of an arbitrary path."""
|
"""Return the filesystem UUID of an arbitrary path.
|
||||||
|
|
||||||
|
:param path: arbitrary path
|
||||||
|
:type path: Path
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
return get_mount_uuid(get_path_mount(path))
|
return get_mount_uuid(get_path_mount(path))
|
||||||
|
|
||||||
|
|
||||||
def make_directory(path: Path) -> None:
|
def make_directory(path: Path) -> None:
|
||||||
"""Make a directory (and its parents) from a path."""
|
"""Make a directory (and its parents) from a path.
|
||||||
|
|
||||||
|
:param path: directory to create
|
||||||
|
:type path: Path
|
||||||
|
"""
|
||||||
path.mkdir(exist_ok=True, parents=True)
|
path.mkdir(exist_ok=True, parents=True)
|
||||||
|
|
||||||
|
|
||||||
def read_file_bytes(file_path: Path) -> bytes:
|
def read_file_bytes(file_path: Path) -> bytes:
|
||||||
"""Read whole file bytes."""
|
"""Read whole file bytes.
|
||||||
|
|
||||||
|
:param file_path: source input file
|
||||||
|
:type file_path: Path
|
||||||
|
:rtype: bytes
|
||||||
|
"""
|
||||||
with file_path.open("br") as file_object:
|
with file_path.open("br") as file_object:
|
||||||
return file_object.read()
|
return file_object.read()
|
||||||
|
|
||||||
|
|
||||||
def read_file_lines(file_path: Path, charset: str = CHARSET) -> list[str]:
|
def read_file_lines(file_path: Path, charset: str = CHARSET) -> list[str]:
|
||||||
"""Read whole file lines."""
|
"""Read whole file lines.
|
||||||
|
|
||||||
|
:param file_path: source input file
|
||||||
|
:type file_path: Path
|
||||||
|
:param charset: charset to use for decoding input
|
||||||
|
:type charset: str
|
||||||
|
:rtype: list[str]
|
||||||
|
"""
|
||||||
return read_file_text(file_path, charset).split(os.linesep)
|
return read_file_text(file_path, charset).split(os.linesep)
|
||||||
|
|
||||||
|
|
||||||
def read_file_text(file_path: Path, charset: str = CHARSET) -> str:
|
def read_file_text(file_path: Path, charset: str = CHARSET) -> str:
|
||||||
"""Read whole file text."""
|
"""Read whole file text.
|
||||||
|
|
||||||
|
:param file_path: source input file
|
||||||
|
:type file_path: Path
|
||||||
|
:param charset: charset to use for decoding input
|
||||||
|
:type charset: str
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
return read_file_bytes(file_path).decode(charset)
|
return read_file_bytes(file_path).decode(charset)
|
||||||
|
|
||||||
|
|
||||||
def wipe(path: Path) -> None:
|
def wipe(path: Path) -> None:
|
||||||
"""Wipe provided path, whether directory or file."""
|
"""Wipe provided path, whether directory or file.
|
||||||
|
|
||||||
|
:param path: target path
|
||||||
|
:type path: Path
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
shutil.rmtree(path)
|
shutil.rmtree(path)
|
||||||
except NotADirectoryError:
|
except NotADirectoryError:
|
||||||
|
@ -79,6 +133,14 @@ def wipe(path: Path) -> None:
|
||||||
|
|
||||||
|
|
||||||
def write(file_path: Path, text: str, charset: str = CHARSET) -> None:
|
def write(file_path: Path, text: str, charset: str = CHARSET) -> None:
|
||||||
"""Write text into a file."""
|
"""Write text into a file.
|
||||||
|
|
||||||
|
:param file_path: target file path
|
||||||
|
:type file_path: Path
|
||||||
|
:param text: content to write
|
||||||
|
:type text: str
|
||||||
|
:param charset: charset to use for encoding ouput
|
||||||
|
:type charset: str
|
||||||
|
"""
|
||||||
with file_path.open(encoding=charset, mode="w") as file_object:
|
with file_path.open(encoding=charset, mode="w") as file_object:
|
||||||
file_object.write(text)
|
file_object.write(text)
|
||||||
|
|
|
@ -5,7 +5,12 @@ import sys
|
||||||
|
|
||||||
|
|
||||||
def get_file_logger(name: str) -> logging.Logger:
|
def get_file_logger(name: str) -> logging.Logger:
|
||||||
"""Return a file logger."""
|
"""Return a file logger.
|
||||||
|
|
||||||
|
:param name: arbitrary name
|
||||||
|
:type name: str
|
||||||
|
:rtype: logging.Logger
|
||||||
|
"""
|
||||||
# formatter
|
# formatter
|
||||||
items = [
|
items = [
|
||||||
"%(name)s: %(asctime)s",
|
"%(name)s: %(asctime)s",
|
||||||
|
@ -27,7 +32,12 @@ def get_file_logger(name: str) -> logging.Logger:
|
||||||
|
|
||||||
|
|
||||||
def get_stream_logger(level: int) -> logging.Logger:
|
def get_stream_logger(level: int) -> logging.Logger:
|
||||||
"""Return a stream logger."""
|
"""Return a stream logger.
|
||||||
|
|
||||||
|
:param level: filtering level
|
||||||
|
:type level: int
|
||||||
|
:rtype: logging.Logger
|
||||||
|
"""
|
||||||
# handler
|
# handler
|
||||||
out_handler = logging.StreamHandler(stream=sys.stdout)
|
out_handler = logging.StreamHandler(stream=sys.stdout)
|
||||||
out_handler.setLevel(level)
|
out_handler.setLevel(level)
|
||||||
|
|
|
@ -8,7 +8,12 @@ from .debian import Debian
|
||||||
|
|
||||||
|
|
||||||
def from_path(path: Path) -> OS:
|
def from_path(path: Path) -> OS:
|
||||||
"""Initialize from an already existing path."""
|
"""Initialize from an already existing path.
|
||||||
|
|
||||||
|
:param path: source root directory
|
||||||
|
:type path: Path
|
||||||
|
:rtype: OS
|
||||||
|
"""
|
||||||
return Debian(path)
|
return Debian(path)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -10,10 +10,17 @@ class OS(Class, ABC):
|
||||||
"""Operating System."""
|
"""Operating System."""
|
||||||
|
|
||||||
def __init__(self, path: Path) -> None:
|
def __init__(self, path: Path) -> None:
|
||||||
"""Set root."""
|
"""Set root.
|
||||||
|
|
||||||
|
:param path: root directory
|
||||||
|
:type path: Path
|
||||||
|
"""
|
||||||
self.root = path
|
self.root = path
|
||||||
self.name = self.get_name()
|
self.name = self.get_name()
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_name(self) -> str:
|
def get_name(self) -> str:
|
||||||
"""Return mandatory name."""
|
"""Return mandatory name.
|
||||||
|
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
|
|
|
@ -16,8 +16,14 @@ class PM(Class, ABC):
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_clean_command(self) -> Command:
|
def get_clean_command(self) -> Command:
|
||||||
"""Command to clean packages cache."""
|
"""Command to clean packages cache.
|
||||||
|
|
||||||
|
:rtype: Command
|
||||||
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_install_command(self) -> Command:
|
def get_install_command(self) -> Command:
|
||||||
"""Command to install package(s)."""
|
"""Command to install package(s).
|
||||||
|
|
||||||
|
:rtype: Command
|
||||||
|
"""
|
||||||
|
|
|
@ -8,9 +8,15 @@ class APT(PM):
|
||||||
"""Advanced Package Tool."""
|
"""Advanced Package Tool."""
|
||||||
|
|
||||||
def get_clean_command(self) -> Command:
|
def get_clean_command(self) -> Command:
|
||||||
"""Return clean command."""
|
"""Return clean command.
|
||||||
|
|
||||||
|
:rtype: Command
|
||||||
|
"""
|
||||||
return Command()
|
return Command()
|
||||||
|
|
||||||
def get_install_command(self) -> Command:
|
def get_install_command(self) -> Command:
|
||||||
"""Return install command."""
|
"""Return install command.
|
||||||
|
|
||||||
|
:rtype: Command
|
||||||
|
"""
|
||||||
return Command()
|
return Command()
|
||||||
|
|
|
@ -9,13 +9,22 @@ class Command(Class):
|
||||||
"""Command to run."""
|
"""Command to run."""
|
||||||
|
|
||||||
def __init__(self, *arguments: str | tuple[str, ...]) -> None:
|
def __init__(self, *arguments: str | tuple[str, ...]) -> None:
|
||||||
"""Set raw & flat arguments."""
|
"""Set raw & flat arguments.
|
||||||
|
|
||||||
|
:param *arguments: single argument or grouped ones
|
||||||
|
:type *arguments: str | tuple[str, ...]
|
||||||
|
"""
|
||||||
self.raw = arguments
|
self.raw = arguments
|
||||||
self.flat: list[str] = []
|
self.flat: list[str] = []
|
||||||
|
|
||||||
|
|
||||||
def get_tuples_args(*items: str | tuple[str, ...]) -> list[str]:
|
def get_tuples_args(*items: str | tuple[str, ...]) -> list[str]:
|
||||||
"""Turn arguments tuples into an arguments list."""
|
"""Turn arguments tuples into an arguments list.
|
||||||
|
|
||||||
|
:param *items: single item or grouped ones
|
||||||
|
:type *items: str | tuple[str, ...]
|
||||||
|
:rtype: list[str]
|
||||||
|
"""
|
||||||
args: list[str] = []
|
args: list[str] = []
|
||||||
for item in items:
|
for item in items:
|
||||||
match item:
|
match item:
|
||||||
|
@ -27,14 +36,26 @@ def get_tuples_args(*items: str | tuple[str, ...]) -> list[str]:
|
||||||
|
|
||||||
|
|
||||||
def run(*items: str | tuple[str, ...]) -> subprocess.CompletedProcess:
|
def run(*items: str | tuple[str, ...]) -> subprocess.CompletedProcess:
|
||||||
"""Run from a list of arguments tuples."""
|
"""Run from a list of arguments tuples.
|
||||||
|
|
||||||
|
:param *items: single item or grouped ones
|
||||||
|
:type *items: str | tuple[str, ...]
|
||||||
|
:rtype: subprocess.CompletedProcess
|
||||||
|
"""
|
||||||
return subprocess.run(
|
return subprocess.run(
|
||||||
get_tuples_args(*items), capture_output=False, check=True
|
get_tuples_args(*items), capture_output=False, check=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def run_line(*items: str | tuple[str, ...], charset: str = txt.CHARSET) -> str:
|
def run_line(*items: str | tuple[str, ...], charset: str = txt.CHARSET) -> str:
|
||||||
"""Run and return output line."""
|
"""Run and return output line.
|
||||||
|
|
||||||
|
:param *items: single item or grouped ones
|
||||||
|
:type *items: str | tuple[str, ...]
|
||||||
|
:param charset: charset to use for decoding binary output
|
||||||
|
:type charset: str
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
line, *_ = run_lines(*items, charset=charset)
|
line, *_ = run_lines(*items, charset=charset)
|
||||||
return line
|
return line
|
||||||
|
|
||||||
|
@ -42,7 +63,14 @@ def run_line(*items: str | tuple[str, ...], charset: str = txt.CHARSET) -> str:
|
||||||
def run_lines(
|
def run_lines(
|
||||||
*items: str | tuple[str, ...], charset: str = txt.CHARSET
|
*items: str | tuple[str, ...], charset: str = txt.CHARSET
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
"""Run and return output lines."""
|
"""Run and return output lines.
|
||||||
|
|
||||||
|
:param *items: single item or grouped ones
|
||||||
|
:type *items: str | tuple[str, ...]
|
||||||
|
:param charset: charset to use for decoding binary output
|
||||||
|
:type charset: str
|
||||||
|
:rtype: list[str]
|
||||||
|
"""
|
||||||
process = subprocess.run(
|
process = subprocess.run(
|
||||||
get_tuples_args(*items), capture_output=True, check=True
|
get_tuples_args(*items), capture_output=True, check=True
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue