308 lines
11 KiB
Python
308 lines
11 KiB
Python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Batch-shift text subtitles (ASS, SRT, TTML, VTT) by +N seconds.
|
|
|
|
Primary method:
|
|
- FFmpeg with `-itsoffset` to create a shifted external subtitle file,
|
|
preserving the original format/extension.
|
|
|
|
Fallback:
|
|
- Python per-format shifters for SRT, VTT, ASS, TTML (handles negative-time clamping).
|
|
|
|
Outputs:
|
|
- Writes to <folder>/output/ keeping the same file names.
|
|
|
|
Usage:
|
|
python shift_subtitles_batch.py /path/to/folder 1
|
|
python shift_subtitles_batch.py "/subs_folder" 1 --recursive --skip-existing
|
|
"""
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
import shutil
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Optional
|
|
import xml.etree.ElementTree as ET
|
|
|
|
SUPPORTED_EXTS = [".srt", ".ass", ".vtt", ".ttml"]
|
|
FFMPEG_CODEC_BY_EXT = {
|
|
".srt": None, # copy is fine
|
|
".ass": "ass", # be explicit if needed
|
|
".vtt": "webvtt", # FFmpeg supports webvtt muxer/codec
|
|
".ttml": "ttml", # may not be available in all builds; fallback if fails
|
|
}
|
|
|
|
def check_binary(name: str):
|
|
if shutil.which(name) is None:
|
|
print(f"Error: '{name}' not found on PATH. Install it and try again.")
|
|
sys.exit(1)
|
|
|
|
def ffmpeg_shift(input_sub: Path, output_sub: Path, seconds: float) -> int:
|
|
"""
|
|
Try to shift a text subtitle with FFmpeg using -itsoffset.
|
|
Use -c:s <codec> when known; otherwise -c copy.
|
|
"""
|
|
ext = input_sub.suffix.lower()
|
|
codec = FFMPEG_CODEC_BY_EXT.get(ext)
|
|
cmd = [
|
|
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
|
|
"-itsoffset", str(seconds),
|
|
"-i", str(input_sub),
|
|
]
|
|
if codec:
|
|
cmd += ["-c:s", codec]
|
|
else:
|
|
cmd += ["-c", "copy"]
|
|
cmd += [str(output_sub)]
|
|
print("FFmpeg shift:\n " + " ".join(map(str, cmd)))
|
|
return subprocess.run(cmd).returncode
|
|
|
|
# ---------- Python fallback shifters ----------
|
|
def clamp_ms(ms: int) -> int:
|
|
return max(ms, 0)
|
|
|
|
# SRT: 00:00:05,123 --> 00:00:08,456
|
|
SRT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2}),(\d{3})")
|
|
def srt_to_ms(m: re.Match) -> int:
|
|
h, mi, s, ms = map(int, m.groups())
|
|
return ((h * 3600 + mi * 60 + s) * 1000) + ms
|
|
|
|
def ms_to_srt(ms: int) -> str:
|
|
ms = clamp_ms(ms)
|
|
h = ms // 3600000; ms %= 3600000
|
|
mi = ms // 60000; ms %= 60000
|
|
s = ms // 1000; ms %= 1000
|
|
return f"{h:02}:{mi:02}:{s:02},{ms:03}"
|
|
|
|
def shift_srt_text(text: str, offset_ms: int) -> str:
|
|
out_lines = []
|
|
for line in text.splitlines():
|
|
if "-->" in line:
|
|
parts = line.split("-->")
|
|
left = SRT_TIME.search(parts[0])
|
|
right = SRT_TIME.search(parts[1])
|
|
if left and right:
|
|
l_ms = srt_to_ms(left) + offset_ms
|
|
r_ms = srt_to_ms(right) + offset_ms
|
|
new_line = f"{ms_to_srt(l_ms)} --> {ms_to_srt(r_ms)}"
|
|
out_lines.append(new_line)
|
|
continue
|
|
out_lines.append(line)
|
|
return "\n".join(out_lines)
|
|
|
|
# VTT: WEBVTT header; times use '.' separator: 00:00:05.123 --> ...
|
|
VTT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{3})")
|
|
def vtt_to_ms(m: re.Match) -> int:
|
|
h, mi, s, ms = map(int, m.groups())
|
|
return ((h * 3600 + mi * 60 + s) * 1000) + ms
|
|
|
|
def ms_to_vtt(ms: int) -> str:
|
|
ms = clamp_ms(ms)
|
|
h = ms // 3600000; ms %= 3600000
|
|
mi = ms // 60000; ms %= 60000
|
|
s = ms // 1000; ms %= 1000
|
|
return f"{h:02}:{mi:02}:{s:02}.{ms:03}"
|
|
|
|
def shift_vtt_text(text: str, offset_ms: int) -> str:
|
|
out_lines = []
|
|
for i, line in enumerate(text.splitlines()):
|
|
if "-->" in line:
|
|
# Preserve cue settings like "line:-1 align:right" if they exist.
|
|
left, right = line.split("-->", 1)
|
|
# Left timestamp may have trailing settings; isolate the time token
|
|
lm = VTT_TIME.search(left)
|
|
rm = VTT_TIME.search(right)
|
|
if lm and rm:
|
|
l_ms = vtt_to_ms(lm) + offset_ms
|
|
r_ms = vtt_to_ms(rm) + offset_ms
|
|
# Replace only the matched portions; keep extra cue settings
|
|
left_new = VTT_TIME.sub(ms_to_vtt(l_ms), left, count=1)
|
|
right_new = VTT_TIME.sub(ms_to_vtt(r_ms), right, count=1)
|
|
out_lines.append(f"{left_new}-->{right_new}")
|
|
continue
|
|
out_lines.append(line)
|
|
return "\n".join(out_lines)
|
|
|
|
# ASS: times appear in Dialogue events; format line defines field order.
|
|
# Typical: "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text"
|
|
# Dialogue: 0,00:00:05.12,00:00:08.34,Default,...
|
|
ASS_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{2})")
|
|
def ass_to_cs(m: re.Match) -> int:
|
|
h, mi, s, cs = map(int, m.groups())
|
|
return ((h * 3600 + mi * 60 + s) * 100) + cs # centiseconds
|
|
|
|
def cs_to_ass(cs: int) -> str:
|
|
cs = max(cs, 0)
|
|
h = cs // (3600 * 100); cs %= (3600 * 100)
|
|
mi = cs // (60 * 100); cs %= (60 * 100)
|
|
s = cs // 100; cs %= 100
|
|
return f"{h:02}:{mi:02}:{s:02}.{cs:02}"
|
|
|
|
def shift_ass_text(text: str, offset_ms: int) -> str:
|
|
offset_cs = int(round(offset_ms / 10.0))
|
|
out_lines = []
|
|
fmt_fields: Optional[List[str]] = None
|
|
for line in text.splitlines():
|
|
if line.startswith("Format:"):
|
|
# Capture field order for reference
|
|
fmt_fields = [f.strip() for f in line.split(":", 1)[1].split(",")]
|
|
out_lines.append(line)
|
|
continue
|
|
if line.startswith("Dialogue:"):
|
|
parts = line.split(":", 1)[1].split(",", maxsplit=len(fmt_fields) or 10)
|
|
# Heuristic: Start = field named "Start" or position 1; End = "End" or position 2
|
|
try:
|
|
if fmt_fields:
|
|
start_idx = fmt_fields.index("Start")
|
|
end_idx = fmt_fields.index("End")
|
|
else:
|
|
start_idx, end_idx = 1, 2
|
|
sm = ASS_TIME.search(parts[start_idx])
|
|
em = ASS_TIME.search(parts[end_idx])
|
|
if sm and em:
|
|
s_cs = ass_to_cs(sm) + offset_cs
|
|
e_cs = ass_to_cs(em) + offset_cs
|
|
parts[start_idx] = ASS_TIME.sub(cs_to_ass(s_cs), parts[start_idx], count=1)
|
|
parts[end_idx] = ASS_TIME.sub(cs_to_ass(e_cs), parts[end_idx], count=1)
|
|
out_lines.append("Dialogue:" + ",".join(parts))
|
|
continue
|
|
except Exception:
|
|
pass
|
|
out_lines.append(line)
|
|
return "\n".join(out_lines)
|
|
|
|
# TTML: XML; adjust begin/end/dur attributes when present.
|
|
def parse_time_to_ms(value: str) -> Optional[int]:
|
|
"""
|
|
Accept forms like 'HH:MM:SS.mmm' or 'HH:MM:SS:FF' (rare) or 'XmYsZms'
|
|
Keep to simplest: HH:MM:SS.mmm and HH:MM:SS for typical TTML.
|
|
"""
|
|
m = re.match(r"^(\d{2}):(\d{2}):(\d{2})(?:\.(\d{1,3}))?$", value)
|
|
if m:
|
|
h, mi, s = map(int, m.groups()[:3])
|
|
ms = int((m.group(4) or "0").ljust(3, "0"))
|
|
return ((h * 3600 + mi * 60 + s) * 1000) + ms
|
|
return None
|
|
|
|
def ms_to_ttml(ms: int) -> str:
|
|
ms = clamp_ms(ms)
|
|
h = ms // 3600000; ms %= 3600000
|
|
mi = ms // 60000; ms %= 60000
|
|
s = ms // 1000; ms %= 1000
|
|
return f"{h:02}:{mi:02}:{s:02}.{ms:03}"
|
|
|
|
def shift_ttml_text(text: str, offset_ms: int) -> str:
|
|
try:
|
|
root = ET.fromstring(text)
|
|
# Common TTML namespaces vary; try to adjust attributes on any element
|
|
for elem in root.iter():
|
|
for attr in ("begin", "end", "dur"):
|
|
if attr in elem.attrib:
|
|
val = elem.attrib[attr]
|
|
ms = parse_time_to_ms(val)
|
|
if ms is not None:
|
|
if attr == "dur":
|
|
# duration stays the same when prepending silence
|
|
continue
|
|
elem.attrib[attr] = ms_to_ttml(ms + offset_ms)
|
|
return ET.tostring(root, encoding="unicode")
|
|
except Exception:
|
|
# If parsing fails, return original text
|
|
return text
|
|
|
|
def python_shift(input_sub: Path, output_sub: Path, seconds: float) -> bool:
|
|
"""
|
|
Format-aware shifting when FFmpeg fails or for negative offset clamping.
|
|
"""
|
|
ext = input_sub.suffix.lower()
|
|
text = input_sub.read_text(encoding="utf-8", errors="replace")
|
|
offset_ms = int(round(seconds * 1000))
|
|
|
|
if ext == ".srt":
|
|
out = shift_srt_text(text, offset_ms)
|
|
elif ext == ".vtt":
|
|
out = shift_vtt_text(text, offset_ms)
|
|
# Ensure WEBVTT header remains if present
|
|
if not out.lstrip().startswith("WEBVTT") and text.lstrip().startswith("WEBVTT"):
|
|
out = "WEBVTT\n\n" + out
|
|
elif ext == ".ass":
|
|
out = shift_ass_text(text, offset_ms)
|
|
elif ext == ".ttml":
|
|
out = shift_ttml_text(text, offset_ms)
|
|
else:
|
|
return False
|
|
|
|
output_sub.write_text(out, encoding="utf-8")
|
|
return True
|
|
|
|
# ---------- batch ----------
|
|
def find_sub_files(folder: Path, recursive: bool, exts: List[str]) -> List[Path]:
|
|
pattern = "**/*" if recursive else "*"
|
|
exts_norm = {e.lower() for e in exts}
|
|
return sorted([p for p in folder.glob(pattern) if p.is_file() and p.suffix.lower() in exts_norm])
|
|
|
|
def process_one(file: Path, out_dir: Path, seconds: float, skip_existing: bool):
|
|
out_path = out_dir / file.name
|
|
if skip_existing and out_path.exists():
|
|
print(f"Skip (exists): {out_path}")
|
|
return
|
|
|
|
# 1) Try FFmpeg first
|
|
rc = ffmpeg_shift(file, out_path, seconds)
|
|
if rc == 0:
|
|
print(f"OK (ffmpeg): {file.name} -> {out_path.name}")
|
|
return
|
|
|
|
print(f"FFmpeg failed; using Python fallback for {file.name} …")
|
|
ok = python_shift(file, out_path, seconds)
|
|
if ok:
|
|
print(f"OK (python): {file.name} -> {out_path.name}")
|
|
else:
|
|
print(f"FAILED: {file}")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Batch shift text subtitles by N seconds (ASS/SRT/TTML/VTT)."
|
|
)
|
|
parser.add_argument("folder", help="Folder containing subtitle files")
|
|
parser.add_argument("seconds", nargs="?", type=float, default=1.0,
|
|
help="Constant time shift in seconds (default: +1.0)")
|
|
parser.add_argument("--recursive", action="store_true", help="Process subfolders")
|
|
parser.add_argument("--skip-existing", action="store_true", help="Skip if output already exists")
|
|
parser.add_argument("--exts", nargs="+", default=SUPPORTED_EXTS, help="Extensions to include")
|
|
parser.add_argument("--output-dir-name", default="output", help="Name of the output subfolder")
|
|
args = parser.parse_args()
|
|
|
|
check_binary("ffmpeg")
|
|
|
|
root = Path(args.folder).expanduser().resolve()
|
|
if not root.exists() or not root.is_dir():
|
|
print(f"Error: '{root}' is not a folder.")
|
|
sys.exit(1)
|
|
|
|
out_dir = root / args.output_dir_name
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
files = find_sub_files(root, args.recursive, args.exts)
|
|
files = [f for f in files if out_dir not in f.parents]
|
|
if not files:
|
|
print("No matching subtitle files found.")
|
|
sys.exit(0)
|
|
|
|
print(f"Found {len(files)} file(s). Output: {out_dir}")
|
|
for f in files:
|
|
try:
|
|
process_one(f, out_dir, args.seconds, args.skip_existing)
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted.")
|
|
sys.exit(130)
|
|
except Exception as e:
|
|
print(f"Error on '{f}': {e}")
|
|
print("Done.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|