diff --git a/build.py b/render.py similarity index 100% rename from build.py rename to render.py diff --git a/rwx/fs/__init__.py b/rwx/fs/__init__.py index 8a45288..b55713a 100644 --- a/rwx/fs/__init__.py +++ b/rwx/fs/__init__.py @@ -2,9 +2,9 @@ import os import shutil -from pathlib import Path - import tomllib +import yaml +from pathlib import Path from rwx import ps @@ -133,6 +133,19 @@ def read_file_text(file_path: Path, charset: str = CHARSET) -> str: return read_file_bytes(file_path).decode(charset) +def read_file_yaml(file_path: Path, charset: str = CHARSET) -> dict | list: + """Read whole file as yaml object. + + :param file_path: source input file + :type file_path: Path + :param charset: charset to use for decoding input + :type charset: str + :rtype: dict + """ + text = read_file_text(file_path, charset) + return yaml.safe_load(text) + + def wipe(path: Path) -> None: """Wipe provided path, whether directory or file. diff --git a/rwx/prj/__init__.py b/rwx/prj/__init__.py index ea6a67e..f545f73 100644 --- a/rwx/prj/__init__.py +++ b/rwx/prj/__init__.py @@ -22,4 +22,4 @@ class Project(Object): def build(self) -> None: """Build the project.""" - run(str(self.root / "build.py")) + run(str(self.root / "render.py")) diff --git a/rwx/sw/ytdlp/__init__.py b/rwx/sw/ytdlp/__init__.py new file mode 100644 index 0000000..9045781 --- /dev/null +++ b/rwx/sw/ytdlp/__init__.py @@ -0,0 +1,379 @@ +"""YouTube DownLoad.""" + +from datetime import datetime +from pathlib import Path +from typing import Any + +from yt_dlp import YoutubeDL + +from rwx import Object +from rwx.fs import read_file_yaml +from rwx.log import stream as log + +SUBTITLES_EXTENSIONS = ["vtt"] +TIMESTAMP = "%Y%m%d%H%M%S" +URL = "https://youtube.com" + + +# ╭─────────╮ +# │ classes │ +# ╰─────────╯ + + +class Cache(Object): + """YouTube local cache.""" + + def __init__(self, root_file: Path) -> None: + self.root_file = root_file.resolve() + self.root_directory = self.root_file.parent + self.load() + + def load(self) -> None: + d = read_file_yaml(self.root_file) + log.info(d) + + +class Channel(Object): + """YouTube channel.""" + + def __init__(self, channel_id: str) -> None: + """Set objects tree. + + :param channel_id: channel identifier + :type channel_id: str + """ + d = extract_videos(channel_id) + # channel + self.uid = d["channel_id"] + self.title = d["channel"] + self.followers = int(d["channel_follower_count"]) + self.description = d["description"] + self.tags = d["tags"] + self.thumbnails = [thumbnail["url"] for thumbnail in d["thumbnails"]] + self.thumbnail = self.thumbnails[-1] + self.uploader_id = d["uploader_id"] + self.uploader = d["uploader"] + # videos + self.videos = [ + Video(entry) + for entry in reversed(d["entries"]) + if entry["availability"] != "subscriber_only" + ] + # playlists + d = extract_playlists(channel_id) + self.playlists = [Playlist(entry) for entry in reversed(d["entries"])] + + +# TODO Format +class Format(Object): + """YouTube format.""" + + @staticmethod + def get(d: dict, key: str) -> str | None: + value = d.get(key) + match value: + case "none": + return None + case _: + return value + + def __init__(self, d: dict) -> None: + """Set format info. + + :param d: format info + :type d: dict + """ + self.uid = d["format_id"] + self.extension = d["ext"] + self.filesize = d.get("filesize") + self.filesize_approx = d.get("filesize_approx") + self.language = d.get("language") + self.quality = d.get("quality") + # video + self.video_codec = Format.get(d, "vcodec") + if self.video_codec: + self.video_bit_rate = d["vbr"] + self.video_dynamic_range = d["dynamic_range"] + self.video_extension = d["video_ext"] + self.video_fps = d["fps"] + self.video_height = int(d["height"]) + self.video_width = int(d["width"]) + else: + del self.video_codec + # audio + self.audio_codec = Format.get(d, "acodec") + if self.audio_codec: + self.audio_bit_rate = d["abr"] + self.audio_channels = int(d["audio_channels"]) + self.audio_extension = d["audio_ext"] + self.audio_sampling_rate = d["asr"] + else: + del self.audio_codec + + def audio(self) -> str: + return f"{self.uid} \ +→ {self.audio_sampling_rate} × {self.audio_channels} \ +@ {self.audio_bit_rate} × {self.audio_codec}" + + def video(self) -> str: + return f"{self.uid} \ +→ {self.video_width} × {self.video_height} × {self.video_fps} \ +@ {self.video_bit_rate} × {self.video_codec}" + + +# TODO Playlist/extra +class Playlist(Object): + """YouTube playlist.""" + + def __init__(self, d: dict) -> None: + """Set playlist info. + + :param d: playlist info + :type d: dict + """ + self.uid = d["id"] + self.title = d["title"] + + +class Subtitles(Object): + """YouTube subtitles.""" + + def __init__(self, uid: str, d: dict) -> None: + """Set subtitles info. + + :param d: subtitles info + :type d: dict + """ + self.uid = uid + self.extension = d["ext"] + self.name = d["name"] + self.url = d["url"] + + +# TODO Thumbnail + + +class Video(Object): + """YouTube video.""" + + def __init__(self, d: dict) -> None: + """Set video info. + + :param d: video info + :type d: dict + """ + self.description_cut = d["description"] + self.uid = d["id"] + self.title = d["title"] + self.duration = int(d["duration"]) + self.thumbnail = d["thumbnails"][-1]["url"] + + def load_extra(self): + self.at = datetime.now().strftime(TIMESTAMP) + d = extract_video(self.uid) + self.audio_formats = [] + self.video_formats = [] + for entry in d["formats"]: + f = Format(entry) + if hasattr(f, "video_codec"): + self.video_format = f + self.video_formats.append(f) + elif hasattr(f, "audio_codec"): + self.audio_format = f + self.audio_formats.append(f) + thumbnail = d["thumbnails"][-1]["url"] + # TODO compare existing thumbnail + self.description = d["description"] + self.channel_id = d["channel_id"] + self.duration = int(d["duration"]) + self.views = int(d["view_count"]) + self.categories = d["categories"] + self.tags = d["tags"] + self.automatic_captions = [] + for uid, entries in d["automatic_captions"].items(): + for entry in entries: + subtitles = Subtitles(uid, entry) + if subtitles.extension in SUBTITLES_EXTENSIONS: + self.automatic_captions.append(subtitles) + self.subtitles = [] + for uid, entries in d["subtitles"].items(): + for entry in entries: + subtitles = Subtitles(uid, entry) + if subtitles.extension in SUBTITLES_EXTENSIONS: + self.subtitles.append(subtitles) + self.chapters = d["chapters"] + self.likes = d["like_count"] + self.timestamp = datetime.fromtimestamp(d["timestamp"]).strftime( + TIMESTAMP + ) + self.fulltitle = d["fulltitle"] + + +# ╭──────────╮ +# │ download │ +# ╰──────────╯ + + +def download_video(video_id: str | None) -> None: + if video_id: + ytdl( + { + "format": "bestvideo[ext=webm]+bestaudio[ext=webm]", + "outtmpl": "%(id)s.%(ext)s", + "postprocessors": [ + { + "key": "SponsorBlock", + "categories": ["sponsor"], + }, + { + "key": "ModifyChapters", + "remove_sponsor_segments": ["sponsor"], + }, + ], + "writesubtitles": True, + "writethumbnail": True, + }, + ).download([url_video(video_id)]) + + +# ╭─────────╮ +# │ extract │ +# ╰─────────╯ + + +def extract(url: str) -> dict[str, Any]: + """Return extracted dict. + + :rtype: dict + """ + d = ytdl( + { + "extract_flat": True, + "skip_download": True, + }, + ).extract_info(url, download=False) + log.debug(d) + return d + + +def extract_playlist(playlist_id: str) -> dict: + """Return extracted playlist dict. + + :param playlist_id: playlist identifier + :type playlist_id: str + :rtype: dict + """ + return extract(url_playlist(playlist_id)) + + +def extract_playlists(channel_id: str) -> dict: + """Return extracted playlists dict. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: dict + """ + return extract(url_playlists(channel_id)) + + +def extract_video(video_id: str) -> dict: + """Return extracted video dict. + + :param video_id: video identifier + :type video_id: str + :rtype: dict + """ + return extract(url_video(video_id)) + + +def extract_videos(channel_id: str) -> dict: + """Return extracted videos dict. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: dict + """ + return extract(url_videos(channel_id)) + + +# ╭──────╮ +# │ next │ +# ╰──────╯ + + +def next_download(videos: list[str]) -> str | None: + for index, video_id in enumerate(videos): + if not Path(f"{video_id}.mp4").exists(): + log.info(f"{index} ∕ {len(videos)}") + return video_id + return None + + +# ╭─────╮ +# │ url │ +# ╰─────╯ + + +def url_channel(channel_id: str) -> str: + """Return channel URL. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: str + """ + return f"{URL}/channel/{channel_id}" + + +def url_playlist(playlist_id: str) -> str: + """Return playlist URL. + + :param playlist_id: playlist identifier + :type playlist_id: str + :rtype: str + """ + return f"{URL}/playlist?list={playlist_id}" + + +def url_playlists(channel_id: str) -> str: + """Return playlists URL. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: str + """ + return f"{url_channel(channel_id)}/playlists" + + +def url_video(video_id: str) -> str: + """Return video URL. + + :param video_id: video identifier + :type video_id: str + :rtype: str + """ + return f"{URL}/watch?v={video_id}" + + +def url_videos(channel_id: str) -> str: + """Return videos URL. + + :param channel_id: channel identifier + :type channel_id: str + :rtype: str + """ + return f"{url_channel(channel_id)}/videos" + + +# ╭──────╮ +# │ ytdl │ +# ╰──────╯ + + +def ytdl(opt: dict) -> YoutubeDL: + options = { + **opt, + "ignoreerrors": False, + "quiet": False, + } + log.info(options) + return YoutubeDL(options) diff --git a/rwx/sw/ytdlp/video.py b/rwx/sw/ytdlp/video.py new file mode 100644 index 0000000..e69de29 diff --git a/rwx/web/__init__.py b/rwx/web/__init__.py new file mode 100644 index 0000000..e746872 --- /dev/null +++ b/rwx/web/__init__.py @@ -0,0 +1,32 @@ +import requests + +from rwx import Object, txt +from rwx.txt import CHARSET + + +def fetch(url: str) -> str: + response = requests.get(url) + response.raise_for_status() + return response.text + + +class Page(Object): + def __init__(self): + self.charset = CHARSET + self.description = "" + self.title = "" + + def render(self) -> str: + return f"""\ + + +
+ + + +