Fix off-by-one error in SegmentTemplate segment enumeration when startNumber is 0. Previously, the code would request one extra segment beyond what exists, causing 404 errors on the final segment. The issue was that end_number was calculated as a segment count via math.ceil(), but then used incorrectly with range(start_number, end_number + 1), treating it as both a count and an inclusive endpoint. Changed to explicitly calculate segment_count first, then derive end_number as: start_number + segment_count - 1 Example: - Duration: 3540.996s, segment duration: 4s - Before: segments 0-886 (887 segments) - segment 886 doesn't exist - After: segments 0-885 (886 segments) - correct
807 lines
34 KiB
Python
807 lines
34 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import html
|
|
import logging
|
|
import math
|
|
import re
|
|
import sys
|
|
from copy import copy
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Optional, Union
|
|
from urllib.parse import urljoin, urlparse
|
|
from uuid import UUID
|
|
from zlib import crc32
|
|
|
|
import requests
|
|
from curl_cffi.requests import Session as CurlSession
|
|
from langcodes import Language, tag_is_valid
|
|
from lxml.etree import Element, ElementTree
|
|
from pyplayready.system.pssh import PSSH as PR_PSSH
|
|
from pywidevine.cdm import Cdm as WidevineCdm
|
|
from pywidevine.pssh import PSSH
|
|
from requests import Session
|
|
|
|
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack
|
|
from unshackle.core.downloaders import requests as requests_downloader
|
|
from unshackle.core.drm import DRM_T, PlayReady, Widevine
|
|
from unshackle.core.events import events
|
|
from unshackle.core.tracks import Audio, Subtitle, Tracks, Video
|
|
from unshackle.core.utilities import is_close_match, try_ensure_utf8
|
|
from unshackle.core.utils.xml import load_xml
|
|
|
|
|
|
class DASH:
|
|
def __init__(self, manifest, url: str):
|
|
if manifest is None:
|
|
raise ValueError("DASH manifest must be provided.")
|
|
if manifest.tag != "MPD":
|
|
raise TypeError(f"Expected 'MPD' document, but received a '{manifest.tag}' document instead.")
|
|
|
|
if not url:
|
|
raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.")
|
|
if not isinstance(url, str):
|
|
raise TypeError(f"Expected url to be a {str}, not {url!r}")
|
|
|
|
self.manifest = manifest
|
|
self.url = url
|
|
|
|
@classmethod
|
|
def from_url(cls, url: str, session: Optional[Union[Session, CurlSession]] = None, **args: Any) -> DASH:
|
|
if not url:
|
|
raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.")
|
|
if not isinstance(url, str):
|
|
raise TypeError(f"Expected url to be a {str}, not {url!r}")
|
|
|
|
if not session:
|
|
session = Session()
|
|
elif not isinstance(session, (Session, CurlSession)):
|
|
raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not {session!r}")
|
|
|
|
res = session.get(url, **args)
|
|
if res.url != url:
|
|
url = res.url
|
|
|
|
if not res.ok:
|
|
raise requests.ConnectionError("Failed to request the MPD document.", response=res)
|
|
|
|
return DASH.from_text(res.text, url)
|
|
|
|
@classmethod
|
|
def from_text(cls, text: str, url: str) -> DASH:
|
|
if not text:
|
|
raise ValueError("DASH manifest Text must be provided.")
|
|
if not isinstance(text, str):
|
|
raise TypeError(f"Expected text to be a {str}, not {text!r}")
|
|
|
|
if not url:
|
|
raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.")
|
|
if not isinstance(url, str):
|
|
raise TypeError(f"Expected url to be a {str}, not {url!r}")
|
|
|
|
manifest = load_xml(text)
|
|
|
|
return cls(manifest, url)
|
|
|
|
def to_tracks(
|
|
self, language: Optional[Union[str, Language]] = None, period_filter: Optional[Callable] = None
|
|
) -> Tracks:
|
|
"""
|
|
Convert an MPEG-DASH document to Video, Audio and Subtitle Track objects.
|
|
|
|
Parameters:
|
|
language: The Title's Original Recorded Language. It will also be used as a fallback
|
|
track language value if the manifest does not list language information.
|
|
period_filter: Filter out period's within the manifest.
|
|
|
|
All Track URLs will be a list of segment URLs.
|
|
"""
|
|
tracks = Tracks()
|
|
|
|
for period in self.manifest.findall("Period"):
|
|
if callable(period_filter) and period_filter(period):
|
|
continue
|
|
if next(iter(period.xpath("SegmentType/@value")), "content") != "content":
|
|
continue
|
|
if "urn:amazon:primevideo:cachingBreadth" in [
|
|
x.get("schemeIdUri") for x in period.findall("SupplementalProperty")
|
|
]:
|
|
continue
|
|
|
|
for adaptation_set in period.findall("AdaptationSet"):
|
|
if self.is_trick_mode(adaptation_set):
|
|
# we don't want trick mode streams (they are only used for fast-forward/rewind)
|
|
continue
|
|
|
|
for rep in adaptation_set.findall("Representation"):
|
|
get = partial(self._get, adaptation_set=adaptation_set, representation=rep)
|
|
findall = partial(self._findall, adaptation_set=adaptation_set, representation=rep, both=True)
|
|
segment_base = rep.find("SegmentBase")
|
|
|
|
codecs = get("codecs")
|
|
content_type = get("contentType")
|
|
mime_type = get("mimeType")
|
|
|
|
if not content_type and mime_type:
|
|
content_type = mime_type.split("/")[0]
|
|
if not content_type and not mime_type:
|
|
raise ValueError("Unable to determine the format of a Representation, cannot continue...")
|
|
|
|
if mime_type == "application/mp4" or content_type == "application":
|
|
# likely mp4-boxed subtitles
|
|
# TODO: It may not actually be subtitles
|
|
try:
|
|
real_codec = Subtitle.Codec.from_mime(codecs)
|
|
content_type = "text"
|
|
mime_type = f"application/mp4; codecs='{real_codec.value.lower()}'"
|
|
except ValueError:
|
|
raise ValueError(f"Unsupported content type '{content_type}' with codecs of '{codecs}'")
|
|
|
|
if content_type == "text" and mime_type and "/mp4" not in mime_type:
|
|
# mimeType likely specifies the subtitle codec better than `codecs`
|
|
codecs = mime_type.split("/")[1]
|
|
|
|
if content_type == "video":
|
|
track_type = Video
|
|
track_codec = Video.Codec.from_codecs(codecs)
|
|
track_fps = get("frameRate")
|
|
if not track_fps and segment_base is not None:
|
|
track_fps = segment_base.get("timescale")
|
|
|
|
track_args = dict(
|
|
range_=self.get_video_range(
|
|
codecs, findall("SupplementalProperty"), findall("EssentialProperty")
|
|
),
|
|
bitrate=get("bandwidth") or None,
|
|
width=get("width") or 0,
|
|
height=get("height") or 0,
|
|
fps=track_fps or None,
|
|
)
|
|
elif content_type == "audio":
|
|
track_type = Audio
|
|
track_codec = Audio.Codec.from_codecs(codecs)
|
|
track_args = dict(
|
|
bitrate=get("bandwidth") or None,
|
|
channels=next(
|
|
iter(
|
|
rep.xpath("AudioChannelConfiguration/@value")
|
|
or adaptation_set.xpath("AudioChannelConfiguration/@value")
|
|
),
|
|
None,
|
|
),
|
|
joc=self.get_ddp_complexity_index(adaptation_set, rep),
|
|
descriptive=self.is_descriptive(adaptation_set),
|
|
)
|
|
elif content_type == "text":
|
|
track_type = Subtitle
|
|
track_codec = Subtitle.Codec.from_codecs(codecs or "vtt")
|
|
track_args = dict(
|
|
cc=self.is_closed_caption(adaptation_set),
|
|
sdh=self.is_sdh(adaptation_set),
|
|
forced=self.is_forced(adaptation_set),
|
|
)
|
|
elif content_type == "image":
|
|
# we don't want what's likely thumbnails for the seekbar
|
|
continue
|
|
else:
|
|
raise ValueError(f"Unknown Track Type '{content_type}'")
|
|
|
|
track_lang = self.get_language(adaptation_set, rep, fallback=language)
|
|
if not track_lang:
|
|
msg = "Language information could not be derived from a Representation."
|
|
if language is None:
|
|
msg += " No fallback language was provided when calling DASH.to_tracks()."
|
|
elif not tag_is_valid((str(language) or "").strip()) or str(language).startswith("und"):
|
|
msg += f" The fallback language provided is also invalid: {language}"
|
|
raise ValueError(msg)
|
|
|
|
# for some reason it's incredibly common for services to not provide
|
|
# a good and actually unique track ID, sometimes because of the lang
|
|
# dialect not being represented in the id, or the bitrate, or such.
|
|
# this combines all of them as one and hashes it to keep it small(ish).
|
|
track_id = hex(
|
|
crc32(
|
|
"{codec}-{lang}-{bitrate}-{base_url}-{ids}-{track_args}".format(
|
|
codec=codecs,
|
|
lang=track_lang,
|
|
bitrate=get("bitrate"),
|
|
base_url=(rep.findtext("BaseURL") or "").split("?")[0],
|
|
ids=[get("audioTrackId"), get("id"), period.get("id")],
|
|
track_args=track_args,
|
|
).encode()
|
|
)
|
|
)[2:]
|
|
|
|
tracks.add(
|
|
track_type(
|
|
id_=track_id,
|
|
url=self.url,
|
|
codec=track_codec,
|
|
language=track_lang,
|
|
is_original_lang=bool(language and is_close_match(track_lang, [language])),
|
|
descriptor=Video.Descriptor.DASH,
|
|
data={
|
|
"dash": {
|
|
"manifest": self.manifest,
|
|
"period": period,
|
|
"adaptation_set": adaptation_set,
|
|
"representation": rep,
|
|
}
|
|
},
|
|
**track_args,
|
|
)
|
|
)
|
|
|
|
# only get tracks from the first main-content period
|
|
break
|
|
|
|
return tracks
|
|
|
|
@staticmethod
|
|
def download_track(
|
|
track: AnyTrack,
|
|
save_path: Path,
|
|
save_dir: Path,
|
|
progress: partial,
|
|
session: Optional[Session] = None,
|
|
proxy: Optional[str] = None,
|
|
max_workers: Optional[int] = None,
|
|
license_widevine: Optional[Callable] = None,
|
|
*,
|
|
cdm: Optional[object] = None,
|
|
):
|
|
if not session:
|
|
session = Session()
|
|
elif not isinstance(session, (Session, CurlSession)):
|
|
raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not {session!r}")
|
|
|
|
if proxy:
|
|
session.proxies.update({"all": proxy})
|
|
|
|
log = logging.getLogger("DASH")
|
|
|
|
manifest: ElementTree = track.data["dash"]["manifest"]
|
|
period: Element = track.data["dash"]["period"]
|
|
adaptation_set: Element = track.data["dash"]["adaptation_set"]
|
|
representation: Element = track.data["dash"]["representation"]
|
|
|
|
# Preserve existing DRM if it was set by the service, especially when service set Widevine
|
|
# but manifest only contains PlayReady protection (common scenario for some services)
|
|
existing_drm = track.drm
|
|
manifest_drm = DASH.get_drm(
|
|
representation.findall("ContentProtection") + adaptation_set.findall("ContentProtection")
|
|
)
|
|
|
|
# Only override existing DRM if:
|
|
# 1. No existing DRM was set, OR
|
|
# 2. Existing DRM contains same type as manifest DRM, OR
|
|
# 3. Existing DRM is not Widevine (preserve Widevine when service explicitly set it)
|
|
should_override_drm = (
|
|
not existing_drm
|
|
or (
|
|
existing_drm
|
|
and manifest_drm
|
|
and any(isinstance(existing, type(manifest)) for existing in existing_drm for manifest in manifest_drm)
|
|
)
|
|
or (existing_drm and not any(isinstance(drm, Widevine) for drm in existing_drm))
|
|
)
|
|
|
|
if should_override_drm:
|
|
track.drm = manifest_drm
|
|
else:
|
|
track.drm = existing_drm
|
|
|
|
manifest_base_url = manifest.findtext("BaseURL")
|
|
if not manifest_base_url:
|
|
manifest_base_url = track.url
|
|
elif not re.match("^https?://", manifest_base_url, re.IGNORECASE):
|
|
manifest_base_url = urljoin(track.url, f"./{manifest_base_url}")
|
|
period_base_url = urljoin(manifest_base_url, period.findtext("BaseURL"))
|
|
rep_base_url = urljoin(period_base_url, representation.findtext("BaseURL"))
|
|
|
|
period_duration = period.get("duration") or manifest.get("mediaPresentationDuration")
|
|
init_data: Optional[bytes] = None
|
|
|
|
segment_template = representation.find("SegmentTemplate")
|
|
if segment_template is None:
|
|
segment_template = adaptation_set.find("SegmentTemplate")
|
|
|
|
segment_list = representation.find("SegmentList")
|
|
if segment_list is None:
|
|
segment_list = adaptation_set.find("SegmentList")
|
|
|
|
segment_base = representation.find("SegmentBase")
|
|
if segment_base is None:
|
|
segment_base = adaptation_set.find("SegmentBase")
|
|
|
|
segments: list[tuple[str, Optional[str]]] = []
|
|
segment_timescale: float = 0
|
|
segment_durations: list[int] = []
|
|
track_kid: Optional[UUID] = None
|
|
|
|
if segment_template is not None:
|
|
segment_template = copy(segment_template)
|
|
start_number = int(segment_template.get("startNumber") or 1)
|
|
end_number = int(segment_template.get("endNumber") or 0) or None
|
|
segment_timeline = segment_template.find("SegmentTimeline")
|
|
segment_timescale = float(segment_template.get("timescale") or 1)
|
|
|
|
for item in ("initialization", "media"):
|
|
value = segment_template.get(item)
|
|
if not value:
|
|
continue
|
|
if not re.match("^https?://", value, re.IGNORECASE):
|
|
if not rep_base_url:
|
|
raise ValueError("Resolved Segment URL is not absolute, and no Base URL is available.")
|
|
value = urljoin(rep_base_url, value)
|
|
if not urlparse(value).query:
|
|
manifest_url_query = urlparse(track.url).query
|
|
if manifest_url_query:
|
|
value += f"?{manifest_url_query}"
|
|
segment_template.set(item, value)
|
|
|
|
init_url = segment_template.get("initialization")
|
|
if init_url:
|
|
res = session.get(
|
|
DASH.replace_fields(
|
|
init_url, Bandwidth=representation.get("bandwidth"), RepresentationID=representation.get("id")
|
|
)
|
|
)
|
|
res.raise_for_status()
|
|
init_data = res.content
|
|
track_kid = track.get_key_id(init_data)
|
|
|
|
if segment_timeline is not None:
|
|
current_time = 0
|
|
for s in segment_timeline.findall("S"):
|
|
if s.get("t"):
|
|
current_time = int(s.get("t"))
|
|
for _ in range(1 + (int(s.get("r") or 0))):
|
|
segment_durations.append(current_time)
|
|
current_time += int(s.get("d"))
|
|
|
|
if not end_number:
|
|
end_number = len(segment_durations)
|
|
|
|
for t, n in zip(segment_durations, range(start_number, end_number + 1)):
|
|
segments.append(
|
|
(
|
|
DASH.replace_fields(
|
|
segment_template.get("media"),
|
|
Bandwidth=representation.get("bandwidth"),
|
|
Number=n,
|
|
RepresentationID=representation.get("id"),
|
|
Time=t,
|
|
),
|
|
None,
|
|
)
|
|
)
|
|
else:
|
|
if not period_duration:
|
|
raise ValueError("Duration of the Period was unable to be determined.")
|
|
period_duration = DASH.pt_to_sec(period_duration)
|
|
segment_duration = float(segment_template.get("duration")) or 1
|
|
|
|
if not end_number:
|
|
segment_count = math.ceil(period_duration / (segment_duration / segment_timescale))
|
|
end_number = start_number + segment_count - 1
|
|
|
|
for s in range(start_number, end_number + 1):
|
|
segments.append(
|
|
(
|
|
DASH.replace_fields(
|
|
segment_template.get("media"),
|
|
Bandwidth=representation.get("bandwidth"),
|
|
Number=s,
|
|
RepresentationID=representation.get("id"),
|
|
Time=s,
|
|
),
|
|
None,
|
|
)
|
|
)
|
|
# TODO: Should we floor/ceil/round, or is int() ok?
|
|
segment_durations.append(int(segment_duration))
|
|
elif segment_list is not None:
|
|
segment_timescale = float(segment_list.get("timescale") or 1)
|
|
|
|
init_data = None
|
|
initialization = segment_list.find("Initialization")
|
|
if initialization is not None:
|
|
source_url = initialization.get("sourceURL")
|
|
if not source_url:
|
|
source_url = rep_base_url
|
|
elif not re.match("^https?://", source_url, re.IGNORECASE):
|
|
source_url = urljoin(rep_base_url, f"./{source_url}")
|
|
|
|
if initialization.get("range"):
|
|
init_range_header = {"Range": f"bytes={initialization.get('range')}"}
|
|
else:
|
|
init_range_header = None
|
|
|
|
res = session.get(url=source_url, headers=init_range_header)
|
|
res.raise_for_status()
|
|
init_data = res.content
|
|
track_kid = track.get_key_id(init_data)
|
|
|
|
segment_urls = segment_list.findall("SegmentURL")
|
|
for segment_url in segment_urls:
|
|
media_url = segment_url.get("media")
|
|
if not media_url:
|
|
media_url = rep_base_url
|
|
elif not re.match("^https?://", media_url, re.IGNORECASE):
|
|
media_url = urljoin(rep_base_url, f"./{media_url}")
|
|
|
|
segments.append((media_url, segment_url.get("mediaRange")))
|
|
segment_durations.append(int(segment_url.get("duration") or 1))
|
|
elif segment_base is not None:
|
|
media_range = None
|
|
init_data = None
|
|
initialization = segment_base.find("Initialization")
|
|
if initialization is not None:
|
|
if initialization.get("range"):
|
|
init_range_header = {"Range": f"bytes={initialization.get('range')}"}
|
|
else:
|
|
init_range_header = None
|
|
|
|
res = session.get(url=rep_base_url, headers=init_range_header)
|
|
res.raise_for_status()
|
|
init_data = res.content
|
|
track_kid = track.get_key_id(init_data)
|
|
total_size = res.headers.get("Content-Range", "").split("/")[-1]
|
|
if total_size:
|
|
media_range = f"{len(init_data)}-{total_size}"
|
|
|
|
segments.append((rep_base_url, media_range))
|
|
elif rep_base_url:
|
|
segments.append((rep_base_url, None))
|
|
else:
|
|
log.error("Could not find a way to get segments from this MPD manifest.")
|
|
log.debug(track.url)
|
|
sys.exit(1)
|
|
|
|
# TODO: Should we floor/ceil/round, or is int() ok?
|
|
track.data["dash"]["timescale"] = int(segment_timescale)
|
|
track.data["dash"]["segment_durations"] = segment_durations
|
|
|
|
if not track.drm and isinstance(track, (Video, Audio)):
|
|
try:
|
|
track.drm = [Widevine.from_init_data(init_data)]
|
|
except Widevine.Exceptions.PSSHNotFound:
|
|
# it might not have Widevine DRM, or might not have found the PSSH
|
|
log.warning("No Widevine PSSH was found for this track, is it DRM free?")
|
|
|
|
if track.drm:
|
|
track_kid = track_kid or track.get_key_id(url=segments[0][0], session=session)
|
|
drm = track.get_drm_for_cdm(cdm)
|
|
if isinstance(drm, (Widevine, PlayReady)):
|
|
# license and grab content keys
|
|
try:
|
|
if not license_widevine:
|
|
raise ValueError("license_widevine func must be supplied to use DRM")
|
|
progress(downloaded="LICENSING")
|
|
license_widevine(drm, track_kid=track_kid)
|
|
progress(downloaded="[yellow]LICENSED")
|
|
except Exception: # noqa
|
|
DOWNLOAD_CANCELLED.set() # skip pending track downloads
|
|
progress(downloaded="[red]FAILED")
|
|
raise
|
|
else:
|
|
drm = None
|
|
|
|
if DOWNLOAD_LICENCE_ONLY.is_set():
|
|
progress(downloaded="[yellow]SKIPPED")
|
|
return
|
|
|
|
progress(total=len(segments))
|
|
|
|
downloader = track.downloader
|
|
if downloader.__name__ == "aria2c" and any(bytes_range is not None for url, bytes_range in segments):
|
|
# aria2(c) is shit and doesn't support the Range header, fallback to the requests downloader
|
|
downloader = requests_downloader
|
|
log.warning("Falling back to the requests downloader as aria2(c) doesn't support the Range header")
|
|
|
|
downloader_args = dict(
|
|
urls=[
|
|
{"url": url, "headers": {"Range": f"bytes={bytes_range}"} if bytes_range else {}}
|
|
for url, bytes_range in segments
|
|
],
|
|
output_dir=save_dir,
|
|
filename="{i:0%d}.mp4" % (len(str(len(segments)))),
|
|
headers=session.headers,
|
|
cookies=session.cookies,
|
|
proxy=proxy,
|
|
max_workers=max_workers,
|
|
)
|
|
|
|
if downloader.__name__ == "n_m3u8dl_re":
|
|
downloader_args.update({"filename": track.id, "track": track})
|
|
|
|
for status_update in downloader(**downloader_args):
|
|
file_downloaded = status_update.get("file_downloaded")
|
|
if file_downloaded:
|
|
events.emit(events.Types.SEGMENT_DOWNLOADED, track=track, segment=file_downloaded)
|
|
else:
|
|
downloaded = status_update.get("downloaded")
|
|
if downloaded and downloaded.endswith("/s"):
|
|
status_update["downloaded"] = f"DASH {downloaded}"
|
|
progress(**status_update)
|
|
|
|
# see https://github.com/devine-dl/devine/issues/71
|
|
for control_file in save_dir.glob("*.aria2__temp"):
|
|
control_file.unlink()
|
|
|
|
segments_to_merge = [x for x in sorted(save_dir.iterdir()) if x.is_file()]
|
|
with open(save_path, "wb") as f:
|
|
if init_data:
|
|
f.write(init_data)
|
|
if len(segments_to_merge) > 1:
|
|
progress(downloaded="Merging", completed=0, total=len(segments_to_merge))
|
|
for segment_file in segments_to_merge:
|
|
segment_data = segment_file.read_bytes()
|
|
# TODO: fix encoding after decryption?
|
|
if (
|
|
not drm
|
|
and isinstance(track, Subtitle)
|
|
and track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML)
|
|
):
|
|
segment_data = try_ensure_utf8(segment_data)
|
|
segment_data = (
|
|
segment_data.decode("utf8")
|
|
.replace("‎", html.unescape("‎"))
|
|
.replace("‏", html.unescape("‏"))
|
|
.encode("utf8")
|
|
)
|
|
f.write(segment_data)
|
|
f.flush()
|
|
segment_file.unlink()
|
|
progress(advance=1)
|
|
|
|
track.path = save_path
|
|
events.emit(events.Types.TRACK_DOWNLOADED, track=track)
|
|
|
|
if drm:
|
|
progress(downloaded="Decrypting", completed=0, total=100)
|
|
drm.decrypt(save_path)
|
|
track.drm = None
|
|
events.emit(events.Types.TRACK_DECRYPTED, track=track, drm=drm, segment=None)
|
|
progress(downloaded="Decrypting", advance=100)
|
|
|
|
save_dir.rmdir()
|
|
|
|
progress(downloaded="Downloaded")
|
|
|
|
@staticmethod
|
|
def _get(item: str, adaptation_set: Element, representation: Optional[Element] = None) -> Optional[Any]:
|
|
"""Helper to get a requested item from the Representation, otherwise from the AdaptationSet."""
|
|
adaptation_set_item = adaptation_set.get(item)
|
|
if representation is None:
|
|
return adaptation_set_item
|
|
|
|
representation_item = representation.get(item)
|
|
if representation_item is not None:
|
|
return representation_item
|
|
|
|
return adaptation_set_item
|
|
|
|
@staticmethod
|
|
def _findall(
|
|
item: str, adaptation_set: Element, representation: Optional[Element] = None, both: bool = False
|
|
) -> list[Any]:
|
|
"""
|
|
Helper to get all requested items from the Representation, otherwise from the AdaptationSet.
|
|
Optionally, you may pass both=True to keep both values (where available).
|
|
"""
|
|
adaptation_set_items = adaptation_set.findall(item)
|
|
if representation is None:
|
|
return adaptation_set_items
|
|
|
|
representation_items = representation.findall(item)
|
|
|
|
if both:
|
|
return representation_items + adaptation_set_items
|
|
|
|
if representation_items:
|
|
return representation_items
|
|
|
|
return adaptation_set_items
|
|
|
|
@staticmethod
|
|
def get_language(
|
|
adaptation_set: Element,
|
|
representation: Optional[Element] = None,
|
|
fallback: Optional[Union[str, Language]] = None,
|
|
) -> Optional[Language]:
|
|
"""
|
|
Get Language (if any) from the AdaptationSet or Representation.
|
|
|
|
A fallback language may be provided if no language information could be
|
|
retrieved.
|
|
"""
|
|
options = []
|
|
|
|
if representation is not None:
|
|
options.append(representation.get("lang"))
|
|
# derive language from somewhat common id string format
|
|
# the format is typically "{rep_id}_{lang}={bitrate}" or similar
|
|
rep_id = representation.get("id")
|
|
if rep_id:
|
|
m = re.match(r"\w+_(\w+)=\d+", rep_id)
|
|
if m:
|
|
options.append(m.group(1))
|
|
|
|
options.append(adaptation_set.get("lang"))
|
|
|
|
if fallback:
|
|
options.append(fallback)
|
|
|
|
for option in options:
|
|
option = (str(option) or "").strip()
|
|
if not tag_is_valid(option) or option.startswith("und"):
|
|
continue
|
|
return Language.get(option)
|
|
|
|
@staticmethod
|
|
def get_video_range(
|
|
codecs: str, all_supplemental_props: list[Element], all_essential_props: list[Element]
|
|
) -> Video.Range:
|
|
if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")):
|
|
return Video.Range.DV
|
|
|
|
return Video.Range.from_cicp(
|
|
primaries=next(
|
|
(
|
|
int(x.get("value"))
|
|
for x in all_supplemental_props + all_essential_props
|
|
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:ColourPrimaries"
|
|
),
|
|
0,
|
|
),
|
|
transfer=next(
|
|
(
|
|
int(x.get("value"))
|
|
for x in all_supplemental_props + all_essential_props
|
|
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:TransferCharacteristics"
|
|
),
|
|
0,
|
|
),
|
|
matrix=next(
|
|
(
|
|
int(x.get("value"))
|
|
for x in all_supplemental_props + all_essential_props
|
|
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:MatrixCoefficients"
|
|
),
|
|
0,
|
|
),
|
|
)
|
|
|
|
@staticmethod
|
|
def is_trick_mode(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is a Trick-Mode stream."""
|
|
essential_props = adaptation_set.findall("EssentialProperty")
|
|
supplemental_props = adaptation_set.findall("SupplementalProperty")
|
|
|
|
return any(
|
|
prop.get("schemeIdUri") == "http://dashif.org/guidelines/trickmode"
|
|
for prop in essential_props + supplemental_props
|
|
)
|
|
|
|
@staticmethod
|
|
def is_descriptive(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is Descriptive."""
|
|
return any(
|
|
(x.get("schemeIdUri"), x.get("value"))
|
|
in (("urn:mpeg:dash:role:2011", "descriptive"), ("urn:tva:metadata:cs:AudioPurposeCS:2007", "1"))
|
|
for x in adaptation_set.findall("Accessibility")
|
|
)
|
|
|
|
@staticmethod
|
|
def is_forced(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is a Forced Subtitle."""
|
|
return any(
|
|
x.get("schemeIdUri") == "urn:mpeg:dash:role:2011"
|
|
and x.get("value") in ("forced-subtitle", "forced_subtitle")
|
|
for x in adaptation_set.findall("Role")
|
|
)
|
|
|
|
@staticmethod
|
|
def is_sdh(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is for the Hearing Impaired."""
|
|
return any(
|
|
(x.get("schemeIdUri"), x.get("value")) == ("urn:tva:metadata:cs:AudioPurposeCS:2007", "2")
|
|
for x in adaptation_set.findall("Accessibility")
|
|
)
|
|
|
|
@staticmethod
|
|
def is_closed_caption(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is a Closed Caption Subtitle."""
|
|
return any(
|
|
(x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "caption")
|
|
for x in adaptation_set.findall("Role")
|
|
)
|
|
|
|
@staticmethod
|
|
def get_ddp_complexity_index(adaptation_set: Element, representation: Optional[Element]) -> Optional[int]:
|
|
"""Get the DD+ Complexity Index (if any) from the AdaptationSet or Representation."""
|
|
return next(
|
|
(
|
|
int(x.get("value"))
|
|
for x in DASH._findall("SupplementalProperty", adaptation_set, representation, both=True)
|
|
if x.get("schemeIdUri") == "tag:dolby.com,2018:dash:EC3_ExtensionComplexityIndex:2018"
|
|
),
|
|
None,
|
|
)
|
|
|
|
@staticmethod
|
|
def get_drm(protections: list[Element]) -> list[DRM_T]:
|
|
drm: list[DRM_T] = []
|
|
|
|
for protection in protections:
|
|
urn = (protection.get("schemeIdUri") or "").lower()
|
|
|
|
if urn == WidevineCdm.urn:
|
|
pssh_text = protection.findtext("pssh")
|
|
if not pssh_text:
|
|
continue
|
|
pssh = PSSH(pssh_text)
|
|
|
|
kid = protection.get("kid")
|
|
if kid:
|
|
kid = UUID(bytes=base64.b64decode(kid))
|
|
|
|
default_kid = protection.get("default_KID")
|
|
if default_kid:
|
|
kid = UUID(default_kid)
|
|
|
|
if not pssh.key_ids and not kid:
|
|
kid = next((UUID(p.get("default_KID")) for p in protections if p.get("default_KID")), None)
|
|
|
|
drm.append(Widevine(pssh=pssh, kid=kid))
|
|
|
|
elif urn in ("urn:uuid:9a04f079-9840-4286-ab92-e65be0885f95", "urn:microsoft:playready"):
|
|
pr_pssh_b64 = (
|
|
protection.findtext("pssh")
|
|
or protection.findtext("pro")
|
|
or protection.findtext("{urn:microsoft:playready}pro")
|
|
)
|
|
if not pr_pssh_b64:
|
|
continue
|
|
pr_pssh = PR_PSSH(pr_pssh_b64)
|
|
kid_b64 = protection.findtext("kid")
|
|
kid = None
|
|
if kid_b64:
|
|
try:
|
|
kid = UUID(bytes=base64.b64decode(kid_b64))
|
|
except Exception:
|
|
kid = None
|
|
|
|
drm.append(PlayReady(pssh=pr_pssh, kid=kid, pssh_b64=pr_pssh_b64))
|
|
|
|
return drm
|
|
|
|
@staticmethod
|
|
def pt_to_sec(d: Union[str, float]) -> float:
|
|
if isinstance(d, float):
|
|
return d
|
|
has_ymd = d[0:8] == "P0Y0M0DT"
|
|
if d[0:2] != "PT" and not has_ymd:
|
|
raise ValueError("Input data is not a valid time string.")
|
|
if has_ymd:
|
|
d = d[6:].upper() # skip `P0Y0M0DT`
|
|
else:
|
|
d = d[2:].upper() # skip `PT`
|
|
m = re.findall(r"([\d.]+.)", d)
|
|
return sum(float(x[0:-1]) * {"H": 60 * 60, "M": 60, "S": 1}[x[-1].upper()] for x in m)
|
|
|
|
@staticmethod
|
|
def replace_fields(url: str, **kwargs: Any) -> str:
|
|
for field, value in kwargs.items():
|
|
url = url.replace(f"${field}$", str(value))
|
|
m = re.search(rf"\${re.escape(field)}%([a-z0-9]+)\$", url, flags=re.I)
|
|
if m:
|
|
url = url.replace(m.group(), f"{value:{m.group(1)}}")
|
|
return url
|
|
|
|
|
|
__all__ = ("DASH",)
|