rwx/rwx/sw/ytdlp/__init__.py
2025-06-07 17:40:24 +02:00

379 lines
9.6 KiB
Python

"""YouTube DownLoad."""
from datetime import datetime
from pathlib import Path
from typing import Any
from yt_dlp import YoutubeDL
from rwx import Object
from rwx.fs import read_file_yaml
from rwx.log import stream as log
SUBTITLES_EXTENSIONS = ["vtt"]
TIMESTAMP = "%Y%m%d%H%M%S"
URL = "https://youtube.com"
# ╭─────────╮
# │ classes │
# ╰─────────╯
class Cache(Object):
"""YouTube local cache."""
def __init__(self, root_file: Path) -> None:
self.root_file = root_file.resolve()
self.root_directory = self.root_file.parent
self.load()
def load(self) -> None:
d = read_file_yaml(self.root_file)
log.info(d)
class Channel(Object):
"""YouTube channel."""
def __init__(self, channel_id: str) -> None:
"""Set objects tree.
:param channel_id: channel identifier
:type channel_id: str
"""
d = extract_videos(channel_id)
# channel
self.uid = d["channel_id"]
self.title = d["channel"]
self.followers = int(d["channel_follower_count"])
self.description = d["description"]
self.tags = d["tags"]
self.thumbnails = [thumbnail["url"] for thumbnail in d["thumbnails"]]
self.thumbnail = self.thumbnails[-1]
self.uploader_id = d["uploader_id"]
self.uploader = d["uploader"]
# videos
self.videos = [
Video(entry)
for entry in reversed(d["entries"])
if entry["availability"] != "subscriber_only"
]
# playlists
d = extract_playlists(channel_id)
self.playlists = [Playlist(entry) for entry in reversed(d["entries"])]
# TODO Format
class Format(Object):
"""YouTube format."""
@staticmethod
def get(d: dict, key: str) -> str | None:
value = d.get(key)
match value:
case "none":
return None
case _:
return value
def __init__(self, d: dict) -> None:
"""Set format info.
:param d: format info
:type d: dict
"""
self.uid = d["format_id"]
self.extension = d["ext"]
self.filesize = d.get("filesize")
self.filesize_approx = d.get("filesize_approx")
self.language = d.get("language")
self.quality = d.get("quality")
# video
self.video_codec = Format.get(d, "vcodec")
if self.video_codec:
self.video_bit_rate = d["vbr"]
self.video_dynamic_range = d["dynamic_range"]
self.video_extension = d["video_ext"]
self.video_fps = d["fps"]
self.video_height = int(d["height"])
self.video_width = int(d["width"])
else:
del self.video_codec
# audio
self.audio_codec = Format.get(d, "acodec")
if self.audio_codec:
self.audio_bit_rate = d["abr"]
self.audio_channels = int(d["audio_channels"])
self.audio_extension = d["audio_ext"]
self.audio_sampling_rate = d["asr"]
else:
del self.audio_codec
def audio(self) -> str:
return f"{self.uid} \
{self.audio_sampling_rate} × {self.audio_channels} \
@ {self.audio_bit_rate} × {self.audio_codec}"
def video(self) -> str:
return f"{self.uid} \
{self.video_width} × {self.video_height} × {self.video_fps} \
@ {self.video_bit_rate} × {self.video_codec}"
# TODO Playlist/extra
class Playlist(Object):
"""YouTube playlist."""
def __init__(self, d: dict) -> None:
"""Set playlist info.
:param d: playlist info
:type d: dict
"""
self.uid = d["id"]
self.title = d["title"]
class Subtitles(Object):
"""YouTube subtitles."""
def __init__(self, uid: str, d: dict) -> None:
"""Set subtitles info.
:param d: subtitles info
:type d: dict
"""
self.uid = uid
self.extension = d["ext"]
self.name = d["name"]
self.url = d["url"]
# TODO Thumbnail
class Video(Object):
"""YouTube video."""
def __init__(self, d: dict) -> None:
"""Set video info.
:param d: video info
:type d: dict
"""
self.description_cut = d["description"]
self.uid = d["id"]
self.title = d["title"]
self.duration = int(d["duration"])
self.thumbnail = d["thumbnails"][-1]["url"]
def load_extra(self):
self.at = datetime.now().strftime(TIMESTAMP)
d = extract_video(self.uid)
self.audio_formats = []
self.video_formats = []
for entry in d["formats"]:
f = Format(entry)
if hasattr(f, "video_codec"):
self.video_format = f
self.video_formats.append(f)
elif hasattr(f, "audio_codec"):
self.audio_format = f
self.audio_formats.append(f)
thumbnail = d["thumbnails"][-1]["url"]
# TODO compare existing thumbnail
self.description = d["description"]
self.channel_id = d["channel_id"]
self.duration = int(d["duration"])
self.views = int(d["view_count"])
self.categories = d["categories"]
self.tags = d["tags"]
self.automatic_captions = []
for uid, entries in d["automatic_captions"].items():
for entry in entries:
subtitles = Subtitles(uid, entry)
if subtitles.extension in SUBTITLES_EXTENSIONS:
self.automatic_captions.append(subtitles)
self.subtitles = []
for uid, entries in d["subtitles"].items():
for entry in entries:
subtitles = Subtitles(uid, entry)
if subtitles.extension in SUBTITLES_EXTENSIONS:
self.subtitles.append(subtitles)
self.chapters = d["chapters"]
self.likes = d["like_count"]
self.timestamp = datetime.fromtimestamp(d["timestamp"]).strftime(
TIMESTAMP
)
self.fulltitle = d["fulltitle"]
# ╭──────────╮
# │ download │
# ╰──────────╯
def download_video(video_id: str | None) -> None:
if video_id:
ytdl(
{
"format": "bestvideo[ext=webm]+bestaudio[ext=webm]",
"outtmpl": "%(id)s.%(ext)s",
"postprocessors": [
{
"key": "SponsorBlock",
"categories": ["sponsor"],
},
{
"key": "ModifyChapters",
"remove_sponsor_segments": ["sponsor"],
},
],
"writesubtitles": True,
"writethumbnail": True,
},
).download([url_video(video_id)])
# ╭─────────╮
# │ extract │
# ╰─────────╯
def extract(url: str) -> dict[str, Any]:
"""Return extracted dict.
:rtype: dict
"""
d = ytdl(
{
"extract_flat": True,
"skip_download": True,
},
).extract_info(url, download=False)
log.debug(d)
return d
def extract_playlist(playlist_id: str) -> dict:
"""Return extracted playlist dict.
:param playlist_id: playlist identifier
:type playlist_id: str
:rtype: dict
"""
return extract(url_playlist(playlist_id))
def extract_playlists(channel_id: str) -> dict:
"""Return extracted playlists dict.
:param channel_id: channel identifier
:type channel_id: str
:rtype: dict
"""
return extract(url_playlists(channel_id))
def extract_video(video_id: str) -> dict:
"""Return extracted video dict.
:param video_id: video identifier
:type video_id: str
:rtype: dict
"""
return extract(url_video(video_id))
def extract_videos(channel_id: str) -> dict:
"""Return extracted videos dict.
:param channel_id: channel identifier
:type channel_id: str
:rtype: dict
"""
return extract(url_videos(channel_id))
# ╭──────╮
# │ next │
# ╰──────╯
def next_download(videos: list[str]) -> str | None:
for index, video_id in enumerate(videos):
if not Path(f"{video_id}.mp4").exists():
log.info(f"{index} ∕ {len(videos)}")
return video_id
return None
# ╭─────╮
# │ url │
# ╰─────╯
def url_channel(channel_id: str) -> str:
"""Return channel URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{URL}/channel/{channel_id}"
def url_playlist(playlist_id: str) -> str:
"""Return playlist URL.
:param playlist_id: playlist identifier
:type playlist_id: str
:rtype: str
"""
return f"{URL}/playlist?list={playlist_id}"
def url_playlists(channel_id: str) -> str:
"""Return playlists URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{url_channel(channel_id)}/playlists"
def url_video(video_id: str) -> str:
"""Return video URL.
:param video_id: video identifier
:type video_id: str
:rtype: str
"""
return f"{URL}/watch?v={video_id}"
def url_videos(channel_id: str) -> str:
"""Return videos URL.
:param channel_id: channel identifier
:type channel_id: str
:rtype: str
"""
return f"{url_channel(channel_id)}/videos"
# ╭──────╮
# │ ytdl │
# ╰──────╯
def ytdl(opt: dict) -> YoutubeDL:
options = {
**opt,
"ignoreerrors": False,
"quiet": False,
}
log.info(options)
return YoutubeDL(options)