336 lines
8.2 KiB
Python
336 lines
8.2 KiB
Python
"""YouTube DownLoad."""
|
|
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from yt_dlp import YoutubeDL
|
|
|
|
from rwx import Object
|
|
from rwx.fs import read_file_yaml
|
|
from rwx.log import stream as log
|
|
|
|
EXT = "webm"
|
|
TIMESTAMP = "%Y%m%d%H%M%S"
|
|
URL = "https://youtube.com"
|
|
|
|
|
|
# ╭─────────╮
|
|
# │ classes │
|
|
# ╰─────────╯
|
|
|
|
|
|
class Cache(Object):
|
|
"""YouTube local cache."""
|
|
|
|
def __init__(self, root_file: Path) -> None:
|
|
self.root_file = root_file.resolve()
|
|
self.root_directory = self.root_file.parent
|
|
self.load()
|
|
|
|
def load(self) -> None:
|
|
d = read_file_yaml(self.root_file)
|
|
log.info(d)
|
|
|
|
|
|
class Channel(Object):
|
|
"""YouTube channel."""
|
|
|
|
def __init__(self, channel_id: str) -> None:
|
|
"""Set objects tree.
|
|
|
|
:param channel_id: channel identifier
|
|
:type channel_id: str
|
|
"""
|
|
d = extract_videos(channel_id)
|
|
# channel
|
|
self.uid = d["channel_id"]
|
|
self.title = d["channel"]
|
|
self.followers = int(d["channel_follower_count"])
|
|
self.description = d["description"]
|
|
self.tags = d["tags"]
|
|
self.thumbnails = [thumbnail["url"] for thumbnail in d["thumbnails"]]
|
|
self.thumbnail = self.thumbnails[-1]
|
|
self.uploader_id = d["uploader_id"]
|
|
self.uploader = d["uploader"]
|
|
# videos
|
|
self.videos = [
|
|
Video(entry)
|
|
for entry in reversed(d["entries"])
|
|
if entry["availability"] != "subscriber_only"
|
|
]
|
|
# playlists
|
|
d = extract_playlists(channel_id)
|
|
self.playlists = [Playlist(entry) for entry in reversed(d["entries"])]
|
|
|
|
|
|
# TODO Format
|
|
class Format(Object):
|
|
"""YouTube format."""
|
|
|
|
@staticmethod
|
|
def get(d: dict, key: str) -> str | None:
|
|
value = d.get(key)
|
|
match value:
|
|
case "none":
|
|
return None
|
|
case _:
|
|
return value
|
|
|
|
def __init__(self, d: dict) -> None:
|
|
"""Set format info.
|
|
|
|
:param d: format info
|
|
:type d: dict
|
|
"""
|
|
self.format_id = d["format_id"]
|
|
self.format_note = d.get("format_note")
|
|
self.quality = d.get("quality")
|
|
self.language = d.get("language")
|
|
self.ext = d["ext"]
|
|
# video
|
|
self.video_codec = Format.get(d, "vcodec")
|
|
if self.video_codec:
|
|
self.video_dynamic_range = d["dynamic_range"]
|
|
self.video_fps = d["fps"]
|
|
self.video_height = int(d["height"])
|
|
self.video_bit_rate = d["vbr"]
|
|
self.video_ext = d["video_ext"]
|
|
self.video_width = int(d["width"])
|
|
# audio
|
|
self.audio_codec = Format.get(d, "acodec")
|
|
if self.audio_codec:
|
|
self.audio_bit_rate = d["abr"]
|
|
self.audio_sampling_rate = d["asr"]
|
|
self.audio_ext = d["audio_ext"]
|
|
|
|
|
|
# TODO Playlist/extra
|
|
class Playlist(Object):
|
|
"""YouTube playlist."""
|
|
|
|
def __init__(self, d: dict) -> None:
|
|
"""Set playlist info.
|
|
|
|
:param d: playlist info
|
|
:type d: dict
|
|
"""
|
|
self.uid = d["id"]
|
|
self.title = d["title"]
|
|
|
|
|
|
# TODO Thumbnail
|
|
|
|
|
|
class Video(Object):
|
|
"""YouTube video."""
|
|
|
|
def __init__(self, d: dict) -> None:
|
|
"""Set video info.
|
|
|
|
:param d: video info
|
|
:type d: dict
|
|
"""
|
|
self.uid = d["id"]
|
|
self.title = d["title"]
|
|
self.description_cut = d["description"]
|
|
self.duration = int(d["duration"])
|
|
self.thumbnail = d["thumbnails"][-1]["url"]
|
|
|
|
def load_extra(self):
|
|
self.at = datetime.now().strftime(TIMESTAMP)
|
|
d = extract_video(self.uid)
|
|
self.audio_formats = []
|
|
self.video_formats = []
|
|
for entry in d["formats"]:
|
|
f = Format(entry)
|
|
if f.video_codec:
|
|
self.video_format = f
|
|
self.video_formats.append(f)
|
|
elif f.audio_codec:
|
|
self.audio_format = f
|
|
self.audio_formats.append(f)
|
|
thumbnail = d["thumbnails"][-1]["url"]
|
|
# TODO compare existing thumbnail
|
|
self.description = d["description"]
|
|
self.channel_id = d["channel_id"]
|
|
self.duration = int(d["duration"])
|
|
self.views = int(d["view_count"])
|
|
self.categories = d["categories"]
|
|
self.tags = d["tags"]
|
|
# TODO automatic_captions
|
|
# TODO subtitles
|
|
self.chapters = d["chapters"]
|
|
self.likes = d["like_count"]
|
|
self.timestamp = datetime.fromtimestamp(d["timestamp"]).strftime(TIMESTAMP)
|
|
self.fulltitle = d["fulltitle"]
|
|
|
|
|
|
# ╭──────────╮
|
|
# │ download │
|
|
# ╰──────────╯
|
|
|
|
|
|
def download_video(video_id: str | None) -> None:
|
|
if video_id:
|
|
ytdl(
|
|
{
|
|
"format": "+".join([f"best{av}[ext={EXT}]" for av in ["video", "audio"]]),
|
|
"outtmpl": "%(id)s.%(ext)s",
|
|
"postprocessors": [
|
|
{
|
|
"key": "SponsorBlock",
|
|
"categories": ["sponsor"],
|
|
},
|
|
{
|
|
"key": "ModifyChapters",
|
|
"remove_sponsor_segments": ["sponsor"],
|
|
},
|
|
],
|
|
"writesubtitles": True,
|
|
"writethumbnail": True,
|
|
},
|
|
).download([url_video(video_id)])
|
|
|
|
|
|
# ╭─────────╮
|
|
# │ extract │
|
|
# ╰─────────╯
|
|
|
|
|
|
def extract(url: str) -> dict[str, Any]:
|
|
"""Return extracted dict.
|
|
|
|
:rtype: dict
|
|
"""
|
|
d = ytdl(
|
|
{
|
|
"extract_flat": True,
|
|
"skip_download": True,
|
|
},
|
|
).extract_info(url, download=False)
|
|
log.debug(d)
|
|
return d
|
|
|
|
|
|
def extract_playlist(playlist_id: str) -> dict:
|
|
"""Return extracted playlist dict.
|
|
|
|
:param playlist_id: playlist identifier
|
|
:type playlist_id: str
|
|
:rtype: dict
|
|
"""
|
|
return extract(url_playlist(playlist_id))
|
|
|
|
|
|
def extract_playlists(channel_id: str) -> dict:
|
|
"""Return extracted playlists dict.
|
|
|
|
:param channel_id: channel identifier
|
|
:type channel_id: str
|
|
:rtype: dict
|
|
"""
|
|
return extract(url_playlists(channel_id))
|
|
|
|
|
|
def extract_video(video_id: str) -> dict:
|
|
"""Return extracted video dict.
|
|
|
|
:param video_id: video identifier
|
|
:type video_id: str
|
|
:rtype: dict
|
|
"""
|
|
return extract(url_video(video_id))
|
|
|
|
|
|
def extract_videos(channel_id: str) -> dict:
|
|
"""Return extracted videos dict.
|
|
|
|
:param channel_id: channel identifier
|
|
:type channel_id: str
|
|
:rtype: dict
|
|
"""
|
|
return extract(url_videos(channel_id))
|
|
|
|
|
|
# ╭──────╮
|
|
# │ next │
|
|
# ╰──────╯
|
|
|
|
|
|
def next_download(videos: list[str]) -> str | None:
|
|
for index, video_id in enumerate(videos):
|
|
if not Path(f"{video_id}.mp4").exists():
|
|
log.info(f"{index} ∕ {len(videos)}")
|
|
return video_id
|
|
return None
|
|
|
|
|
|
# ╭─────╮
|
|
# │ url │
|
|
# ╰─────╯
|
|
|
|
|
|
def url_channel(channel_id: str) -> str:
|
|
"""Return channel URL.
|
|
|
|
:param channel_id: channel identifier
|
|
:type channel_id: str
|
|
:rtype: str
|
|
"""
|
|
return f"{URL}/channel/{channel_id}"
|
|
|
|
|
|
def url_playlist(playlist_id: str) -> str:
|
|
"""Return playlist URL.
|
|
|
|
:param playlist_id: playlist identifier
|
|
:type playlist_id: str
|
|
:rtype: str
|
|
"""
|
|
return f"{URL}/playlist?list={playlist_id}"
|
|
|
|
|
|
def url_playlists(channel_id: str) -> str:
|
|
"""Return playlists URL.
|
|
|
|
:param channel_id: channel identifier
|
|
:type channel_id: str
|
|
:rtype: str
|
|
"""
|
|
return f"{url_channel(channel_id)}/playlists"
|
|
|
|
|
|
def url_video(video_id: str) -> str:
|
|
"""Return video URL.
|
|
|
|
:param video_id: video identifier
|
|
:type video_id: str
|
|
:rtype: str
|
|
"""
|
|
return f"{URL}/watch?v={video_id}"
|
|
|
|
|
|
def url_videos(channel_id: str) -> str:
|
|
"""Return videos URL.
|
|
|
|
:param channel_id: channel identifier
|
|
:type channel_id: str
|
|
:rtype: str
|
|
"""
|
|
return f"{url_channel(channel_id)}/videos"
|
|
|
|
|
|
# ╭──────╮
|
|
# │ ytdl │
|
|
# ╰──────╯
|
|
|
|
|
|
def ytdl(opt: dict) -> YoutubeDL:
|
|
options = {
|
|
**opt,
|
|
"ignoreerrors": False,
|
|
"quiet": False,
|
|
}
|
|
log.info(options)
|
|
return YoutubeDL(options)
|