#!/usr/bin/env python3 """ Batch-shift text subtitles (ASS, SRT, TTML, VTT) by +N seconds. Primary method: - FFmpeg with `-itsoffset` to create a shifted external subtitle file, preserving the original format/extension. Fallback: - Python per-format shifters for SRT, VTT, ASS, TTML (handles negative-time clamping). Outputs: - Writes to /output/ keeping the same file names. Usage: python shift_subtitles_batch.py /path/to/folder 1 python shift_subtitles_batch.py "/subs_folder" 1 --recursive --skip-existing """ import argparse import re import sys import shutil import subprocess from pathlib import Path from typing import List, Tuple, Optional import xml.etree.ElementTree as ET SUPPORTED_EXTS = [".srt", ".ass", ".vtt", ".ttml"] FFMPEG_CODEC_BY_EXT = { ".srt": None, # copy is fine ".ass": "ass", # be explicit if needed ".vtt": "webvtt", # FFmpeg supports webvtt muxer/codec ".ttml": "ttml", # may not be available in all builds; fallback if fails } def check_binary(name: str): if shutil.which(name) is None: print(f"Error: '{name}' not found on PATH. Install it and try again.") sys.exit(1) def ffmpeg_shift(input_sub: Path, output_sub: Path, seconds: float) -> int: """ Try to shift a text subtitle with FFmpeg using -itsoffset. Use -c:s when known; otherwise -c copy. """ ext = input_sub.suffix.lower() codec = FFMPEG_CODEC_BY_EXT.get(ext) cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-y", "-itsoffset", str(seconds), "-i", str(input_sub), ] if codec: cmd += ["-c:s", codec] else: cmd += ["-c", "copy"] cmd += [str(output_sub)] print("FFmpeg shift:\n " + " ".join(map(str, cmd))) return subprocess.run(cmd).returncode # ---------- Python fallback shifters ---------- def clamp_ms(ms: int) -> int: return max(ms, 0) # SRT: 00:00:05,123 --> 00:00:08,456 SRT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2}),(\d{3})") def srt_to_ms(m: re.Match) -> int: h, mi, s, ms = map(int, m.groups()) return ((h * 3600 + mi * 60 + s) * 1000) + ms def ms_to_srt(ms: int) -> str: ms = clamp_ms(ms) h = ms // 3600000; ms %= 3600000 mi = ms // 60000; ms %= 60000 s = ms // 1000; ms %= 1000 return f"{h:02}:{mi:02}:{s:02},{ms:03}" def shift_srt_text(text: str, offset_ms: int) -> str: out_lines = [] for line in text.splitlines(): if "-->" in line: parts = line.split("-->") left = SRT_TIME.search(parts[0]) right = SRT_TIME.search(parts[1]) if left and right: l_ms = srt_to_ms(left) + offset_ms r_ms = srt_to_ms(right) + offset_ms new_line = f"{ms_to_srt(l_ms)} --> {ms_to_srt(r_ms)}" out_lines.append(new_line) continue out_lines.append(line) return "\n".join(out_lines) # VTT: WEBVTT header; times use '.' separator: 00:00:05.123 --> ... VTT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{3})") def vtt_to_ms(m: re.Match) -> int: h, mi, s, ms = map(int, m.groups()) return ((h * 3600 + mi * 60 + s) * 1000) + ms def ms_to_vtt(ms: int) -> str: ms = clamp_ms(ms) h = ms // 3600000; ms %= 3600000 mi = ms // 60000; ms %= 60000 s = ms // 1000; ms %= 1000 return f"{h:02}:{mi:02}:{s:02}.{ms:03}" def shift_vtt_text(text: str, offset_ms: int) -> str: out_lines = [] for i, line in enumerate(text.splitlines()): if "-->" in line: # Preserve cue settings like "line:-1 align:right" if they exist. left, right = line.split("-->", 1) # Left timestamp may have trailing settings; isolate the time token lm = VTT_TIME.search(left) rm = VTT_TIME.search(right) if lm and rm: l_ms = vtt_to_ms(lm) + offset_ms r_ms = vtt_to_ms(rm) + offset_ms # Replace only the matched portions; keep extra cue settings left_new = VTT_TIME.sub(ms_to_vtt(l_ms), left, count=1) right_new = VTT_TIME.sub(ms_to_vtt(r_ms), right, count=1) out_lines.append(f"{left_new}-->{right_new}") continue out_lines.append(line) return "\n".join(out_lines) # ASS: times appear in Dialogue events; format line defines field order. # Typical: "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text" # Dialogue: 0,00:00:05.12,00:00:08.34,Default,... ASS_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{2})") def ass_to_cs(m: re.Match) -> int: h, mi, s, cs = map(int, m.groups()) return ((h * 3600 + mi * 60 + s) * 100) + cs # centiseconds def cs_to_ass(cs: int) -> str: cs = max(cs, 0) h = cs // (3600 * 100); cs %= (3600 * 100) mi = cs // (60 * 100); cs %= (60 * 100) s = cs // 100; cs %= 100 return f"{h:02}:{mi:02}:{s:02}.{cs:02}" def shift_ass_text(text: str, offset_ms: int) -> str: offset_cs = int(round(offset_ms / 10.0)) out_lines = [] fmt_fields: Optional[List[str]] = None for line in text.splitlines(): if line.startswith("Format:"): # Capture field order for reference fmt_fields = [f.strip() for f in line.split(":", 1)[1].split(",")] out_lines.append(line) continue if line.startswith("Dialogue:"): parts = line.split(":", 1)[1].split(",", maxsplit=len(fmt_fields) or 10) # Heuristic: Start = field named "Start" or position 1; End = "End" or position 2 try: if fmt_fields: start_idx = fmt_fields.index("Start") end_idx = fmt_fields.index("End") else: start_idx, end_idx = 1, 2 sm = ASS_TIME.search(parts[start_idx]) em = ASS_TIME.search(parts[end_idx]) if sm and em: s_cs = ass_to_cs(sm) + offset_cs e_cs = ass_to_cs(em) + offset_cs parts[start_idx] = ASS_TIME.sub(cs_to_ass(s_cs), parts[start_idx], count=1) parts[end_idx] = ASS_TIME.sub(cs_to_ass(e_cs), parts[end_idx], count=1) out_lines.append("Dialogue:" + ",".join(parts)) continue except Exception: pass out_lines.append(line) return "\n".join(out_lines) # TTML: XML; adjust begin/end/dur attributes when present. def parse_time_to_ms(value: str) -> Optional[int]: """ Accept forms like 'HH:MM:SS.mmm' or 'HH:MM:SS:FF' (rare) or 'XmYsZms' Keep to simplest: HH:MM:SS.mmm and HH:MM:SS for typical TTML. """ m = re.match(r"^(\d{2}):(\d{2}):(\d{2})(?:\.(\d{1,3}))?$", value) if m: h, mi, s = map(int, m.groups()[:3]) ms = int((m.group(4) or "0").ljust(3, "0")) return ((h * 3600 + mi * 60 + s) * 1000) + ms return None def ms_to_ttml(ms: int) -> str: ms = clamp_ms(ms) h = ms // 3600000; ms %= 3600000 mi = ms // 60000; ms %= 60000 s = ms // 1000; ms %= 1000 return f"{h:02}:{mi:02}:{s:02}.{ms:03}" def shift_ttml_text(text: str, offset_ms: int) -> str: try: root = ET.fromstring(text) # Common TTML namespaces vary; try to adjust attributes on any element for elem in root.iter(): for attr in ("begin", "end", "dur"): if attr in elem.attrib: val = elem.attrib[attr] ms = parse_time_to_ms(val) if ms is not None: if attr == "dur": # duration stays the same when prepending silence continue elem.attrib[attr] = ms_to_ttml(ms + offset_ms) return ET.tostring(root, encoding="unicode") except Exception: # If parsing fails, return original text return text def python_shift(input_sub: Path, output_sub: Path, seconds: float) -> bool: """ Format-aware shifting when FFmpeg fails or for negative offset clamping. """ ext = input_sub.suffix.lower() text = input_sub.read_text(encoding="utf-8", errors="replace") offset_ms = int(round(seconds * 1000)) if ext == ".srt": out = shift_srt_text(text, offset_ms) elif ext == ".vtt": out = shift_vtt_text(text, offset_ms) # Ensure WEBVTT header remains if present if not out.lstrip().startswith("WEBVTT") and text.lstrip().startswith("WEBVTT"): out = "WEBVTT\n\n" + out elif ext == ".ass": out = shift_ass_text(text, offset_ms) elif ext == ".ttml": out = shift_ttml_text(text, offset_ms) else: return False output_sub.write_text(out, encoding="utf-8") return True # ---------- batch ---------- def find_sub_files(folder: Path, recursive: bool, exts: List[str]) -> List[Path]: pattern = "**/*" if recursive else "*" exts_norm = {e.lower() for e in exts} return sorted([p for p in folder.glob(pattern) if p.is_file() and p.suffix.lower() in exts_norm]) def process_one(file: Path, out_dir: Path, seconds: float, skip_existing: bool): out_path = out_dir / file.name if skip_existing and out_path.exists(): print(f"Skip (exists): {out_path}") return # 1) Try FFmpeg first rc = ffmpeg_shift(file, out_path, seconds) if rc == 0: print(f"OK (ffmpeg): {file.name} -> {out_path.name}") return print(f"FFmpeg failed; using Python fallback for {file.name} …") ok = python_shift(file, out_path, seconds) if ok: print(f"OK (python): {file.name} -> {out_path.name}") else: print(f"FAILED: {file}") def main(): parser = argparse.ArgumentParser( description="Batch shift text subtitles by N seconds (ASS/SRT/TTML/VTT)." ) parser.add_argument("folder", help="Folder containing subtitle files") parser.add_argument("seconds", nargs="?", type=float, default=1.0, help="Constant time shift in seconds (default: +1.0)") parser.add_argument("--recursive", action="store_true", help="Process subfolders") parser.add_argument("--skip-existing", action="store_true", help="Skip if output already exists") parser.add_argument("--exts", nargs="+", default=SUPPORTED_EXTS, help="Extensions to include") parser.add_argument("--output-dir-name", default="output", help="Name of the output subfolder") args = parser.parse_args() check_binary("ffmpeg") root = Path(args.folder).expanduser().resolve() if not root.exists() or not root.is_dir(): print(f"Error: '{root}' is not a folder.") sys.exit(1) out_dir = root / args.output_dir_name out_dir.mkdir(parents=True, exist_ok=True) files = find_sub_files(root, args.recursive, args.exts) files = [f for f in files if out_dir not in f.parents] if not files: print("No matching subtitle files found.") sys.exit(0) print(f"Found {len(files)} file(s). Output: {out_dir}") for f in files: try: process_one(f, out_dir, args.seconds, args.skip_existing) except KeyboardInterrupt: print("\nInterrupted.") sys.exit(130) except Exception as e: print(f"Error on '{f}': {e}") print("Done.") if __name__ == "__main__": main()