refactor(history): commit development branch
All checks were successful
/ job (push) Successful in 1m12s
All checks were successful
/ job (push) Successful in 1m12s
new development branch from root commit
This commit is contained in:
parent
3e562930f6
commit
020aaa0b9a
94 changed files with 4804 additions and 0 deletions
33
rwx/__init__.py
Normal file
33
rwx/__init__.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
"""Read Write eXecute."""
|
||||
|
||||
__version__ = "0.0.1"
|
||||
|
||||
from os import linesep
|
||||
|
||||
|
||||
class Object:
|
||||
"""Root object."""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return machine-readable state.
|
||||
|
||||
:return: state
|
||||
:rtype: str
|
||||
"""
|
||||
name = self.__class__.__name__
|
||||
attributes = [
|
||||
f"{k}={v!r}" for k, v in vars(self).items() if not k.startswith("_")
|
||||
]
|
||||
arguments = ", ".join(attributes)
|
||||
return f"{name}({arguments})"
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return human-readable state.
|
||||
|
||||
:return: state
|
||||
:rtype: str
|
||||
"""
|
||||
attributes = [
|
||||
f"{k} = {v}" for k, v in vars(self).items() if not k.startswith("_")
|
||||
]
|
||||
return linesep.join(attributes)
|
19
rwx/__main__.py
Executable file
19
rwx/__main__.py
Executable file
|
@ -0,0 +1,19 @@
|
|||
#! /usr/bin/env python3
|
||||
|
||||
"""Entry point."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import fs
|
||||
|
||||
if __name__ == "__main__":
|
||||
file_path: Path = Path(__file__).resolve()
|
||||
root_path: Path = file_path.parent
|
||||
directory_path: Path = root_path / "tmp"
|
||||
file_path = directory_path / "file"
|
||||
|
||||
fs.wipe(directory_path)
|
||||
fs.make_directory(directory_path)
|
||||
fs.write(file_path, "Martine écrit beaucoup.")
|
||||
fs.empty_file(file_path)
|
||||
fs.write(file_path, "Martine écrit moins.")
|
13
rwx/arg/__init__.py
Normal file
13
rwx/arg/__init__.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
"""Handle system arguments."""
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
def split() -> tuple[str, list[str]]:
|
||||
"""Split command & actual arguments.
|
||||
|
||||
:return: both
|
||||
:rtype: tuple[str, list[str]]
|
||||
"""
|
||||
command, *arguments = sys.argv
|
||||
return command, arguments
|
22
rwx/cmd/__init__.py
Normal file
22
rwx/cmd/__init__.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
"""Handle system commands & packages."""
|
||||
|
||||
commands: list[str] = []
|
||||
packages: list[str] = []
|
||||
|
||||
|
||||
def need(command: str) -> None:
|
||||
"""Assert package dependency for a command.
|
||||
|
||||
:param command: name of the requested command
|
||||
:type command: str
|
||||
"""
|
||||
package: str | None
|
||||
match command:
|
||||
case "debootstrap":
|
||||
package = "debootstrap"
|
||||
case "mksquashfs" | "unsquashfs":
|
||||
package = "squashfs-tools"
|
||||
case _:
|
||||
package = None
|
||||
if package and package not in packages:
|
||||
packages.append(package)
|
26
rwx/cmd/squashfs/__init__.py
Normal file
26
rwx/cmd/squashfs/__init__.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
"""Wrap SquashFS commands."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import cmd, ps
|
||||
|
||||
cmd.need("mksquashfs")
|
||||
|
||||
|
||||
def mksquashfs(input_root: Path, output_file: Path) -> None:
|
||||
"""Make a SquashFS bootable image file.
|
||||
|
||||
:param input_root: ?
|
||||
:type input_root: Path
|
||||
:param output_file: ?
|
||||
:type output_file: Path
|
||||
"""
|
||||
ps.run(
|
||||
"mksquashfs",
|
||||
str(input_root),
|
||||
str(output_file),
|
||||
"-comp",
|
||||
"zstd",
|
||||
"-Xcompression-level",
|
||||
str(18),
|
||||
)
|
31
rwx/deb/__init__.py
Normal file
31
rwx/deb/__init__.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
"""Wrap Debian commands."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import cmd, ps
|
||||
|
||||
cmd.need("debootstrap")
|
||||
|
||||
BOOTSTRAP_ARCHITECTURE = "amd64"
|
||||
BOOTSTRAP_VARIANT = "minbase"
|
||||
|
||||
|
||||
def bootstrap(root_path: Path, suite: str, mirror_location: str) -> None:
|
||||
"""Boostrap a base operating filesystem.
|
||||
|
||||
:param root_path: target output path
|
||||
:type root_path: Path
|
||||
:param suite: target distribution name
|
||||
:type suite: str
|
||||
:param mirror_location: source input repository
|
||||
:type mirror_location: str
|
||||
"""
|
||||
command = (
|
||||
"debootstrap",
|
||||
("--arch", BOOTSTRAP_ARCHITECTURE),
|
||||
("--variant", BOOTSTRAP_VARIANT),
|
||||
suite,
|
||||
str(root_path),
|
||||
mirror_location,
|
||||
)
|
||||
ps.run(*command)
|
7
rwx/err/__init__.py
Normal file
7
rwx/err/__init__.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
"""Handle errors."""
|
||||
|
||||
from rwx import Object
|
||||
|
||||
|
||||
class Error(Object, Exception):
|
||||
"""Parent class for all errors."""
|
159
rwx/fs/__init__.py
Normal file
159
rwx/fs/__init__.py
Normal file
|
@ -0,0 +1,159 @@
|
|||
"""Operations involving FileSystems."""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import tomllib
|
||||
|
||||
from rwx import ps
|
||||
|
||||
CHARSET = "UTF-8"
|
||||
|
||||
|
||||
def create_image(file_path: Path, size_bytes: int) -> None:
|
||||
"""Create a virtual device image file.
|
||||
|
||||
:param file_path: target image file
|
||||
:type file_path: Path
|
||||
:param size_bytes: virtual volume
|
||||
:type size_bytes: int
|
||||
"""
|
||||
ps.run(
|
||||
("qemu-img", "create"),
|
||||
("-f", "qcow2"),
|
||||
(str(file_path), str(size_bytes)),
|
||||
)
|
||||
|
||||
|
||||
def empty_file(path: Path) -> None:
|
||||
"""Empty the file at provided path.
|
||||
|
||||
:param path: target file to empty
|
||||
:type path: Path
|
||||
"""
|
||||
write(path, "")
|
||||
|
||||
|
||||
def get_mount_uuid(path: Path) -> str:
|
||||
"""Return the filesystem UUID of a mountpoint path.
|
||||
|
||||
:param path: mountpoint path
|
||||
:type path: Path
|
||||
:rtype: str
|
||||
"""
|
||||
return ps.run_line(
|
||||
"findmnt",
|
||||
"--noheadings",
|
||||
("--output", "UUID"),
|
||||
str(path),
|
||||
)
|
||||
|
||||
|
||||
def get_path_mount(path: Path) -> Path:
|
||||
"""Return the mountpoint path of an arbitrary path.
|
||||
|
||||
:param path: arbitrary path
|
||||
:type path: Path
|
||||
:rtype: Path
|
||||
"""
|
||||
return Path(
|
||||
ps.run_line(
|
||||
"stat",
|
||||
("--format", "%m"),
|
||||
str(path),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def get_path_uuid(path: Path) -> str:
|
||||
"""Return the filesystem UUID of an arbitrary path.
|
||||
|
||||
:param path: arbitrary path
|
||||
:type path: Path
|
||||
:rtype: str
|
||||
"""
|
||||
return get_mount_uuid(get_path_mount(path))
|
||||
|
||||
|
||||
def make_directory(path: Path) -> None:
|
||||
"""Make a directory (and its parents) from a path.
|
||||
|
||||
:param path: directory to create
|
||||
:type path: Path
|
||||
"""
|
||||
path.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
|
||||
def read_file_bytes(file_path: Path) -> bytes:
|
||||
"""Read whole file bytes.
|
||||
|
||||
:param file_path: source input file
|
||||
:type file_path: Path
|
||||
:rtype: bytes
|
||||
"""
|
||||
with file_path.open("br") as file_object:
|
||||
return file_object.read()
|
||||
|
||||
|
||||
def read_file_dict(file_path: Path, charset: str = CHARSET) -> dict:
|
||||
"""Read whole file as toml object.
|
||||
|
||||
:param file_path: source input file
|
||||
:type file_path: Path
|
||||
:param charset: charset to use for decoding input
|
||||
:type charset: str
|
||||
:rtype: dict
|
||||
"""
|
||||
text = read_file_text(file_path, charset)
|
||||
return tomllib.loads(text)
|
||||
|
||||
|
||||
def read_file_lines(file_path: Path, charset: str = CHARSET) -> list[str]:
|
||||
"""Read whole file lines.
|
||||
|
||||
:param file_path: source input file
|
||||
:type file_path: Path
|
||||
:param charset: charset to use for decoding input
|
||||
:type charset: str
|
||||
:rtype: list[str]
|
||||
"""
|
||||
return read_file_text(file_path, charset).split(os.linesep)
|
||||
|
||||
|
||||
def read_file_text(file_path: Path, charset: str = CHARSET) -> str:
|
||||
"""Read whole file text.
|
||||
|
||||
:param file_path: source input file
|
||||
:type file_path: Path
|
||||
:param charset: charset to use for decoding input
|
||||
:type charset: str
|
||||
:rtype: str
|
||||
"""
|
||||
return read_file_bytes(file_path).decode(charset)
|
||||
|
||||
|
||||
def wipe(path: Path) -> None:
|
||||
"""Wipe provided path, whether directory or file.
|
||||
|
||||
:param path: target path
|
||||
:type path: Path
|
||||
"""
|
||||
try:
|
||||
path.unlink(missing_ok=True)
|
||||
except IsADirectoryError:
|
||||
shutil.rmtree(path)
|
||||
|
||||
|
||||
def write(file_path: Path, text: str, charset: str = CHARSET) -> None:
|
||||
"""Write text into a file.
|
||||
|
||||
:param file_path: target file path
|
||||
:type file_path: Path
|
||||
:param text: content to write
|
||||
:type text: str
|
||||
:param charset: charset to use for encoding ouput
|
||||
:type charset: str
|
||||
"""
|
||||
with file_path.open(encoding=charset, mode="w") as file_object:
|
||||
file_object.write(text)
|
54
rwx/grub/__init__.py
Normal file
54
rwx/grub/__init__.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
"""Wrap GRUB commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from rwx import cmd, ps
|
||||
|
||||
cmd.need("grub-mkimage")
|
||||
|
||||
COMPRESSION = "xz"
|
||||
ENV_BYTES = 1024
|
||||
ENV_COMMENT = "#"
|
||||
ENV_HEADER = f"""{ENV_COMMENT} GRUB Environment Block
|
||||
"""
|
||||
MODULES = {
|
||||
"i386-pc": [
|
||||
("biosdisk",),
|
||||
("ntldr",),
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def make_image(
|
||||
image_format: str,
|
||||
image_path: str,
|
||||
modules: list[str],
|
||||
memdisk_path: str,
|
||||
pubkey_path: str | None = None,
|
||||
) -> None:
|
||||
"""Make a binary bootable image.
|
||||
|
||||
:param image_format: output format (x86_64-efi, i386-pc, arm64-efi)
|
||||
:type image_format: str
|
||||
:param image_path: output file
|
||||
:type image_path: str
|
||||
:param modules: modules to embed
|
||||
:type modules: list[str]
|
||||
:param memdisk_path: archive to include
|
||||
:type memdisk_path: str
|
||||
:param pubkey_path: extra public key to add
|
||||
:type pubkey_path: str | None
|
||||
"""
|
||||
args: list[str | tuple[str, ...]] = [
|
||||
"grub-mkimage",
|
||||
("--compress", COMPRESSION),
|
||||
("--format", image_format),
|
||||
("--output", image_path),
|
||||
("--memdisk", memdisk_path),
|
||||
]
|
||||
if pubkey_path:
|
||||
args.append(("--pubkey", pubkey_path))
|
||||
args.extend(modules)
|
||||
if extra_modules := MODULES.get(image_format):
|
||||
args.extend(extra_modules)
|
||||
ps.run(*args)
|
51
rwx/log/__init__.py
Normal file
51
rwx/log/__init__.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
"""Handle logging."""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
|
||||
def get_file_logger(name: str) -> logging.Logger:
|
||||
"""Return a file logger.
|
||||
|
||||
:param name: arbitrary name
|
||||
:type name: str
|
||||
:rtype: logging.Logger
|
||||
"""
|
||||
# formatter
|
||||
items = [
|
||||
"%(name)s: %(asctime)s",
|
||||
"%(levelname)s",
|
||||
"%(filename)s:%(lineno)s",
|
||||
"%(process)d >>> %(message)s",
|
||||
]
|
||||
template = " | ".join(items)
|
||||
formatter = logging.Formatter(template)
|
||||
# handler
|
||||
out_handler = logging.StreamHandler(stream=sys.stdout)
|
||||
out_handler.setFormatter(formatter)
|
||||
out_handler.setLevel(logging.INFO)
|
||||
# logger
|
||||
logger = logging.getLogger(name)
|
||||
logger.addHandler(out_handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
return logger
|
||||
|
||||
|
||||
def get_stream_logger(level: int) -> logging.Logger:
|
||||
"""Return a stream logger.
|
||||
|
||||
:param level: filtering level
|
||||
:type level: int
|
||||
:rtype: logging.Logger
|
||||
"""
|
||||
# handler
|
||||
out_handler = logging.StreamHandler(stream=sys.stdout)
|
||||
out_handler.setLevel(level)
|
||||
# logger
|
||||
logger = logging.getLogger()
|
||||
logger.addHandler(out_handler)
|
||||
logger.setLevel(level)
|
||||
return logger
|
||||
|
||||
|
||||
stream = get_stream_logger(logging.INFO)
|
20
rwx/os/__init__.py
Normal file
20
rwx/os/__init__.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
"""Control Operating Systems."""
|
||||
|
||||
from os import sep
|
||||
from pathlib import Path
|
||||
|
||||
from .abstract import OS
|
||||
from .debian import Debian
|
||||
|
||||
|
||||
def from_path(path: Path) -> OS:
|
||||
"""Initialize from an already existing path.
|
||||
|
||||
:param path: source root directory
|
||||
:type path: Path
|
||||
:rtype: OS
|
||||
"""
|
||||
return Debian(path)
|
||||
|
||||
|
||||
up = from_path(Path(sep))
|
26
rwx/os/abstract.py
Normal file
26
rwx/os/abstract.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
"""Abstract Operating System."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import Object
|
||||
|
||||
|
||||
class OS(Object, ABC):
|
||||
"""Operating System."""
|
||||
|
||||
def __init__(self, path: Path) -> None:
|
||||
"""Set root.
|
||||
|
||||
:param path: root directory
|
||||
:type path: Path
|
||||
"""
|
||||
self.root = path
|
||||
self.name = self.get_name()
|
||||
|
||||
@abstractmethod
|
||||
def get_name(self) -> str:
|
||||
"""Return mandatory name.
|
||||
|
||||
:rtype: str
|
||||
"""
|
14
rwx/os/debian.py
Normal file
14
rwx/os/debian.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
"""Debian operating system."""
|
||||
|
||||
from .abstract import OS
|
||||
|
||||
|
||||
class Debian(OS):
|
||||
"""Debian operating system."""
|
||||
|
||||
def get_name(self) -> str:
|
||||
"""Return name.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return "Debian"
|
29
rwx/os/pm/__init__.py
Normal file
29
rwx/os/pm/__init__.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
"""Package Manager."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from rwx import Object
|
||||
from rwx.ps import Command
|
||||
|
||||
|
||||
class PM(Object, ABC):
|
||||
"""Package Manager."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Set commands."""
|
||||
self.clean = self.get_clean_command()
|
||||
self.install = self.get_install_command()
|
||||
|
||||
@abstractmethod
|
||||
def get_clean_command(self) -> Command:
|
||||
"""Command to clean packages cache.
|
||||
|
||||
:rtype: Command
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_install_command(self) -> Command:
|
||||
"""Command to install package(s).
|
||||
|
||||
:rtype: Command
|
||||
"""
|
22
rwx/os/pm/apt.py
Normal file
22
rwx/os/pm/apt.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
"""Advanced Package Tool."""
|
||||
|
||||
from rwx.os.pm import PM
|
||||
from rwx.ps import Command
|
||||
|
||||
|
||||
class APT(PM):
|
||||
"""Advanced Package Tool."""
|
||||
|
||||
def get_clean_command(self) -> Command:
|
||||
"""Return clean command.
|
||||
|
||||
:rtype: Command
|
||||
"""
|
||||
return Command()
|
||||
|
||||
def get_install_command(self) -> Command:
|
||||
"""Return install command.
|
||||
|
||||
:rtype: Command
|
||||
"""
|
||||
return Command()
|
20
rwx/prj/__init__.py
Normal file
20
rwx/prj/__init__.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
"""Handle projects."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from rwx import Object
|
||||
|
||||
|
||||
class Project(Object):
|
||||
"""Parent class for any type of project."""
|
||||
|
||||
def __init__(self, file: Path) -> None:
|
||||
"""Set file, root & name.
|
||||
|
||||
:param file: root reference file
|
||||
:type file: Path
|
||||
"""
|
||||
self.raw = file
|
||||
self.file = self.raw.resolve()
|
||||
self.root: Path = self.file.parent
|
||||
self.name: str = self.root.name
|
39
rwx/prj/sphinx.py
Normal file
39
rwx/prj/sphinx.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
"""Project consisting only of a Sphinx documentation."""
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sphinx.cmd.build import build_main
|
||||
|
||||
from rwx.fs import wipe
|
||||
from rwx.prj import Project
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class SphinxProject(Project):
|
||||
"""Child class for a project based on Sphinx."""
|
||||
|
||||
def build(self) -> None:
|
||||
"""Build the project."""
|
||||
output_root: Path = self.root / "out"
|
||||
wipe(output_root)
|
||||
arguments: list[str] = [
|
||||
"-E",
|
||||
"-j",
|
||||
"2",
|
||||
"-b",
|
||||
"html",
|
||||
"-D",
|
||||
f"project={self.name}",
|
||||
"-D",
|
||||
"master_doc=index",
|
||||
"-D",
|
||||
"html_theme=sphinx_rtd_theme",
|
||||
"-c",
|
||||
str(self.root),
|
||||
# "-C",
|
||||
str(self.root / self.name),
|
||||
str(output_root / "web"),
|
||||
]
|
||||
build_main(arguments)
|
85
rwx/ps/__init__.py
Normal file
85
rwx/ps/__init__.py
Normal file
|
@ -0,0 +1,85 @@
|
|||
"""Handle processes."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
|
||||
from rwx import Object, txt
|
||||
|
||||
|
||||
class Command(Object):
|
||||
"""Command to run."""
|
||||
|
||||
def __init__(self, *arguments: str | tuple[str, ...]) -> None:
|
||||
"""Set raw & flat arguments.
|
||||
|
||||
:param *arguments: single argument or grouped ones
|
||||
:type *arguments: str | tuple[str, ...]
|
||||
"""
|
||||
self.raw = arguments
|
||||
self.flat: list[str] = []
|
||||
|
||||
|
||||
def get_tuples_args(*items: str | tuple[str, ...]) -> list[str]:
|
||||
"""Turn arguments tuples into an arguments list.
|
||||
|
||||
:param *items: single item or grouped ones
|
||||
:type *items: str | tuple[str, ...]
|
||||
:rtype: list[str]
|
||||
"""
|
||||
args: list[str] = []
|
||||
for item in items:
|
||||
match item:
|
||||
case str():
|
||||
args.append(item)
|
||||
case tuple():
|
||||
args.extend(item)
|
||||
return args
|
||||
|
||||
|
||||
def run(*items: str | tuple[str, ...]) -> subprocess.CompletedProcess:
|
||||
"""Run from a list of arguments tuples.
|
||||
|
||||
:param *items: single item or grouped ones
|
||||
:type *items: str | tuple[str, ...]
|
||||
:rtype: subprocess.CompletedProcess
|
||||
"""
|
||||
return subprocess.run(
|
||||
get_tuples_args(*items),
|
||||
capture_output=False,
|
||||
check=True,
|
||||
)
|
||||
|
||||
|
||||
def run_line(*items: str | tuple[str, ...], charset: str = txt.CHARSET) -> str:
|
||||
"""Run and return output line.
|
||||
|
||||
:param *items: single item or grouped ones
|
||||
:type *items: str | tuple[str, ...]
|
||||
:param charset: charset to use for decoding binary output
|
||||
:type charset: str
|
||||
:rtype: str
|
||||
"""
|
||||
line, *_ = run_lines(*items, charset=charset)
|
||||
return line
|
||||
|
||||
|
||||
def run_lines(
|
||||
*items: str | tuple[str, ...],
|
||||
charset: str = txt.CHARSET,
|
||||
) -> list[str]:
|
||||
"""Run and return output lines.
|
||||
|
||||
:param *items: single item or grouped ones
|
||||
:type *items: str | tuple[str, ...]
|
||||
:param charset: charset to use for decoding binary output
|
||||
:type charset: str
|
||||
:rtype: list[str]
|
||||
"""
|
||||
process = subprocess.run(
|
||||
get_tuples_args(*items),
|
||||
capture_output=True,
|
||||
check=True,
|
||||
)
|
||||
string = process.stdout.decode(charset)
|
||||
return string.rstrip().splitlines()
|
0
rwx/py.typed
Normal file
0
rwx/py.typed
Normal file
1
rwx/sw/freetube/__init__.py
Normal file
1
rwx/sw/freetube/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
"""Configure FreeTube."""
|
31
rwx/sw/freetube/authors.py
Normal file
31
rwx/sw/freetube/authors.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
"""FreeTube authors."""
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from rwx import Object
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .playlists import Playlist
|
||||
|
||||
|
||||
class Author(Object):
|
||||
"""FreeTube author."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set uid & name.
|
||||
|
||||
:param uid: identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.uid = uid
|
||||
self.name = name
|
||||
self.playlist: Playlist
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return non-breakable name.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return self.name.replace(" ", chr(0xA0))
|
29
rwx/sw/freetube/channels.py
Normal file
29
rwx/sw/freetube/channels.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
"""FreeTube channels."""
|
||||
|
||||
from rwx import Object
|
||||
|
||||
|
||||
class Channel(Object):
|
||||
"""FreeTube channel."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set uid & name.
|
||||
|
||||
:param uid: unique identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.uid = uid
|
||||
self.name = name
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return identifier as db.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return f"""\
|
||||
{{\
|
||||
"id":"{self.uid}"\
|
||||
}}\
|
||||
"""
|
21
rwx/sw/freetube/db.py
Normal file
21
rwx/sw/freetube/db.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
"""Output FreeTube db."""
|
||||
|
||||
|
||||
def to_db(value: object) -> str:
|
||||
"""Render value as string.
|
||||
|
||||
:param value: value to render
|
||||
:type value: object
|
||||
:rtype: str
|
||||
"""
|
||||
match value:
|
||||
case bool():
|
||||
text = str(value).lower()
|
||||
case dict():
|
||||
sub = ",".join([f'"{i}":{to_db(v)}' for i, v in value.items()])
|
||||
text = f"{{{sub}}}"
|
||||
case float() | str():
|
||||
text = f'"{value}"'
|
||||
case _:
|
||||
text = str(value)
|
||||
return text
|
24
rwx/sw/freetube/languages.py
Normal file
24
rwx/sw/freetube/languages.py
Normal file
|
@ -0,0 +1,24 @@
|
|||
"""FreeTube languages."""
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from rwx import Object
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .playlists import Playlist
|
||||
|
||||
|
||||
class Language(Object):
|
||||
"""FreeTube language."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set uid & name.
|
||||
|
||||
:param uid: identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.uid = uid
|
||||
self.name = name
|
||||
self.playlist: Playlist
|
47
rwx/sw/freetube/playlists.py
Normal file
47
rwx/sw/freetube/playlists.py
Normal file
|
@ -0,0 +1,47 @@
|
|||
"""FreeTube playlists."""
|
||||
|
||||
from rwx import Object
|
||||
|
||||
from .videos import Video
|
||||
|
||||
|
||||
class Playlist(Object):
|
||||
"""FreeTube playlist."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set uid & name.
|
||||
|
||||
:param uid: identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.uid = uid
|
||||
self.name = name
|
||||
self.videos: list[Video] = []
|
||||
|
||||
def add(self, video: Video) -> None:
|
||||
"""Add video.
|
||||
|
||||
:param video: video to add
|
||||
:type video: Video
|
||||
"""
|
||||
self.videos.append(video)
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return identifier, name & videos.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
videos = ",".join([video.to_db() for video in self.videos])
|
||||
return f"""\
|
||||
{{\
|
||||
"_id":"{self.uid}"\
|
||||
,\
|
||||
"playlistName":"{self.name}"\
|
||||
,\
|
||||
"protected":true\
|
||||
,\
|
||||
"videos":[{videos}]\
|
||||
}}\
|
||||
"""
|
45
rwx/sw/freetube/profiles.py
Normal file
45
rwx/sw/freetube/profiles.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
"""FreeTube profiles."""
|
||||
|
||||
from rwx import Object
|
||||
|
||||
from .channels import Channel
|
||||
|
||||
|
||||
class Profile(Object):
|
||||
"""FreeTube profile."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set uid & name.
|
||||
|
||||
:param uid: unique identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.id = uid
|
||||
self.name = name
|
||||
self.channels: list[Channel] = []
|
||||
|
||||
def add(self, channel: Channel) -> None:
|
||||
"""Add channel.
|
||||
|
||||
:param channel: channel to add
|
||||
:type channel: Channel
|
||||
"""
|
||||
self.channels.append(channel)
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return identifier, name & channels.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
channels = ",".join([channel.to_db() for channel in self.channels])
|
||||
return f"""\
|
||||
{{\
|
||||
"_id":"{self.id}"\
|
||||
,\
|
||||
"name":"{self.name}"\
|
||||
,\
|
||||
"subscriptions":[{channels}]\
|
||||
}}\
|
||||
"""
|
33
rwx/sw/freetube/settings.py
Normal file
33
rwx/sw/freetube/settings.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
"""FreeTube settings."""
|
||||
|
||||
from rwx import Object
|
||||
|
||||
from .db import to_db
|
||||
|
||||
|
||||
class Setting(Object):
|
||||
"""FreeTube setting."""
|
||||
|
||||
def __init__(self, uid: str, value: object) -> None:
|
||||
"""Set uid & value.
|
||||
|
||||
:param uid: unique identifier
|
||||
:type uid: str
|
||||
:param value: value
|
||||
:type value: object
|
||||
"""
|
||||
self.uid = uid
|
||||
self.value = value
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return uid & value as db string.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return f"""\
|
||||
{{\
|
||||
"_id":"{self.uid}"\
|
||||
,\
|
||||
"value":{to_db(self.value)}\
|
||||
}}\
|
||||
"""
|
33
rwx/sw/freetube/videos.py
Normal file
33
rwx/sw/freetube/videos.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
"""FreeTube videos."""
|
||||
|
||||
from rwx import Object
|
||||
|
||||
|
||||
class Video(Object):
|
||||
"""FreeTube video."""
|
||||
|
||||
def __init__(self, uid: str, name: str) -> None:
|
||||
"""Set id & name.
|
||||
|
||||
:param uid: identifier
|
||||
:type uid: str
|
||||
:param name: label
|
||||
:type name: str
|
||||
"""
|
||||
self.uid = uid
|
||||
self.name = name
|
||||
|
||||
def to_db(self) -> str:
|
||||
"""Return identifier, zero length & title.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return f"""\
|
||||
{{\
|
||||
"videoId":"{self.uid}"\
|
||||
,\
|
||||
"lengthSeconds":0\
|
||||
,\
|
||||
"title":"{self.name}"\
|
||||
}}\
|
||||
"""
|
3
rwx/txt/__init__.py
Normal file
3
rwx/txt/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
"""Handle text."""
|
||||
|
||||
CHARSET = "UTF-8"
|
Loading…
Add table
Add a link
Reference in a new issue