Files
unshackle-SeFree/unshackle/core/tracks/hybrid.py
Andy 829ae01000 fix(hybrid): accept HDR10+ tracks as valid base layer for HYBRID mode
HYBRID mode previously required a plain HDR10 track, rejecting HDR10+ (HDR10P) even though it's a perfectly valid (and superior) base layer.
HDR10+ is now preferred over HDR10 when both are available, preserving dynamic metadata in the final DV Profile 8 output.
2026-02-18 15:56:55 -07:00

713 lines
29 KiB
Python

import json
import logging
import os
import random
import re
import subprocess
import sys
from pathlib import Path
from rich.padding import Padding
from rich.rule import Rule
from unshackle.core.binaries import FFMPEG, DoviTool, FFProbe, HDR10PlusTool
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.utilities import get_debug_logger
class Hybrid:
def __init__(self, videos, source) -> None:
self.log = logging.getLogger("hybrid")
self.debug_logger = get_debug_logger()
"""
Takes the Dolby Vision and HDR10(+) streams out of the VideoTracks.
It will then attempt to inject the Dolby Vision metadata layer to the HDR10(+) stream.
If no DV track is available but HDR10+ is present, it will convert HDR10+ to DV.
"""
global directories
from unshackle.core.tracks import Video
self.videos = videos
self.source = source
self.rpu_file = "RPU.bin"
self.hdr_type = "HDR10"
self.hevc_file = f"{self.hdr_type}-DV.hevc"
self.hdr10plus_to_dv = False
self.hdr10plus_file = "HDR10Plus.json"
# Get resolution info from HDR10 track for display
hdr10_track = next((v for v in videos if v.range == Video.Range.HDR10), None)
hdr10p_track = next((v for v in videos if v.range == Video.Range.HDR10P), None)
track_for_res = hdr10_track or hdr10p_track
self.resolution = f"{track_for_res.height}p" if track_for_res and track_for_res.height else "Unknown"
console.print(Padding(Rule(f"[rule.text]HDR10+DV Hybrid ({self.resolution})"), (1, 2)))
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_init",
message="Starting HDR10+DV hybrid processing",
context={
"source": source,
"resolution": self.resolution,
"video_count": len(videos),
"video_ranges": [str(v.range) for v in videos],
},
)
for video in self.videos:
if not video.path or not os.path.exists(video.path):
raise ValueError(f"Video track {video.id} was not downloaded before injection.")
# Check if we have DV track available
has_dv = any(video.range == Video.Range.DV for video in self.videos)
has_hdr10 = any(video.range == Video.Range.HDR10 for video in self.videos)
has_hdr10p = any(video.range == Video.Range.HDR10P for video in self.videos)
if not has_hdr10 and not has_hdr10p:
raise ValueError("No HDR10 or HDR10+ track available for hybrid processing.")
# If we have HDR10+ but no DV, we can convert HDR10+ to DV
if not has_dv and has_hdr10p:
console.status("No DV track found, but HDR10+ is available. Will convert HDR10+ to DV.")
self.hdr10plus_to_dv = True
elif not has_dv:
raise ValueError("No DV track available and no HDR10+ to convert.")
if os.path.isfile(config.directories.temp / self.hevc_file):
console.status("Already Injected")
return
for video in videos:
# Use the actual path from the video track
save_path = video.path
if not save_path or not os.path.exists(save_path):
raise ValueError(f"Video track {video.id} was not downloaded or path not found: {save_path}")
if video.range == Video.Range.HDR10:
self.extract_stream(save_path, "HDR10")
elif video.range == Video.Range.HDR10P:
self.extract_stream(save_path, "HDR10")
self.hdr_type = "HDR10+"
elif video.range == Video.Range.DV:
self.extract_stream(save_path, "DV")
if self.hdr10plus_to_dv:
# Extract HDR10+ metadata and convert to DV
hdr10p_video = next(v for v in videos if v.range == Video.Range.HDR10P)
self.extract_hdr10plus(hdr10p_video)
self.convert_hdr10plus_to_dv()
else:
# Regular DV extraction
dv_video = next(v for v in videos if v.range == Video.Range.DV)
self.extract_rpu(dv_video)
if os.path.isfile(config.directories.temp / "RPU_UNT.bin"):
self.rpu_file = "RPU_UNT.bin"
# Mode 3 conversion already done during extraction when not untouched
elif os.path.isfile(config.directories.temp / "RPU.bin"):
# RPU already extracted with mode 3
pass
# Edit L6 with actual luminance values from RPU, then L5 active area
self.level_6()
base_video = next(
(v for v in videos if v.range in (Video.Range.HDR10, Video.Range.HDR10P)), None
)
if base_video and base_video.path:
self.level_5(base_video.path)
self.injecting()
if self.debug_logger:
self.debug_logger.log(
level="INFO",
operation="hybrid_complete",
message="Injection Completed",
context={
"hdr_type": self.hdr_type,
"resolution": self.resolution,
"hdr10plus_to_dv": self.hdr10plus_to_dv,
"rpu_file": self.rpu_file,
"output_file": self.hevc_file,
},
)
self.log.info("✓ Injection Completed")
if self.source == ("itunes" or "appletvplus"):
Path.unlink(config.directories.temp / "hdr10.mkv")
Path.unlink(config.directories.temp / "dv.mkv")
Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True)
Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True)
Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True)
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
Path.unlink(config.directories.temp / "L5.json", missing_ok=True)
Path.unlink(config.directories.temp / "L6.json", missing_ok=True)
def ffmpeg_simple(self, save_path, output):
"""Simple ffmpeg execution without progress tracking"""
p = subprocess.run(
[
str(FFMPEG) if FFMPEG else "ffmpeg",
"-nostdin",
"-i",
str(save_path),
"-c:v",
"copy",
str(output),
"-y", # overwrite output
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return p
def extract_stream(self, save_path, type_):
output = Path(config.directories.temp / f"{type_}.hevc")
with console.status(f"Extracting {type_} stream...", spinner="dots"):
result = self.ffmpeg_simple(save_path, output)
if result.returncode:
output.unlink(missing_ok=True)
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_stream",
message=f"Failed extracting {type_} stream",
context={
"type": type_,
"input": str(save_path),
"output": str(output),
"returncode": result.returncode,
"stderr": (result.stderr or b"").decode(errors="replace"),
"stdout": (result.stdout or b"").decode(errors="replace"),
},
)
self.log.error(f"x Failed extracting {type_} stream")
sys.exit(1)
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_extract_stream",
message=f"Extracted {type_} stream",
context={"type": type_, "input": str(save_path), "output": str(output)},
success=True,
)
def extract_rpu(self, video, untouched=False):
if os.path.isfile(config.directories.temp / "RPU.bin") or os.path.isfile(
config.directories.temp / "RPU_UNT.bin"
):
return
with console.status(
f"Extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream...", spinner="dots"
):
extraction_args = [str(DoviTool)]
if not untouched:
extraction_args += ["-m", "3"]
extraction_args += [
"extract-rpu",
config.directories.temp / "DV.hevc",
"-o",
config.directories.temp / f"{'RPU' if not untouched else 'RPU_UNT'}.bin",
]
rpu_extraction = subprocess.run(
extraction_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
rpu_name = "RPU" if not untouched else "RPU_UNT"
if rpu_extraction.returncode:
Path.unlink(config.directories.temp / f"{rpu_name}.bin")
stderr_text = rpu_extraction.stderr.decode(errors="replace") if rpu_extraction.stderr else ""
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_rpu",
message=f"Failed extracting{' untouched ' if untouched else ' '}RPU",
context={
"untouched": untouched,
"returncode": rpu_extraction.returncode,
"stderr": stderr_text,
"args": [str(a) for a in extraction_args],
},
)
if b"MAX_PQ_LUMINANCE" in rpu_extraction.stderr:
self.extract_rpu(video, untouched=True)
elif b"Invalid PPS index" in rpu_extraction.stderr:
raise ValueError("Dolby Vision VideoTrack seems to be corrupt")
else:
raise ValueError(f"Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
elif self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_extract_rpu",
message=f"Extracted{' untouched ' if untouched else ' '}RPU from Dolby Vision stream",
context={"untouched": untouched, "output": f"{rpu_name}.bin"},
success=True,
)
def level_5(self, input_video):
"""Generate Level 5 active area metadata via crop detection on the HDR10 stream.
This resolves mismatches where DV has no black bars but HDR10 does (or vice versa)
by telling the display the correct active area.
"""
if os.path.isfile(config.directories.temp / "RPU_L5.bin"):
return
ffprobe_bin = str(FFProbe) if FFProbe else "ffprobe"
ffmpeg_bin = str(FFMPEG) if FFMPEG else "ffmpeg"
# Get video duration for random sampling
with console.status("Detecting active area (crop detection)...", spinner="dots"):
result_duration = subprocess.run(
[ffprobe_bin, "-v", "error", "-show_entries", "format=duration", "-of", "json", str(input_video)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result_duration.returncode != 0:
if self.debug_logger:
self.debug_logger.log(
level="WARNING",
operation="hybrid_level5",
message="Could not probe video duration",
context={"returncode": result_duration.returncode, "stderr": (result_duration.stderr or "")},
)
self.log.warning("Could not probe video duration, skipping L5 crop detection")
return
duration_info = json.loads(result_duration.stdout)
duration = float(duration_info["format"]["duration"])
# Get video resolution for proper border calculation
result_streams = subprocess.run(
[
ffprobe_bin,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"json",
str(input_video),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result_streams.returncode != 0:
if self.debug_logger:
self.debug_logger.log(
level="WARNING",
operation="hybrid_level5",
message="Could not probe video resolution",
context={"returncode": result_streams.returncode, "stderr": (result_streams.stderr or "")},
)
self.log.warning("Could not probe video resolution, skipping L5 crop detection")
return
stream_info = json.loads(result_streams.stdout)
original_width = int(stream_info["streams"][0]["width"])
original_height = int(stream_info["streams"][0]["height"])
# Sample 10 random timestamps and run cropdetect on each
random_times = sorted(random.uniform(0, duration) for _ in range(10))
crop_results = []
for t in random_times:
result_cropdetect = subprocess.run(
[
ffmpeg_bin,
"-y",
"-nostdin",
"-loglevel",
"info",
"-ss",
f"{t:.2f}",
"-i",
str(input_video),
"-vf",
"cropdetect=round=2",
"-vframes",
"10",
"-f",
"null",
"-",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# cropdetect outputs crop=w:h:x:y
crop_match = re.search(
r"crop=(\d+):(\d+):(\d+):(\d+)",
(result_cropdetect.stdout or "") + (result_cropdetect.stderr or ""),
)
if crop_match:
w, h = int(crop_match.group(1)), int(crop_match.group(2))
x, y = int(crop_match.group(3)), int(crop_match.group(4))
# Calculate actual border sizes from crop geometry
left = x
top = y
right = original_width - w - x
bottom = original_height - h - y
crop_results.append((left, top, right, bottom))
if not crop_results:
if self.debug_logger:
self.debug_logger.log(
level="WARNING",
operation="hybrid_level5",
message="No crop data detected, skipping L5",
context={"samples": len(random_times)},
)
self.log.warning("No crop data detected, skipping L5")
return
# Find the most common crop values
crop_counts = {}
for crop in crop_results:
crop_counts[crop] = crop_counts.get(crop, 0) + 1
most_common = max(crop_counts, key=crop_counts.get)
left, top, right, bottom = most_common
# If all borders are 0 there's nothing to correct
if left == 0 and top == 0 and right == 0 and bottom == 0:
return
l5_json = {
"active_area": {
"crop": False,
"presets": [{"id": 0, "left": left, "right": right, "top": top, "bottom": bottom}],
"edits": {"all": 0},
}
}
l5_path = config.directories.temp / "L5.json"
with open(l5_path, "w") as f:
json.dump(l5_json, f, indent=4)
with console.status("Editing RPU Level 5 active area...", spinner="dots"):
result = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
str(config.directories.temp / self.rpu_file),
"-j",
str(l5_path),
"-o",
str(config.directories.temp / "RPU_L5.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level5",
message="Failed editing RPU Level 5 values",
context={"returncode": result.returncode, "stderr": (result.stderr or b"").decode(errors="replace")},
)
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
raise ValueError("Failed editing RPU Level 5 values")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_level5",
message="Edited RPU Level 5 active area",
context={"crop": {"left": left, "right": right, "top": top, "bottom": bottom}, "samples": len(crop_results)},
success=True,
)
self.rpu_file = "RPU_L5.bin"
def level_6(self):
"""Edit RPU Level 6 values using actual luminance data from the RPU."""
if os.path.isfile(config.directories.temp / "RPU_L6.bin"):
return
with console.status("Reading RPU luminance metadata...", spinner="dots"):
result = subprocess.run(
[str(DoviTool), "info", "-i", str(config.directories.temp / self.rpu_file), "-s"],
capture_output=True,
text=True,
)
if result.returncode != 0:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level6",
message="Failed reading RPU metadata for Level 6 values",
context={"returncode": result.returncode, "stderr": (result.stderr or "")},
)
raise ValueError("Failed reading RPU metadata for Level 6 values")
max_cll = None
max_fall = None
max_mdl = None
min_mdl = None
for line in result.stdout.splitlines():
if "RPU content light level (L1):" in line:
parts = line.split("MaxCLL:")[1].split(",")
max_cll = int(float(parts[0].strip().split()[0]))
if len(parts) > 1 and "MaxFALL:" in parts[1]:
max_fall = int(float(parts[1].split("MaxFALL:")[1].strip().split()[0]))
elif "RPU mastering display:" in line:
mastering = line.split(":", 1)[1].strip()
min_lum, max_lum = mastering.split("/")[0], mastering.split("/")[1].split(" ")[0]
min_mdl = int(float(min_lum) * 10000)
max_mdl = int(float(max_lum))
if any(v is None for v in (max_cll, max_fall, max_mdl, min_mdl)):
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level6",
message="Could not extract Level 6 luminance data from RPU",
context={"max_cll": max_cll, "max_fall": max_fall, "max_mdl": max_mdl, "min_mdl": min_mdl},
)
raise ValueError("Could not extract Level 6 luminance data from RPU")
level6_data = {
"level6": {
"remove_cmv4": False,
"remove_mapping": False,
"max_display_mastering_luminance": max_mdl,
"min_display_mastering_luminance": min_mdl,
"max_content_light_level": max_cll,
"max_frame_average_light_level": max_fall,
}
}
l6_path = config.directories.temp / "L6.json"
with open(l6_path, "w") as f:
json.dump(level6_data, f, indent=4)
with console.status("Editing RPU Level 6 values...", spinner="dots"):
result = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
str(config.directories.temp / self.rpu_file),
"-j",
str(l6_path),
"-o",
str(config.directories.temp / "RPU_L6.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level6",
message="Failed editing RPU Level 6 values",
context={"returncode": result.returncode, "stderr": (result.stderr or b"").decode(errors="replace")},
)
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
raise ValueError("Failed editing RPU Level 6 values")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_level6",
message="Edited RPU Level 6 luminance values",
context={
"max_cll": max_cll,
"max_fall": max_fall,
"max_mdl": max_mdl,
"min_mdl": min_mdl,
},
success=True,
)
self.rpu_file = "RPU_L6.bin"
def injecting(self):
if os.path.isfile(config.directories.temp / self.hevc_file):
return
with console.status(f"Injecting Dolby Vision metadata into {self.hdr_type} stream...", spinner="dots"):
inject_cmd = [
str(DoviTool),
"inject-rpu",
"-i",
config.directories.temp / "HDR10.hevc",
"--rpu-in",
config.directories.temp / self.rpu_file,
]
inject_cmd.extend(["-o", config.directories.temp / self.hevc_file])
inject = subprocess.run(
inject_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if inject.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_inject_rpu",
message="Failed injecting Dolby Vision metadata into HDR10 stream",
context={
"returncode": inject.returncode,
"stderr": (inject.stderr or b"").decode(errors="replace"),
"stdout": (inject.stdout or b"").decode(errors="replace"),
"cmd": [str(a) for a in inject_cmd],
},
)
Path.unlink(config.directories.temp / self.hevc_file)
raise ValueError("Failed injecting Dolby Vision metadata into HDR10 stream")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_inject_rpu",
message=f"Injected Dolby Vision metadata into {self.hdr_type} stream",
context={"hdr_type": self.hdr_type, "rpu_file": self.rpu_file, "output": self.hevc_file, "drop_hdr10plus": self.hdr10plus_to_dv},
success=True,
)
def extract_hdr10plus(self, _video):
"""Extract HDR10+ metadata from the video stream"""
if os.path.isfile(config.directories.temp / self.hdr10plus_file):
return
if not HDR10PlusTool:
raise ValueError("HDR10Plus_tool not found. Please install it to use HDR10+ to DV conversion.")
with console.status("Extracting HDR10+ metadata...", spinner="dots"):
# HDR10Plus_tool needs raw HEVC stream
extraction = subprocess.run(
[
str(HDR10PlusTool),
"extract",
str(config.directories.temp / "HDR10.hevc"),
"-o",
str(config.directories.temp / self.hdr10plus_file),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if extraction.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_hdr10plus",
message="Failed extracting HDR10+ metadata",
context={
"returncode": extraction.returncode,
"stderr": (extraction.stderr or b"").decode(errors="replace"),
"stdout": (extraction.stdout or b"").decode(errors="replace"),
},
)
raise ValueError("Failed extracting HDR10+ metadata")
# Check if the extracted file has content
file_size = os.path.getsize(config.directories.temp / self.hdr10plus_file)
if file_size == 0:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_hdr10plus",
message="No HDR10+ metadata found in the stream",
context={"file_size": 0},
)
raise ValueError("No HDR10+ metadata found in the stream")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_extract_hdr10plus",
message="Extracted HDR10+ metadata",
context={"output": self.hdr10plus_file, "file_size": file_size},
success=True,
)
def convert_hdr10plus_to_dv(self):
"""Convert HDR10+ metadata to Dolby Vision RPU"""
if os.path.isfile(config.directories.temp / "RPU.bin"):
return
with console.status("Converting HDR10+ metadata to Dolby Vision...", spinner="dots"):
# First create the extra metadata JSON for dovi_tool
extra_metadata = {
"cm_version": "V29",
"length": 0, # dovi_tool will figure this out
"level6": {
"max_display_mastering_luminance": 1000,
"min_display_mastering_luminance": 1,
"max_content_light_level": 0,
"max_frame_average_light_level": 0,
},
}
with open(config.directories.temp / "extra.json", "w") as f:
json.dump(extra_metadata, f, indent=2)
# Generate DV RPU from HDR10+ metadata
conversion = subprocess.run(
[
str(DoviTool),
"generate",
"-j",
str(config.directories.temp / "extra.json"),
"--hdr10plus-json",
str(config.directories.temp / self.hdr10plus_file),
"-o",
str(config.directories.temp / "RPU.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if conversion.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_convert_hdr10plus",
message="Failed converting HDR10+ to Dolby Vision",
context={
"returncode": conversion.returncode,
"stderr": (conversion.stderr or b"").decode(errors="replace"),
"stdout": (conversion.stdout or b"").decode(errors="replace"),
},
)
raise ValueError("Failed converting HDR10+ to Dolby Vision")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_convert_hdr10plus",
message="Converted HDR10+ metadata to Dolby Vision Profile 8",
success=True,
)
# Clean up temporary files
Path.unlink(config.directories.temp / "extra.json")
Path.unlink(config.directories.temp / self.hdr10plus_file)