Compare commits
102 commits
519613eeeb
...
9835028199
Author | SHA1 | Date | |
---|---|---|---|
9835028199 | |||
1c4f52007c | |||
eb3c9814bb | |||
daccf01b8c | |||
5eae0334de | |||
edd8e15978 | |||
a92a3a786a | |||
46ec104b75 | |||
6ece695be4 | |||
2a01ef0066 | |||
45df475195 | |||
19153b3bc1 | |||
e61be6cf4f | |||
2327d5388d | |||
094d66bc33 | |||
6fab5ce9b4 | |||
0d77038392 | |||
2c6bec253c | |||
eb0f862125 | |||
b05de437d0 | |||
313cc30aec | |||
149ed2dc3b | |||
e9d228ba2c | |||
3940d32195 | |||
2e00140e82 | |||
b9754b5dde | |||
012b10f7d5 | |||
d6e112d6c5 | |||
622122cdef | |||
e5bb634cf4 | |||
8227524a7c | |||
487d7cb9b2 | |||
b7eec788f7 | |||
450e10e2f4 | |||
ca88e5bc1d | |||
26fea93bdb | |||
c300ab86d7 | |||
2f0c3a0487 | |||
db47d5c80a | |||
04eaae7ba3 | |||
5a25c6df27 | |||
f8567686fd | |||
63c179a0a4 | |||
304c2bc617 | |||
5038e587af | |||
73c7ac00f4 | |||
f2991cff04 | |||
0ad5cc9701 | |||
6f9ba7f8f7 | |||
4d8c1d7aab | |||
148f757d5d | |||
f4498f691c | |||
c5930b4107 | |||
e75a624c46 | |||
9c1136cfa0 | |||
909e652f0d | |||
4e369df232 | |||
cc0b131a56 | |||
d4d6098241 | |||
68cbe3cd88 | |||
2ad223fa83 | |||
51b4f0f5f2 | |||
378a212786 | |||
cd4e7403ae | |||
521884102e | |||
4fac21da08 | |||
56404078f7 | |||
114ee61102 | |||
e213285987 | |||
3a8b239b9f | |||
17b16ec9d0 | |||
b1970c1f78 | |||
0e2b638bbb | |||
c024c553b0 | |||
07699b08eb | |||
bf1b3402ae | |||
78dd431ca5 | |||
b3e66ec1c3 | |||
065d647e51 | |||
5e3c8e93ff | |||
cfa9ca9e3c | |||
de80d7ecff | |||
ef45fc9d81 | |||
0a9404fb2f | |||
3393f09907 | |||
aa1f06c20f | |||
3e0d5bf2bc | |||
08b6364d9b | |||
f7c1d90dfd | |||
ce3629a776 | |||
9bb8003812 | |||
f46d5a9765 | |||
88311cb55c | |||
8c896c9074 | |||
eb5a386e02 | |||
2663288127 | |||
12a2a82cfe | |||
076f654a2d | |||
63d30013a3 | |||
e306971534 | |||
2dfdabed26 | |||
5361fbd9a0 |
33 changed files with 771 additions and 115 deletions
18
.forgejo/workflows/main.yaml
Normal file
18
.forgejo/workflows/main.yaml
Normal file
|
@ -0,0 +1,18 @@
|
|||
on: [push]
|
||||
jobs:
|
||||
job:
|
||||
container:
|
||||
image: ${{vars.DOCKER}}debian:bookworm
|
||||
steps:
|
||||
- name: spcd
|
||||
env:
|
||||
SPCD: ${{vars.SPCD}}
|
||||
SPCD_SSH_HOSTS: ${{vars.SPCD_SSH_HOSTS}}
|
||||
SPCD_SSH_KEY: ${{secrets.SPCD_SSH_KEY}}
|
||||
SPCD_TXT_LOCALE: ${{vars.SPCD_TXT_LOCALE}}
|
||||
run: ${{vars.SPCD}}
|
||||
|
||||
#- run: spcd-check-project
|
||||
- run: spcd-build-project
|
||||
- run: spcd-browse-workspace
|
||||
- run: spcd-synchronize
|
11
build.py
Executable file
11
build.py
Executable file
|
@ -0,0 +1,11 @@
|
|||
#! /usr/bin/env python3
|
||||
"""Dummy build."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from rwx.fs import make_directory, write
|
||||
|
||||
if __name__ == "__main__":
|
||||
out = Path(__file__).parent / "out" / "web"
|
||||
make_directory(out)
|
||||
write(out / "index.html", "rwx.rwx.work")
|
|
@ -21,7 +21,7 @@ keywords = []
|
|||
license-files = { paths = ["license.md"] }
|
||||
name = "rwx"
|
||||
readme = "readme.md"
|
||||
requires-python = ">= 3.10"
|
||||
requires-python = ">= 3.11"
|
||||
|
||||
[project.scripts]
|
||||
# command = "package.module:function"
|
||||
|
@ -31,6 +31,12 @@ requires-python = ">= 3.10"
|
|||
[tool.hatch.version]
|
||||
path = "rwx/__init__.py"
|
||||
|
||||
[tool.pydoclint]
|
||||
allow-init-docstring = true
|
||||
quiet = true
|
||||
skip-checking-short-docstrings = false
|
||||
style = "sphinx"
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 80
|
||||
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
******************
|
||||
Read Write eXecute
|
||||
******************
|
||||
|
||||
`English <readme.rst>`_ | `Français <readme.fr.rst>`_
|
||||
|
||||
Français
|
61
readme.md
Normal file
61
readme.md
Normal file
|
@ -0,0 +1,61 @@
|
|||
# Read Write eXecute
|
||||
|
||||
A tiny framework to read, write & execute things.
|
||||
|
||||
---
|
||||
|
||||
## Why
|
||||
|
||||
---
|
||||
|
||||
## How
|
||||
|
||||
---
|
||||
|
||||
## What
|
||||
|
||||
---
|
||||
|
||||
## Who
|
||||
|
||||
### By
|
||||
|
||||
* [Marc Beninca](https://marc.beninca.link)
|
||||
|
||||
### For
|
||||
|
||||
* myself
|
||||
|
||||
---
|
||||
|
||||
## Where
|
||||
|
||||
### Chat
|
||||
|
||||
* [Discord](https://discord.com/channels/983145051985154108/1255894474895134761)
|
||||
* [IRC](ircs://irc.libera.chat/##rwx)
|
||||
|
||||
### Forge
|
||||
|
||||
* [Repository](https://forge.rwx.work/rwx.work/rwx)
|
||||
* [RSS](https://forge.rwx.work/rwx.work/rwx.rss)
|
||||
* [Workflows](https://forge.rwx.work/rwx.work/rwx/actions)
|
||||
|
||||
### Deployment
|
||||
|
||||
* [Site](https://rwx.rwx.work)
|
||||
|
||||
---
|
||||
|
||||
## When
|
||||
|
||||
### Task stack
|
||||
|
||||
* character constants for box drawing
|
||||
* common __str__ function
|
||||
* parse pyproject.toml to write commands
|
||||
* write classes for
|
||||
* steps bars to log
|
||||
* system commands to run
|
||||
* with single call of subprocess.run
|
||||
* or alternate subprocess method?
|
|
@ -1,8 +0,0 @@
|
|||
Read Write eXecute
|
||||
==================
|
||||
|
||||
`English <https://doc.rwx.work>`_
|
||||
---------------------------------
|
||||
|
||||
`Français <https://doc.rwx.work>`_
|
||||
----------------------------------
|
|
@ -1,3 +1,33 @@
|
|||
"""Read Write eXecute."""
|
||||
|
||||
__version__ = "0.0.1"
|
||||
|
||||
from os import linesep
|
||||
|
||||
|
||||
class Class:
|
||||
"""Root class."""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return machine-readable state.
|
||||
|
||||
:return: state
|
||||
:rtype: str
|
||||
"""
|
||||
name = self.__class__.__name__
|
||||
attributes = [
|
||||
f"{k}={v!r}" for k, v in vars(self).items() if not k.startswith("_")
|
||||
]
|
||||
arguments = ", ".join(attributes)
|
||||
return f"{name}({arguments})"
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return human-readable state.
|
||||
|
||||
:return: state
|
||||
:rtype: str
|
||||
"""
|
||||
attributes = [
|
||||
f"{k} = {v}" for k, v in vars(self).items() if not k.startswith("_")
|
||||
]
|
||||
return linesep.join(attributes)
|
||||
|
|
|
@ -4,12 +4,12 @@
|
|||
|
||||
from pathlib import Path
|
||||
|
||||
import fs
|
||||
from rwx import fs
|
||||
|
||||
if __name__ == "__main__":
|
||||
file_path = Path(__file__).resolve()
|
||||
root_path = file_path.parent
|
||||
directory_path = root_path / "tmp"
|
||||
file_path: Path = Path(__file__).resolve()
|
||||
root_path: Path = file_path.parent
|
||||
directory_path: Path = root_path / "tmp"
|
||||
file_path = directory_path / "file"
|
||||
|
||||
fs.wipe(directory_path)
|
||||
|
|
|
@ -1,6 +1,13 @@
|
|||
"""Handle system arguments."""
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
def split() -> tuple[str, list[str]]:
|
||||
"""Split command & actual arguments.
|
||||
|
||||
:return: both
|
||||
:rtype: tuple[str, list[str]]
|
||||
"""
|
||||
command, *arguments = sys.argv
|
||||
return command, arguments
|
||||
|
|
|
@ -1,8 +1,16 @@
|
|||
"""Handle system commands & packages."""
|
||||
|
||||
commands: list[str] = []
|
||||
packages: list[str] = []
|
||||
|
||||
|
||||
def need(command: str) -> None:
|
||||
"""Assert package dependency for a command.
|
||||
|
||||
:param command: name of the requested command
|
||||
:type command: str
|
||||
"""
|
||||
package: str | None
|
||||
match command:
|
||||
case "debootstrap":
|
||||
package = "debootstrap"
|
||||
|
|
|
@ -1,19 +1,26 @@
|
|||
import ps
|
||||
"""Wrap SquashFS commands."""
|
||||
|
||||
import rwx.cmd
|
||||
from pathlib import Path
|
||||
|
||||
rwx.cmd.need("mksquashfs")
|
||||
from rwx import cmd, ps
|
||||
|
||||
cmd.need("mksquashfs")
|
||||
|
||||
|
||||
def mksquashfs(input_root: str, output_file: str):
|
||||
def mksquashfs(input_root: Path, output_file: Path) -> None:
|
||||
"""Make a SquashFS bootable image file.
|
||||
|
||||
:param input_root: ?
|
||||
:type input_root: Path
|
||||
:param output_file: ?
|
||||
:type output_file: Path
|
||||
"""
|
||||
ps.run(
|
||||
[
|
||||
"mksquashfs",
|
||||
input_root,
|
||||
output_file,
|
||||
str(input_root),
|
||||
str(output_file),
|
||||
"-comp",
|
||||
"zstd",
|
||||
"-Xcompression-level",
|
||||
str(18),
|
||||
]
|
||||
)
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import cmd
|
||||
"""Wrap Debian commands."""
|
||||
|
||||
import ps
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import cmd, ps
|
||||
|
||||
cmd.need("debootstrap")
|
||||
|
||||
|
@ -8,13 +10,22 @@ BOOTSTRAP_ARCHITECTURE = "amd64"
|
|||
BOOTSTRAP_VARIANT = "minbase"
|
||||
|
||||
|
||||
def bootstrap(root_path: str, suite: str, mirror_location: str):
|
||||
command = [
|
||||
("debootstrap",),
|
||||
def bootstrap(root_path: Path, suite: str, mirror_location: str) -> None:
|
||||
"""Boostrap a base operating filesystem.
|
||||
|
||||
:param root_path: target output path
|
||||
:type root_path: Path
|
||||
:param suite: target distribution name
|
||||
:type suite: str
|
||||
:param mirror_location: source input repository
|
||||
:type mirror_location: str
|
||||
"""
|
||||
command = (
|
||||
"debootstrap",
|
||||
("--arch", BOOTSTRAP_ARCHITECTURE),
|
||||
("--variant", BOOTSTRAP_VARIANT),
|
||||
(suite,),
|
||||
(root_path,),
|
||||
(mirror_location,),
|
||||
]
|
||||
return ps.run(command)
|
||||
suite,
|
||||
str(root_path),
|
||||
mirror_location,
|
||||
)
|
||||
ps.run(*command)
|
||||
|
|
|
@ -1,2 +1,7 @@
|
|||
class Exception(Exception):
|
||||
pass
|
||||
"""Handle errors."""
|
||||
|
||||
from rwx import Class
|
||||
|
||||
|
||||
class Error(Class, Exception):
|
||||
"""Parent class for all errors."""
|
||||
|
|
|
@ -1,70 +1,160 @@
|
|||
"""Operations involving FileSystems."""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tomllib
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import ps
|
||||
|
||||
CHARSET = "UTF-8"
|
||||
|
||||
|
||||
def create_image(file_path: str, size_bytes: int):
|
||||
def create_image(file_path: Path, size_bytes: int) -> None:
|
||||
"""Create a virtual device image file.
|
||||
|
||||
:param file_path: target image file
|
||||
:type file_path: Path
|
||||
:param size_bytes: virtual volume
|
||||
:type size_bytes: int
|
||||
"""
|
||||
ps.run(
|
||||
("qemu-img", "create"),
|
||||
("-f", "qcow2"),
|
||||
(file_path, size_bytes),
|
||||
(str(file_path), str(size_bytes)),
|
||||
)
|
||||
|
||||
|
||||
def empty_file(path: str):
|
||||
def empty_file(path: Path) -> None:
|
||||
"""Empty the file at provided path.
|
||||
|
||||
:param path: target file to empty
|
||||
:type path: Path
|
||||
"""
|
||||
write(path, "")
|
||||
|
||||
|
||||
def get_mount_uuid(path: str):
|
||||
def get_mount_uuid(path: Path) -> str:
|
||||
"""Return the filesystem UUID of a mountpoint path.
|
||||
|
||||
:param path: mountpoint path
|
||||
:type path: Path
|
||||
:rtype: str
|
||||
"""
|
||||
return ps.run_line(
|
||||
("findmnt",),
|
||||
("--noheadings",),
|
||||
"findmnt",
|
||||
"--noheadings",
|
||||
("--output", "UUID"),
|
||||
(path,),
|
||||
str(path),
|
||||
)
|
||||
|
||||
|
||||
def get_path_mount(path: str):
|
||||
return ps.run_line(
|
||||
("stat",),
|
||||
def get_path_mount(path: Path) -> Path:
|
||||
"""Return the mountpoint path of an arbitrary path.
|
||||
|
||||
:param path: arbitrary path
|
||||
:type path: Path
|
||||
:rtype: Path
|
||||
"""
|
||||
return Path(
|
||||
ps.run_line(
|
||||
"stat",
|
||||
("--format", "%m"),
|
||||
(path,),
|
||||
str(path),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def get_path_uuid(path: str):
|
||||
def get_path_uuid(path: Path) -> str:
|
||||
"""Return the filesystem UUID of an arbitrary path.
|
||||
|
||||
:param path: arbitrary path
|
||||
:type path: Path
|
||||
:rtype: str
|
||||
"""
|
||||
return get_mount_uuid(get_path_mount(path))
|
||||
|
||||
|
||||
def make_directory(path: str):
|
||||
os.makedirs(path, exist_ok=True)
|
||||
def make_directory(path: Path) -> None:
|
||||
"""Make a directory (and its parents) from a path.
|
||||
|
||||
:param path: directory to create
|
||||
:type path: Path
|
||||
"""
|
||||
path.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
|
||||
def read_file(file_path: str):
|
||||
with open(file_path, "br") as file_object:
|
||||
def read_file_bytes(file_path: Path) -> bytes:
|
||||
"""Read whole file bytes.
|
||||
|
||||
:param file_path: source input file
|
||||
:type file_path: Path
|
||||
:rtype: bytes
|
||||
"""
|
||||
with file_path.open("br") as file_object:
|
||||
return file_object.read()
|
||||
|
||||
|
||||
def read_file_lines(file_path: str, charset=CHARSET):
|
||||
return read_file_text(file_path).split(os.linesep)
|
||||
def read_file_dict(file_path: Path, charset: str = CHARSET) -> dict:
|
||||
"""Read whole file as toml object.
|
||||
|
||||
:param file_path: source input file
|
||||
:type file_path: Path
|
||||
:param charset: charset to use for decoding input
|
||||
:type charset: str
|
||||
:rtype: dict
|
||||
"""
|
||||
text = read_file_text(file_path, charset)
|
||||
return tomllib.loads(text)
|
||||
|
||||
|
||||
def read_file_text(file_path: str, charset=CHARSET):
|
||||
return read_file(file_path).decode(charset)
|
||||
def read_file_lines(file_path: Path, charset: str = CHARSET) -> list[str]:
|
||||
"""Read whole file lines.
|
||||
|
||||
:param file_path: source input file
|
||||
:type file_path: Path
|
||||
:param charset: charset to use for decoding input
|
||||
:type charset: str
|
||||
:rtype: list[str]
|
||||
"""
|
||||
return read_file_text(file_path, charset).split(os.linesep)
|
||||
|
||||
|
||||
def wipe(path: str):
|
||||
def read_file_text(file_path: Path, charset: str = CHARSET) -> str:
|
||||
"""Read whole file text.
|
||||
|
||||
:param file_path: source input file
|
||||
:type file_path: Path
|
||||
:param charset: charset to use for decoding input
|
||||
:type charset: str
|
||||
:rtype: str
|
||||
"""
|
||||
return read_file_bytes(file_path).decode(charset)
|
||||
|
||||
|
||||
def wipe(path: Path) -> None:
|
||||
"""Wipe provided path, whether directory or file.
|
||||
|
||||
:param path: target path
|
||||
:type path: Path
|
||||
"""
|
||||
try:
|
||||
shutil.rmtree(path)
|
||||
except NotADirectoryError:
|
||||
os.remove(path)
|
||||
path.unlink(missing_ok=True)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
|
||||
def write(file_path: str, text: str, charset=CHARSET):
|
||||
with open(file_path, "bw") as file_object:
|
||||
file_object.write(text.encode(charset))
|
||||
def write(file_path: Path, text: str, charset: str = CHARSET) -> None:
|
||||
"""Write text into a file.
|
||||
|
||||
:param file_path: target file path
|
||||
:type file_path: Path
|
||||
:param text: content to write
|
||||
:type text: str
|
||||
:param charset: charset to use for encoding ouput
|
||||
:type charset: str
|
||||
"""
|
||||
with file_path.open(encoding=charset, mode="w") as file_object:
|
||||
file_object.write(text)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import cmd
|
||||
"""Wrap GRUB commands."""
|
||||
|
||||
import ps
|
||||
from rwx import cmd, ps
|
||||
|
||||
cmd.need("grub-mkimage")
|
||||
|
||||
|
@ -24,8 +24,21 @@ def make_image(
|
|||
memdisk_path: str,
|
||||
pubkey_path: str | None = None,
|
||||
) -> None:
|
||||
args = [
|
||||
("grub-mkimage",),
|
||||
"""Make a binary bootable image.
|
||||
|
||||
:param image_format: output format (x86_64-efi, i386-pc, arm64-efi)
|
||||
:type image_format: str
|
||||
:param image_path: output file
|
||||
:type image_path: str
|
||||
:param modules: modules to embed
|
||||
:type modules: list[str]
|
||||
:param memdisk_path: archive to include
|
||||
:type memdisk_path: str
|
||||
:param pubkey_path: extra public key to add
|
||||
:type pubkey_path: str | None
|
||||
"""
|
||||
args: list[str | tuple[str, ...]] = [
|
||||
"grub-mkimage",
|
||||
("--compress", COMPRESSION),
|
||||
("--format", image_format),
|
||||
("--output", image_path),
|
||||
|
@ -34,6 +47,6 @@ def make_image(
|
|||
if pubkey_path:
|
||||
args.append(("--pubkey", pubkey_path))
|
||||
args.extend(modules)
|
||||
if modules := MODULES.get(image_format):
|
||||
args.extend(modules)
|
||||
if extra_modules := MODULES.get(image_format):
|
||||
args.extend(extra_modules)
|
||||
ps.run(*args)
|
||||
|
|
|
@ -1,31 +1,50 @@
|
|||
"""Handle logging."""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
|
||||
def get_file_logger(name: str) -> logging.Logger:
|
||||
formatter = logging.Formatter(
|
||||
"%(name)s: %(asctime)s | %(levelname)s | %(filename)s:%(lineno)s | %(process)d >>> %(message)s",
|
||||
)
|
||||
#
|
||||
"""Return a file logger.
|
||||
|
||||
:param name: arbitrary name
|
||||
:type name: str
|
||||
:rtype: logging.Logger
|
||||
"""
|
||||
# formatter
|
||||
items = [
|
||||
"%(name)s: %(asctime)s",
|
||||
"%(levelname)s",
|
||||
"%(filename)s:%(lineno)s",
|
||||
"%(process)d >>> %(message)s",
|
||||
]
|
||||
template = " | ".join(items)
|
||||
formatter = logging.Formatter(template)
|
||||
# handler
|
||||
out_handler = logging.StreamHandler(stream=sys.stdout)
|
||||
out_handler.setFormatter(formatter)
|
||||
out_handler.setLevel(logging.INFO)
|
||||
#
|
||||
# logger
|
||||
logger = logging.getLogger(name)
|
||||
logger.addHandler(out_handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
#
|
||||
return logger
|
||||
|
||||
|
||||
def get_stream_logger(level: int) -> logging.Logger:
|
||||
"""Return a stream logger.
|
||||
|
||||
:param level: filtering level
|
||||
:type level: int
|
||||
:rtype: logging.Logger
|
||||
"""
|
||||
# handler
|
||||
out_handler = logging.StreamHandler(stream=sys.stdout)
|
||||
out_handler.setLevel(level)
|
||||
#
|
||||
# logger
|
||||
logger = logging.getLogger()
|
||||
logger.addHandler(out_handler)
|
||||
logger.setLevel(level)
|
||||
#
|
||||
return logger
|
||||
|
||||
|
||||
|
|
20
rwx/os/__init__.py
Normal file
20
rwx/os/__init__.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
"""Control Operating Systems."""
|
||||
|
||||
from os import sep
|
||||
from pathlib import Path
|
||||
|
||||
from .abstract import OS
|
||||
from .debian import Debian
|
||||
|
||||
|
||||
def from_path(path: Path) -> OS:
|
||||
"""Initialize from an already existing path.
|
||||
|
||||
:param path: source root directory
|
||||
:type path: Path
|
||||
:rtype: OS
|
||||
"""
|
||||
return Debian(path)
|
||||
|
||||
|
||||
up = from_path(Path(sep))
|
26
rwx/os/abstract.py
Normal file
26
rwx/os/abstract.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
"""Abstract Operating System."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import Class
|
||||
|
||||
|
||||
class OS(Class, ABC):
|
||||
"""Operating System."""
|
||||
|
||||
def __init__(self, path: Path) -> None:
|
||||
"""Set root.
|
||||
|
||||
:param path: root directory
|
||||
:type path: Path
|
||||
"""
|
||||
self.root = path
|
||||
self.name = self.get_name()
|
||||
|
||||
@abstractmethod
|
||||
def get_name(self) -> str:
|
||||
"""Return mandatory name.
|
||||
|
||||
:rtype: str
|
||||
"""
|
14
rwx/os/debian.py
Normal file
14
rwx/os/debian.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
"""Debian operating system."""
|
||||
|
||||
from .abstract import OS
|
||||
|
||||
|
||||
class Debian(OS):
|
||||
"""Debian operating system."""
|
||||
|
||||
def get_name(self) -> str:
|
||||
"""Return name.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return "Debian"
|
29
rwx/os/pm/__init__.py
Normal file
29
rwx/os/pm/__init__.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
"""Package Manager."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from rwx import Class
|
||||
from rwx.ps import Command
|
||||
|
||||
|
||||
class PM(Class, ABC):
|
||||
"""Package Manager."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Set commands."""
|
||||
self.clean = self.get_clean_command()
|
||||
self.install = self.get_install_command()
|
||||
|
||||
@abstractmethod
|
||||
def get_clean_command(self) -> Command:
|
||||
"""Command to clean packages cache.
|
||||
|
||||
:rtype: Command
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_install_command(self) -> Command:
|
||||
"""Command to install package(s).
|
||||
|
||||
:rtype: Command
|
||||
"""
|
22
rwx/os/pm/apt.py
Normal file
22
rwx/os/pm/apt.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
"""Advanced Package Tool."""
|
||||
|
||||
from rwx.os.pm import PM
|
||||
from rwx.ps import Command
|
||||
|
||||
|
||||
class APT(PM):
|
||||
"""Advanced Package Tool."""
|
||||
|
||||
def get_clean_command(self) -> Command:
|
||||
"""Return clean command.
|
||||
|
||||
:rtype: Command
|
||||
"""
|
||||
return Command()
|
||||
|
||||
def get_install_command(self) -> Command:
|
||||
"""Return install command.
|
||||
|
||||
:rtype: Command
|
||||
"""
|
||||
return Command()
|
|
@ -1,8 +1,20 @@
|
|||
from os import path
|
||||
"""Handle projects."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import Class
|
||||
|
||||
|
||||
class Project:
|
||||
def __init__(self, file_path: str) -> None:
|
||||
self.file: str = path.realpath(file_path)
|
||||
self.root: str = path.dirname(self.file)
|
||||
self.name: str = path.basename(self.root)
|
||||
class Project(Class):
|
||||
"""Parent class for any type of project."""
|
||||
|
||||
def __init__(self, file: Path) -> None:
|
||||
"""Set file, root & name.
|
||||
|
||||
:param file: root reference file
|
||||
:type file: Path
|
||||
"""
|
||||
self.raw = file
|
||||
self.file = self.raw.resolve()
|
||||
self.root: Path = self.file.parent
|
||||
self.name: str = self.root.name
|
||||
|
|
|
@ -1,17 +1,22 @@
|
|||
from os import path
|
||||
"""Project consisting only of a Sphinx documentation."""
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sphinx.cmd.build import build_main
|
||||
|
||||
from rwx.fs import wipe
|
||||
from rwx.prj import Project
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class SphinxProject(Project):
|
||||
def __init__(self, file_path: str) -> None:
|
||||
super().__init__(file_path)
|
||||
"""Child class for a project based on Sphinx."""
|
||||
|
||||
def build(self):
|
||||
output_root: str = path.join(self.root, "out")
|
||||
def build(self) -> None:
|
||||
"""Build the project."""
|
||||
output_root: Path = self.root / "out"
|
||||
wipe(output_root)
|
||||
arguments: list[str] = [
|
||||
"-E",
|
||||
|
@ -22,13 +27,13 @@ class SphinxProject(Project):
|
|||
"-D",
|
||||
f"project={self.name}",
|
||||
"-D",
|
||||
"master_doc={}".format("index"),
|
||||
"master_doc=index",
|
||||
"-D",
|
||||
"html_theme={}".format("sphinx_rtd_theme"),
|
||||
"html_theme=sphinx_rtd_theme",
|
||||
"-c",
|
||||
self.root,
|
||||
str(self.root),
|
||||
# "-C",
|
||||
path.join(self.root, self.name),
|
||||
path.join(output_root, "web"),
|
||||
str(self.root / self.name),
|
||||
str(output_root / "web"),
|
||||
]
|
||||
build_main(arguments)
|
||||
|
|
|
@ -1,32 +1,78 @@
|
|||
"""Handle processes."""
|
||||
|
||||
import subprocess
|
||||
|
||||
from rwx import txt
|
||||
from rwx import Class, txt
|
||||
|
||||
|
||||
def get_tuples_args(tuples) -> list[str]:
|
||||
class Command(Class):
|
||||
"""Command to run."""
|
||||
|
||||
def __init__(self, *arguments: str | tuple[str, ...]) -> None:
|
||||
"""Set raw & flat arguments.
|
||||
|
||||
:param *arguments: single argument or grouped ones
|
||||
:type *arguments: str | tuple[str, ...]
|
||||
"""
|
||||
self.raw = arguments
|
||||
self.flat: list[str] = []
|
||||
|
||||
|
||||
def get_tuples_args(*items: str | tuple[str, ...]) -> list[str]:
|
||||
"""Turn arguments tuples into an arguments list.
|
||||
|
||||
:param *items: single item or grouped ones
|
||||
:type *items: str | tuple[str, ...]
|
||||
:rtype: list[str]
|
||||
"""
|
||||
args: list[str] = []
|
||||
for item in tuples:
|
||||
if type(item) is tuple:
|
||||
args.extend(item)
|
||||
else:
|
||||
for item in items:
|
||||
match item:
|
||||
case str():
|
||||
args.append(item)
|
||||
case tuple():
|
||||
args.extend(item)
|
||||
return args
|
||||
|
||||
|
||||
def run(*tuples) -> subprocess.CompletedProcess:
|
||||
def run(*items: str | tuple[str, ...]) -> subprocess.CompletedProcess:
|
||||
"""Run from a list of arguments tuples.
|
||||
|
||||
:param *items: single item or grouped ones
|
||||
:type *items: str | tuple[str, ...]
|
||||
:rtype: subprocess.CompletedProcess
|
||||
"""
|
||||
return subprocess.run(
|
||||
get_tuples_args(tuples), capture_output=False, check=True
|
||||
get_tuples_args(*items), capture_output=False, check=True
|
||||
)
|
||||
|
||||
|
||||
def run_line(*tuples, charset: str = txt.CHARSET) -> str:
|
||||
lines = run_lines(*get_tuples_args(tuples), charset=charset)
|
||||
return lines[0]
|
||||
def run_line(*items: str | tuple[str, ...], charset: str = txt.CHARSET) -> str:
|
||||
"""Run and return output line.
|
||||
|
||||
:param *items: single item or grouped ones
|
||||
:type *items: str | tuple[str, ...]
|
||||
:param charset: charset to use for decoding binary output
|
||||
:type charset: str
|
||||
:rtype: str
|
||||
"""
|
||||
line, *_ = run_lines(*items, charset=charset)
|
||||
return line
|
||||
|
||||
|
||||
def run_lines(*tuples, charset: str = txt.CHARSET) -> list[str]:
|
||||
def run_lines(
|
||||
*items: str | tuple[str, ...], charset: str = txt.CHARSET
|
||||
) -> list[str]:
|
||||
"""Run and return output lines.
|
||||
|
||||
:param *items: single item or grouped ones
|
||||
:type *items: str | tuple[str, ...]
|
||||
:param charset: charset to use for decoding binary output
|
||||
:type charset: str
|
||||
:rtype: list[str]
|
||||
"""
|
||||
process = subprocess.run(
|
||||
get_tuples_args(tuples), capture_output=True, check=True
|
||||
get_tuples_args(*items), capture_output=True, check=True
|
||||
)
|
||||
string = process.stdout.decode(charset)
|
||||
return string.rstrip().splitlines()
|
||||
|
|
0
rwx/py.typed
Normal file
0
rwx/py.typed
Normal file
1
rwx/sw/freetube/__init__.py
Normal file
1
rwx/sw/freetube/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
"""Configure FreeTube."""
|
29
rwx/sw/freetube/channels.py
Normal file
29
rwx/sw/freetube/channels.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
"""FreeTube channels."""
|
||||
|
||||
from rwx import Class
|
||||
|
||||
|
||||
class Channel(Class):
|
||||
"""FreeTube channel."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set uid & name.
|
||||
|
||||
:param uid: unique identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.uid = uid
|
||||
self.name = name
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return identifier as db.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return f"""\
|
||||
{{\
|
||||
"id":"{self.uid}"\
|
||||
}}\
|
||||
"""
|
21
rwx/sw/freetube/db.py
Normal file
21
rwx/sw/freetube/db.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
"""Output FreeTube db."""
|
||||
|
||||
|
||||
def to_db(value: object) -> str:
|
||||
"""Render value as string.
|
||||
|
||||
:param value: value to render
|
||||
:type value: object
|
||||
:rtype: str
|
||||
"""
|
||||
match value:
|
||||
case bool():
|
||||
text = str(value).lower()
|
||||
case dict():
|
||||
sub = ",".join([f'"{i}":{to_db(v)}' for i, v in value.items()])
|
||||
text = f"{{{sub}}}"
|
||||
case float() | str():
|
||||
text = f'"{value}"'
|
||||
case _:
|
||||
text = str(value)
|
||||
return text
|
47
rwx/sw/freetube/playlists.py
Normal file
47
rwx/sw/freetube/playlists.py
Normal file
|
@ -0,0 +1,47 @@
|
|||
"""FreeTube playlists."""
|
||||
|
||||
from rwx import Class
|
||||
|
||||
from .videos import Video
|
||||
|
||||
|
||||
class Playlist(Class):
|
||||
"""FreeTube playlist."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set uid & name.
|
||||
|
||||
:param uid: identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.uid = uid
|
||||
self.name = name
|
||||
self.videos: list[Video] = []
|
||||
|
||||
def add(self, video: Video) -> None:
|
||||
"""Add video.
|
||||
|
||||
:param video: video to add
|
||||
:type video: Video
|
||||
"""
|
||||
self.videos.append(video)
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return identifier, name & videos.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
videos = ",".join([video.to_db() for video in self.videos])
|
||||
return f"""\
|
||||
{{\
|
||||
"_id":"{self.uid}"\
|
||||
,\
|
||||
"playlistName":"{self.name}"\
|
||||
,\
|
||||
"protected":true\
|
||||
,\
|
||||
"videos":[{videos}]\
|
||||
}}\
|
||||
"""
|
45
rwx/sw/freetube/profiles.py
Normal file
45
rwx/sw/freetube/profiles.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
"""FreeTube profiles."""
|
||||
|
||||
from rwx import Class
|
||||
|
||||
from .channels import Channel
|
||||
|
||||
|
||||
class Profile(Class):
|
||||
"""FreeTube profile."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set uid & name.
|
||||
|
||||
:param uid: unique identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.id = uid
|
||||
self.name = name
|
||||
self.channels: list[Channel] = []
|
||||
|
||||
def add(self, channel: Channel) -> None:
|
||||
"""Add channel.
|
||||
|
||||
:param channel: channel to add
|
||||
:type channel: Channel
|
||||
"""
|
||||
self.channels.append(channel)
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return identifier, name & channels.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
channels = ",".join([channel.to_db() for channel in self.channels])
|
||||
return f"""\
|
||||
{{\
|
||||
"_id":"{self.id}"\
|
||||
,\
|
||||
"name":"{self.name}"\
|
||||
,\
|
||||
"subscriptions":[{channels}]\
|
||||
}}\
|
||||
"""
|
33
rwx/sw/freetube/settings.py
Normal file
33
rwx/sw/freetube/settings.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
"""FreeTube settings."""
|
||||
|
||||
from rwx import Class
|
||||
|
||||
from .db import to_db
|
||||
|
||||
|
||||
class Setting(Class):
|
||||
"""FreeTube setting."""
|
||||
|
||||
def __init__(self, uid: str, value: object) -> None:
|
||||
"""Set uid & value.
|
||||
|
||||
:param uid: unique identifier
|
||||
:type uid: str
|
||||
:param value: value
|
||||
:type value: object
|
||||
"""
|
||||
self.uid = uid
|
||||
self.value = value
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return uid & value as db string.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return f"""\
|
||||
{{\
|
||||
"_id":"{self.uid}"\
|
||||
,\
|
||||
"value":{to_db(self.value)}\
|
||||
}}\
|
||||
"""
|
33
rwx/sw/freetube/videos.py
Normal file
33
rwx/sw/freetube/videos.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
"""FreeTube videos."""
|
||||
|
||||
from rwx import Class
|
||||
|
||||
|
||||
class Video(Class):
|
||||
"""FreeTube video."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set id & name.
|
||||
|
||||
:param uid: identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.uid = uid
|
||||
self.name = name
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return identifier, zero length & title.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return f"""\
|
||||
{{\
|
||||
"videoId":"{self.uid}"\
|
||||
,\
|
||||
"lengthSeconds":0\
|
||||
,\
|
||||
"title":"{self.name}"\
|
||||
}}\
|
||||
"""
|
|
@ -1 +1,3 @@
|
|||
"""Handle text."""
|
||||
|
||||
CHARSET = "UTF-8"
|
||||
|
|
Loading…
Reference in a new issue