Merge branch 'main' into main

This commit is contained in:
CodeName393
2026-01-22 02:29:08 +09:00
committed by GitHub
24 changed files with 1010 additions and 141 deletions

View File

@@ -91,6 +91,12 @@ class Subtitle(Track):
return Subtitle.Codec.TimedTextMarkupLang
raise ValueError(f"The Content Profile '{profile}' is not a supported Subtitle Codec")
# WebVTT sanitization patterns (compiled once for performance)
_CUE_ID_PATTERN = re.compile(r"^[A-Za-z]+\d+$")
_TIMING_START_PATTERN = re.compile(r"^\d+:\d+[:\.]")
_TIMING_LINE_PATTERN = re.compile(r"^((?:\d+:)?\d+:\d+[.,]\d+)\s*-->\s*((?:\d+:)?\d+:\d+[.,]\d+)(.*)$")
_LINE_POS_PATTERN = re.compile(r"line:(\d+(?:\.\d+)?%?)")
def __init__(
self,
*args: Any,
@@ -239,6 +245,11 @@ class Subtitle(Track):
# Sanitize WebVTT timestamps before parsing
text = Subtitle.sanitize_webvtt_timestamps(text)
# Remove cue identifiers that confuse parsers like pysubs2
text = Subtitle.sanitize_webvtt_cue_identifiers(text)
# Merge overlapping cues with line positioning into single multi-line cues
text = Subtitle.merge_overlapping_webvtt_cues(text)
preserve_formatting = config.subtitle.get("preserve_formatting", True)
if preserve_formatting:
@@ -277,6 +288,240 @@ class Subtitle(Track):
# Replace negative timestamps with 00:00:00.000
return re.sub(r"(-\d+:\d+:\d+\.\d+)", "00:00:00.000", text)
@staticmethod
def has_webvtt_cue_identifiers(text: str) -> bool:
"""
Check if WebVTT content has cue identifiers that need removal.
Parameters:
text: The WebVTT content as string
Returns:
True if cue identifiers are detected, False otherwise
"""
lines = text.split("\n")
for i, line in enumerate(lines):
line = line.strip()
if Subtitle._CUE_ID_PATTERN.match(line):
# Look ahead to see if next non-empty line is a timing line
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
if j < len(lines) and ("-->" in lines[j] or Subtitle._TIMING_START_PATTERN.match(lines[j].strip())):
return True
return False
@staticmethod
def sanitize_webvtt_cue_identifiers(text: str) -> str:
"""
Remove WebVTT cue identifiers that can confuse subtitle parsers.
Some services use cue identifiers like "Q0", "Q1", etc.
that appear on their own line before the timing line. These can be
incorrectly parsed as part of the previous cue's text content by
some parsers (like pysubs2).
Parameters:
text: The WebVTT content as string
Returns:
Sanitized WebVTT content with cue identifiers removed
"""
if not Subtitle.has_webvtt_cue_identifiers(text):
return text
lines = text.split("\n")
sanitized_lines = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Check if this line is a cue identifier followed by a timing line
if Subtitle._CUE_ID_PATTERN.match(line):
# Look ahead to see if next non-empty line is a timing line
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
if j < len(lines) and ("-->" in lines[j] or Subtitle._TIMING_START_PATTERN.match(lines[j].strip())):
# This is a cue identifier, skip it
i += 1
continue
sanitized_lines.append(lines[i])
i += 1
return "\n".join(sanitized_lines)
@staticmethod
def _parse_vtt_time(t: str) -> int:
"""Parse WebVTT timestamp to milliseconds. Returns 0 for malformed input."""
try:
t = t.replace(",", ".")
parts = t.split(":")
if len(parts) == 2:
m, s = parts
h = "0"
elif len(parts) >= 3:
h, m, s = parts[:3]
else:
return 0
sec_parts = s.split(".")
secs = int(sec_parts[0])
# Handle variable millisecond digits (e.g., .5 = 500ms, .50 = 500ms, .500 = 500ms)
ms = int(sec_parts[1].ljust(3, "0")[:3]) if len(sec_parts) > 1 else 0
return int(h) * 3600000 + int(m) * 60000 + secs * 1000 + ms
except (ValueError, IndexError):
return 0
@staticmethod
def has_overlapping_webvtt_cues(text: str) -> bool:
"""
Check if WebVTT content has overlapping cues that need merging.
Detects cues with start times within 50ms of each other and the same end time,
which indicates multi-line subtitles split into separate cues.
Parameters:
text: The WebVTT content as string
Returns:
True if overlapping cues are detected, False otherwise
"""
timings = []
for line in text.split("\n"):
match = Subtitle._TIMING_LINE_PATTERN.match(line)
if match:
start_str, end_str = match.group(1), match.group(2)
timings.append((Subtitle._parse_vtt_time(start_str), Subtitle._parse_vtt_time(end_str)))
# Check for overlapping cues (within 50ms start, same end)
for i in range(len(timings) - 1):
curr_start, curr_end = timings[i]
next_start, next_end = timings[i + 1]
if abs(curr_start - next_start) <= 50 and curr_end == next_end:
return True
return False
@staticmethod
def merge_overlapping_webvtt_cues(text: str) -> str:
"""
Merge WebVTT cues that have overlapping/near-identical times but different line positions.
Some services use separate cues for each line of a multi-line subtitle, with
slightly different start times (1ms apart) and different line: positions.
This merges them into single cues with proper line ordering based on the
line: position (lower percentage = higher on screen = first line).
Parameters:
text: The WebVTT content as string
Returns:
WebVTT content with overlapping cues merged
"""
if not Subtitle.has_overlapping_webvtt_cues(text):
return text
lines = text.split("\n")
cues = []
header_lines = []
in_header = True
i = 0
while i < len(lines):
line = lines[i]
if in_header:
if "-->" in line:
in_header = False
else:
header_lines.append(line)
i += 1
continue
match = Subtitle._TIMING_LINE_PATTERN.match(line)
if match:
start_str, end_str, settings = match.groups()
line_pos = 100.0 # Default to bottom
line_match = Subtitle._LINE_POS_PATTERN.search(settings)
if line_match:
pos_str = line_match.group(1).rstrip("%")
line_pos = float(pos_str)
content_lines = []
i += 1
while i < len(lines) and lines[i].strip() and "-->" not in lines[i]:
content_lines.append(lines[i])
i += 1
cues.append(
{
"start_ms": Subtitle._parse_vtt_time(start_str),
"end_ms": Subtitle._parse_vtt_time(end_str),
"start_str": start_str,
"end_str": end_str,
"line_pos": line_pos,
"content": "\n".join(content_lines),
"settings": settings,
}
)
else:
i += 1
# Merge overlapping cues (within 50ms of each other with same end time)
merged_cues = []
i = 0
while i < len(cues):
current = cues[i]
group = [current]
j = i + 1
while j < len(cues):
other = cues[j]
if abs(current["start_ms"] - other["start_ms"]) <= 50 and current["end_ms"] == other["end_ms"]:
group.append(other)
j += 1
else:
break
if len(group) > 1:
# Sort by line position (lower % = higher on screen = first)
group.sort(key=lambda x: x["line_pos"])
# Use the earliest start time from the group
earliest = min(group, key=lambda x: x["start_ms"])
merged_cues.append(
{
"start_str": earliest["start_str"],
"end_str": group[0]["end_str"],
"content": "\n".join(c["content"] for c in group),
"settings": "",
}
)
else:
merged_cues.append(
{
"start_str": current["start_str"],
"end_str": current["end_str"],
"content": current["content"],
"settings": current["settings"],
}
)
i = j if len(group) > 1 else i + 1
result_lines = header_lines[:]
if result_lines and result_lines[-1].strip():
result_lines.append("")
for cue in merged_cues:
result_lines.append(f"{cue['start_str']} --> {cue['end_str']}{cue['settings']}")
result_lines.append(cue["content"])
result_lines.append("")
return "\n".join(result_lines)
@staticmethod
def sanitize_webvtt(text: str) -> str:
"""
@@ -565,13 +810,18 @@ class Subtitle(Track):
if binaries.SubtitleEdit and self.codec not in (Subtitle.Codec.fTTML, Subtitle.Codec.fVTT):
sub_edit_format = {
Subtitle.Codec.SubStationAlphav4: "AdvancedSubStationAlpha",
Subtitle.Codec.TimedTextMarkupLang: "TimedText1.0",
}.get(codec, codec.name)
Subtitle.Codec.SubRip: "subrip",
Subtitle.Codec.SubStationAlpha: "substationalpha",
Subtitle.Codec.SubStationAlphav4: "advancedsubstationalpha",
Subtitle.Codec.TimedTextMarkupLang: "timedtext1.0",
Subtitle.Codec.WebVTT: "webvtt",
Subtitle.Codec.SAMI: "sami",
Subtitle.Codec.MicroDVD: "microdvd",
}.get(codec, codec.name.lower())
sub_edit_args = [
binaries.SubtitleEdit,
"/Convert",
self.path,
str(binaries.SubtitleEdit),
"/convert",
str(self.path),
sub_edit_format,
f"/outputfilename:{output_path.name}",
"/encoding:utf8",
@@ -631,7 +881,7 @@ class Subtitle(Track):
text = try_ensure_utf8(data).decode("utf8")
text = text.replace("tt:", "")
# negative size values aren't allowed in TTML/DFXP spec, replace with 0
text = re.sub(r'"(-\d+(\.\d+)?(px|em|%|c|pt))"', '"0"', text)
text = re.sub(r"-(\d+(?:\.\d+)?)(px|em|%|c|pt)", r"0\2", text)
caption_set = pycaption.DFXPReader().read(text)
elif codec == Subtitle.Codec.fVTT:
caption_lists: dict[str, pycaption.CaptionList] = defaultdict(pycaption.CaptionList)
@@ -962,18 +1212,26 @@ class Subtitle(Track):
except Exception:
pass # Fall through to other methods
if binaries.SubtitleEdit:
if self.codec == Subtitle.Codec.SubStationAlphav4:
output_format = "AdvancedSubStationAlpha"
elif self.codec == Subtitle.Codec.TimedTextMarkupLang:
output_format = "TimedText1.0"
else:
output_format = self.codec.name
conversion_method = config.subtitle.get("conversion_method", "auto")
use_subtitleedit = sdh_method == "subtitleedit" or (
sdh_method == "auto" and conversion_method in ("auto", "subtitleedit")
)
if binaries.SubtitleEdit and use_subtitleedit:
output_format = {
Subtitle.Codec.SubRip: "subrip",
Subtitle.Codec.SubStationAlpha: "substationalpha",
Subtitle.Codec.SubStationAlphav4: "advancedsubstationalpha",
Subtitle.Codec.TimedTextMarkupLang: "timedtext1.0",
Subtitle.Codec.WebVTT: "webvtt",
Subtitle.Codec.SAMI: "sami",
Subtitle.Codec.MicroDVD: "microdvd",
}.get(self.codec, self.codec.name.lower())
subprocess.run(
[
binaries.SubtitleEdit,
"/Convert",
self.path,
str(binaries.SubtitleEdit),
"/convert",
str(self.path),
output_format,
"/encoding:utf8",
"/overwrite",
@@ -981,6 +1239,7 @@ class Subtitle(Track):
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
else:
if config.subtitle.get("convert_before_strip", True) and self.codec != Subtitle.Codec.SubRip:
@@ -1022,18 +1281,21 @@ class Subtitle(Track):
if not binaries.SubtitleEdit:
raise EnvironmentError("SubtitleEdit executable not found...")
if self.codec == Subtitle.Codec.SubStationAlphav4:
output_format = "AdvancedSubStationAlpha"
elif self.codec == Subtitle.Codec.TimedTextMarkupLang:
output_format = "TimedText1.0"
else:
output_format = self.codec.name
output_format = {
Subtitle.Codec.SubRip: "subrip",
Subtitle.Codec.SubStationAlpha: "substationalpha",
Subtitle.Codec.SubStationAlphav4: "advancedsubstationalpha",
Subtitle.Codec.TimedTextMarkupLang: "timedtext1.0",
Subtitle.Codec.WebVTT: "webvtt",
Subtitle.Codec.SAMI: "sami",
Subtitle.Codec.MicroDVD: "microdvd",
}.get(self.codec, self.codec.name.lower())
subprocess.run(
[
binaries.SubtitleEdit,
"/Convert",
self.path,
str(binaries.SubtitleEdit),
"/convert",
str(self.path),
output_format,
"/ReverseRtlStartEnd",
"/encoding:utf8",
@@ -1041,6 +1303,7 @@ class Subtitle(Track):
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)

View File

@@ -295,12 +295,23 @@ class Track:
try:
if not self.drm and track_type in ("Video", "Audio"):
# the service might not have explicitly defined the `drm` property
# try find widevine DRM information from the init data of URL
try:
self.drm = [Widevine.from_track(self, session)]
except Widevine.Exceptions.PSSHNotFound:
# it might not have Widevine DRM, or might not have found the PSSH
log.warning("No Widevine PSSH was found for this track, is it DRM free?")
# try find DRM information from the init data of URL based on CDM type
if isinstance(cdm, PlayReadyCdm):
try:
self.drm = [PlayReady.from_track(self, session)]
except PlayReady.Exceptions.PSSHNotFound:
try:
self.drm = [Widevine.from_track(self, session)]
except Widevine.Exceptions.PSSHNotFound:
log.warning("No PlayReady or Widevine PSSH was found for this track, is it DRM free?")
else:
try:
self.drm = [Widevine.from_track(self, session)]
except Widevine.Exceptions.PSSHNotFound:
try:
self.drm = [PlayReady.from_track(self, session)]
except PlayReady.Exceptions.PSSHNotFound:
log.warning("No Widevine or PlayReady PSSH was found for this track, is it DRM free?")
if self.drm:
track_kid = self.get_key_id(session=session)

View File

@@ -22,7 +22,7 @@ from unshackle.core.tracks.chapters import Chapter, Chapters
from unshackle.core.tracks.subtitle import Subtitle
from unshackle.core.tracks.track import Track
from unshackle.core.tracks.video import Video
from unshackle.core.utilities import is_close_match, sanitize_filename
from unshackle.core.utilities import get_debug_logger, is_close_match, sanitize_filename
from unshackle.core.utils.collections import as_list, flatten
@@ -507,6 +507,35 @@ class Tracks:
if not output_path:
raise ValueError("No tracks provided, at least one track must be provided.")
debug_logger = get_debug_logger()
if debug_logger:
debug_logger.log(
level="DEBUG",
operation="mux_start",
message="Starting mkvmerge muxing",
context={
"title": title,
"output_path": str(output_path),
"video_count": len(self.videos),
"audio_count": len(self.audio),
"subtitle_count": len(self.subtitles),
"attachment_count": len(self.attachments),
"has_chapters": bool(self.chapters),
"video_tracks": [
{"id": v.id, "codec": getattr(v, "codec", None), "language": str(v.language)}
for v in self.videos
],
"audio_tracks": [
{"id": a.id, "codec": getattr(a, "codec", None), "language": str(a.language)}
for a in self.audio
],
"subtitle_tracks": [
{"id": s.id, "codec": getattr(s, "codec", None), "language": str(s.language)}
for s in self.subtitles
],
},
)
# let potential failures go to caller, caller should handle
try:
errors = []
@@ -516,7 +545,33 @@ class Tracks:
errors.append(line)
if "progress" in line:
progress(total=100, completed=int(line.strip()[14:-1]))
return output_path, p.wait(), errors
returncode = p.wait()
if debug_logger:
if returncode != 0 or errors:
debug_logger.log(
level="ERROR",
operation="mux_failed",
message=f"mkvmerge exited with code {returncode}",
context={
"returncode": returncode,
"output_path": str(output_path),
"errors": errors,
},
)
else:
debug_logger.log(
level="DEBUG",
operation="mux_complete",
message="mkvmerge muxing completed successfully",
context={
"output_path": str(output_path),
"output_exists": output_path.exists() if output_path else False,
},
)
return output_path, returncode, errors
finally:
if chapters_path:
chapters_path.unlink()