diff --git a/.forgejo/workflows/main.yaml b/.forgejo/workflows/main.yaml index 123fb8c..bcc505a 100644 --- a/.forgejo/workflows/main.yaml +++ b/.forgejo/workflows/main.yaml @@ -1,7 +1,6 @@ on: [push] jobs: job: - runs-on: ubuntu-latest container: image: ${{vars.DOCKER}}debian:bookworm steps: diff --git a/render.py b/build.py similarity index 100% rename from render.py rename to build.py diff --git a/rwx/fs/__init__.py b/rwx/fs/__init__.py index b55713a..8a45288 100644 --- a/rwx/fs/__init__.py +++ b/rwx/fs/__init__.py @@ -2,10 +2,10 @@ import os import shutil -import tomllib -import yaml from pathlib import Path +import tomllib + from rwx import ps CHARSET = "UTF-8" @@ -133,19 +133,6 @@ def read_file_text(file_path: Path, charset: str = CHARSET) -> str: return read_file_bytes(file_path).decode(charset) -def read_file_yaml(file_path: Path, charset: str = CHARSET) -> dict | list: - """Read whole file as yaml object. - - :param file_path: source input file - :type file_path: Path - :param charset: charset to use for decoding input - :type charset: str - :rtype: dict - """ - text = read_file_text(file_path, charset) - return yaml.safe_load(text) - - def wipe(path: Path) -> None: """Wipe provided path, whether directory or file. diff --git a/rwx/prj/__init__.py b/rwx/prj/__init__.py index f545f73..62d91d9 100644 --- a/rwx/prj/__init__.py +++ b/rwx/prj/__init__.py @@ -3,7 +3,6 @@ from pathlib import Path from rwx import Object -from rwx.ps import run class Project(Object): @@ -19,7 +18,3 @@ class Project(Object): self.file = self.raw.resolve() self.root: Path = self.file.parent self.name: str = self.root.name - - def build(self) -> None: - """Build the project.""" - run(str(self.root / "render.py")) diff --git a/rwx/sw/ytdlp/__init__.py b/rwx/sw/ytdlp/__init__.py deleted file mode 100644 index 9045781..0000000 --- a/rwx/sw/ytdlp/__init__.py +++ /dev/null @@ -1,379 +0,0 @@ -"""YouTube DownLoad.""" - -from datetime import datetime -from pathlib import Path -from typing import Any - -from yt_dlp import YoutubeDL - -from rwx import Object -from rwx.fs import read_file_yaml -from rwx.log import stream as log - -SUBTITLES_EXTENSIONS = ["vtt"] -TIMESTAMP = "%Y%m%d%H%M%S" -URL = "https://youtube.com" - - -# ╭─────────╮ -# │ classes │ -# ╰─────────╯ - - -class Cache(Object): - """YouTube local cache.""" - - def __init__(self, root_file: Path) -> None: - self.root_file = root_file.resolve() - self.root_directory = self.root_file.parent - self.load() - - def load(self) -> None: - d = read_file_yaml(self.root_file) - log.info(d) - - -class Channel(Object): - """YouTube channel.""" - - def __init__(self, channel_id: str) -> None: - """Set objects tree. - - :param channel_id: channel identifier - :type channel_id: str - """ - d = extract_videos(channel_id) - # channel - self.uid = d["channel_id"] - self.title = d["channel"] - self.followers = int(d["channel_follower_count"]) - self.description = d["description"] - self.tags = d["tags"] - self.thumbnails = [thumbnail["url"] for thumbnail in d["thumbnails"]] - self.thumbnail = self.thumbnails[-1] - self.uploader_id = d["uploader_id"] - self.uploader = d["uploader"] - # videos - self.videos = [ - Video(entry) - for entry in reversed(d["entries"]) - if entry["availability"] != "subscriber_only" - ] - # playlists - d = extract_playlists(channel_id) - self.playlists = [Playlist(entry) for entry in reversed(d["entries"])] - - -# TODO Format -class Format(Object): - """YouTube format.""" - - @staticmethod - def get(d: dict, key: str) -> str | None: - value = d.get(key) - match value: - case "none": - return None - case _: - return value - - def __init__(self, d: dict) -> None: - """Set format info. - - :param d: format info - :type d: dict - """ - self.uid = d["format_id"] - self.extension = d["ext"] - self.filesize = d.get("filesize") - self.filesize_approx = d.get("filesize_approx") - self.language = d.get("language") - self.quality = d.get("quality") - # video - self.video_codec = Format.get(d, "vcodec") - if self.video_codec: - self.video_bit_rate = d["vbr"] - self.video_dynamic_range = d["dynamic_range"] - self.video_extension = d["video_ext"] - self.video_fps = d["fps"] - self.video_height = int(d["height"]) - self.video_width = int(d["width"]) - else: - del self.video_codec - # audio - self.audio_codec = Format.get(d, "acodec") - if self.audio_codec: - self.audio_bit_rate = d["abr"] - self.audio_channels = int(d["audio_channels"]) - self.audio_extension = d["audio_ext"] - self.audio_sampling_rate = d["asr"] - else: - del self.audio_codec - - def audio(self) -> str: - return f"{self.uid} \ -→ {self.audio_sampling_rate} × {self.audio_channels} \ -@ {self.audio_bit_rate} × {self.audio_codec}" - - def video(self) -> str: - return f"{self.uid} \ -→ {self.video_width} × {self.video_height} × {self.video_fps} \ -@ {self.video_bit_rate} × {self.video_codec}" - - -# TODO Playlist/extra -class Playlist(Object): - """YouTube playlist.""" - - def __init__(self, d: dict) -> None: - """Set playlist info. - - :param d: playlist info - :type d: dict - """ - self.uid = d["id"] - self.title = d["title"] - - -class Subtitles(Object): - """YouTube subtitles.""" - - def __init__(self, uid: str, d: dict) -> None: - """Set subtitles info. - - :param d: subtitles info - :type d: dict - """ - self.uid = uid - self.extension = d["ext"] - self.name = d["name"] - self.url = d["url"] - - -# TODO Thumbnail - - -class Video(Object): - """YouTube video.""" - - def __init__(self, d: dict) -> None: - """Set video info. - - :param d: video info - :type d: dict - """ - self.description_cut = d["description"] - self.uid = d["id"] - self.title = d["title"] - self.duration = int(d["duration"]) - self.thumbnail = d["thumbnails"][-1]["url"] - - def load_extra(self): - self.at = datetime.now().strftime(TIMESTAMP) - d = extract_video(self.uid) - self.audio_formats = [] - self.video_formats = [] - for entry in d["formats"]: - f = Format(entry) - if hasattr(f, "video_codec"): - self.video_format = f - self.video_formats.append(f) - elif hasattr(f, "audio_codec"): - self.audio_format = f - self.audio_formats.append(f) - thumbnail = d["thumbnails"][-1]["url"] - # TODO compare existing thumbnail - self.description = d["description"] - self.channel_id = d["channel_id"] - self.duration = int(d["duration"]) - self.views = int(d["view_count"]) - self.categories = d["categories"] - self.tags = d["tags"] - self.automatic_captions = [] - for uid, entries in d["automatic_captions"].items(): - for entry in entries: - subtitles = Subtitles(uid, entry) - if subtitles.extension in SUBTITLES_EXTENSIONS: - self.automatic_captions.append(subtitles) - self.subtitles = [] - for uid, entries in d["subtitles"].items(): - for entry in entries: - subtitles = Subtitles(uid, entry) - if subtitles.extension in SUBTITLES_EXTENSIONS: - self.subtitles.append(subtitles) - self.chapters = d["chapters"] - self.likes = d["like_count"] - self.timestamp = datetime.fromtimestamp(d["timestamp"]).strftime( - TIMESTAMP - ) - self.fulltitle = d["fulltitle"] - - -# ╭──────────╮ -# │ download │ -# ╰──────────╯ - - -def download_video(video_id: str | None) -> None: - if video_id: - ytdl( - { - "format": "bestvideo[ext=webm]+bestaudio[ext=webm]", - "outtmpl": "%(id)s.%(ext)s", - "postprocessors": [ - { - "key": "SponsorBlock", - "categories": ["sponsor"], - }, - { - "key": "ModifyChapters", - "remove_sponsor_segments": ["sponsor"], - }, - ], - "writesubtitles": True, - "writethumbnail": True, - }, - ).download([url_video(video_id)]) - - -# ╭─────────╮ -# │ extract │ -# ╰─────────╯ - - -def extract(url: str) -> dict[str, Any]: - """Return extracted dict. - - :rtype: dict - """ - d = ytdl( - { - "extract_flat": True, - "skip_download": True, - }, - ).extract_info(url, download=False) - log.debug(d) - return d - - -def extract_playlist(playlist_id: str) -> dict: - """Return extracted playlist dict. - - :param playlist_id: playlist identifier - :type playlist_id: str - :rtype: dict - """ - return extract(url_playlist(playlist_id)) - - -def extract_playlists(channel_id: str) -> dict: - """Return extracted playlists dict. - - :param channel_id: channel identifier - :type channel_id: str - :rtype: dict - """ - return extract(url_playlists(channel_id)) - - -def extract_video(video_id: str) -> dict: - """Return extracted video dict. - - :param video_id: video identifier - :type video_id: str - :rtype: dict - """ - return extract(url_video(video_id)) - - -def extract_videos(channel_id: str) -> dict: - """Return extracted videos dict. - - :param channel_id: channel identifier - :type channel_id: str - :rtype: dict - """ - return extract(url_videos(channel_id)) - - -# ╭──────╮ -# │ next │ -# ╰──────╯ - - -def next_download(videos: list[str]) -> str | None: - for index, video_id in enumerate(videos): - if not Path(f"{video_id}.mp4").exists(): - log.info(f"{index} ∕ {len(videos)}") - return video_id - return None - - -# ╭─────╮ -# │ url │ -# ╰─────╯ - - -def url_channel(channel_id: str) -> str: - """Return channel URL. - - :param channel_id: channel identifier - :type channel_id: str - :rtype: str - """ - return f"{URL}/channel/{channel_id}" - - -def url_playlist(playlist_id: str) -> str: - """Return playlist URL. - - :param playlist_id: playlist identifier - :type playlist_id: str - :rtype: str - """ - return f"{URL}/playlist?list={playlist_id}" - - -def url_playlists(channel_id: str) -> str: - """Return playlists URL. - - :param channel_id: channel identifier - :type channel_id: str - :rtype: str - """ - return f"{url_channel(channel_id)}/playlists" - - -def url_video(video_id: str) -> str: - """Return video URL. - - :param video_id: video identifier - :type video_id: str - :rtype: str - """ - return f"{URL}/watch?v={video_id}" - - -def url_videos(channel_id: str) -> str: - """Return videos URL. - - :param channel_id: channel identifier - :type channel_id: str - :rtype: str - """ - return f"{url_channel(channel_id)}/videos" - - -# ╭──────╮ -# │ ytdl │ -# ╰──────╯ - - -def ytdl(opt: dict) -> YoutubeDL: - options = { - **opt, - "ignoreerrors": False, - "quiet": False, - } - log.info(options) - return YoutubeDL(options) diff --git a/rwx/sw/ytdlp/video.py b/rwx/sw/ytdlp/video.py deleted file mode 100644 index e69de29..0000000 diff --git a/rwx/web/__init__.py b/rwx/web/__init__.py deleted file mode 100644 index e746872..0000000 --- a/rwx/web/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -import requests - -from rwx import Object, txt -from rwx.txt import CHARSET - - -def fetch(url: str) -> str: - response = requests.get(url) - response.raise_for_status() - return response.text - - -class Page(Object): - def __init__(self): - self.charset = CHARSET - self.description = "" - self.title = "" - - def render(self) -> str: - return f"""\ - - - - - - -{self.title} - - - - -""" diff --git a/sh/alias/git.sh b/sh/alias/git.sh index fe5380a..cadd10b 100644 --- a/sh/alias/git.sh +++ b/sh/alias/git.sh @@ -307,14 +307,6 @@ a__git_log_all() { "${@}" } -# log all history as oneline -glao() { a__git_log_all_oneline "${@}"; } -a__git_log_all_oneline() { - a__git_log_all \ - --oneline \ - "${@}" -} - # log all history with patches glap() { a__git_log_all_patch "${@}"; } a__git_log_all_patch() { @@ -324,14 +316,6 @@ a__git_log_all_patch() { "${@}" } -# log history as oneline -glo() { a__git_log_oneline "${@}"; } -a__git_log_oneline() { - a__git_log \ - --oneline \ - "${@}" -} - # log history with patches glp() { a__git_log_patch "${@}"; } a__git_log_patch() { diff --git a/sh/cryptsetup.sh b/sh/cryptsetup.sh index fd20a64..900083d 100644 --- a/sh/cryptsetup.sh +++ b/sh/cryptsetup.sh @@ -1,127 +1,6 @@ -_rwx_cmd_cs() { rwx_crypt "${@}"; } +_rwx_cmd_cs() { rwx_crypt_setup "${@}"; } -RWX_CRYPT_ROOT="/data/home/user/crypt" -RWX_CRYPT_VAR="/var/lib/crypt" - -rwx_crypt_device() { - local device size - local index=0 - while [ -z "${device}" ]; do - device="/dev/nbd${index}" - if [ -b "${device}" ]; then - size="$(cat /sys/block/nbd"${index}/size")" - [ "${size}" -eq 0 ] || - device="" - else - device="" - break - fi - index=$((index + 1)) - done - if [ -n "${device}" ]; then - echo "${device}" - else - rwx_log_error 1 "No device available" - fi -} - -rwx_crypt() { +rwx_crypt_setup() { local action="${1}" - local action_close="close" - local action_open="open" - local mapper="/dev/mapper" - local mount_root="/media" - local crypt_arg crypt_file crypt_map crypt_mount pass_phrase - case "${action}" in - "${action_close}" | "${action_open}") - shift - local user_id - user_id="$(id --user)" - [ "${user_id}" -eq 0 ] || - rwx_log_error 1 "Not root" - [ -n "${1}" ] || - rwx_log_error 2 "No files" - [ "${action}" = "${action_open}" ] && - pass_phrase="$(rwx_read_passphrase)" - for crypt_arg in "${@}"; do - rwx_log_info - crypt_file="${RWX_CRYPT_ROOT}/${crypt_arg}.qcow2" - if [ -f "${crypt_file}" ]; then - crypt_map="${mapper}/${crypt_arg}" - crypt_mount="${mount_root}/${crypt_arg}" - local device - case "${action}" in - "${action_open}") - # find device - if ! device="$(rwx_crypt_device)"; then - rwx_log_error 4 "No device available" - fi - # make directory - if ! mkdir --parents "${RWX_CRYPT_VAR}"; then - rwx_log_error 5 "Making failure: ${RWX_CRYPT_VAR}" - fi - # record device - if ! rwx_file_write \ - "${RWX_CRYPT_VAR}/${crypt_arg}" "${device}"; then - rwx_log_error 6 "Writing failure: ${device}" - fi - # connect device - if ! qemu-nbd --connect "${device}" "${crypt_file}"; then - rwx_log_error 7 "Connection failure: ${device}" - fi - # open device - if ! echo "${pass_phrase}" | - cryptsetup luksOpen "${device}" "${crypt_arg}"; then - rwx_log_error 8 "Opening failure: ${device}" - fi - # make mount directory - if ! mkdir --parents "${crypt_mount}"; then - rwx_log_error 9 "Making failure: ${crypt_mount}" - fi - # mount file system - if ! mount \ - --options "autodefrag,compress-force=zstd" \ - "${crypt_map}" "${crypt_mount}"; then - rwx_log_error 10 "Mounting failure: ${crypt_map}" - fi - ;; - "${action_close}") - # unmount file system - if ! umount "${crypt_mount}"; then - rwx_log_error 4 "Unmounting failure: ${crypt_mount}" - fi - # remove mount directory - if ! rmdir "${crypt_mount}"; then - rwx_log_error 5 "Removal failure: ${crypt_mount}" - fi - # close device - if ! cryptsetup luksClose "${crypt_arg}"; then - rwx_log_error 6 "Closing failure: ${crypt_arg}" - fi - # load device - if ! device="$(cat "${RWX_CRYPT_VAR}/${crypt_arg}")"; then - rwx_log_error 7 "Loading failure: ${crypt_arg}" - fi - # disconnect device - if ! qemu-nbd --disconnect "${device}"; then - rwx_log_error 8 "Disconnection failure: ${device}" - fi - # remove record - if ! rm "${RWX_CRYPT_VAR}/${crypt_arg}"; then - rwx_log_error 9 "Removal failure: ${crypt_arg}" - fi - ;; - *) ;; - esac - else - rwx_log_error 3 "Not a file: ${crypt_file}" - fi - done - ;; - *) - rwx_log_info "Usage:" - rwx_log_info "${action_close}|${action_open}" - # TODO list - ;; - esac + echo "cs: ${action}" } diff --git a/sh/ffmpeg.sh b/sh/ffmpeg.sh index c32ad06..bcb3a62 100644 --- a/sh/ffmpeg.sh +++ b/sh/ffmpeg.sh @@ -72,9 +72,8 @@ rwx_ffmpeg_input_hdmi() { set -- \ -f "v4l2" \ -video_size "1920x1080" \ - -framerate "120" \ - -input_format "yuv420p" \ - -pix_fmt "yuv420p" \ + -framerate "60" \ + -input_format "yuyv422" \ -i "${device}" local argument for argument in "${@}"; do echo "${argument}"; done diff --git a/sh/fs.sh b/sh/fs.sh index 0db5696..ac46f6d 100644 --- a/sh/fs.sh +++ b/sh/fs.sh @@ -5,7 +5,7 @@ rwx_fs_make_btrfs() { if [ -b "${device}" ]; then set -- \ --force \ - --checksum "blake2" + --checksum "sha256" if [ -n "${label}" ]; then set -- "${@}" \ --label "${label}" diff --git a/sh/python.sh b/sh/python.sh index 59773c7..44ea181 100644 --- a/sh/python.sh +++ b/sh/python.sh @@ -9,6 +9,6 @@ rwx_python_venv() { local path="${1}" [ -d "${path}" ] || return 1 - export VIRTUAL_ENV="${path}" && + export VIRTUAL_ENV="${path}" && \ export PATH="${VIRTUAL_ENV}/bin:${PATH}" }