from __future__ import annotations import logging import math import re import subprocess from enum import Enum from pathlib import Path from typing import Any, Optional, Union from langcodes import Language from unshackle.core import binaries from unshackle.core.config import config from unshackle.core.tracks.subtitle import Subtitle from unshackle.core.tracks.track import Track from unshackle.core.utilities import FPS, get_boxes class Video(Track): class Codec(str, Enum): AVC = "H.264" HEVC = "H.265" VC1 = "VC-1" VP8 = "VP8" VP9 = "VP9" AV1 = "AV1" @property def extension(self) -> str: return self.value.lower().replace(".", "").replace("-", "") @staticmethod def from_mime(mime: str) -> Video.Codec: mime = mime.lower().strip().split(".")[0] if mime in ( "avc1", "avc2", "avc3", "dva1", "dvav", # Dolby Vision ): return Video.Codec.AVC if mime in ( "hev1", "hev2", "hev3", "hvc1", "hvc2", "hvc3", "dvh1", "dvhe", # Dolby Vision "lhv1", "lhe1", # Layered ): return Video.Codec.HEVC if mime == "vc-1": return Video.Codec.VC1 if mime in ("vp08", "vp8"): return Video.Codec.VP8 if mime in ("vp09", "vp9"): return Video.Codec.VP9 if mime == "av01": return Video.Codec.AV1 raise ValueError(f"The MIME '{mime}' is not a supported Video Codec") @staticmethod def from_codecs(codecs: str) -> Video.Codec: for codec in codecs.lower().split(","): codec = codec.strip() mime = codec.split(".")[0] try: return Video.Codec.from_mime(mime) except ValueError: pass raise ValueError(f"No MIME types matched any supported Video Codecs in '{codecs}'") @staticmethod def from_netflix_profile(profile: str) -> Video.Codec: profile = profile.lower().strip() if profile.startswith(("h264", "playready-h264")): return Video.Codec.AVC if profile.startswith("hevc"): return Video.Codec.HEVC if profile.startswith("vp9"): return Video.Codec.VP9 if profile.startswith("av1"): return Video.Codec.AV1 raise ValueError(f"The Content Profile '{profile}' is not a supported Video Codec") class Range(str, Enum): SDR = "SDR" # No Dynamic Range HLG = "HLG" # https://en.wikipedia.org/wiki/Hybrid_log%E2%80%93gamma HDR10 = "HDR10" # https://en.wikipedia.org/wiki/HDR10 HDR10P = "HDR10+" # https://en.wikipedia.org/wiki/HDR10%2B DV = "DV" # https://en.wikipedia.org/wiki/Dolby_Vision HYBRID = "HYBRID" # Selects both HDR10 and DV tracks for hybrid processing with DoviTool @staticmethod def from_cicp(primaries: int, transfer: int, matrix: int) -> Video.Range: """ Convert CICP (Coding-Independent Code Points) values to Video Range. CICP is defined in ITU-T H.273 and ISO/IEC 23091-2 for signaling video color properties independently of the compression codec. These values are used across AVC (H.264), HEVC (H.265), VVC, AV1, and other modern codecs. The enum values (Primaries, Transfer, Matrix) match the official specifications: - ITU-T H.273: Coding-independent code points for video signal type identification - ISO/IEC 23091-2: Information technology — Coding-independent code points — Part 2: Video - H.264 Table E-3 (Colour Primaries) and Table E-4 (Transfer Characteristics) - H.265 Table E.3 and E.4 (identical to H.264) Note: Value 0 = "Reserved" and Value 2 = "Unspecified" per specification. While both effectively mean "unknown" in practice, the distinction matters for spec compliance. Value 2 was added based on user feedback (GitHub issue) and verified against FFmpeg's AVColorPrimaries/AVColorTransferCharacteristic enums. Sources: - https://www.itu.int/rec/T-REC-H.273 - https://www.itu.int/rec/T-REC-H.Sup19-202104-I - https://github.com/FFmpeg/FFmpeg/blob/master/libavutil/pixfmt.h """ class Primaries(Enum): Reserved = 0 BT_709 = 1 Unspecified = 2 BT_601_625 = 5 BT_601_525 = 6 BT_2020_and_2100 = 9 SMPTE_ST_2113_and_EG_4321 = 12 # P3D65 class Transfer(Enum): Reserved = 0 BT_709 = 1 Unspecified = 2 BT_601 = 6 BT_2020 = 14 BT_2100 = 15 BT_2100_PQ = 16 BT_2100_HLG = 18 class Matrix(Enum): RGB = 0 YCbCr_BT_709 = 1 YCbCr_BT_601_625 = 5 YCbCr_BT_601_525 = 6 YCbCr_BT_2020_and_2100 = 9 # YCbCr BT.2100 shares the same CP ICtCp_BT_2100 = 14 if transfer == 5: # While not part of any standard, it is typically used as a PAL variant of Transfer.BT_601=6. # i.e. where Transfer 6 would be for BT.601-NTSC and Transfer 5 would be for BT.601-PAL. # The codebase is currently agnostic to either, so a manual conversion to 6 is done. transfer = 6 primaries = Primaries(primaries) transfer = Transfer(transfer) matrix = Matrix(matrix) # primaries and matrix does not strictly correlate to a range if (primaries, transfer, matrix) == (Primaries.Reserved, Transfer.Reserved, Matrix.RGB): return Video.Range.SDR elif primaries in (Primaries.BT_601_625, Primaries.BT_601_525): return Video.Range.SDR elif transfer == Transfer.BT_2100_PQ: return Video.Range.HDR10 elif transfer == Transfer.BT_2100_HLG: return Video.Range.HLG else: return Video.Range.SDR @staticmethod def from_m3u_range_tag(tag: str) -> Optional[Video.Range]: tag = (tag or "").upper().replace('"', "").strip() if not tag: return None if tag == "SDR": return Video.Range.SDR elif tag == "PQ": return Video.Range.HDR10 # technically could be any PQ-transfer range elif tag == "HLG": return Video.Range.HLG # for some reason there's no Dolby Vision info tag raise ValueError(f"The M3U Range Tag '{tag}' is not a supported Video Range") class ScanType(str, Enum): PROGRESSIVE = "progressive" INTERLACED = "interlaced" def __init__( self, *args: Any, codec: Optional[Video.Codec] = None, range_: Optional[Video.Range] = None, bitrate: Optional[Union[str, int, float]] = None, width: Optional[int] = None, height: Optional[int] = None, fps: Optional[Union[str, int, float]] = None, scan_type: Optional[Video.ScanType] = None, closed_captions: Optional[list[dict[str, Any]]] = None, **kwargs: Any, ) -> None: """ Create a new Video track object. Parameters: codec: A Video.Codec enum representing the video codec. If not specified, MediaInfo will be used to retrieve the codec once the track has been downloaded. range_: A Video.Range enum representing the video color range. Defaults to SDR if not specified. bitrate: A number or float representing the average bandwidth in bytes/s. Float values are rounded up to the nearest integer. width: The horizontal resolution of the video. height: The vertical resolution of the video. fps: A number, float, or string representing the frames/s of the video. Strings may represent numbers, floats, or a fraction (num/den). All strings will be cast to either a number or float. Note: If codec, bitrate, width, height, or fps is not specified some checks may be skipped or assume a value. Specifying as much information as possible is highly recommended. """ super().__init__(*args, **kwargs) if not isinstance(codec, (Video.Codec, type(None))): raise TypeError(f"Expected codec to be a {Video.Codec}, not {codec!r}") if not isinstance(range_, (Video.Range, type(None))): raise TypeError(f"Expected range_ to be a {Video.Range}, not {range_!r}") if not isinstance(bitrate, (str, int, float, type(None))): raise TypeError(f"Expected bitrate to be a {str}, {int}, or {float}, not {bitrate!r}") if not isinstance(width, (int, str, type(None))): raise TypeError(f"Expected width to be a {int}, not {width!r}") if not isinstance(height, (int, str, type(None))): raise TypeError(f"Expected height to be a {int}, not {height!r}") if not isinstance(fps, (str, int, float, type(None))): raise TypeError(f"Expected fps to be a {str}, {int}, or {float}, not {fps!r}") if not isinstance(scan_type, (Video.ScanType, type(None))): raise TypeError(f"Expected scan_type to be a {Video.ScanType}, not {scan_type!r}") self.codec = codec self.range = range_ or Video.Range.SDR try: self.bitrate = int(math.ceil(float(bitrate))) if bitrate else None except (ValueError, TypeError) as e: raise ValueError(f"Expected bitrate to be a number or float, {e}") try: self.width = int(width or 0) or None except ValueError as e: raise ValueError(f"Expected width to be a number, not {width!r}, {e}") try: self.height = int(height or 0) or None except ValueError as e: raise ValueError(f"Expected height to be a number, not {height!r}, {e}") try: self.fps = (FPS.parse(str(fps)) or None) if fps else None except Exception as e: raise ValueError("Expected fps to be a number, float, or a string as numerator/denominator form, " + str(e)) self.scan_type = scan_type self.closed_captions: list[dict[str, Any]] = closed_captions or [] self.needs_duration_fix = False def __str__(self) -> str: return " | ".join( filter( bool, [ "VID", "[" + (", ".join(filter(bool, [self.codec.value if self.codec else None, self.range.name]))) + "]", str(self.language), ", ".join( filter( bool, [ " @ ".join( filter( bool, [ f"{self.width}x{self.height}" if self.width and self.height else None, f"{self.bitrate // 1000} kb/s" if self.bitrate else None, ], ) ), f"{self.fps:.3f} FPS" if self.fps else None, ], ) ), ", ".join(self.edition) if self.edition else None, ], ) ) def change_color_range(self, range_: int) -> None: """Change the Video's Color Range to Limited (0) or Full (1).""" if not self.path or not self.path.exists(): raise ValueError("Cannot change the color range flag on a Video that has not been downloaded.") if not self.codec: raise ValueError("Cannot change the color range flag on a Video that has no codec specified.") if self.codec not in (Video.Codec.AVC, Video.Codec.HEVC): raise NotImplementedError( "Cannot change the color range flag on this Video as " f"it's codec, {self.codec.value}, is not yet supported." ) if not binaries.FFMPEG: raise EnvironmentError('FFmpeg executable "ffmpeg" was not found but is required for this call.') filter_key = {Video.Codec.AVC: "h264_metadata", Video.Codec.HEVC: "hevc_metadata"}[self.codec] original_path = self.path output_path = original_path.with_stem(f"{original_path.stem}_{['limited', 'full'][range_]}_range") subprocess.run( [ binaries.FFMPEG, "-hide_banner", "-loglevel", "panic", "-i", original_path, "-codec", "copy", "-bsf:v", f"{filter_key}=video_full_range_flag={range_}", str(output_path), ], check=True, ) self.path = output_path original_path.unlink() def ccextractor( self, track_id: Any, out_path: Union[Path, str], language: Language, original: bool = False ) -> Optional[Subtitle]: """Return a TextTrack object representing CC track extracted by CCExtractor.""" if not self.path: raise ValueError("You must download the track first.") if not binaries.CCExtractor: raise EnvironmentError("ccextractor executable was not found.") out_path = Path(out_path) def _run_ccextractor() -> bool: try: subprocess.run( [binaries.CCExtractor, "-trim", "-nobom", "-noru", "-ru1", "-o", out_path, self.path], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except subprocess.CalledProcessError as e: out_path.unlink(missing_ok=True) if e.returncode != 10: # 10 = No captions found raise return out_path.exists() # Try on the original file first (preserves container-level CC data like c608 boxes), # then fall back to repacked file (ccextractor can fail on some container formats). if not _run_ccextractor(): self.repackage() _run_ccextractor() if out_path.exists(): cc_track = Subtitle( id_=track_id, url="", # doesn't need to be downloaded codec=Subtitle.Codec.SubRip, language=language, is_original_lang=original, cc=True, ) cc_track.path = out_path return cc_track return None def extract_c608(self) -> list[Subtitle]: """ Extract Apple-Style c608 box (CEA-608) subtitle using ccextractor. This isn't much more than a wrapper to the track.ccextractor function. All this does, is actually check if a c608 box exists and only if so does it actually call ccextractor. Even though there is a possibility of more than one c608 box, only one can actually be extracted. Not only that but it's very possible this needs to be done before any decryption as the decryption may destroy some of the metadata. TODO: Need a test file with more than one c608 box to add support for more than one CEA-608 extraction. """ if not self.path: raise ValueError("You must download the track first.") with self.path.open("rb") as f: # assuming 20KB is enough to contain the c608 box. # ffprobe will fail, so a c608 box check must be done. c608_count = len(list(get_boxes(f.read(20000), b"c608"))) if c608_count > 0: # TODO: Figure out the real language, it might be different # CEA-608 boxes doesnt seem to carry language information :( # TODO: Figure out if the CC language is original lang or not. # Will need to figure out above first to do so. track_id = f"ccextractor-{self.id}" cc_lang = self.language cc_track = self.ccextractor( track_id=track_id, out_path=config.directories.temp / config.filenames.subtitle.format(id=track_id, language=cc_lang), language=cc_lang, original=False, ) if not cc_track: return [] return [cc_track] return [] def remove_eia_cc(self) -> bool: """ Remove EIA-CC data from Bitstream while keeping SEI data. This works by removing all NAL Unit's with the Type of 6 from the bistream and then re-adding SEI data (effectively a new NAL Unit with just the SEI data). Only bitstreams with x264 encoding information is currently supported due to the obscurity on the MDAT mp4 box structure. Therefore, we need to use hacky regex. """ if not self.path or not self.path.exists(): raise ValueError("Cannot clean a Track that has not been downloaded.") if not binaries.FFMPEG: raise EnvironmentError('FFmpeg executable "ffmpeg" was not found but is required for this call.') log = logging.getLogger("x264-clean") log.info("Removing EIA-CC from Video Track with FFMPEG") with open(self.path, "rb") as f: file = f.read(60000) x264 = re.search(rb"(.{16})(x264)", file) if not x264: log.info(" - No x264 encode settings were found, unsupported...") return False uuid = x264.group(1).hex() i = file.index(b"x264") encoding_settings = file[i : i + file[i:].index(b"\x00")].replace(b":", rb"\\:").replace(b",", rb"\,").decode() original_path = self.path cleaned_path = original_path.with_suffix(f".cleaned{original_path.suffix}") subprocess.run( [ binaries.FFMPEG, "-hide_banner", "-loglevel", "panic", "-i", original_path, "-map_metadata", "-1", "-fflags", "bitexact", "-bsf:v", f"filter_units=remove_types=6,h264_metadata=sei_user_data={uuid}+{encoding_settings}", "-codec", "copy", str(cleaned_path), ], check=True, ) log.info(" + Removed") self.path = cleaned_path original_path.unlink() return True __all__ = ("Video",)