HYBRID mode previously required a plain HDR10 track, rejecting HDR10+ (HDR10P) even though it's a perfectly valid (and superior) base layer. HDR10+ is now preferred over HDR10 when both are available, preserving dynamic metadata in the final DV Profile 8 output.
713 lines
29 KiB
Python
713 lines
29 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from rich.padding import Padding
|
|
from rich.rule import Rule
|
|
|
|
from unshackle.core.binaries import FFMPEG, DoviTool, FFProbe, HDR10PlusTool
|
|
from unshackle.core.config import config
|
|
from unshackle.core.console import console
|
|
from unshackle.core.utilities import get_debug_logger
|
|
|
|
|
|
class Hybrid:
|
|
def __init__(self, videos, source) -> None:
|
|
self.log = logging.getLogger("hybrid")
|
|
self.debug_logger = get_debug_logger()
|
|
|
|
"""
|
|
Takes the Dolby Vision and HDR10(+) streams out of the VideoTracks.
|
|
It will then attempt to inject the Dolby Vision metadata layer to the HDR10(+) stream.
|
|
If no DV track is available but HDR10+ is present, it will convert HDR10+ to DV.
|
|
"""
|
|
global directories
|
|
from unshackle.core.tracks import Video
|
|
|
|
self.videos = videos
|
|
self.source = source
|
|
self.rpu_file = "RPU.bin"
|
|
self.hdr_type = "HDR10"
|
|
self.hevc_file = f"{self.hdr_type}-DV.hevc"
|
|
self.hdr10plus_to_dv = False
|
|
self.hdr10plus_file = "HDR10Plus.json"
|
|
|
|
# Get resolution info from HDR10 track for display
|
|
hdr10_track = next((v for v in videos if v.range == Video.Range.HDR10), None)
|
|
hdr10p_track = next((v for v in videos if v.range == Video.Range.HDR10P), None)
|
|
track_for_res = hdr10_track or hdr10p_track
|
|
self.resolution = f"{track_for_res.height}p" if track_for_res and track_for_res.height else "Unknown"
|
|
|
|
console.print(Padding(Rule(f"[rule.text]HDR10+DV Hybrid ({self.resolution})"), (1, 2)))
|
|
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="DEBUG",
|
|
operation="hybrid_init",
|
|
message="Starting HDR10+DV hybrid processing",
|
|
context={
|
|
"source": source,
|
|
"resolution": self.resolution,
|
|
"video_count": len(videos),
|
|
"video_ranges": [str(v.range) for v in videos],
|
|
},
|
|
)
|
|
|
|
for video in self.videos:
|
|
if not video.path or not os.path.exists(video.path):
|
|
raise ValueError(f"Video track {video.id} was not downloaded before injection.")
|
|
|
|
# Check if we have DV track available
|
|
has_dv = any(video.range == Video.Range.DV for video in self.videos)
|
|
has_hdr10 = any(video.range == Video.Range.HDR10 for video in self.videos)
|
|
has_hdr10p = any(video.range == Video.Range.HDR10P for video in self.videos)
|
|
|
|
if not has_hdr10 and not has_hdr10p:
|
|
raise ValueError("No HDR10 or HDR10+ track available for hybrid processing.")
|
|
|
|
# If we have HDR10+ but no DV, we can convert HDR10+ to DV
|
|
if not has_dv and has_hdr10p:
|
|
console.status("No DV track found, but HDR10+ is available. Will convert HDR10+ to DV.")
|
|
self.hdr10plus_to_dv = True
|
|
elif not has_dv:
|
|
raise ValueError("No DV track available and no HDR10+ to convert.")
|
|
|
|
if os.path.isfile(config.directories.temp / self.hevc_file):
|
|
console.status("Already Injected")
|
|
return
|
|
|
|
for video in videos:
|
|
# Use the actual path from the video track
|
|
save_path = video.path
|
|
if not save_path or not os.path.exists(save_path):
|
|
raise ValueError(f"Video track {video.id} was not downloaded or path not found: {save_path}")
|
|
|
|
if video.range == Video.Range.HDR10:
|
|
self.extract_stream(save_path, "HDR10")
|
|
elif video.range == Video.Range.HDR10P:
|
|
self.extract_stream(save_path, "HDR10")
|
|
self.hdr_type = "HDR10+"
|
|
elif video.range == Video.Range.DV:
|
|
self.extract_stream(save_path, "DV")
|
|
|
|
if self.hdr10plus_to_dv:
|
|
# Extract HDR10+ metadata and convert to DV
|
|
hdr10p_video = next(v for v in videos if v.range == Video.Range.HDR10P)
|
|
self.extract_hdr10plus(hdr10p_video)
|
|
self.convert_hdr10plus_to_dv()
|
|
else:
|
|
# Regular DV extraction
|
|
dv_video = next(v for v in videos if v.range == Video.Range.DV)
|
|
self.extract_rpu(dv_video)
|
|
if os.path.isfile(config.directories.temp / "RPU_UNT.bin"):
|
|
self.rpu_file = "RPU_UNT.bin"
|
|
# Mode 3 conversion already done during extraction when not untouched
|
|
elif os.path.isfile(config.directories.temp / "RPU.bin"):
|
|
# RPU already extracted with mode 3
|
|
pass
|
|
|
|
# Edit L6 with actual luminance values from RPU, then L5 active area
|
|
self.level_6()
|
|
base_video = next(
|
|
(v for v in videos if v.range in (Video.Range.HDR10, Video.Range.HDR10P)), None
|
|
)
|
|
if base_video and base_video.path:
|
|
self.level_5(base_video.path)
|
|
|
|
self.injecting()
|
|
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="INFO",
|
|
operation="hybrid_complete",
|
|
message="Injection Completed",
|
|
context={
|
|
"hdr_type": self.hdr_type,
|
|
"resolution": self.resolution,
|
|
"hdr10plus_to_dv": self.hdr10plus_to_dv,
|
|
"rpu_file": self.rpu_file,
|
|
"output_file": self.hevc_file,
|
|
},
|
|
)
|
|
self.log.info("✓ Injection Completed")
|
|
if self.source == ("itunes" or "appletvplus"):
|
|
Path.unlink(config.directories.temp / "hdr10.mkv")
|
|
Path.unlink(config.directories.temp / "dv.mkv")
|
|
Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True)
|
|
Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True)
|
|
Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True)
|
|
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
|
|
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
|
|
Path.unlink(config.directories.temp / "L5.json", missing_ok=True)
|
|
Path.unlink(config.directories.temp / "L6.json", missing_ok=True)
|
|
|
|
def ffmpeg_simple(self, save_path, output):
|
|
"""Simple ffmpeg execution without progress tracking"""
|
|
p = subprocess.run(
|
|
[
|
|
str(FFMPEG) if FFMPEG else "ffmpeg",
|
|
"-nostdin",
|
|
"-i",
|
|
str(save_path),
|
|
"-c:v",
|
|
"copy",
|
|
str(output),
|
|
"-y", # overwrite output
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
return p
|
|
|
|
def extract_stream(self, save_path, type_):
|
|
output = Path(config.directories.temp / f"{type_}.hevc")
|
|
|
|
with console.status(f"Extracting {type_} stream...", spinner="dots"):
|
|
result = self.ffmpeg_simple(save_path, output)
|
|
|
|
if result.returncode:
|
|
output.unlink(missing_ok=True)
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_extract_stream",
|
|
message=f"Failed extracting {type_} stream",
|
|
context={
|
|
"type": type_,
|
|
"input": str(save_path),
|
|
"output": str(output),
|
|
"returncode": result.returncode,
|
|
"stderr": (result.stderr or b"").decode(errors="replace"),
|
|
"stdout": (result.stdout or b"").decode(errors="replace"),
|
|
},
|
|
)
|
|
self.log.error(f"x Failed extracting {type_} stream")
|
|
sys.exit(1)
|
|
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="DEBUG",
|
|
operation="hybrid_extract_stream",
|
|
message=f"Extracted {type_} stream",
|
|
context={"type": type_, "input": str(save_path), "output": str(output)},
|
|
success=True,
|
|
)
|
|
|
|
def extract_rpu(self, video, untouched=False):
|
|
if os.path.isfile(config.directories.temp / "RPU.bin") or os.path.isfile(
|
|
config.directories.temp / "RPU_UNT.bin"
|
|
):
|
|
return
|
|
|
|
with console.status(
|
|
f"Extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream...", spinner="dots"
|
|
):
|
|
extraction_args = [str(DoviTool)]
|
|
if not untouched:
|
|
extraction_args += ["-m", "3"]
|
|
extraction_args += [
|
|
"extract-rpu",
|
|
config.directories.temp / "DV.hevc",
|
|
"-o",
|
|
config.directories.temp / f"{'RPU' if not untouched else 'RPU_UNT'}.bin",
|
|
]
|
|
|
|
rpu_extraction = subprocess.run(
|
|
extraction_args,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
rpu_name = "RPU" if not untouched else "RPU_UNT"
|
|
if rpu_extraction.returncode:
|
|
Path.unlink(config.directories.temp / f"{rpu_name}.bin")
|
|
stderr_text = rpu_extraction.stderr.decode(errors="replace") if rpu_extraction.stderr else ""
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_extract_rpu",
|
|
message=f"Failed extracting{' untouched ' if untouched else ' '}RPU",
|
|
context={
|
|
"untouched": untouched,
|
|
"returncode": rpu_extraction.returncode,
|
|
"stderr": stderr_text,
|
|
"args": [str(a) for a in extraction_args],
|
|
},
|
|
)
|
|
if b"MAX_PQ_LUMINANCE" in rpu_extraction.stderr:
|
|
self.extract_rpu(video, untouched=True)
|
|
elif b"Invalid PPS index" in rpu_extraction.stderr:
|
|
raise ValueError("Dolby Vision VideoTrack seems to be corrupt")
|
|
else:
|
|
raise ValueError(f"Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
|
|
elif self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="DEBUG",
|
|
operation="hybrid_extract_rpu",
|
|
message=f"Extracted{' untouched ' if untouched else ' '}RPU from Dolby Vision stream",
|
|
context={"untouched": untouched, "output": f"{rpu_name}.bin"},
|
|
success=True,
|
|
)
|
|
|
|
def level_5(self, input_video):
|
|
"""Generate Level 5 active area metadata via crop detection on the HDR10 stream.
|
|
|
|
This resolves mismatches where DV has no black bars but HDR10 does (or vice versa)
|
|
by telling the display the correct active area.
|
|
"""
|
|
if os.path.isfile(config.directories.temp / "RPU_L5.bin"):
|
|
return
|
|
|
|
ffprobe_bin = str(FFProbe) if FFProbe else "ffprobe"
|
|
ffmpeg_bin = str(FFMPEG) if FFMPEG else "ffmpeg"
|
|
|
|
# Get video duration for random sampling
|
|
with console.status("Detecting active area (crop detection)...", spinner="dots"):
|
|
result_duration = subprocess.run(
|
|
[ffprobe_bin, "-v", "error", "-show_entries", "format=duration", "-of", "json", str(input_video)],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
|
|
if result_duration.returncode != 0:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="WARNING",
|
|
operation="hybrid_level5",
|
|
message="Could not probe video duration",
|
|
context={"returncode": result_duration.returncode, "stderr": (result_duration.stderr or "")},
|
|
)
|
|
self.log.warning("Could not probe video duration, skipping L5 crop detection")
|
|
return
|
|
|
|
duration_info = json.loads(result_duration.stdout)
|
|
duration = float(duration_info["format"]["duration"])
|
|
|
|
# Get video resolution for proper border calculation
|
|
result_streams = subprocess.run(
|
|
[
|
|
ffprobe_bin,
|
|
"-v",
|
|
"error",
|
|
"-select_streams",
|
|
"v:0",
|
|
"-show_entries",
|
|
"stream=width,height",
|
|
"-of",
|
|
"json",
|
|
str(input_video),
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
|
|
if result_streams.returncode != 0:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="WARNING",
|
|
operation="hybrid_level5",
|
|
message="Could not probe video resolution",
|
|
context={"returncode": result_streams.returncode, "stderr": (result_streams.stderr or "")},
|
|
)
|
|
self.log.warning("Could not probe video resolution, skipping L5 crop detection")
|
|
return
|
|
|
|
stream_info = json.loads(result_streams.stdout)
|
|
original_width = int(stream_info["streams"][0]["width"])
|
|
original_height = int(stream_info["streams"][0]["height"])
|
|
|
|
# Sample 10 random timestamps and run cropdetect on each
|
|
random_times = sorted(random.uniform(0, duration) for _ in range(10))
|
|
|
|
crop_results = []
|
|
for t in random_times:
|
|
result_cropdetect = subprocess.run(
|
|
[
|
|
ffmpeg_bin,
|
|
"-y",
|
|
"-nostdin",
|
|
"-loglevel",
|
|
"info",
|
|
"-ss",
|
|
f"{t:.2f}",
|
|
"-i",
|
|
str(input_video),
|
|
"-vf",
|
|
"cropdetect=round=2",
|
|
"-vframes",
|
|
"10",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
|
|
# cropdetect outputs crop=w:h:x:y
|
|
crop_match = re.search(
|
|
r"crop=(\d+):(\d+):(\d+):(\d+)",
|
|
(result_cropdetect.stdout or "") + (result_cropdetect.stderr or ""),
|
|
)
|
|
if crop_match:
|
|
w, h = int(crop_match.group(1)), int(crop_match.group(2))
|
|
x, y = int(crop_match.group(3)), int(crop_match.group(4))
|
|
# Calculate actual border sizes from crop geometry
|
|
left = x
|
|
top = y
|
|
right = original_width - w - x
|
|
bottom = original_height - h - y
|
|
crop_results.append((left, top, right, bottom))
|
|
|
|
if not crop_results:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="WARNING",
|
|
operation="hybrid_level5",
|
|
message="No crop data detected, skipping L5",
|
|
context={"samples": len(random_times)},
|
|
)
|
|
self.log.warning("No crop data detected, skipping L5")
|
|
return
|
|
|
|
# Find the most common crop values
|
|
crop_counts = {}
|
|
for crop in crop_results:
|
|
crop_counts[crop] = crop_counts.get(crop, 0) + 1
|
|
most_common = max(crop_counts, key=crop_counts.get)
|
|
left, top, right, bottom = most_common
|
|
|
|
# If all borders are 0 there's nothing to correct
|
|
if left == 0 and top == 0 and right == 0 and bottom == 0:
|
|
return
|
|
|
|
l5_json = {
|
|
"active_area": {
|
|
"crop": False,
|
|
"presets": [{"id": 0, "left": left, "right": right, "top": top, "bottom": bottom}],
|
|
"edits": {"all": 0},
|
|
}
|
|
}
|
|
|
|
l5_path = config.directories.temp / "L5.json"
|
|
with open(l5_path, "w") as f:
|
|
json.dump(l5_json, f, indent=4)
|
|
|
|
with console.status("Editing RPU Level 5 active area...", spinner="dots"):
|
|
result = subprocess.run(
|
|
[
|
|
str(DoviTool),
|
|
"editor",
|
|
"-i",
|
|
str(config.directories.temp / self.rpu_file),
|
|
"-j",
|
|
str(l5_path),
|
|
"-o",
|
|
str(config.directories.temp / "RPU_L5.bin"),
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
if result.returncode:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_level5",
|
|
message="Failed editing RPU Level 5 values",
|
|
context={"returncode": result.returncode, "stderr": (result.stderr or b"").decode(errors="replace")},
|
|
)
|
|
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
|
|
raise ValueError("Failed editing RPU Level 5 values")
|
|
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="DEBUG",
|
|
operation="hybrid_level5",
|
|
message="Edited RPU Level 5 active area",
|
|
context={"crop": {"left": left, "right": right, "top": top, "bottom": bottom}, "samples": len(crop_results)},
|
|
success=True,
|
|
)
|
|
self.rpu_file = "RPU_L5.bin"
|
|
|
|
def level_6(self):
|
|
"""Edit RPU Level 6 values using actual luminance data from the RPU."""
|
|
if os.path.isfile(config.directories.temp / "RPU_L6.bin"):
|
|
return
|
|
|
|
with console.status("Reading RPU luminance metadata...", spinner="dots"):
|
|
result = subprocess.run(
|
|
[str(DoviTool), "info", "-i", str(config.directories.temp / self.rpu_file), "-s"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_level6",
|
|
message="Failed reading RPU metadata for Level 6 values",
|
|
context={"returncode": result.returncode, "stderr": (result.stderr or "")},
|
|
)
|
|
raise ValueError("Failed reading RPU metadata for Level 6 values")
|
|
|
|
max_cll = None
|
|
max_fall = None
|
|
max_mdl = None
|
|
min_mdl = None
|
|
|
|
for line in result.stdout.splitlines():
|
|
if "RPU content light level (L1):" in line:
|
|
parts = line.split("MaxCLL:")[1].split(",")
|
|
max_cll = int(float(parts[0].strip().split()[0]))
|
|
if len(parts) > 1 and "MaxFALL:" in parts[1]:
|
|
max_fall = int(float(parts[1].split("MaxFALL:")[1].strip().split()[0]))
|
|
elif "RPU mastering display:" in line:
|
|
mastering = line.split(":", 1)[1].strip()
|
|
min_lum, max_lum = mastering.split("/")[0], mastering.split("/")[1].split(" ")[0]
|
|
min_mdl = int(float(min_lum) * 10000)
|
|
max_mdl = int(float(max_lum))
|
|
|
|
if any(v is None for v in (max_cll, max_fall, max_mdl, min_mdl)):
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_level6",
|
|
message="Could not extract Level 6 luminance data from RPU",
|
|
context={"max_cll": max_cll, "max_fall": max_fall, "max_mdl": max_mdl, "min_mdl": min_mdl},
|
|
)
|
|
raise ValueError("Could not extract Level 6 luminance data from RPU")
|
|
|
|
level6_data = {
|
|
"level6": {
|
|
"remove_cmv4": False,
|
|
"remove_mapping": False,
|
|
"max_display_mastering_luminance": max_mdl,
|
|
"min_display_mastering_luminance": min_mdl,
|
|
"max_content_light_level": max_cll,
|
|
"max_frame_average_light_level": max_fall,
|
|
}
|
|
}
|
|
|
|
l6_path = config.directories.temp / "L6.json"
|
|
with open(l6_path, "w") as f:
|
|
json.dump(level6_data, f, indent=4)
|
|
|
|
with console.status("Editing RPU Level 6 values...", spinner="dots"):
|
|
result = subprocess.run(
|
|
[
|
|
str(DoviTool),
|
|
"editor",
|
|
"-i",
|
|
str(config.directories.temp / self.rpu_file),
|
|
"-j",
|
|
str(l6_path),
|
|
"-o",
|
|
str(config.directories.temp / "RPU_L6.bin"),
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
if result.returncode:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_level6",
|
|
message="Failed editing RPU Level 6 values",
|
|
context={"returncode": result.returncode, "stderr": (result.stderr or b"").decode(errors="replace")},
|
|
)
|
|
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
|
|
raise ValueError("Failed editing RPU Level 6 values")
|
|
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="DEBUG",
|
|
operation="hybrid_level6",
|
|
message="Edited RPU Level 6 luminance values",
|
|
context={
|
|
"max_cll": max_cll,
|
|
"max_fall": max_fall,
|
|
"max_mdl": max_mdl,
|
|
"min_mdl": min_mdl,
|
|
},
|
|
success=True,
|
|
)
|
|
self.rpu_file = "RPU_L6.bin"
|
|
|
|
def injecting(self):
|
|
if os.path.isfile(config.directories.temp / self.hevc_file):
|
|
return
|
|
|
|
with console.status(f"Injecting Dolby Vision metadata into {self.hdr_type} stream...", spinner="dots"):
|
|
inject_cmd = [
|
|
str(DoviTool),
|
|
"inject-rpu",
|
|
"-i",
|
|
config.directories.temp / "HDR10.hevc",
|
|
"--rpu-in",
|
|
config.directories.temp / self.rpu_file,
|
|
]
|
|
|
|
inject_cmd.extend(["-o", config.directories.temp / self.hevc_file])
|
|
|
|
inject = subprocess.run(
|
|
inject_cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
if inject.returncode:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_inject_rpu",
|
|
message="Failed injecting Dolby Vision metadata into HDR10 stream",
|
|
context={
|
|
"returncode": inject.returncode,
|
|
"stderr": (inject.stderr or b"").decode(errors="replace"),
|
|
"stdout": (inject.stdout or b"").decode(errors="replace"),
|
|
"cmd": [str(a) for a in inject_cmd],
|
|
},
|
|
)
|
|
Path.unlink(config.directories.temp / self.hevc_file)
|
|
raise ValueError("Failed injecting Dolby Vision metadata into HDR10 stream")
|
|
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="DEBUG",
|
|
operation="hybrid_inject_rpu",
|
|
message=f"Injected Dolby Vision metadata into {self.hdr_type} stream",
|
|
context={"hdr_type": self.hdr_type, "rpu_file": self.rpu_file, "output": self.hevc_file, "drop_hdr10plus": self.hdr10plus_to_dv},
|
|
success=True,
|
|
)
|
|
|
|
def extract_hdr10plus(self, _video):
|
|
"""Extract HDR10+ metadata from the video stream"""
|
|
if os.path.isfile(config.directories.temp / self.hdr10plus_file):
|
|
return
|
|
|
|
if not HDR10PlusTool:
|
|
raise ValueError("HDR10Plus_tool not found. Please install it to use HDR10+ to DV conversion.")
|
|
|
|
with console.status("Extracting HDR10+ metadata...", spinner="dots"):
|
|
# HDR10Plus_tool needs raw HEVC stream
|
|
extraction = subprocess.run(
|
|
[
|
|
str(HDR10PlusTool),
|
|
"extract",
|
|
str(config.directories.temp / "HDR10.hevc"),
|
|
"-o",
|
|
str(config.directories.temp / self.hdr10plus_file),
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
if extraction.returncode:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_extract_hdr10plus",
|
|
message="Failed extracting HDR10+ metadata",
|
|
context={
|
|
"returncode": extraction.returncode,
|
|
"stderr": (extraction.stderr or b"").decode(errors="replace"),
|
|
"stdout": (extraction.stdout or b"").decode(errors="replace"),
|
|
},
|
|
)
|
|
raise ValueError("Failed extracting HDR10+ metadata")
|
|
|
|
# Check if the extracted file has content
|
|
file_size = os.path.getsize(config.directories.temp / self.hdr10plus_file)
|
|
if file_size == 0:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_extract_hdr10plus",
|
|
message="No HDR10+ metadata found in the stream",
|
|
context={"file_size": 0},
|
|
)
|
|
raise ValueError("No HDR10+ metadata found in the stream")
|
|
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="DEBUG",
|
|
operation="hybrid_extract_hdr10plus",
|
|
message="Extracted HDR10+ metadata",
|
|
context={"output": self.hdr10plus_file, "file_size": file_size},
|
|
success=True,
|
|
)
|
|
|
|
def convert_hdr10plus_to_dv(self):
|
|
"""Convert HDR10+ metadata to Dolby Vision RPU"""
|
|
if os.path.isfile(config.directories.temp / "RPU.bin"):
|
|
return
|
|
|
|
with console.status("Converting HDR10+ metadata to Dolby Vision...", spinner="dots"):
|
|
# First create the extra metadata JSON for dovi_tool
|
|
extra_metadata = {
|
|
"cm_version": "V29",
|
|
"length": 0, # dovi_tool will figure this out
|
|
"level6": {
|
|
"max_display_mastering_luminance": 1000,
|
|
"min_display_mastering_luminance": 1,
|
|
"max_content_light_level": 0,
|
|
"max_frame_average_light_level": 0,
|
|
},
|
|
}
|
|
|
|
with open(config.directories.temp / "extra.json", "w") as f:
|
|
json.dump(extra_metadata, f, indent=2)
|
|
|
|
# Generate DV RPU from HDR10+ metadata
|
|
conversion = subprocess.run(
|
|
[
|
|
str(DoviTool),
|
|
"generate",
|
|
"-j",
|
|
str(config.directories.temp / "extra.json"),
|
|
"--hdr10plus-json",
|
|
str(config.directories.temp / self.hdr10plus_file),
|
|
"-o",
|
|
str(config.directories.temp / "RPU.bin"),
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
if conversion.returncode:
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="ERROR",
|
|
operation="hybrid_convert_hdr10plus",
|
|
message="Failed converting HDR10+ to Dolby Vision",
|
|
context={
|
|
"returncode": conversion.returncode,
|
|
"stderr": (conversion.stderr or b"").decode(errors="replace"),
|
|
"stdout": (conversion.stdout or b"").decode(errors="replace"),
|
|
},
|
|
)
|
|
raise ValueError("Failed converting HDR10+ to Dolby Vision")
|
|
|
|
if self.debug_logger:
|
|
self.debug_logger.log(
|
|
level="DEBUG",
|
|
operation="hybrid_convert_hdr10plus",
|
|
message="Converted HDR10+ metadata to Dolby Vision Profile 8",
|
|
success=True,
|
|
)
|
|
|
|
# Clean up temporary files
|
|
Path.unlink(config.directories.temp / "extra.json")
|
|
Path.unlink(config.directories.temp / self.hdr10plus_file)
|