Compare commits

..

No commits in common. "7eae3bed9fd04c5730477d8e4c28b947e1142ae5" and "32703df11054e5dd0b072f195b322222cacce05b" have entirely different histories.

11 changed files with 10 additions and 572 deletions

View file

View file

@ -2,10 +2,10 @@
import os import os
import shutil import shutil
import tomllib
import yaml
from pathlib import Path from pathlib import Path
import tomllib
from rwx import ps from rwx import ps
CHARSET = "UTF-8" CHARSET = "UTF-8"
@ -133,19 +133,6 @@ def read_file_text(file_path: Path, charset: str = CHARSET) -> str:
return read_file_bytes(file_path).decode(charset) return read_file_bytes(file_path).decode(charset)
def read_file_yaml(file_path: Path, charset: str = CHARSET) -> dict | list:
"""Read whole file as yaml object.
:param file_path: source input file
:type file_path: Path
:param charset: charset to use for decoding input
:type charset: str
:rtype: dict
"""
text = read_file_text(file_path, charset)
return yaml.safe_load(text)
def wipe(path: Path) -> None: def wipe(path: Path) -> None:
"""Wipe provided path, whether directory or file. """Wipe provided path, whether directory or file.

View file

@ -22,4 +22,4 @@ class Project(Object):
def build(self) -> None: def build(self) -> None:
"""Build the project.""" """Build the project."""
run(str(self.root / "render.py")) run(str(self.root / "build.py"))

View file

@ -1,379 +0,0 @@
"""YouTube DownLoad."""
from datetime import datetime
from pathlib import Path
from typing import Any
from yt_dlp import YoutubeDL
from rwx import Object
from rwx.fs import read_file_yaml
from rwx.log import stream as log
SUBTITLES_EXTENSIONS = ["vtt"]
TIMESTAMP = "%Y%m%d%H%M%S"
URL = "https://youtube.com"
# ╭─────────╮
# │ classes │
# ╰─────────╯
class Cache(Object):
"""YouTube local cache."""
def __init__(self, root_file: Path) -> None:
self.root_file = root_file.resolve()
self.root_directory = self.root_file.parent
self.load()
def load(self) -> None:
d = read_file_yaml(self.root_file)
log.info(d)
class Channel(Object):
"""YouTube channel."""
def __init__(self, channel_id: str) -> None:
"""Set objects tree.
:param channel_id: channel identifier
:type channel_id: str
"""
d = extract_videos(channel_id)
# channel
self.uid = d["channel_id"]
self.title = d["channel"]
self.followers = int(d["channel_follower_count"])
self.description = d["description"]
self.tags = d["tags"]
self.thumbnails = [thumbnail["url"] for thumbnail in d["thumbnails"]]
self.thumbnail = self.thumbnails[-1]
self.uploader_id = d["uploader_id"]
self.uploader = d["uploader"]
# videos
self.videos = [
Video(entry)
for entry in reversed(d["entries"])
if entry["availability"] != "subscriber_only"
]
# playlists
d = extract_playlists(channel_id)
self.playlists = [Playlist(entry) for entry in reversed(d["entries"])]
# TODO Format
class Format(Object):
"""YouTube format."""
@staticmethod
def get(d: dict, key: str) -> str | None:
value = d.get(key)
match value:
case "none":
return None
case _:
return value
def __init__(self, d: dict) -> None:
"""Set format info.
:param d: format info
:type d: dict
"""
self.uid = d["format_id"]
self.extension = d["ext"]
self.filesize = d.get("filesize")
self.filesize_approx = d.get("filesize_approx")
self.language = d.get("language")
self.quality = d.get("quality")
# video
self.video_codec = Format.get(d, "vcodec")
if self.video_codec:
self.video_bit_rate = d["vbr"]
self.video_dynamic_range = d["dynamic_range"]
self.video_extension = d["video_ext"]
self.video_fps = d["fps"]
self.video_height = int(d["height"])
self.video_width = int(d["width"])
else:
del self.video_codec
# audio
self.audio_codec = Format.get(d, "acodec")
if self.audio_codec:
self.audio_bit_rate = d["abr"]
self.audio_channels = int(d["audio_channels"])
self.audio_extension = d["audio_ext"]
self.audio_sampling_rate = d["asr"]
else:
del self.audio_codec
def audio(self) -> str:
return f"{self.uid} \
{self.audio_sampling_rate} × {self.audio_channels} \
@ {self.audio_bit_rate} × {self.audio_codec}"
def video(self) -> str:
return f"{self.uid} \
{self.video_width} × {self.video_height} × {self.video_fps} \
@ {self.video_bit_rate} × {self.video_codec}"
# TODO Playlist/extra
class Playlist(Object):
"""YouTube playlist."""
def __init__(self, d: dict) -> None:
"""Set playlist info.
:param d: playlist info
:type d: dict
"""
self.uid = d["id"]
self.title = d["title"]
class Subtitles(Object):
"""YouTube subtitles."""
def __init__(self, uid: str, d: dict) -> None:
"""Set subtitles info.
:param d: subtitles info
:type d: dict
"""
self.uid = uid
self.extension = d["ext"]
self.name = d["name"]
self.url = d["url"]
# TODO Thumbnail
class Video(Object):
"""YouTube video."""
def __init__(self, d: dict) -> None:
"""Set video info.
:param d: video info
:type d: dict
"""
self.description_cut = d["description"]
self.uid = d["id"]
self.title = d["title"]
self.duration = int(d["duration"])
self.thumbnail = d["thumbnails"][-1]["url"]
def load_extra(self):
self.at = datetime.now().strftime(TIMESTAMP)
d = extract_video(self.uid)
self.audio_formats = []
self.video_formats = []
for entry in d["formats"]:
f = Format(entry)
if hasattr(f, "video_codec"):
self.video_format = f
self.video_formats.append(f)
elif hasattr(f, "audio_codec"):
self.audio_format = f
self.audio_formats.append(f)
thumbnail = d["thumbnails"][-1]["url"]
# TODO compare existing thumbnail
self.description = d["description"]
self.channel_id = d["channel_id"]
self.duration = int(d["duration"])
self.views = int(d["view_count"])
self.categories = d["categories"]
self.tags = d["tags"]
self.automatic_captions = []
for uid, entries in d["automatic_captions"].items():
for entry in entries:
subtitles = Subtitles(uid, entry)
if subtitles.extension in SUBTITLES_EXTENSIONS:
self.automatic_captions.append(subtitles)
self.subtitles = []
for uid, entries in d["subtitles"].items():
for entry in entries:
subtitles = Subtitles(uid, entry)
if subtitles.extension in SUBTITLES_EXTENSIONS:
self.subtitles.append(subtitles)
self.chapters = d["chapters"]
self.likes = d["like_count"]
self.timestamp = datetime.fromtimestamp(d["timestamp"]).strftime(
TIMESTAMP
)
self.fulltitle = d["fulltitle"]
# ╭──────────╮
# │ download │
# ╰──────────╯
def download_video(video_id: str | None) -> None:
if video_id:
ytdl(
{
"format": "bestvideo[ext=webm]+bestaudio[ext=webm]",
"outtmpl": "%(id)s.%(ext)s",
"postprocessors": [
{
"key": "SponsorBlock",
"categories": ["sponsor"],
},
{
"key": "ModifyChapters",
"remove_sponsor_segments": ["sponsor"],
},
],
"writesubtitles": True,
"writethumbnail": True,
},
).download([url_video(video_id)])
# ╭─────────╮
# │ extract │
# ╰─────────╯
def extract(url: str) -> dict[str, Any]:
"""Return extracted dict.
:rtype: dict
"""
d = ytdl(
{
"extract_flat": True,
"skip_download": True,
},
).extract_info(url, download=False)
log.debug(d)
return d
def extract_playlist(playlist_id: str) -> dict:
"""Return extracted playlist dict.
:param playlist_id: playlist identifier
:type playlist_id: str
:rtype: dict
"""
return extract(url_playlist(playlist_id))
def extract_playlists(channel_id: str) -> dict:
"""Return extracted playlists dict.
:param channel_id: channel identifier
:type channel_id: str
:rtype: dict
"""
return extract(url_playlists(channel_id))
def extract_video(video_id: str) -> dict:
"""Return extracted video dict.
:param video_id: video identifier
:type video_id: str
:rtype: dict
"""
return extract(url_video(video_id))
def extract_videos(channel_id: str) -> dict:
"""Return extracted videos dict.
:param channel_id: channel identifier
:type channel_id: str
:rtype: dict
"""
return extract(url_videos(channel_id))
# ╭──────╮
# │ next │
# ╰──────╯
def next_download(videos: list[str]) -> str | None:
for index, video_id in enumerate(videos):
if not Path(f"{video_id}.mp4").exists():
log.info(f"{index} ∕ {len(videos)}")
return video_id
return None
# ╭─────╮
# │ url │
# ╰─────╯
def url_channel(channel_id: str) -> str:
"""Return channel URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{URL}/channel/{channel_id}"
def url_playlist(playlist_id: str) -> str:
"""Return playlist URL.
:param playlist_id: playlist identifier
:type playlist_id: str
:rtype: str
"""
return f"{URL}/playlist?list={playlist_id}"
def url_playlists(channel_id: str) -> str:
"""Return playlists URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{url_channel(channel_id)}/playlists"
def url_video(video_id: str) -> str:
"""Return video URL.
:param video_id: video identifier
:type video_id: str
:rtype: str
"""
return f"{URL}/watch?v={video_id}"
def url_videos(channel_id: str) -> str:
"""Return videos URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{url_channel(channel_id)}/videos"
# ╭──────╮
# │ ytdl │
# ╰──────╯
def ytdl(opt: dict) -> YoutubeDL:
options = {
**opt,
"ignoreerrors": False,
"quiet": False,
}
log.info(options)
return YoutubeDL(options)

View file

View file

@ -1,32 +0,0 @@
import requests
from rwx import Object, txt
from rwx.txt import CHARSET
def fetch(url: str) -> str:
response = requests.get(url)
response.raise_for_status()
return response.text
class Page(Object):
def __init__(self):
self.charset = CHARSET
self.description = ""
self.title = ""
def render(self) -> str:
return f"""\
<!DOCTYPE html>
<html>
<head>
<meta charset="{self.charset}">
<meta name="description" content="{self.description}">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{self.title}</title>
</head>
<body>
</body>
</html>
"""

View file

@ -307,14 +307,6 @@ a__git_log_all() {
"${@}" "${@}"
} }
# log all history as oneline
glao() { a__git_log_all_oneline "${@}"; }
a__git_log_all_oneline() {
a__git_log_all \
--oneline \
"${@}"
}
# log all history with patches # log all history with patches
glap() { a__git_log_all_patch "${@}"; } glap() { a__git_log_all_patch "${@}"; }
a__git_log_all_patch() { a__git_log_all_patch() {
@ -324,14 +316,6 @@ a__git_log_all_patch() {
"${@}" "${@}"
} }
# log history as oneline
glo() { a__git_log_oneline "${@}"; }
a__git_log_oneline() {
a__git_log \
--oneline \
"${@}"
}
# log history with patches # log history with patches
glp() { a__git_log_patch "${@}"; } glp() { a__git_log_patch "${@}"; }
a__git_log_patch() { a__git_log_patch() {

View file

@ -1,127 +1,6 @@
_rwx_cmd_cs() { rwx_crypt "${@}"; } _rwx_cmd_cs() { rwx_crypt_setup "${@}"; }
RWX_CRYPT_ROOT="/data/home/user/crypt" rwx_crypt_setup() {
RWX_CRYPT_VAR="/var/lib/crypt"
rwx_crypt_device() {
local device size
local index=0
while [ -z "${device}" ]; do
device="/dev/nbd${index}"
if [ -b "${device}" ]; then
size="$(cat /sys/block/nbd"${index}/size")"
[ "${size}" -eq 0 ] ||
device=""
else
device=""
break
fi
index=$((index + 1))
done
if [ -n "${device}" ]; then
echo "${device}"
else
rwx_log_error 1 "No device available"
fi
}
rwx_crypt() {
local action="${1}" local action="${1}"
local action_close="close" echo "cs: ${action}"
local action_open="open"
local mapper="/dev/mapper"
local mount_root="/media"
local crypt_arg crypt_file crypt_map crypt_mount pass_phrase
case "${action}" in
"${action_close}" | "${action_open}")
shift
local user_id
user_id="$(id --user)"
[ "${user_id}" -eq 0 ] ||
rwx_log_error 1 "Not root"
[ -n "${1}" ] ||
rwx_log_error 2 "No files"
[ "${action}" = "${action_open}" ] &&
pass_phrase="$(rwx_read_passphrase)"
for crypt_arg in "${@}"; do
rwx_log_info
crypt_file="${RWX_CRYPT_ROOT}/${crypt_arg}.qcow2"
if [ -f "${crypt_file}" ]; then
crypt_map="${mapper}/${crypt_arg}"
crypt_mount="${mount_root}/${crypt_arg}"
local device
case "${action}" in
"${action_open}")
# find device
if ! device="$(rwx_crypt_device)"; then
rwx_log_error 4 "No device available"
fi
# make directory
if ! mkdir --parents "${RWX_CRYPT_VAR}"; then
rwx_log_error 5 "Making failure: ${RWX_CRYPT_VAR}"
fi
# record device
if ! rwx_file_write \
"${RWX_CRYPT_VAR}/${crypt_arg}" "${device}"; then
rwx_log_error 6 "Writing failure: ${device}"
fi
# connect device
if ! qemu-nbd --connect "${device}" "${crypt_file}"; then
rwx_log_error 7 "Connection failure: ${device}"
fi
# open device
if ! echo "${pass_phrase}" |
cryptsetup luksOpen "${device}" "${crypt_arg}"; then
rwx_log_error 8 "Opening failure: ${device}"
fi
# make mount directory
if ! mkdir --parents "${crypt_mount}"; then
rwx_log_error 9 "Making failure: ${crypt_mount}"
fi
# mount file system
if ! mount \
--options "autodefrag,compress-force=zstd" \
"${crypt_map}" "${crypt_mount}"; then
rwx_log_error 10 "Mounting failure: ${crypt_map}"
fi
;;
"${action_close}")
# unmount file system
if ! umount "${crypt_mount}"; then
rwx_log_error 4 "Unmounting failure: ${crypt_mount}"
fi
# remove mount directory
if ! rmdir "${crypt_mount}"; then
rwx_log_error 5 "Removal failure: ${crypt_mount}"
fi
# close device
if ! cryptsetup luksClose "${crypt_arg}"; then
rwx_log_error 6 "Closing failure: ${crypt_arg}"
fi
# load device
if ! device="$(cat "${RWX_CRYPT_VAR}/${crypt_arg}")"; then
rwx_log_error 7 "Loading failure: ${crypt_arg}"
fi
# disconnect device
if ! qemu-nbd --disconnect "${device}"; then
rwx_log_error 8 "Disconnection failure: ${device}"
fi
# remove record
if ! rm "${RWX_CRYPT_VAR}/${crypt_arg}"; then
rwx_log_error 9 "Removal failure: ${crypt_arg}"
fi
;;
*) ;;
esac
else
rwx_log_error 3 "Not a file: ${crypt_file}"
fi
done
;;
*)
rwx_log_info "Usage:"
rwx_log_info "${action_close}|${action_open}"
# TODO list
;;
esac
} }

View file

@ -72,9 +72,8 @@ rwx_ffmpeg_input_hdmi() {
set -- \ set -- \
-f "v4l2" \ -f "v4l2" \
-video_size "1920x1080" \ -video_size "1920x1080" \
-framerate "120" \ -framerate "60" \
-input_format "yuv420p" \ -input_format "yuyv422" \
-pix_fmt "yuv420p" \
-i "${device}" -i "${device}"
local argument local argument
for argument in "${@}"; do echo "${argument}"; done for argument in "${@}"; do echo "${argument}"; done

View file

@ -5,7 +5,7 @@ rwx_fs_make_btrfs() {
if [ -b "${device}" ]; then if [ -b "${device}" ]; then
set -- \ set -- \
--force \ --force \
--checksum "blake2" --checksum "sha256"
if [ -n "${label}" ]; then if [ -n "${label}" ]; then
set -- "${@}" \ set -- "${@}" \
--label "${label}" --label "${label}"

View file

@ -9,6 +9,6 @@
rwx_python_venv() { rwx_python_venv() {
local path="${1}" local path="${1}"
[ -d "${path}" ] || return 1 [ -d "${path}" ] || return 1
export VIRTUAL_ENV="${path}" && export VIRTUAL_ENV="${path}" && \
export PATH="${VIRTUAL_ENV}/bin:${PATH}" export PATH="${VIRTUAL_ENV}/bin:${PATH}"
} }