diff --git a/.forgejo/workflows/main.yaml b/.forgejo/workflows/main.yaml index bcc505a..123fb8c 100644 --- a/.forgejo/workflows/main.yaml +++ b/.forgejo/workflows/main.yaml @@ -1,6 +1,7 @@ on: [push] jobs: job: + runs-on: ubuntu-latest container: image: ${{vars.DOCKER}}debian:bookworm steps: diff --git a/build.py b/render.py similarity index 100% rename from build.py rename to render.py diff --git a/rwx/fs/__init__.py b/rwx/fs/__init__.py index 8a45288..b55713a 100644 --- a/rwx/fs/__init__.py +++ b/rwx/fs/__init__.py @@ -2,9 +2,9 @@ import os import shutil -from pathlib import Path - import tomllib +import yaml +from pathlib import Path from rwx import ps @@ -133,6 +133,19 @@ def read_file_text(file_path: Path, charset: str = CHARSET) -> str: return read_file_bytes(file_path).decode(charset) +def read_file_yaml(file_path: Path, charset: str = CHARSET) -> dict | list: + """Read whole file as yaml object. + + :param file_path: source input file + :type file_path: Path + :param charset: charset to use for decoding input + :type charset: str + :rtype: dict + """ + text = read_file_text(file_path, charset) + return yaml.safe_load(text) + + def wipe(path: Path) -> None: """Wipe provided path, whether directory or file. diff --git a/rwx/prj/__init__.py b/rwx/prj/__init__.py index 62d91d9..f545f73 100644 --- a/rwx/prj/__init__.py +++ b/rwx/prj/__init__.py @@ -3,6 +3,7 @@ from pathlib import Path from rwx import Object +from rwx.ps import run class Project(Object): @@ -18,3 +19,7 @@ class Project(Object): self.file = self.raw.resolve() self.root: Path = self.file.parent self.name: str = self.root.name + + def build(self) -> None: + """Build the project.""" + run(str(self.root / "render.py")) diff --git a/rwx/sw/ytdlp/__init__.py b/rwx/sw/ytdlp/__init__.py new file mode 100644 index 0000000..9045781 --- /dev/null +++ b/rwx/sw/ytdlp/__init__.py @@ -0,0 +1,379 @@ +"""YouTube DownLoad.""" + +from datetime import datetime +from pathlib import Path +from typing import Any + +from yt_dlp import YoutubeDL + +from rwx import Object +from rwx.fs import read_file_yaml +from rwx.log import stream as log + +SUBTITLES_EXTENSIONS = ["vtt"] +TIMESTAMP = "%Y%m%d%H%M%S" +URL = "https://youtube.com" + + +# ╭─────────╮ +# │ classes │ +# ╰─────────╯ + + +class Cache(Object): + """YouTube local cache.""" + + def __init__(self, root_file: Path) -> None: + self.root_file = root_file.resolve() + self.root_directory = self.root_file.parent + self.load() + + def load(self) -> None: + d = read_file_yaml(self.root_file) + log.info(d) + + +class Channel(Object): + """YouTube channel.""" + + def __init__(self, channel_id: str) -> None: + """Set objects tree. + + :param channel_id: channel identifier + :type channel_id: str + """ + d = extract_videos(channel_id) + # channel + self.uid = d["channel_id"] + self.title = d["channel"] + self.followers = int(d["channel_follower_count"]) + self.description = d["description"] + self.tags = d["tags"] + self.thumbnails = [thumbnail["url"] for thumbnail in d["thumbnails"]] + self.thumbnail = self.thumbnails[-1] + self.uploader_id = d["uploader_id"] + self.uploader = d["uploader"] + # videos + self.videos = [ + Video(entry) + for entry in reversed(d["entries"]) + if entry["availability"] != "subscriber_only" + ] + # playlists + d = extract_playlists(channel_id) + self.playlists = [Playlist(entry) for entry in reversed(d["entries"])] + + +# TODO Format +class Format(Object): + """YouTube format.""" + + @staticmethod + def get(d: dict, key: str) -> str | None: + value = d.get(key) + match value: + case "none": + return None + case _: + return value + + def __init__(self, d: dict) -> None: + """Set format info. + + :param d: format info + :type d: dict + """ + self.uid = d["format_id"] + self.extension = d["ext"] + self.filesize = d.get("filesize") + self.filesize_approx = d.get("filesize_approx") + self.language = d.get("language") + self.quality = d.get("quality") + # video + self.video_codec = Format.get(d, "vcodec") + if self.video_codec: + self.video_bit_rate = d["vbr"] + self.video_dynamic_range = d["dynamic_range"] + self.video_extension = d["video_ext"] + self.video_fps = d["fps"] + self.video_height = int(d["height"]) + self.video_width = int(d["width"]) + else: + del self.video_codec + # audio + self.audio_codec = Format.get(d, "acodec") + if self.audio_codec: + self.audio_bit_rate = d["abr"] + self.audio_channels = int(d["audio_channels"]) + self.audio_extension = d["audio_ext"] + self.audio_sampling_rate = d["asr"] + else: + del self.audio_codec + + def audio(self) -> str: + return f"{self.uid} \ +→ {self.audio_sampling_rate} × {self.audio_channels} \ +@ {self.audio_bit_rate} × {self.audio_codec}" + + def video(self) -> str: + return f"{self.uid} \ +→ {self.video_width} × {self.video_height} × {self.video_fps} \ +@ {self.video_bit_rate} × {self.video_codec}" + + +# TODO Playlist/extra +class Playlist(Object): + """YouTube playlist.""" + + def __init__(self, d: dict) -> None: + """Set playlist info. + + :param d: playlist info + :type d: dict + """ + self.uid = d["id"] + self.title = d["title"] + + +class Subtitles(Object): + """YouTube subtitles.""" + + def __init__(self, uid: str, d: dict) -> None: + """Set subtitles info. + + :param d: subtitles info + :type d: dict + """ + self.uid = uid + self.extension = d["ext"] + self.name = d["name"] + self.url = d["url"] + + +# TODO Thumbnail + + +class Video(Object): + """YouTube video.""" + + def __init__(self, d: dict) -> None: + """Set video info. + + :param d: video info + :type d: dict + """ + self.description_cut = d["description"] + self.uid = d["id"] + self.title = d["title"] + self.duration = int(d["duration"]) + self.thumbnail = d["thumbnails"][-1]["url"] + + def load_extra(self): + self.at = datetime.now().strftime(TIMESTAMP) + d = extract_video(self.uid) + self.audio_formats = [] + self.video_formats = [] + for entry in d["formats"]: + f = Format(entry) + if hasattr(f, "video_codec"): + self.video_format = f + self.video_formats.append(f) + elif hasattr(f, "audio_codec"): + self.audio_format = f + self.audio_formats.append(f) + thumbnail = d["thumbnails"][-1]["url"] + # TODO compare existing thumbnail + self.description = d["description"] + self.channel_id = d["channel_id"] + self.duration = int(d["duration"]) + self.views = int(d["view_count"]) + self.categories = d["categories"] + self.tags = d["tags"] + self.automatic_captions = [] + for uid, entries in d["automatic_captions"].items(): + for entry in entries: + subtitles = Subtitles(uid, entry) + if subtitles.extension in SUBTITLES_EXTENSIONS: + self.automatic_captions.append(subtitles) + self.subtitles = [] + for uid, entries in d["subtitles"].items(): + for entry in entries: + subtitles = Subtitles(uid, entry) + if subtitles.extension in SUBTITLES_EXTENSIONS: + self.subtitles.append(subtitles) + self.chapters = d["chapters"] + self.likes = d["like_count"] + self.timestamp = datetime.fromtimestamp(d["timestamp"]).strftime( + TIMESTAMP + ) + self.fulltitle = d["fulltitle"] + + +# ╭──────────╮ +# │ download │ +# ╰──────────╯ + + +def download_video(video_id: str | None) -> None: + if video_id: + ytdl( + { + "format": "bestvideo[ext=webm]+bestaudio[ext=webm]", + "outtmpl": "%(id)s.%(ext)s", + "postprocessors": [ + { + "key": "SponsorBlock", + "categories": ["sponsor"], + }, + { + "key": "ModifyChapters", + "remove_sponsor_segments": ["sponsor"], + }, + ], + "writesubtitles": True, + "writethumbnail": True, + }, + ).download([url_video(video_id)]) + + +# ╭─────────╮ +# │ extract │ +# ╰─────────╯ + + +def extract(url: str) -> dict[str, Any]: + """Return extracted dict. + + :rtype: dict + """ + d = ytdl( + { + "extract_flat": True, + "skip_download": True, + }, + ).extract_info(url, download=False) + log.debug(d) + return d + + +def extract_playlist(playlist_id: str) -> dict: + """Return extracted playlist dict. + + :param playlist_id: playlist identifier + :type playlist_id: str + :rtype: dict + """ + return extract(url_playlist(playlist_id)) + + +def extract_playlists(channel_id: str) -> dict: + """Return extracted playlists dict. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: dict + """ + return extract(url_playlists(channel_id)) + + +def extract_video(video_id: str) -> dict: + """Return extracted video dict. + + :param video_id: video identifier + :type video_id: str + :rtype: dict + """ + return extract(url_video(video_id)) + + +def extract_videos(channel_id: str) -> dict: + """Return extracted videos dict. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: dict + """ + return extract(url_videos(channel_id)) + + +# ╭──────╮ +# │ next │ +# ╰──────╯ + + +def next_download(videos: list[str]) -> str | None: + for index, video_id in enumerate(videos): + if not Path(f"{video_id}.mp4").exists(): + log.info(f"{index} ∕ {len(videos)}") + return video_id + return None + + +# ╭─────╮ +# │ url │ +# ╰─────╯ + + +def url_channel(channel_id: str) -> str: + """Return channel URL. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: str + """ + return f"{URL}/channel/{channel_id}" + + +def url_playlist(playlist_id: str) -> str: + """Return playlist URL. + + :param playlist_id: playlist identifier + :type playlist_id: str + :rtype: str + """ + return f"{URL}/playlist?list={playlist_id}" + + +def url_playlists(channel_id: str) -> str: + """Return playlists URL. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: str + """ + return f"{url_channel(channel_id)}/playlists" + + +def url_video(video_id: str) -> str: + """Return video URL. + + :param video_id: video identifier + :type video_id: str + :rtype: str + """ + return f"{URL}/watch?v={video_id}" + + +def url_videos(channel_id: str) -> str: + """Return videos URL. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: str + """ + return f"{url_channel(channel_id)}/videos" + + +# ╭──────╮ +# │ ytdl │ +# ╰──────╯ + + +def ytdl(opt: dict) -> YoutubeDL: + options = { + **opt, + "ignoreerrors": False, + "quiet": False, + } + log.info(options) + return YoutubeDL(options) diff --git a/rwx/sw/ytdlp/video.py b/rwx/sw/ytdlp/video.py new file mode 100644 index 0000000..e69de29 diff --git a/rwx/web/__init__.py b/rwx/web/__init__.py new file mode 100644 index 0000000..e746872 --- /dev/null +++ b/rwx/web/__init__.py @@ -0,0 +1,32 @@ +import requests + +from rwx import Object, txt +from rwx.txt import CHARSET + + +def fetch(url: str) -> str: + response = requests.get(url) + response.raise_for_status() + return response.text + + +class Page(Object): + def __init__(self): + self.charset = CHARSET + self.description = "" + self.title = "" + + def render(self) -> str: + return f"""\ + + + + + + +{self.title} + + + + +""" diff --git a/sh/alias/git.sh b/sh/alias/git.sh index cadd10b..fe5380a 100644 --- a/sh/alias/git.sh +++ b/sh/alias/git.sh @@ -307,6 +307,14 @@ a__git_log_all() { "${@}" } +# log all history as oneline +glao() { a__git_log_all_oneline "${@}"; } +a__git_log_all_oneline() { + a__git_log_all \ + --oneline \ + "${@}" +} + # log all history with patches glap() { a__git_log_all_patch "${@}"; } a__git_log_all_patch() { @@ -316,6 +324,14 @@ a__git_log_all_patch() { "${@}" } +# log history as oneline +glo() { a__git_log_oneline "${@}"; } +a__git_log_oneline() { + a__git_log \ + --oneline \ + "${@}" +} + # log history with patches glp() { a__git_log_patch "${@}"; } a__git_log_patch() { diff --git a/sh/cryptsetup.sh b/sh/cryptsetup.sh index 900083d..fd20a64 100644 --- a/sh/cryptsetup.sh +++ b/sh/cryptsetup.sh @@ -1,6 +1,127 @@ -_rwx_cmd_cs() { rwx_crypt_setup "${@}"; } +_rwx_cmd_cs() { rwx_crypt "${@}"; } -rwx_crypt_setup() { - local action="${1}" - echo "cs: ${action}" +RWX_CRYPT_ROOT="/data/home/user/crypt" +RWX_CRYPT_VAR="/var/lib/crypt" + +rwx_crypt_device() { + local device size + local index=0 + while [ -z "${device}" ]; do + device="/dev/nbd${index}" + if [ -b "${device}" ]; then + size="$(cat /sys/block/nbd"${index}/size")" + [ "${size}" -eq 0 ] || + device="" + else + device="" + break + fi + index=$((index + 1)) + done + if [ -n "${device}" ]; then + echo "${device}" + else + rwx_log_error 1 "No device available" + fi +} + +rwx_crypt() { + local action="${1}" + local action_close="close" + local action_open="open" + local mapper="/dev/mapper" + local mount_root="/media" + local crypt_arg crypt_file crypt_map crypt_mount pass_phrase + case "${action}" in + "${action_close}" | "${action_open}") + shift + local user_id + user_id="$(id --user)" + [ "${user_id}" -eq 0 ] || + rwx_log_error 1 "Not root" + [ -n "${1}" ] || + rwx_log_error 2 "No files" + [ "${action}" = "${action_open}" ] && + pass_phrase="$(rwx_read_passphrase)" + for crypt_arg in "${@}"; do + rwx_log_info + crypt_file="${RWX_CRYPT_ROOT}/${crypt_arg}.qcow2" + if [ -f "${crypt_file}" ]; then + crypt_map="${mapper}/${crypt_arg}" + crypt_mount="${mount_root}/${crypt_arg}" + local device + case "${action}" in + "${action_open}") + # find device + if ! device="$(rwx_crypt_device)"; then + rwx_log_error 4 "No device available" + fi + # make directory + if ! mkdir --parents "${RWX_CRYPT_VAR}"; then + rwx_log_error 5 "Making failure: ${RWX_CRYPT_VAR}" + fi + # record device + if ! rwx_file_write \ + "${RWX_CRYPT_VAR}/${crypt_arg}" "${device}"; then + rwx_log_error 6 "Writing failure: ${device}" + fi + # connect device + if ! qemu-nbd --connect "${device}" "${crypt_file}"; then + rwx_log_error 7 "Connection failure: ${device}" + fi + # open device + if ! echo "${pass_phrase}" | + cryptsetup luksOpen "${device}" "${crypt_arg}"; then + rwx_log_error 8 "Opening failure: ${device}" + fi + # make mount directory + if ! mkdir --parents "${crypt_mount}"; then + rwx_log_error 9 "Making failure: ${crypt_mount}" + fi + # mount file system + if ! mount \ + --options "autodefrag,compress-force=zstd" \ + "${crypt_map}" "${crypt_mount}"; then + rwx_log_error 10 "Mounting failure: ${crypt_map}" + fi + ;; + "${action_close}") + # unmount file system + if ! umount "${crypt_mount}"; then + rwx_log_error 4 "Unmounting failure: ${crypt_mount}" + fi + # remove mount directory + if ! rmdir "${crypt_mount}"; then + rwx_log_error 5 "Removal failure: ${crypt_mount}" + fi + # close device + if ! cryptsetup luksClose "${crypt_arg}"; then + rwx_log_error 6 "Closing failure: ${crypt_arg}" + fi + # load device + if ! device="$(cat "${RWX_CRYPT_VAR}/${crypt_arg}")"; then + rwx_log_error 7 "Loading failure: ${crypt_arg}" + fi + # disconnect device + if ! qemu-nbd --disconnect "${device}"; then + rwx_log_error 8 "Disconnection failure: ${device}" + fi + # remove record + if ! rm "${RWX_CRYPT_VAR}/${crypt_arg}"; then + rwx_log_error 9 "Removal failure: ${crypt_arg}" + fi + ;; + *) ;; + esac + else + rwx_log_error 3 "Not a file: ${crypt_file}" + fi + done + ;; + *) + rwx_log_info "Usage:" + rwx_log_info "${action_close}|${action_open}" + # TODO list + ;; + esac } diff --git a/sh/ffmpeg.sh b/sh/ffmpeg.sh index bcb3a62..c32ad06 100644 --- a/sh/ffmpeg.sh +++ b/sh/ffmpeg.sh @@ -72,8 +72,9 @@ rwx_ffmpeg_input_hdmi() { set -- \ -f "v4l2" \ -video_size "1920x1080" \ - -framerate "60" \ - -input_format "yuyv422" \ + -framerate "120" \ + -input_format "yuv420p" \ + -pix_fmt "yuv420p" \ -i "${device}" local argument for argument in "${@}"; do echo "${argument}"; done diff --git a/sh/fs.sh b/sh/fs.sh index ac46f6d..0db5696 100644 --- a/sh/fs.sh +++ b/sh/fs.sh @@ -5,7 +5,7 @@ rwx_fs_make_btrfs() { if [ -b "${device}" ]; then set -- \ --force \ - --checksum "sha256" + --checksum "blake2" if [ -n "${label}" ]; then set -- "${@}" \ --label "${label}" diff --git a/sh/python.sh b/sh/python.sh index 44ea181..59773c7 100644 --- a/sh/python.sh +++ b/sh/python.sh @@ -9,6 +9,6 @@ rwx_python_venv() { local path="${1}" [ -d "${path}" ] || return 1 - export VIRTUAL_ENV="${path}" && \ + export VIRTUAL_ENV="${path}" && export PATH="${VIRTUAL_ENV}/bin:${PATH}" }