Compare commits

..

58 commits

Author SHA1 Message Date
7eae3bed9f
render
Some checks failed
/ job (push) Failing after 5m45s
2025-06-07 23:14:21 +02:00
5efa62dd40
build→render 2025-06-07 23:11:01 +02:00
d1a3de55ab
hdmi/yuv420p 2025-06-07 17:40:29 +02:00
5c3f75caa1
hdmi/yuv420p 2025-06-07 17:40:28 +02:00
ce64258fce
web/fetch 2025-06-07 17:40:28 +02:00
510371ef24
glao,glo 2025-06-07 17:40:28 +02:00
a7219cf7fa
cs/fix 2025-06-07 17:40:27 +02:00
715708dc3d
cs/mkdir 2025-06-07 17:40:27 +02:00
930f296dc8
cs/rm 2025-06-07 17:40:27 +02:00
40d6d79394
cs/wip 2025-06-07 17:40:27 +02:00
8d869993a7
cs/home 2025-06-07 17:40:26 +02:00
d034a99da7
crypt/disconnect 2025-06-07 17:40:26 +02:00
36a0479882
crypt/open,close 2025-06-07 17:40:26 +02:00
aa2ed7014b
crypt/device 2025-06-07 17:40:26 +02:00
f4f1aeaccf
cs/device 2025-06-07 17:40:25 +02:00
e0ba419f2a
cs/root 2025-06-07 17:40:25 +02:00
c6f6d42af6
dirs 2025-06-07 17:40:25 +02:00
bb89404513
2025-06-07 17:40:24 +02:00
f4136fd5a9
cs/draft 2025-06-07 17:40:24 +02:00
fd4184c6a8
descut 2025-06-07 17:40:24 +02:00
cd3a584ef1
url 2025-06-07 17:40:24 +02:00
1cf50619fd
captions 2025-06-07 17:40:23 +02:00
12ac6cad39
subs 2025-06-07 17:40:23 +02:00
b4dfb3e597
wip 2025-06-07 17:40:23 +02:00
59ee8ed161
lint 2025-06-07 17:40:22 +02:00
c3fad738b3
wip 2025-06-07 17:40:22 +02:00
7ea1f10c89
formats 2025-06-07 17:40:22 +02:00
b7e4add332
ext 2025-06-07 17:40:22 +02:00
aaeb9e6e47
webm 2025-06-07 17:40:21 +02:00
a23e0eb12c
yaml 2025-06-07 17:40:21 +02:00
e43409d40c
toml 2025-06-07 17:40:21 +02:00
49dd4728f6
pl 2025-06-07 17:40:20 +02:00
5a40282ac3
channel 2025-06-07 17:40:20 +02:00
35ab34c67d
thumb 2025-06-07 17:40:20 +02:00
f180a07564
thumbs 2025-06-07 17:40:20 +02:00
a1584c4f2e
wip 2025-06-07 17:40:19 +02:00
4fd1a87f7a
timestamp 2025-06-07 17:40:19 +02:00
f73667a5d3
wip 2025-06-07 17:40:19 +02:00
979aaf1299
fix 2025-06-07 17:40:18 +02:00
3e04daeb8b
useless 2025-06-07 17:40:18 +02:00
cc4eb0e686
useless 2025-06-07 17:40:18 +02:00
e81cd5b745
wip/cache 2025-06-07 17:40:18 +02:00
75d54d3dbf
lint 2025-06-07 17:40:17 +02:00
899b1383de
lint 2025-06-07 17:40:17 +02:00
e75692ee26
ytdlp 2025-06-07 17:40:17 +02:00
3f41a52e56
filters 2025-06-07 17:40:16 +02:00
1c44e0cc7f
next 2025-06-07 17:40:16 +02:00
46073e6e3a
sponsors 2025-06-07 17:40:16 +02:00
5b091397d2
download,next 2025-06-07 17:40:16 +02:00
1f046e4ba9
any 2025-06-07 17:40:15 +02:00
9e37e3be95
extract 2025-06-07 17:40:15 +02:00
6a2421922b
wip 2025-06-07 17:40:15 +02:00
61ea62a272
wip 2025-06-07 17:40:15 +02:00
0b5d96116c
wip 2025-06-07 17:40:14 +02:00
02cf8f83e7
cp 2025-06-07 17:40:14 +02:00
a167190d6c
lint 2025-06-07 17:40:14 +02:00
268f68aeec
btrfs/blake2 2025-06-07 17:37:51 +02:00
32703df110
runs-on & project.build
All checks were successful
/ job (push) Successful in 3m17s
2025-05-25 15:55:51 +02:00
12 changed files with 578 additions and 10 deletions

View file

@ -1,6 +1,7 @@
on: [push]
jobs:
job:
runs-on: ubuntu-latest
container:
image: ${{vars.DOCKER}}debian:bookworm
steps:

View file

View file

@ -2,9 +2,9 @@
import os
import shutil
from pathlib import Path
import tomllib
import yaml
from pathlib import Path
from rwx import ps
@ -133,6 +133,19 @@ def read_file_text(file_path: Path, charset: str = CHARSET) -> str:
return read_file_bytes(file_path).decode(charset)
def read_file_yaml(file_path: Path, charset: str = CHARSET) -> dict | list:
"""Read whole file as yaml object.
:param file_path: source input file
:type file_path: Path
:param charset: charset to use for decoding input
:type charset: str
:rtype: dict
"""
text = read_file_text(file_path, charset)
return yaml.safe_load(text)
def wipe(path: Path) -> None:
"""Wipe provided path, whether directory or file.

View file

@ -3,6 +3,7 @@
from pathlib import Path
from rwx import Object
from rwx.ps import run
class Project(Object):
@ -18,3 +19,7 @@ class Project(Object):
self.file = self.raw.resolve()
self.root: Path = self.file.parent
self.name: str = self.root.name
def build(self) -> None:
"""Build the project."""
run(str(self.root / "render.py"))

379
rwx/sw/ytdlp/__init__.py Normal file
View file

@ -0,0 +1,379 @@
"""YouTube DownLoad."""
from datetime import datetime
from pathlib import Path
from typing import Any
from yt_dlp import YoutubeDL
from rwx import Object
from rwx.fs import read_file_yaml
from rwx.log import stream as log
SUBTITLES_EXTENSIONS = ["vtt"]
TIMESTAMP = "%Y%m%d%H%M%S"
URL = "https://youtube.com"
# ╭─────────╮
# │ classes │
# ╰─────────╯
class Cache(Object):
"""YouTube local cache."""
def __init__(self, root_file: Path) -> None:
self.root_file = root_file.resolve()
self.root_directory = self.root_file.parent
self.load()
def load(self) -> None:
d = read_file_yaml(self.root_file)
log.info(d)
class Channel(Object):
"""YouTube channel."""
def __init__(self, channel_id: str) -> None:
"""Set objects tree.
:param channel_id: channel identifier
:type channel_id: str
"""
d = extract_videos(channel_id)
# channel
self.uid = d["channel_id"]
self.title = d["channel"]
self.followers = int(d["channel_follower_count"])
self.description = d["description"]
self.tags = d["tags"]
self.thumbnails = [thumbnail["url"] for thumbnail in d["thumbnails"]]
self.thumbnail = self.thumbnails[-1]
self.uploader_id = d["uploader_id"]
self.uploader = d["uploader"]
# videos
self.videos = [
Video(entry)
for entry in reversed(d["entries"])
if entry["availability"] != "subscriber_only"
]
# playlists
d = extract_playlists(channel_id)
self.playlists = [Playlist(entry) for entry in reversed(d["entries"])]
# TODO Format
class Format(Object):
"""YouTube format."""
@staticmethod
def get(d: dict, key: str) -> str | None:
value = d.get(key)
match value:
case "none":
return None
case _:
return value
def __init__(self, d: dict) -> None:
"""Set format info.
:param d: format info
:type d: dict
"""
self.uid = d["format_id"]
self.extension = d["ext"]
self.filesize = d.get("filesize")
self.filesize_approx = d.get("filesize_approx")
self.language = d.get("language")
self.quality = d.get("quality")
# video
self.video_codec = Format.get(d, "vcodec")
if self.video_codec:
self.video_bit_rate = d["vbr"]
self.video_dynamic_range = d["dynamic_range"]
self.video_extension = d["video_ext"]
self.video_fps = d["fps"]
self.video_height = int(d["height"])
self.video_width = int(d["width"])
else:
del self.video_codec
# audio
self.audio_codec = Format.get(d, "acodec")
if self.audio_codec:
self.audio_bit_rate = d["abr"]
self.audio_channels = int(d["audio_channels"])
self.audio_extension = d["audio_ext"]
self.audio_sampling_rate = d["asr"]
else:
del self.audio_codec
def audio(self) -> str:
return f"{self.uid} \
{self.audio_sampling_rate} × {self.audio_channels} \
@ {self.audio_bit_rate} × {self.audio_codec}"
def video(self) -> str:
return f"{self.uid} \
{self.video_width} × {self.video_height} × {self.video_fps} \
@ {self.video_bit_rate} × {self.video_codec}"
# TODO Playlist/extra
class Playlist(Object):
"""YouTube playlist."""
def __init__(self, d: dict) -> None:
"""Set playlist info.
:param d: playlist info
:type d: dict
"""
self.uid = d["id"]
self.title = d["title"]
class Subtitles(Object):
"""YouTube subtitles."""
def __init__(self, uid: str, d: dict) -> None:
"""Set subtitles info.
:param d: subtitles info
:type d: dict
"""
self.uid = uid
self.extension = d["ext"]
self.name = d["name"]
self.url = d["url"]
# TODO Thumbnail
class Video(Object):
"""YouTube video."""
def __init__(self, d: dict) -> None:
"""Set video info.
:param d: video info
:type d: dict
"""
self.description_cut = d["description"]
self.uid = d["id"]
self.title = d["title"]
self.duration = int(d["duration"])
self.thumbnail = d["thumbnails"][-1]["url"]
def load_extra(self):
self.at = datetime.now().strftime(TIMESTAMP)
d = extract_video(self.uid)
self.audio_formats = []
self.video_formats = []
for entry in d["formats"]:
f = Format(entry)
if hasattr(f, "video_codec"):
self.video_format = f
self.video_formats.append(f)
elif hasattr(f, "audio_codec"):
self.audio_format = f
self.audio_formats.append(f)
thumbnail = d["thumbnails"][-1]["url"]
# TODO compare existing thumbnail
self.description = d["description"]
self.channel_id = d["channel_id"]
self.duration = int(d["duration"])
self.views = int(d["view_count"])
self.categories = d["categories"]
self.tags = d["tags"]
self.automatic_captions = []
for uid, entries in d["automatic_captions"].items():
for entry in entries:
subtitles = Subtitles(uid, entry)
if subtitles.extension in SUBTITLES_EXTENSIONS:
self.automatic_captions.append(subtitles)
self.subtitles = []
for uid, entries in d["subtitles"].items():
for entry in entries:
subtitles = Subtitles(uid, entry)
if subtitles.extension in SUBTITLES_EXTENSIONS:
self.subtitles.append(subtitles)
self.chapters = d["chapters"]
self.likes = d["like_count"]
self.timestamp = datetime.fromtimestamp(d["timestamp"]).strftime(
TIMESTAMP
)
self.fulltitle = d["fulltitle"]
# ╭──────────╮
# │ download │
# ╰──────────╯
def download_video(video_id: str | None) -> None:
if video_id:
ytdl(
{
"format": "bestvideo[ext=webm]+bestaudio[ext=webm]",
"outtmpl": "%(id)s.%(ext)s",
"postprocessors": [
{
"key": "SponsorBlock",
"categories": ["sponsor"],
},
{
"key": "ModifyChapters",
"remove_sponsor_segments": ["sponsor"],
},
],
"writesubtitles": True,
"writethumbnail": True,
},
).download([url_video(video_id)])
# ╭─────────╮
# │ extract │
# ╰─────────╯
def extract(url: str) -> dict[str, Any]:
"""Return extracted dict.
:rtype: dict
"""
d = ytdl(
{
"extract_flat": True,
"skip_download": True,
},
).extract_info(url, download=False)
log.debug(d)
return d
def extract_playlist(playlist_id: str) -> dict:
"""Return extracted playlist dict.
:param playlist_id: playlist identifier
:type playlist_id: str
:rtype: dict
"""
return extract(url_playlist(playlist_id))
def extract_playlists(channel_id: str) -> dict:
"""Return extracted playlists dict.
:param channel_id: channel identifier
:type channel_id: str
:rtype: dict
"""
return extract(url_playlists(channel_id))
def extract_video(video_id: str) -> dict:
"""Return extracted video dict.
:param video_id: video identifier
:type video_id: str
:rtype: dict
"""
return extract(url_video(video_id))
def extract_videos(channel_id: str) -> dict:
"""Return extracted videos dict.
:param channel_id: channel identifier
:type channel_id: str
:rtype: dict
"""
return extract(url_videos(channel_id))
# ╭──────╮
# │ next │
# ╰──────╯
def next_download(videos: list[str]) -> str | None:
for index, video_id in enumerate(videos):
if not Path(f"{video_id}.mp4").exists():
log.info(f"{index} ∕ {len(videos)}")
return video_id
return None
# ╭─────╮
# │ url │
# ╰─────╯
def url_channel(channel_id: str) -> str:
"""Return channel URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{URL}/channel/{channel_id}"
def url_playlist(playlist_id: str) -> str:
"""Return playlist URL.
:param playlist_id: playlist identifier
:type playlist_id: str
:rtype: str
"""
return f"{URL}/playlist?list={playlist_id}"
def url_playlists(channel_id: str) -> str:
"""Return playlists URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{url_channel(channel_id)}/playlists"
def url_video(video_id: str) -> str:
"""Return video URL.
:param video_id: video identifier
:type video_id: str
:rtype: str
"""
return f"{URL}/watch?v={video_id}"
def url_videos(channel_id: str) -> str:
"""Return videos URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{url_channel(channel_id)}/videos"
# ╭──────╮
# │ ytdl │
# ╰──────╯
def ytdl(opt: dict) -> YoutubeDL:
options = {
**opt,
"ignoreerrors": False,
"quiet": False,
}
log.info(options)
return YoutubeDL(options)

0
rwx/sw/ytdlp/video.py Normal file
View file

32
rwx/web/__init__.py Normal file
View file

@ -0,0 +1,32 @@
import requests
from rwx import Object, txt
from rwx.txt import CHARSET
def fetch(url: str) -> str:
response = requests.get(url)
response.raise_for_status()
return response.text
class Page(Object):
def __init__(self):
self.charset = CHARSET
self.description = ""
self.title = ""
def render(self) -> str:
return f"""\
<!DOCTYPE html>
<html>
<head>
<meta charset="{self.charset}">
<meta name="description" content="{self.description}">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{self.title}</title>
</head>
<body>
</body>
</html>
"""

View file

@ -307,6 +307,14 @@ a__git_log_all() {
"${@}"
}
# log all history as oneline
glao() { a__git_log_all_oneline "${@}"; }
a__git_log_all_oneline() {
a__git_log_all \
--oneline \
"${@}"
}
# log all history with patches
glap() { a__git_log_all_patch "${@}"; }
a__git_log_all_patch() {
@ -316,6 +324,14 @@ a__git_log_all_patch() {
"${@}"
}
# log history as oneline
glo() { a__git_log_oneline "${@}"; }
a__git_log_oneline() {
a__git_log \
--oneline \
"${@}"
}
# log history with patches
glp() { a__git_log_patch "${@}"; }
a__git_log_patch() {

View file

@ -1,6 +1,127 @@
_rwx_cmd_cs() { rwx_crypt_setup "${@}"; }
_rwx_cmd_cs() { rwx_crypt "${@}"; }
rwx_crypt_setup() {
local action="${1}"
echo "cs: ${action}"
RWX_CRYPT_ROOT="/data/home/user/crypt"
RWX_CRYPT_VAR="/var/lib/crypt"
rwx_crypt_device() {
local device size
local index=0
while [ -z "${device}" ]; do
device="/dev/nbd${index}"
if [ -b "${device}" ]; then
size="$(cat /sys/block/nbd"${index}/size")"
[ "${size}" -eq 0 ] ||
device=""
else
device=""
break
fi
index=$((index + 1))
done
if [ -n "${device}" ]; then
echo "${device}"
else
rwx_log_error 1 "No device available"
fi
}
rwx_crypt() {
local action="${1}"
local action_close="close"
local action_open="open"
local mapper="/dev/mapper"
local mount_root="/media"
local crypt_arg crypt_file crypt_map crypt_mount pass_phrase
case "${action}" in
"${action_close}" | "${action_open}")
shift
local user_id
user_id="$(id --user)"
[ "${user_id}" -eq 0 ] ||
rwx_log_error 1 "Not root"
[ -n "${1}" ] ||
rwx_log_error 2 "No files"
[ "${action}" = "${action_open}" ] &&
pass_phrase="$(rwx_read_passphrase)"
for crypt_arg in "${@}"; do
rwx_log_info
crypt_file="${RWX_CRYPT_ROOT}/${crypt_arg}.qcow2"
if [ -f "${crypt_file}" ]; then
crypt_map="${mapper}/${crypt_arg}"
crypt_mount="${mount_root}/${crypt_arg}"
local device
case "${action}" in
"${action_open}")
# find device
if ! device="$(rwx_crypt_device)"; then
rwx_log_error 4 "No device available"
fi
# make directory
if ! mkdir --parents "${RWX_CRYPT_VAR}"; then
rwx_log_error 5 "Making failure: ${RWX_CRYPT_VAR}"
fi
# record device
if ! rwx_file_write \
"${RWX_CRYPT_VAR}/${crypt_arg}" "${device}"; then
rwx_log_error 6 "Writing failure: ${device}"
fi
# connect device
if ! qemu-nbd --connect "${device}" "${crypt_file}"; then
rwx_log_error 7 "Connection failure: ${device}"
fi
# open device
if ! echo "${pass_phrase}" |
cryptsetup luksOpen "${device}" "${crypt_arg}"; then
rwx_log_error 8 "Opening failure: ${device}"
fi
# make mount directory
if ! mkdir --parents "${crypt_mount}"; then
rwx_log_error 9 "Making failure: ${crypt_mount}"
fi
# mount file system
if ! mount \
--options "autodefrag,compress-force=zstd" \
"${crypt_map}" "${crypt_mount}"; then
rwx_log_error 10 "Mounting failure: ${crypt_map}"
fi
;;
"${action_close}")
# unmount file system
if ! umount "${crypt_mount}"; then
rwx_log_error 4 "Unmounting failure: ${crypt_mount}"
fi
# remove mount directory
if ! rmdir "${crypt_mount}"; then
rwx_log_error 5 "Removal failure: ${crypt_mount}"
fi
# close device
if ! cryptsetup luksClose "${crypt_arg}"; then
rwx_log_error 6 "Closing failure: ${crypt_arg}"
fi
# load device
if ! device="$(cat "${RWX_CRYPT_VAR}/${crypt_arg}")"; then
rwx_log_error 7 "Loading failure: ${crypt_arg}"
fi
# disconnect device
if ! qemu-nbd --disconnect "${device}"; then
rwx_log_error 8 "Disconnection failure: ${device}"
fi
# remove record
if ! rm "${RWX_CRYPT_VAR}/${crypt_arg}"; then
rwx_log_error 9 "Removal failure: ${crypt_arg}"
fi
;;
*) ;;
esac
else
rwx_log_error 3 "Not a file: ${crypt_file}"
fi
done
;;
*)
rwx_log_info "Usage:"
rwx_log_info "${action_close}|${action_open}"
# TODO list
;;
esac
}

View file

@ -72,8 +72,9 @@ rwx_ffmpeg_input_hdmi() {
set -- \
-f "v4l2" \
-video_size "1920x1080" \
-framerate "60" \
-input_format "yuyv422" \
-framerate "120" \
-input_format "yuv420p" \
-pix_fmt "yuv420p" \
-i "${device}"
local argument
for argument in "${@}"; do echo "${argument}"; done

View file

@ -5,7 +5,7 @@ rwx_fs_make_btrfs() {
if [ -b "${device}" ]; then
set -- \
--force \
--checksum "sha256"
--checksum "blake2"
if [ -n "${label}" ]; then
set -- "${@}" \
--label "${label}"

View file

@ -9,6 +9,6 @@
rwx_python_venv() {
local path="${1}"
[ -d "${path}" ] || return 1
export VIRTUAL_ENV="${path}" && \
export VIRTUAL_ENV="${path}" &&
export PATH="${VIRTUAL_ENV}/bin:${PATH}"
}