Files
Andy 15acaea208 feat(dl): extract closed captions from HLS manifests and improve CC extraction
- Parse CLOSED-CAPTIONS entries from HLS manifests and attach CC metadata (language, name, instream_id) to video tracks
- Move CC extraction to run after decryption instead of before, fixing extraction failures on encrypted streams
- Extract CCs even when other subtitle tracks exist, using manifest CC language info instead of guessing
- Try ccextractor on the original file before repacking to preserve container-level CC data (e.g. c608 boxes) that ffmpeg remux strips
- Display deduplicated closed captions in --list output and download progress, positioned after subtitles
- Add closed_captions field to Video track class
2026-03-05 15:57:29 -07:00

489 lines
19 KiB
Python

from __future__ import annotations
import logging
import math
import re
import subprocess
from enum import Enum
from pathlib import Path
from typing import Any, Optional, Union
from langcodes import Language
from unshackle.core import binaries
from unshackle.core.config import config
from unshackle.core.tracks.subtitle import Subtitle
from unshackle.core.tracks.track import Track
from unshackle.core.utilities import FPS, get_boxes
class Video(Track):
class Codec(str, Enum):
AVC = "H.264"
HEVC = "H.265"
VC1 = "VC-1"
VP8 = "VP8"
VP9 = "VP9"
AV1 = "AV1"
@property
def extension(self) -> str:
return self.value.lower().replace(".", "").replace("-", "")
@staticmethod
def from_mime(mime: str) -> Video.Codec:
mime = mime.lower().strip().split(".")[0]
if mime in (
"avc1",
"avc2",
"avc3",
"dva1",
"dvav", # Dolby Vision
):
return Video.Codec.AVC
if mime in (
"hev1",
"hev2",
"hev3",
"hvc1",
"hvc2",
"hvc3",
"dvh1",
"dvhe", # Dolby Vision
"lhv1",
"lhe1", # Layered
):
return Video.Codec.HEVC
if mime == "vc-1":
return Video.Codec.VC1
if mime in ("vp08", "vp8"):
return Video.Codec.VP8
if mime in ("vp09", "vp9"):
return Video.Codec.VP9
if mime == "av01":
return Video.Codec.AV1
raise ValueError(f"The MIME '{mime}' is not a supported Video Codec")
@staticmethod
def from_codecs(codecs: str) -> Video.Codec:
for codec in codecs.lower().split(","):
codec = codec.strip()
mime = codec.split(".")[0]
try:
return Video.Codec.from_mime(mime)
except ValueError:
pass
raise ValueError(f"No MIME types matched any supported Video Codecs in '{codecs}'")
@staticmethod
def from_netflix_profile(profile: str) -> Video.Codec:
profile = profile.lower().strip()
if profile.startswith(("h264", "playready-h264")):
return Video.Codec.AVC
if profile.startswith("hevc"):
return Video.Codec.HEVC
if profile.startswith("vp9"):
return Video.Codec.VP9
if profile.startswith("av1"):
return Video.Codec.AV1
raise ValueError(f"The Content Profile '{profile}' is not a supported Video Codec")
class Range(str, Enum):
SDR = "SDR" # No Dynamic Range
HLG = "HLG" # https://en.wikipedia.org/wiki/Hybrid_log%E2%80%93gamma
HDR10 = "HDR10" # https://en.wikipedia.org/wiki/HDR10
HDR10P = "HDR10+" # https://en.wikipedia.org/wiki/HDR10%2B
DV = "DV" # https://en.wikipedia.org/wiki/Dolby_Vision
HYBRID = "HYBRID" # Selects both HDR10 and DV tracks for hybrid processing with DoviTool
@staticmethod
def from_cicp(primaries: int, transfer: int, matrix: int) -> Video.Range:
"""
Convert CICP (Coding-Independent Code Points) values to Video Range.
CICP is defined in ITU-T H.273 and ISO/IEC 23091-2 for signaling video
color properties independently of the compression codec. These values are
used across AVC (H.264), HEVC (H.265), VVC, AV1, and other modern codecs.
The enum values (Primaries, Transfer, Matrix) match the official specifications:
- ITU-T H.273: Coding-independent code points for video signal type identification
- ISO/IEC 23091-2: Information technology — Coding-independent code points — Part 2: Video
- H.264 Table E-3 (Colour Primaries) and Table E-4 (Transfer Characteristics)
- H.265 Table E.3 and E.4 (identical to H.264)
Note: Value 0 = "Reserved" and Value 2 = "Unspecified" per specification.
While both effectively mean "unknown" in practice, the distinction matters for
spec compliance. Value 2 was added based on user feedback (GitHub issue) and
verified against FFmpeg's AVColorPrimaries/AVColorTransferCharacteristic enums.
Sources:
- https://www.itu.int/rec/T-REC-H.273
- https://www.itu.int/rec/T-REC-H.Sup19-202104-I
- https://github.com/FFmpeg/FFmpeg/blob/master/libavutil/pixfmt.h
"""
class Primaries(Enum):
Reserved = 0
BT_709 = 1
Unspecified = 2
BT_601_625 = 5
BT_601_525 = 6
BT_2020_and_2100 = 9
SMPTE_ST_2113_and_EG_4321 = 12 # P3D65
class Transfer(Enum):
Reserved = 0
BT_709 = 1
Unspecified = 2
BT_601 = 6
BT_2020 = 14
BT_2100 = 15
BT_2100_PQ = 16
BT_2100_HLG = 18
class Matrix(Enum):
RGB = 0
YCbCr_BT_709 = 1
YCbCr_BT_601_625 = 5
YCbCr_BT_601_525 = 6
YCbCr_BT_2020_and_2100 = 9 # YCbCr BT.2100 shares the same CP
ICtCp_BT_2100 = 14
if transfer == 5:
# While not part of any standard, it is typically used as a PAL variant of Transfer.BT_601=6.
# i.e. where Transfer 6 would be for BT.601-NTSC and Transfer 5 would be for BT.601-PAL.
# The codebase is currently agnostic to either, so a manual conversion to 6 is done.
transfer = 6
primaries = Primaries(primaries)
transfer = Transfer(transfer)
matrix = Matrix(matrix)
# primaries and matrix does not strictly correlate to a range
if (primaries, transfer, matrix) == (Primaries.Reserved, Transfer.Reserved, Matrix.RGB):
return Video.Range.SDR
elif primaries in (Primaries.BT_601_625, Primaries.BT_601_525):
return Video.Range.SDR
elif transfer == Transfer.BT_2100_PQ:
return Video.Range.HDR10
elif transfer == Transfer.BT_2100_HLG:
return Video.Range.HLG
else:
return Video.Range.SDR
@staticmethod
def from_m3u_range_tag(tag: str) -> Optional[Video.Range]:
tag = (tag or "").upper().replace('"', "").strip()
if not tag:
return None
if tag == "SDR":
return Video.Range.SDR
elif tag == "PQ":
return Video.Range.HDR10 # technically could be any PQ-transfer range
elif tag == "HLG":
return Video.Range.HLG
# for some reason there's no Dolby Vision info tag
raise ValueError(f"The M3U Range Tag '{tag}' is not a supported Video Range")
class ScanType(str, Enum):
PROGRESSIVE = "progressive"
INTERLACED = "interlaced"
def __init__(
self,
*args: Any,
codec: Optional[Video.Codec] = None,
range_: Optional[Video.Range] = None,
bitrate: Optional[Union[str, int, float]] = None,
width: Optional[int] = None,
height: Optional[int] = None,
fps: Optional[Union[str, int, float]] = None,
scan_type: Optional[Video.ScanType] = None,
closed_captions: Optional[list[dict[str, Any]]] = None,
**kwargs: Any,
) -> None:
"""
Create a new Video track object.
Parameters:
codec: A Video.Codec enum representing the video codec.
If not specified, MediaInfo will be used to retrieve the codec
once the track has been downloaded.
range_: A Video.Range enum representing the video color range.
Defaults to SDR if not specified.
bitrate: A number or float representing the average bandwidth in bytes/s.
Float values are rounded up to the nearest integer.
width: The horizontal resolution of the video.
height: The vertical resolution of the video.
fps: A number, float, or string representing the frames/s of the video.
Strings may represent numbers, floats, or a fraction (num/den).
All strings will be cast to either a number or float.
Note: If codec, bitrate, width, height, or fps is not specified some checks
may be skipped or assume a value. Specifying as much information as possible
is highly recommended.
"""
super().__init__(*args, **kwargs)
if not isinstance(codec, (Video.Codec, type(None))):
raise TypeError(f"Expected codec to be a {Video.Codec}, not {codec!r}")
if not isinstance(range_, (Video.Range, type(None))):
raise TypeError(f"Expected range_ to be a {Video.Range}, not {range_!r}")
if not isinstance(bitrate, (str, int, float, type(None))):
raise TypeError(f"Expected bitrate to be a {str}, {int}, or {float}, not {bitrate!r}")
if not isinstance(width, (int, str, type(None))):
raise TypeError(f"Expected width to be a {int}, not {width!r}")
if not isinstance(height, (int, str, type(None))):
raise TypeError(f"Expected height to be a {int}, not {height!r}")
if not isinstance(fps, (str, int, float, type(None))):
raise TypeError(f"Expected fps to be a {str}, {int}, or {float}, not {fps!r}")
if not isinstance(scan_type, (Video.ScanType, type(None))):
raise TypeError(f"Expected scan_type to be a {Video.ScanType}, not {scan_type!r}")
self.codec = codec
self.range = range_ or Video.Range.SDR
try:
self.bitrate = int(math.ceil(float(bitrate))) if bitrate else None
except (ValueError, TypeError) as e:
raise ValueError(f"Expected bitrate to be a number or float, {e}")
try:
self.width = int(width or 0) or None
except ValueError as e:
raise ValueError(f"Expected width to be a number, not {width!r}, {e}")
try:
self.height = int(height or 0) or None
except ValueError as e:
raise ValueError(f"Expected height to be a number, not {height!r}, {e}")
try:
self.fps = (FPS.parse(str(fps)) or None) if fps else None
except Exception as e:
raise ValueError("Expected fps to be a number, float, or a string as numerator/denominator form, " + str(e))
self.scan_type = scan_type
self.closed_captions: list[dict[str, Any]] = closed_captions or []
self.needs_duration_fix = False
def __str__(self) -> str:
return " | ".join(
filter(
bool,
[
"VID",
"[" + (", ".join(filter(bool, [self.codec.value if self.codec else None, self.range.name]))) + "]",
str(self.language),
", ".join(
filter(
bool,
[
" @ ".join(
filter(
bool,
[
f"{self.width}x{self.height}" if self.width and self.height else None,
f"{self.bitrate // 1000} kb/s" if self.bitrate else None,
],
)
),
f"{self.fps:.3f} FPS" if self.fps else None,
],
)
),
", ".join(self.edition) if self.edition else None,
],
)
)
def change_color_range(self, range_: int) -> None:
"""Change the Video's Color Range to Limited (0) or Full (1)."""
if not self.path or not self.path.exists():
raise ValueError("Cannot change the color range flag on a Video that has not been downloaded.")
if not self.codec:
raise ValueError("Cannot change the color range flag on a Video that has no codec specified.")
if self.codec not in (Video.Codec.AVC, Video.Codec.HEVC):
raise NotImplementedError(
"Cannot change the color range flag on this Video as "
f"it's codec, {self.codec.value}, is not yet supported."
)
if not binaries.FFMPEG:
raise EnvironmentError('FFmpeg executable "ffmpeg" was not found but is required for this call.')
filter_key = {Video.Codec.AVC: "h264_metadata", Video.Codec.HEVC: "hevc_metadata"}[self.codec]
original_path = self.path
output_path = original_path.with_stem(f"{original_path.stem}_{['limited', 'full'][range_]}_range")
subprocess.run(
[
binaries.FFMPEG,
"-hide_banner",
"-loglevel",
"panic",
"-i",
original_path,
"-codec",
"copy",
"-bsf:v",
f"{filter_key}=video_full_range_flag={range_}",
str(output_path),
],
check=True,
)
self.path = output_path
original_path.unlink()
def ccextractor(
self, track_id: Any, out_path: Union[Path, str], language: Language, original: bool = False
) -> Optional[Subtitle]:
"""Return a TextTrack object representing CC track extracted by CCExtractor."""
if not self.path:
raise ValueError("You must download the track first.")
if not binaries.CCExtractor:
raise EnvironmentError("ccextractor executable was not found.")
out_path = Path(out_path)
def _run_ccextractor() -> bool:
try:
subprocess.run(
[binaries.CCExtractor, "-trim", "-nobom", "-noru", "-ru1", "-o", out_path, self.path],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as e:
out_path.unlink(missing_ok=True)
if e.returncode != 10: # 10 = No captions found
raise
return out_path.exists()
# Try on the original file first (preserves container-level CC data like c608 boxes),
# then fall back to repacked file (ccextractor can fail on some container formats).
if not _run_ccextractor():
self.repackage()
_run_ccextractor()
if out_path.exists():
cc_track = Subtitle(
id_=track_id,
url="", # doesn't need to be downloaded
codec=Subtitle.Codec.SubRip,
language=language,
is_original_lang=original,
cc=True,
)
cc_track.path = out_path
return cc_track
return None
def extract_c608(self) -> list[Subtitle]:
"""
Extract Apple-Style c608 box (CEA-608) subtitle using ccextractor.
This isn't much more than a wrapper to the track.ccextractor function.
All this does, is actually check if a c608 box exists and only if so
does it actually call ccextractor.
Even though there is a possibility of more than one c608 box, only one
can actually be extracted. Not only that but it's very possible this
needs to be done before any decryption as the decryption may destroy
some of the metadata.
TODO: Need a test file with more than one c608 box to add support for
more than one CEA-608 extraction.
"""
if not self.path:
raise ValueError("You must download the track first.")
with self.path.open("rb") as f:
# assuming 20KB is enough to contain the c608 box.
# ffprobe will fail, so a c608 box check must be done.
c608_count = len(list(get_boxes(f.read(20000), b"c608")))
if c608_count > 0:
# TODO: Figure out the real language, it might be different
# CEA-608 boxes doesnt seem to carry language information :(
# TODO: Figure out if the CC language is original lang or not.
# Will need to figure out above first to do so.
track_id = f"ccextractor-{self.id}"
cc_lang = self.language
cc_track = self.ccextractor(
track_id=track_id,
out_path=config.directories.temp / config.filenames.subtitle.format(id=track_id, language=cc_lang),
language=cc_lang,
original=False,
)
if not cc_track:
return []
return [cc_track]
return []
def remove_eia_cc(self) -> bool:
"""
Remove EIA-CC data from Bitstream while keeping SEI data.
This works by removing all NAL Unit's with the Type of 6 from the bistream
and then re-adding SEI data (effectively a new NAL Unit with just the SEI data).
Only bitstreams with x264 encoding information is currently supported due to the
obscurity on the MDAT mp4 box structure. Therefore, we need to use hacky regex.
"""
if not self.path or not self.path.exists():
raise ValueError("Cannot clean a Track that has not been downloaded.")
if not binaries.FFMPEG:
raise EnvironmentError('FFmpeg executable "ffmpeg" was not found but is required for this call.')
log = logging.getLogger("x264-clean")
log.info("Removing EIA-CC from Video Track with FFMPEG")
with open(self.path, "rb") as f:
file = f.read(60000)
x264 = re.search(rb"(.{16})(x264)", file)
if not x264:
log.info(" - No x264 encode settings were found, unsupported...")
return False
uuid = x264.group(1).hex()
i = file.index(b"x264")
encoding_settings = file[i : i + file[i:].index(b"\x00")].replace(b":", rb"\\:").replace(b",", rb"\,").decode()
original_path = self.path
cleaned_path = original_path.with_suffix(f".cleaned{original_path.suffix}")
subprocess.run(
[
binaries.FFMPEG,
"-hide_banner",
"-loglevel",
"panic",
"-i",
original_path,
"-map_metadata",
"-1",
"-fflags",
"bitexact",
"-bsf:v",
f"filter_units=remove_types=6,h264_metadata=sei_user_data={uuid}+{encoding_settings}",
"-codec",
"copy",
str(cleaned_path),
],
check=True,
)
log.info(" + Removed")
self.path = cleaned_path
original_path.unlink()
return True
__all__ = ("Video",)