Compare commits

..

No commits in common. "98350281993d8821cba71087f7ad8ad83e9dded7" and "519613eeeb0d2715f41165a2c6d74c125e5f4856" have entirely different histories.

33 changed files with 115 additions and 771 deletions

View file

@ -1,18 +0,0 @@
on: [push]
jobs:
job:
container:
image: ${{vars.DOCKER}}debian:bookworm
steps:
- name: spcd
env:
SPCD: ${{vars.SPCD}}
SPCD_SSH_HOSTS: ${{vars.SPCD_SSH_HOSTS}}
SPCD_SSH_KEY: ${{secrets.SPCD_SSH_KEY}}
SPCD_TXT_LOCALE: ${{vars.SPCD_TXT_LOCALE}}
run: ${{vars.SPCD}}
#- run: spcd-check-project
- run: spcd-build-project
- run: spcd-browse-workspace
- run: spcd-synchronize

View file

@ -1,11 +0,0 @@
#! /usr/bin/env python3
"""Dummy build."""
from pathlib import Path
from rwx.fs import make_directory, write
if __name__ == "__main__":
out = Path(__file__).parent / "out" / "web"
make_directory(out)
write(out / "index.html", "rwx.rwx.work")

View file

@ -21,7 +21,7 @@ keywords = []
license-files = { paths = ["license.md"] }
name = "rwx"
readme = "readme.md"
requires-python = ">= 3.11"
requires-python = ">= 3.10"
[project.scripts]
# command = "package.module:function"
@ -31,12 +31,6 @@ requires-python = ">= 3.11"
[tool.hatch.version]
path = "rwx/__init__.py"
[tool.pydoclint]
allow-init-docstring = true
quiet = true
skip-checking-short-docstrings = false
style = "sphinx"
[tool.ruff]
line-length = 80

7
readme.fr.rst Normal file
View file

@ -0,0 +1,7 @@
******************
Read Write eXecute
******************
`English <readme.rst>`_ | `Français <readme.fr.rst>`_
Français

View file

@ -1,61 +0,0 @@
# Read Write eXecute
A tiny framework to read, write & execute things.
---
## Why
---
## How
---
## What
---
## Who
### By
* [Marc Beninca](https://marc.beninca.link)
### For
* myself
---
## Where
### Chat
* [Discord](https://discord.com/channels/983145051985154108/1255894474895134761)
* [IRC](ircs://irc.libera.chat/##rwx)
### Forge
* [Repository](https://forge.rwx.work/rwx.work/rwx)
* [RSS](https://forge.rwx.work/rwx.work/rwx.rss)
* [Workflows](https://forge.rwx.work/rwx.work/rwx/actions)
### Deployment
* [Site](https://rwx.rwx.work)
---
## When
### Task stack
* character constants for box drawing
* common __str__ function
* parse pyproject.toml to write commands
* write classes for
* steps bars to log
* system commands to run
* with single call of subprocess.run
* or alternate subprocess method?

8
readme.rst Normal file
View file

@ -0,0 +1,8 @@
Read Write eXecute
==================
`English <https://doc.rwx.work>`_
---------------------------------
`Français <https://doc.rwx.work>`_
----------------------------------

View file

@ -1,33 +1,3 @@
"""Read Write eXecute."""
__version__ = "0.0.1"
from os import linesep
class Class:
"""Root class."""
def __repr__(self) -> str:
"""Return machine-readable state.
:return: state
:rtype: str
"""
name = self.__class__.__name__
attributes = [
f"{k}={v!r}" for k, v in vars(self).items() if not k.startswith("_")
]
arguments = ", ".join(attributes)
return f"{name}({arguments})"
def __str__(self) -> str:
"""Return human-readable state.
:return: state
:rtype: str
"""
attributes = [
f"{k} = {v}" for k, v in vars(self).items() if not k.startswith("_")
]
return linesep.join(attributes)

View file

@ -4,12 +4,12 @@
from pathlib import Path
from rwx import fs
import fs
if __name__ == "__main__":
file_path: Path = Path(__file__).resolve()
root_path: Path = file_path.parent
directory_path: Path = root_path / "tmp"
file_path = Path(__file__).resolve()
root_path = file_path.parent
directory_path = root_path / "tmp"
file_path = directory_path / "file"
fs.wipe(directory_path)

View file

@ -1,13 +1,6 @@
"""Handle system arguments."""
import sys
def split() -> tuple[str, list[str]]:
"""Split command & actual arguments.
:return: both
:rtype: tuple[str, list[str]]
"""
command, *arguments = sys.argv
return command, arguments

View file

@ -1,16 +1,8 @@
"""Handle system commands & packages."""
commands: list[str] = []
packages: list[str] = []
def need(command: str) -> None:
"""Assert package dependency for a command.
:param command: name of the requested command
:type command: str
"""
package: str | None
match command:
case "debootstrap":
package = "debootstrap"

View file

@ -1,26 +1,19 @@
"""Wrap SquashFS commands."""
import ps
from pathlib import Path
import rwx.cmd
from rwx import cmd, ps
cmd.need("mksquashfs")
rwx.cmd.need("mksquashfs")
def mksquashfs(input_root: Path, output_file: Path) -> None:
"""Make a SquashFS bootable image file.
:param input_root: ?
:type input_root: Path
:param output_file: ?
:type output_file: Path
"""
def mksquashfs(input_root: str, output_file: str):
ps.run(
"mksquashfs",
str(input_root),
str(output_file),
"-comp",
"zstd",
"-Xcompression-level",
str(18),
[
"mksquashfs",
input_root,
output_file,
"-comp",
"zstd",
"-Xcompression-level",
str(18),
]
)

View file

@ -1,8 +1,6 @@
"""Wrap Debian commands."""
import cmd
from pathlib import Path
from rwx import cmd, ps
import ps
cmd.need("debootstrap")
@ -10,22 +8,13 @@ BOOTSTRAP_ARCHITECTURE = "amd64"
BOOTSTRAP_VARIANT = "minbase"
def bootstrap(root_path: Path, suite: str, mirror_location: str) -> None:
"""Boostrap a base operating filesystem.
:param root_path: target output path
:type root_path: Path
:param suite: target distribution name
:type suite: str
:param mirror_location: source input repository
:type mirror_location: str
"""
command = (
"debootstrap",
def bootstrap(root_path: str, suite: str, mirror_location: str):
command = [
("debootstrap",),
("--arch", BOOTSTRAP_ARCHITECTURE),
("--variant", BOOTSTRAP_VARIANT),
suite,
str(root_path),
mirror_location,
)
ps.run(*command)
(suite,),
(root_path,),
(mirror_location,),
]
return ps.run(command)

View file

@ -1,7 +1,2 @@
"""Handle errors."""
from rwx import Class
class Error(Class, Exception):
"""Parent class for all errors."""
class Exception(Exception):
pass

View file

@ -1,160 +1,70 @@
"""Operations involving FileSystems."""
import os
import shutil
import tomllib
from pathlib import Path
from rwx import ps
CHARSET = "UTF-8"
def create_image(file_path: Path, size_bytes: int) -> None:
"""Create a virtual device image file.
:param file_path: target image file
:type file_path: Path
:param size_bytes: virtual volume
:type size_bytes: int
"""
def create_image(file_path: str, size_bytes: int):
ps.run(
("qemu-img", "create"),
("-f", "qcow2"),
(str(file_path), str(size_bytes)),
(file_path, size_bytes),
)
def empty_file(path: Path) -> None:
"""Empty the file at provided path.
:param path: target file to empty
:type path: Path
"""
def empty_file(path: str):
write(path, "")
def get_mount_uuid(path: Path) -> str:
"""Return the filesystem UUID of a mountpoint path.
:param path: mountpoint path
:type path: Path
:rtype: str
"""
def get_mount_uuid(path: str):
return ps.run_line(
"findmnt",
"--noheadings",
("findmnt",),
("--noheadings",),
("--output", "UUID"),
str(path),
(path,),
)
def get_path_mount(path: Path) -> Path:
"""Return the mountpoint path of an arbitrary path.
:param path: arbitrary path
:type path: Path
:rtype: Path
"""
return Path(
ps.run_line(
"stat",
("--format", "%m"),
str(path),
)
def get_path_mount(path: str):
return ps.run_line(
("stat",),
("--format", "%m"),
(path,),
)
def get_path_uuid(path: Path) -> str:
"""Return the filesystem UUID of an arbitrary path.
:param path: arbitrary path
:type path: Path
:rtype: str
"""
def get_path_uuid(path: str):
return get_mount_uuid(get_path_mount(path))
def make_directory(path: Path) -> None:
"""Make a directory (and its parents) from a path.
:param path: directory to create
:type path: Path
"""
path.mkdir(exist_ok=True, parents=True)
def make_directory(path: str):
os.makedirs(path, exist_ok=True)
def read_file_bytes(file_path: Path) -> bytes:
"""Read whole file bytes.
:param file_path: source input file
:type file_path: Path
:rtype: bytes
"""
with file_path.open("br") as file_object:
def read_file(file_path: str):
with open(file_path, "br") as file_object:
return file_object.read()
def read_file_dict(file_path: Path, charset: str = CHARSET) -> dict:
"""Read whole file as toml object.
:param file_path: source input file
:type file_path: Path
:param charset: charset to use for decoding input
:type charset: str
:rtype: dict
"""
text = read_file_text(file_path, charset)
return tomllib.loads(text)
def read_file_lines(file_path: str, charset=CHARSET):
return read_file_text(file_path).split(os.linesep)
def read_file_lines(file_path: Path, charset: str = CHARSET) -> list[str]:
"""Read whole file lines.
:param file_path: source input file
:type file_path: Path
:param charset: charset to use for decoding input
:type charset: str
:rtype: list[str]
"""
return read_file_text(file_path, charset).split(os.linesep)
def read_file_text(file_path: str, charset=CHARSET):
return read_file(file_path).decode(charset)
def read_file_text(file_path: Path, charset: str = CHARSET) -> str:
"""Read whole file text.
:param file_path: source input file
:type file_path: Path
:param charset: charset to use for decoding input
:type charset: str
:rtype: str
"""
return read_file_bytes(file_path).decode(charset)
def wipe(path: Path) -> None:
"""Wipe provided path, whether directory or file.
:param path: target path
:type path: Path
"""
def wipe(path: str):
try:
shutil.rmtree(path)
except NotADirectoryError:
path.unlink(missing_ok=True)
os.remove(path)
except FileNotFoundError:
pass
def write(file_path: Path, text: str, charset: str = CHARSET) -> None:
"""Write text into a file.
:param file_path: target file path
:type file_path: Path
:param text: content to write
:type text: str
:param charset: charset to use for encoding ouput
:type charset: str
"""
with file_path.open(encoding=charset, mode="w") as file_object:
file_object.write(text)
def write(file_path: str, text: str, charset=CHARSET):
with open(file_path, "bw") as file_object:
file_object.write(text.encode(charset))

View file

@ -1,6 +1,6 @@
"""Wrap GRUB commands."""
import cmd
from rwx import cmd, ps
import ps
cmd.need("grub-mkimage")
@ -24,21 +24,8 @@ def make_image(
memdisk_path: str,
pubkey_path: str | None = None,
) -> None:
"""Make a binary bootable image.
:param image_format: output format (x86_64-efi, i386-pc, arm64-efi)
:type image_format: str
:param image_path: output file
:type image_path: str
:param modules: modules to embed
:type modules: list[str]
:param memdisk_path: archive to include
:type memdisk_path: str
:param pubkey_path: extra public key to add
:type pubkey_path: str | None
"""
args: list[str | tuple[str, ...]] = [
"grub-mkimage",
args = [
("grub-mkimage",),
("--compress", COMPRESSION),
("--format", image_format),
("--output", image_path),
@ -47,6 +34,6 @@ def make_image(
if pubkey_path:
args.append(("--pubkey", pubkey_path))
args.extend(modules)
if extra_modules := MODULES.get(image_format):
args.extend(extra_modules)
if modules := MODULES.get(image_format):
args.extend(modules)
ps.run(*args)

View file

@ -1,50 +1,31 @@
"""Handle logging."""
import logging
import sys
def get_file_logger(name: str) -> logging.Logger:
"""Return a file logger.
:param name: arbitrary name
:type name: str
:rtype: logging.Logger
"""
# formatter
items = [
"%(name)s: %(asctime)s",
"%(levelname)s",
"%(filename)s:%(lineno)s",
"%(process)d >>> %(message)s",
]
template = " | ".join(items)
formatter = logging.Formatter(template)
# handler
formatter = logging.Formatter(
"%(name)s: %(asctime)s | %(levelname)s | %(filename)s:%(lineno)s | %(process)d >>> %(message)s",
)
#
out_handler = logging.StreamHandler(stream=sys.stdout)
out_handler.setFormatter(formatter)
out_handler.setLevel(logging.INFO)
# logger
#
logger = logging.getLogger(name)
logger.addHandler(out_handler)
logger.setLevel(logging.INFO)
#
return logger
def get_stream_logger(level: int) -> logging.Logger:
"""Return a stream logger.
:param level: filtering level
:type level: int
:rtype: logging.Logger
"""
# handler
out_handler = logging.StreamHandler(stream=sys.stdout)
out_handler.setLevel(level)
# logger
#
logger = logging.getLogger()
logger.addHandler(out_handler)
logger.setLevel(level)
#
return logger

View file

@ -1,20 +0,0 @@
"""Control Operating Systems."""
from os import sep
from pathlib import Path
from .abstract import OS
from .debian import Debian
def from_path(path: Path) -> OS:
"""Initialize from an already existing path.
:param path: source root directory
:type path: Path
:rtype: OS
"""
return Debian(path)
up = from_path(Path(sep))

View file

@ -1,26 +0,0 @@
"""Abstract Operating System."""
from abc import ABC, abstractmethod
from pathlib import Path
from rwx import Class
class OS(Class, ABC):
"""Operating System."""
def __init__(self, path: Path) -> None:
"""Set root.
:param path: root directory
:type path: Path
"""
self.root = path
self.name = self.get_name()
@abstractmethod
def get_name(self) -> str:
"""Return mandatory name.
:rtype: str
"""

View file

@ -1,14 +0,0 @@
"""Debian operating system."""
from .abstract import OS
class Debian(OS):
"""Debian operating system."""
def get_name(self) -> str:
"""Return name.
:rtype: str
"""
return "Debian"

View file

@ -1,29 +0,0 @@
"""Package Manager."""
from abc import ABC, abstractmethod
from rwx import Class
from rwx.ps import Command
class PM(Class, ABC):
"""Package Manager."""
def __init__(self) -> None:
"""Set commands."""
self.clean = self.get_clean_command()
self.install = self.get_install_command()
@abstractmethod
def get_clean_command(self) -> Command:
"""Command to clean packages cache.
:rtype: Command
"""
@abstractmethod
def get_install_command(self) -> Command:
"""Command to install package(s).
:rtype: Command
"""

View file

@ -1,22 +0,0 @@
"""Advanced Package Tool."""
from rwx.os.pm import PM
from rwx.ps import Command
class APT(PM):
"""Advanced Package Tool."""
def get_clean_command(self) -> Command:
"""Return clean command.
:rtype: Command
"""
return Command()
def get_install_command(self) -> Command:
"""Return install command.
:rtype: Command
"""
return Command()

View file

@ -1,20 +1,8 @@
"""Handle projects."""
from pathlib import Path
from rwx import Class
from os import path
class Project(Class):
"""Parent class for any type of project."""
def __init__(self, file: Path) -> None:
"""Set file, root & name.
:param file: root reference file
:type file: Path
"""
self.raw = file
self.file = self.raw.resolve()
self.root: Path = self.file.parent
self.name: str = self.root.name
class Project:
def __init__(self, file_path: str) -> None:
self.file: str = path.realpath(file_path)
self.root: str = path.dirname(self.file)
self.name: str = path.basename(self.root)

View file

@ -1,22 +1,17 @@
"""Project consisting only of a Sphinx documentation."""
from typing import TYPE_CHECKING
from os import path
from sphinx.cmd.build import build_main
from rwx.fs import wipe
from rwx.prj import Project
if TYPE_CHECKING:
from pathlib import Path
class SphinxProject(Project):
"""Child class for a project based on Sphinx."""
def __init__(self, file_path: str) -> None:
super().__init__(file_path)
def build(self) -> None:
"""Build the project."""
output_root: Path = self.root / "out"
def build(self):
output_root: str = path.join(self.root, "out")
wipe(output_root)
arguments: list[str] = [
"-E",
@ -27,13 +22,13 @@ class SphinxProject(Project):
"-D",
f"project={self.name}",
"-D",
"master_doc=index",
"master_doc={}".format("index"),
"-D",
"html_theme=sphinx_rtd_theme",
"html_theme={}".format("sphinx_rtd_theme"),
"-c",
str(self.root),
self.root,
# "-C",
str(self.root / self.name),
str(output_root / "web"),
path.join(self.root, self.name),
path.join(output_root, "web"),
]
build_main(arguments)

View file

@ -1,78 +1,32 @@
"""Handle processes."""
import subprocess
from rwx import Class, txt
from rwx import txt
class Command(Class):
"""Command to run."""
def __init__(self, *arguments: str | tuple[str, ...]) -> None:
"""Set raw & flat arguments.
:param *arguments: single argument or grouped ones
:type *arguments: str | tuple[str, ...]
"""
self.raw = arguments
self.flat: list[str] = []
def get_tuples_args(*items: str | tuple[str, ...]) -> list[str]:
"""Turn arguments tuples into an arguments list.
:param *items: single item or grouped ones
:type *items: str | tuple[str, ...]
:rtype: list[str]
"""
def get_tuples_args(tuples) -> list[str]:
args: list[str] = []
for item in items:
match item:
case str():
args.append(item)
case tuple():
args.extend(item)
for item in tuples:
if type(item) is tuple:
args.extend(item)
else:
args.append(item)
return args
def run(*items: str | tuple[str, ...]) -> subprocess.CompletedProcess:
"""Run from a list of arguments tuples.
:param *items: single item or grouped ones
:type *items: str | tuple[str, ...]
:rtype: subprocess.CompletedProcess
"""
def run(*tuples) -> subprocess.CompletedProcess:
return subprocess.run(
get_tuples_args(*items), capture_output=False, check=True
get_tuples_args(tuples), capture_output=False, check=True
)
def run_line(*items: str | tuple[str, ...], charset: str = txt.CHARSET) -> str:
"""Run and return output line.
:param *items: single item or grouped ones
:type *items: str | tuple[str, ...]
:param charset: charset to use for decoding binary output
:type charset: str
:rtype: str
"""
line, *_ = run_lines(*items, charset=charset)
return line
def run_line(*tuples, charset: str = txt.CHARSET) -> str:
lines = run_lines(*get_tuples_args(tuples), charset=charset)
return lines[0]
def run_lines(
*items: str | tuple[str, ...], charset: str = txt.CHARSET
) -> list[str]:
"""Run and return output lines.
:param *items: single item or grouped ones
:type *items: str | tuple[str, ...]
:param charset: charset to use for decoding binary output
:type charset: str
:rtype: list[str]
"""
def run_lines(*tuples, charset: str = txt.CHARSET) -> list[str]:
process = subprocess.run(
get_tuples_args(*items), capture_output=True, check=True
get_tuples_args(tuples), capture_output=True, check=True
)
string = process.stdout.decode(charset)
return string.rstrip().splitlines()

View file

View file

@ -1 +0,0 @@
"""Configure FreeTube."""

View file

@ -1,29 +0,0 @@
"""FreeTube channels."""
from rwx import Class
class Channel(Class):
"""FreeTube channel."""
def __init__(self, uid: str, name: str) -> None:
"""Set uid & name.
:param uid: unique identifier
:type uid: str
:param name: label
:type name: str
"""
self.uid = uid
self.name = name
def to_db(self) -> str:
"""Return identifier as db.
:rtype: str
"""
return f"""\
{{\
"id":"{self.uid}"\
}}\
"""

View file

@ -1,21 +0,0 @@
"""Output FreeTube db."""
def to_db(value: object) -> str:
"""Render value as string.
:param value: value to render
:type value: object
:rtype: str
"""
match value:
case bool():
text = str(value).lower()
case dict():
sub = ",".join([f'"{i}":{to_db(v)}' for i, v in value.items()])
text = f"{{{sub}}}"
case float() | str():
text = f'"{value}"'
case _:
text = str(value)
return text

View file

@ -1,47 +0,0 @@
"""FreeTube playlists."""
from rwx import Class
from .videos import Video
class Playlist(Class):
"""FreeTube playlist."""
def __init__(self, uid: str, name: str) -> None:
"""Set uid & name.
:param uid: identifier
:type uid: str
:param name: label
:type name: str
"""
self.uid = uid
self.name = name
self.videos: list[Video] = []
def add(self, video: Video) -> None:
"""Add video.
:param video: video to add
:type video: Video
"""
self.videos.append(video)
def to_db(self) -> str:
"""Return identifier, name & videos.
:rtype: str
"""
videos = ",".join([video.to_db() for video in self.videos])
return f"""\
{{\
"_id":"{self.uid}"\
,\
"playlistName":"{self.name}"\
,\
"protected":true\
,\
"videos":[{videos}]\
}}\
"""

View file

@ -1,45 +0,0 @@
"""FreeTube profiles."""
from rwx import Class
from .channels import Channel
class Profile(Class):
"""FreeTube profile."""
def __init__(self, uid: str, name: str) -> None:
"""Set uid & name.
:param uid: unique identifier
:type uid: str
:param name: label
:type name: str
"""
self.id = uid
self.name = name
self.channels: list[Channel] = []
def add(self, channel: Channel) -> None:
"""Add channel.
:param channel: channel to add
:type channel: Channel
"""
self.channels.append(channel)
def to_db(self) -> str:
"""Return identifier, name & channels.
:rtype: str
"""
channels = ",".join([channel.to_db() for channel in self.channels])
return f"""\
{{\
"_id":"{self.id}"\
,\
"name":"{self.name}"\
,\
"subscriptions":[{channels}]\
}}\
"""

View file

@ -1,33 +0,0 @@
"""FreeTube settings."""
from rwx import Class
from .db import to_db
class Setting(Class):
"""FreeTube setting."""
def __init__(self, uid: str, value: object) -> None:
"""Set uid & value.
:param uid: unique identifier
:type uid: str
:param value: value
:type value: object
"""
self.uid = uid
self.value = value
def to_db(self) -> str:
"""Return uid & value as db string.
:rtype: str
"""
return f"""\
{{\
"_id":"{self.uid}"\
,\
"value":{to_db(self.value)}\
}}\
"""

View file

@ -1,33 +0,0 @@
"""FreeTube videos."""
from rwx import Class
class Video(Class):
"""FreeTube video."""
def __init__(self, uid: str, name: str) -> None:
"""Set id & name.
:param uid: identifier
:type uid: str
:param name: label
:type name: str
"""
self.uid = uid
self.name = name
def to_db(self) -> str:
"""Return identifier, zero length & title.
:rtype: str
"""
return f"""\
{{\
"videoId":"{self.uid}"\
,\
"lengthSeconds":0\
,\
"title":"{self.name}"\
}}\
"""

View file

@ -1,3 +1 @@
"""Handle text."""
CHARSET = "UTF-8"