Compare commits

6 Commits
main ... dev

Author SHA1 Message Date
sefree
8fed8325ca . 2026-03-31 12:57:52 +07:00
sefree
ac89cbf545 add SeFree-Custom-Script 2026-03-31 12:57:14 +07:00
sefree
99bacaff3f . 2026-03-31 12:56:13 +07:00
sefree
853878f38b test-dev-branch 2026-03-31 12:51:16 +07:00
sefree
7de68e5c2a . 2026-03-31 12:50:16 +07:00
panitan103
2eb1d91987 . 2026-03-31 12:49:39 +07:00
13 changed files with 1479 additions and 2 deletions

3
.gitignore vendored
View File

@@ -249,4 +249,5 @@ bot_logs/
test*.py
Unshackle-Service-SeFree/
SeFree-Custom-Script/
SeFree-Custom-Script/example_tid_decrypt.js
SeFree-Custom-Script/example_tid_decrypt.py

View File

@@ -0,0 +1,151 @@
import os
import glob
import subprocess
import sys
import langcodes
import pycountry
from ass_editor import ASS_Editor, attach_font
def find_files(folder_path):
"""Find all video (MP4, TS, MKV), AAC, and subtitle files in the folder."""
video_files = (glob.glob(os.path.join(folder_path, "*.mkv")) +
glob.glob(os.path.join(folder_path, "*.mp4")) +
glob.glob(os.path.join(folder_path, "*.ts")))
base_video_files = [f for f in video_files if not f.endswith("_DUB.mkv") and
not f.endswith("_DUB.mp4") and not f.endswith("_DUB.ts")]
dub_video_files = [f for f in video_files if f.endswith("_DUB.mkv") or
f.endswith("_DUB.mp4") or f.endswith("_DUB.ts")]
aac_files = (glob.glob(os.path.join(folder_path, "*.aac")) +
glob.glob(os.path.join(folder_path, "*.m4a")) +
glob.glob(os.path.join(folder_path, "*.mka")))
subtitle_files = (glob.glob(os.path.join(folder_path, "*.srt")) +
glob.glob(os.path.join(folder_path, "*.ass")) +
glob.glob(os.path.join(folder_path, "*.vtt")))
return base_video_files, dub_video_files, aac_files, subtitle_files
def get_base_name(file_path):
"""Extract base name by removing '_DUB' and file extension for video files."""
file_name = os.path.basename(file_path)
for ext in ["_DUB.mkv", "_DUB.mp4", "_DUB.ts"]:
if file_name.endswith(ext):
return file_name[:-len(ext)]
return os.path.splitext(file_name)[0]
def get_lang_code(file_path):
"""Helper to extract language code from filename."""
parts = os.path.splitext(os.path.basename(file_path))[0].split('.')
lang = parts[-1]
if len(lang) == 2:
try:
return langcodes.Language.make(lang).to_alpha3()
except:
return "und"
elif len(lang) == 3:
return lang
return "und"
def group_files(video_files, dub_video_files, aac_files, subtitle_files):
"""Group video, AAC, and subtitle files by base name."""
file_groups = {}
for video_file in video_files:
base_name = get_base_name(video_file)
file_groups[base_name] = {'video': video_file, 'dub_video': None, 'audio': [], 'subtitles': []}
for dub_video_file in dub_video_files:
base_name = get_base_name(dub_video_file)
if base_name in file_groups:
file_groups[base_name]['dub_video'] = dub_video_file
for aac_file in aac_files:
base_name = os.path.splitext(os.path.basename(aac_file))[0].split('.')[0]
lang = get_lang_code(aac_file)
if base_name in file_groups:
file_groups[base_name]['audio'].append((aac_file, lang))
for sub_file in subtitle_files:
base_name = os.path.splitext(os.path.basename(sub_file))[0].split('.')[0]
lang = get_lang_code(sub_file)
if base_name in file_groups:
file_groups[base_name]['subtitles'].append((sub_file, lang))
return file_groups
def embed_files(folder_path):
"""Embed audio and subtitles using mkvmerge."""
output_folder = os.path.join(folder_path, "Output")
os.makedirs(output_folder, exist_ok=True)
video_files, dub_video_files, aac_files, subtitle_files = find_files(folder_path)
file_groups = group_files(video_files, dub_video_files, aac_files, subtitle_files)
for base_name, files in file_groups.items():
video_file = files['video']
dub_video_file = files['dub_video']
audio_inputs = files['audio']
subtitle_inputs = files['subtitles']
output_file = os.path.join(output_folder, base_name + ".mkv")
if not video_file:
continue
# Start mkvmerge command
cmd = ["mkvmerge", "-o", output_file]
# 1. Base Video File
# Set a title for the first video track if desired
cmd.extend(["--track-name", "0:SeFree", video_file])
# 2. Dubbed Video File (if exists, add its audio/subs)
if dub_video_file:
# Assume track 0 is video (skipped), track 1 is Japanese, track 2 is Thai
# This logic depends on the internal track IDs of the dub file
cmd.extend([
"--no-video",
"--language", "2:jpn", "--track-name", "2:Japanese",
"--language", "3:tha", "--track-name", "3:Thai",
dub_video_file
])
# 3. External Audio Files
if not dub_video_file:
for aac_file, lang in audio_inputs:
# mkvmerge track ID for external single-track files is usually 0
cmd.extend([
"--language", f"0:{lang}",
"--track-name", f"0:{pycountry.languages.get(alpha_3=lang).name if lang != 'und' else 'Unknown'}",
aac_file
])
# 4. External Subtitle Files
for sub_file, lang in subtitle_inputs:
cmd.extend([
"--language", f"0:{lang}",
"--track-name", f"0:{pycountry.languages.get(alpha_3=lang).name if lang != 'und' else 'Unknown'}_BLBL",
sub_file
])
# cmd=attach_font(cmd, "BLBL")
print(f"Processing {base_name} with mkvmerge...")
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
print(f"✅ Successfully created {os.path.basename(output_file)}")
except subprocess.CalledProcessError as e:
print(f"❌ Error processing {base_name}: {e.stderr}")
def main():
os.umask(0o000)
if len(sys.argv) != 2:
print("Usage: python add_subtitles_to_mkv.py <folder_path>")
sys.exit(1)
folder_path = sys.argv[1].strip()
if not os.path.exists(folder_path):
print(f"Error: Folder '{folder_path}' does not exist!")
sys.exit(1)
embed_files(folder_path)
print("🎉 Processing complete!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,87 @@
import pysubs2
import shutil
import os
class ASS_Editor:
def __init__(self,ass_file):
base_name, extension = os.path.splitext(ass_file)
if extension.lower() == ".mp4":
shutil.move(ass_file, base_name + ".ass")
ass_file = base_name + ".ass"
self.input=ass_file
self.subs = pysubs2.load(self.input)
def batch_custom_style(self,font_name=None, font_size=64,output=None):
# Change font and size for every existing style
for style in self.subs.styles.values():
if font_name is not None:
style.fontname = font_name # <-- put your font family here
style.fontsize = font_size
self.save_out(self.input, output)
def save_out(self,input_file, output=None):
if output:
self.subs.save(input_file)
base_name, extension = os.path.splitext(input_file)
if extension.lower() == ".ass":
shutil.move(input_file, base_name + ".mp4")
self.input = base_name + ".mp4"
else:
self.subs.save(os.path.join(os.path.dirname(input_file), os.path.basename(input_file)+".modified.ass"))
def attach_font(cl,service,FONT_DIR="/root/VT.PR.WV/assets/fonts/{Service}"):
FONT_DIR = FONT_DIR.format(Service=service)
for font_file in os.listdir(FONT_DIR):
if font_file.lower().endswith((".ttf")):
cl.extend(["--attach-file", os.path.join(FONT_DIR, font_file),
"--attachment-mime-type", "font/ttf"])
elif font_file.lower().endswith((".otf")):
cl.extend(["--attach-file", os.path.join(FONT_DIR, font_file),
"--attachment-mime-type", "font/otf"])
return cl
def encode_uu(data: bytes, filename: str) -> str:
import io
out = io.StringIO()
out.write(f"begin 644 {filename}\n")
# encode in 45-byte chunks
for i in range(0, len(data), 45):
chunk = data[i:i+45]
# output length char
out.write(chr(32 + len(chunk)))
# process every 3 bytes
for j in range(0, len(chunk), 3):
triple = chunk[j:j+3]
# pad to 3 bytes
while len(triple) < 3:
triple += b"\0"
# 24 bits
b1, b2, b3 = triple
c1 = (b1 >> 2) & 0x3F
c2 = ((b1 << 4) & 0x30) | ((b2 >> 4) & 0xF)
c3 = ((b2 << 2) & 0x3C) | ((b3 >> 6) & 0x3)
c4 = b3 & 0x3F
for c in (c1, c2, c3, c4):
out.write(chr(32 + (c & 0x3F)))
out.write("\n")
out.write("`\nend\n")
return out.getvalue()
def main():
ass_file= "/root/VT.PR.WV/test.ass"
ass_editor= ASS_Editor(ass_file)
# print(ass_editor.subs.fonts_opaque.values())
font_path="/root/VT.PR.WV/assets/fonts/BLBL/NotoSansThai-Regular.ttf"
with open(font_path, "rb") as f:
font_bytes = f.read()
uue_text = encode_uu(font_bytes, "NotoSansThai-Regular.ttf")
print(uue_text[:200])
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,251 @@
#!/usr/bin/env python3
"""
Prepend N seconds of silence to audio files using ffmpeg as a subprocess.
- Primary path: concat demuxer + -c copy (avoids re-encoding main audio)
- Fallback: filter_complex concat (re-encodes output) when stream-copy can't be used
- Writes outputs to <folder>/output/
- Optional recursion, skip-existing, and extension filtering
Usage:
python extend_audio.py /path/to/folder [duration_seconds]
Examples:
python extend_audio.py "/folder_that_contain_many_audio" 1
python extend_audio.py "/folder_that_contain_many_audio" 1 --recursive --skip-existing
"""
import os
import sys
import json
import shlex
import subprocess
from pathlib import Path
# ------------------- helpers -------------------
def check_binary(name: str):
if shutil.which(name) is None:
print(f"Error: '{name}' not found on PATH. Install it and try again.")
sys.exit(1)
def run_probe(args):
return json.loads(subprocess.run(args, capture_output=True, text=True, check=True).stdout)
def get_audio_info(file_path):
"""
Probe first audio stream; return codec_name, sample_rate, channels, channel_layout.
Handle files without audio gracefully.
"""
try:
data = run_probe([
"ffprobe", "-v", "error",
"-select_streams", "a:0",
"-show_entries", "stream=codec_name,channels,sample_rate,channel_layout,bit_rate",
"-of", "json", file_path
])
s = data["streams"][0]
codec = s.get("codec_name")
channels = int(s.get("channels", 2))
sample_rate = int(s.get("sample_rate", 48000))
layout = s.get("channel_layout") or ("mono" if channels == 1 else "stereo")
bitrate = s.get("bit_rate")
return {
"codec": codec,
"channels": channels,
"sample_rate": sample_rate,
"layout": layout,
"bitrate": bitrate,
"has_audio": True
}
except Exception:
# If the container has no audio stream, still allow generating silence only.
return {
"codec": "aac",
"channels": 2,
"sample_rate": 48000,
"layout": "stereo",
"bitrate": None,
"has_audio": False
}
# ------------------- encoders -------------------
AUDIO_ENCODER_MAP = {
"aac": ("aac", ["-b:a", "192k"]),
"mp3": ("libmp3lame", ["-b:a", "192k"]),
"libmp3lame": ("libmp3lame", ["-b:a", "192k"]),
"opus": ("libopus", ["-b:a", "160k"]),
"vorbis": ("libvorbis", ["-q:a", "4"]),
"ac3": ("ac3", ["-b:a", "384k"]),
"eac3": ("eac3", ["-b:a", "384k"]),
"flac": ("flac", []),
"alac": ("alac", []),
"pcm_s16le": ("pcm_s16le", []),
"wav": ("pcm_s16le", []), # convenience
}
def pick_audio_encoder(codec_name):
# Normalize codec name to the encoder we want to use
if not codec_name:
return "aac", ["-b:a", "192k"]
enc, extra = AUDIO_ENCODER_MAP.get(codec_name, ("aac", ["-b:a", "192k"]))
return enc, extra
# ------------------- build intro (silence) -------------------
def build_silence_cmd(info, seconds, intro_path):
"""
Create a short silent file encoded with the SAME codec + params as the input,
so we can try concat demuxer + -c copy.
"""
enc, extra = pick_audio_encoder(info["codec"])
ch = info["channels"]
sr = info["sample_rate"]
layout = info["layout"]
# Important: tell ffmpeg that anullsrc is a filter input (-f lavfi). [3](https://stackoverflow.com/questions/42147512/ffmpeg-adding-silence-struggling-to-use-i-anullsrc-option)
cmd = [
"ffmpeg",
"-hide_banner", "-loglevel", "error", "-y",
"-f", "lavfi", "-t", str(seconds),
"-i", f"anullsrc=channel_layout={layout}:sample_rate={sr}:d={seconds}",
"-c:a", enc, *extra,
"-ac", str(ch), "-ar", str(sr),
intro_path
]
return cmd
# ------------------- concat methods -------------------
def concat_demuxer_copy(intro_path, input_path, output_path):
"""
Concatenate via the concat demuxer and stream copy. Requires same codec & params. [1](https://trac.ffmpeg.org/wiki/Concatenate)
"""
# Build a temporary list file
list_file = Path(output_path).with_suffix(".concat.txt")
list_file.write_text(f"file '{intro_path}'\nfile '{input_path}'\n", encoding="utf-8")
cmd = [
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
"-f", "concat", "-safe", "0",
"-i", str(list_file),
"-c", "copy",
output_path
]
print("Concat demuxer command:\n " + " ".join(shlex.quote(x) for x in cmd))
proc = subprocess.run(cmd)
try:
list_file.unlink()
except Exception:
pass
return proc.returncode
def concat_filter_reencode(input_path, seconds, output_path, info):
"""
Fallback: use filter_complex concat to prepend the silent intro and re-encode output. [1](https://trac.ffmpeg.org/wiki/Concatenate)
"""
enc, extra = pick_audio_encoder(info["codec"])
ch = info["channels"]
sr = info["sample_rate"]
layout = info["layout"]
# Build filter path: [silence][input] concat to 1 audio stream. [4](https://superuser.com/questions/579008/add-1-second-of-silence-to-audio-through-ffmpeg)
cmd = [
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
"-f", "lavfi", "-t", str(seconds),
"-i", f"anullsrc=channel_layout={layout}:sample_rate={sr}:d={seconds}",
"-i", input_path,
"-filter_complex", "[0:a][1:a]concat=n=2:v=0:a=1[a]", # concat filter for audio
"-map", "[a]",
"-c:a", enc, *extra,
"-ac", str(ch), "-ar", str(sr),
output_path
]
print("Concat filter (fallback) command:\n " + " ".join(shlex.quote(x) for x in cmd))
return subprocess.run(cmd).returncode
# ------------------- batch logic -------------------
import shutil
SUPPORTED_EXTS = [".wav", ".mp3", ".m4a", ".aac", ".flac", ".ogg", ".opus"]
def find_audio_files(folder: Path, recursive: bool, exts):
pattern = "**/*" if recursive else "*"
exts_norm = {e.lower() for e in exts}
return sorted([p for p in folder.glob(pattern) if p.is_file() and p.suffix.lower() in exts_norm])
def process_one(input_file: Path, out_dir: Path, seconds: float, skip_existing: bool):
info = get_audio_info(str(input_file))
output_file = out_dir / input_file.name
intro_file = out_dir / (input_file.stem + "_intro" + input_file.suffix)
if skip_existing and output_file.exists():
print(f"Skip (exists): {output_file}")
return
# 1) Create silence intro encoded like the source
intro_cmd = build_silence_cmd(info, seconds, str(intro_file))
print("Intro command:\n " + " ".join(shlex.quote(x) for x in intro_cmd))
rc = subprocess.run(intro_cmd).returncode
if rc != 0:
print(f"Failed to make intro for {input_file.name}")
return
# 2) Try demuxer (stream copy) first
rc = concat_demuxer_copy(str(intro_file), str(input_file), str(output_file))
if rc == 0:
print(f"OK (copy): {input_file.name} -> {output_file.name}")
else:
print(f"Demuxer concat failed; trying filter fallback…")
rc = concat_filter_reencode(str(input_file), seconds, str(output_file), info)
if rc == 0:
print(f"OK (re-encode): {input_file.name} -> {output_file.name}")
else:
print(f"FAILED: {input_file}")
# Cleanup the intro file
try:
intro_file.unlink()
except Exception:
pass
def process_folder(root: Path, seconds: float, recursive: bool, skip_existing: bool, output_dir_name: str, exts):
out_dir = root / output_dir_name
out_dir.mkdir(parents=True, exist_ok=True)
files = find_audio_files(root, recursive, exts)
# Don't process anything in output/
files = [f for f in files if out_dir not in f.parents]
if not files:
print("No matching audio files found.")
return
print(f"Found {len(files)} file(s). Output dir: {out_dir}")
for f in files:
try:
process_one(f, out_dir, seconds, skip_existing)
except KeyboardInterrupt:
print("\nInterrupted.")
sys.exit(130)
except Exception as e:
print(f"Error on '{f}': {e}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Prepend N seconds of silence to audio files. Uses concat demuxer when possible; falls back to re-encode."
)
parser.add_argument("folder", help="Folder containing audio files")
parser.add_argument("seconds", nargs="?", type=float, default=1.0, help="Silence duration in seconds (default: 1.0)")
parser.add_argument("--recursive", action="store_true", help="Process subfolders")
parser.add_argument("--skip-existing", action="store_true", help="Skip if output already exists")
parser.add_argument("--output-dir-name", default="output", help="Subfolder name for outputs (default: output)")
parser.add_argument("--exts", nargs="+", default=SUPPORTED_EXTS, help="Extensions to include")
args = parser.parse_args()
check_binary("ffmpeg")
check_binary("ffprobe")
root = Path(args.folder).expanduser().resolve()
if not root.exists() or not root.is_dir():
print(f"Error: '{root}' is not a folder.")
sys.exit(1)
print(f"Extending audio in: {root} | silence: {args.seconds}s\n")
process_folder(root, args.seconds, args.recursive, args.skip_existing, args.output_dir_name, args.exts)

View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""
Batch-shift text subtitles (ASS, SRT, TTML, VTT) by +N seconds.
Primary method:
- FFmpeg with `-itsoffset` to create a shifted external subtitle file,
preserving the original format/extension.
Fallback:
- Python per-format shifters for SRT, VTT, ASS, TTML (handles negative-time clamping).
Outputs:
- Writes to <folder>/output/ keeping the same file names.
Usage:
python shift_subtitles_batch.py /path/to/folder 1
python shift_subtitles_batch.py "/subs_folder" 1 --recursive --skip-existing
"""
import argparse
import re
import sys
import shutil
import subprocess
from pathlib import Path
from typing import List, Tuple, Optional
import xml.etree.ElementTree as ET
SUPPORTED_EXTS = [".srt", ".ass", ".vtt", ".ttml"]
FFMPEG_CODEC_BY_EXT = {
".srt": None, # copy is fine
".ass": "ass", # be explicit if needed
".vtt": "webvtt", # FFmpeg supports webvtt muxer/codec
".ttml": "ttml", # may not be available in all builds; fallback if fails
}
def check_binary(name: str):
if shutil.which(name) is None:
print(f"Error: '{name}' not found on PATH. Install it and try again.")
sys.exit(1)
def ffmpeg_shift(input_sub: Path, output_sub: Path, seconds: float) -> int:
"""
Try to shift a text subtitle with FFmpeg using -itsoffset.
Use -c:s <codec> when known; otherwise -c copy.
"""
ext = input_sub.suffix.lower()
codec = FFMPEG_CODEC_BY_EXT.get(ext)
cmd = [
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
"-itsoffset", str(seconds),
"-i", str(input_sub),
]
if codec:
cmd += ["-c:s", codec]
else:
cmd += ["-c", "copy"]
cmd += [str(output_sub)]
print("FFmpeg shift:\n " + " ".join(map(str, cmd)))
return subprocess.run(cmd).returncode
# ---------- Python fallback shifters ----------
def clamp_ms(ms: int) -> int:
return max(ms, 0)
# SRT: 00:00:05,123 --> 00:00:08,456
SRT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2}),(\d{3})")
def srt_to_ms(m: re.Match) -> int:
h, mi, s, ms = map(int, m.groups())
return ((h * 3600 + mi * 60 + s) * 1000) + ms
def ms_to_srt(ms: int) -> str:
ms = clamp_ms(ms)
h = ms // 3600000; ms %= 3600000
mi = ms // 60000; ms %= 60000
s = ms // 1000; ms %= 1000
return f"{h:02}:{mi:02}:{s:02},{ms:03}"
def shift_srt_text(text: str, offset_ms: int) -> str:
out_lines = []
for line in text.splitlines():
if "-->" in line:
parts = line.split("-->")
left = SRT_TIME.search(parts[0])
right = SRT_TIME.search(parts[1])
if left and right:
l_ms = srt_to_ms(left) + offset_ms
r_ms = srt_to_ms(right) + offset_ms
new_line = f"{ms_to_srt(l_ms)} --> {ms_to_srt(r_ms)}"
out_lines.append(new_line)
continue
out_lines.append(line)
return "\n".join(out_lines)
# VTT: WEBVTT header; times use '.' separator: 00:00:05.123 --> ...
VTT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{3})")
def vtt_to_ms(m: re.Match) -> int:
h, mi, s, ms = map(int, m.groups())
return ((h * 3600 + mi * 60 + s) * 1000) + ms
def ms_to_vtt(ms: int) -> str:
ms = clamp_ms(ms)
h = ms // 3600000; ms %= 3600000
mi = ms // 60000; ms %= 60000
s = ms // 1000; ms %= 1000
return f"{h:02}:{mi:02}:{s:02}.{ms:03}"
def shift_vtt_text(text: str, offset_ms: int) -> str:
out_lines = []
for i, line in enumerate(text.splitlines()):
if "-->" in line:
# Preserve cue settings like "line:-1 align:right" if they exist.
left, right = line.split("-->", 1)
# Left timestamp may have trailing settings; isolate the time token
lm = VTT_TIME.search(left)
rm = VTT_TIME.search(right)
if lm and rm:
l_ms = vtt_to_ms(lm) + offset_ms
r_ms = vtt_to_ms(rm) + offset_ms
# Replace only the matched portions; keep extra cue settings
left_new = VTT_TIME.sub(ms_to_vtt(l_ms), left, count=1)
right_new = VTT_TIME.sub(ms_to_vtt(r_ms), right, count=1)
out_lines.append(f"{left_new}-->{right_new}")
continue
out_lines.append(line)
return "\n".join(out_lines)
# ASS: times appear in Dialogue events; format line defines field order.
# Typical: "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text"
# Dialogue: 0,00:00:05.12,00:00:08.34,Default,...
ASS_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{2})")
def ass_to_cs(m: re.Match) -> int:
h, mi, s, cs = map(int, m.groups())
return ((h * 3600 + mi * 60 + s) * 100) + cs # centiseconds
def cs_to_ass(cs: int) -> str:
cs = max(cs, 0)
h = cs // (3600 * 100); cs %= (3600 * 100)
mi = cs // (60 * 100); cs %= (60 * 100)
s = cs // 100; cs %= 100
return f"{h:02}:{mi:02}:{s:02}.{cs:02}"
def shift_ass_text(text: str, offset_ms: int) -> str:
offset_cs = int(round(offset_ms / 10.0))
out_lines = []
fmt_fields: Optional[List[str]] = None
for line in text.splitlines():
if line.startswith("Format:"):
# Capture field order for reference
fmt_fields = [f.strip() for f in line.split(":", 1)[1].split(",")]
out_lines.append(line)
continue
if line.startswith("Dialogue:"):
parts = line.split(":", 1)[1].split(",", maxsplit=len(fmt_fields) or 10)
# Heuristic: Start = field named "Start" or position 1; End = "End" or position 2
try:
if fmt_fields:
start_idx = fmt_fields.index("Start")
end_idx = fmt_fields.index("End")
else:
start_idx, end_idx = 1, 2
sm = ASS_TIME.search(parts[start_idx])
em = ASS_TIME.search(parts[end_idx])
if sm and em:
s_cs = ass_to_cs(sm) + offset_cs
e_cs = ass_to_cs(em) + offset_cs
parts[start_idx] = ASS_TIME.sub(cs_to_ass(s_cs), parts[start_idx], count=1)
parts[end_idx] = ASS_TIME.sub(cs_to_ass(e_cs), parts[end_idx], count=1)
out_lines.append("Dialogue:" + ",".join(parts))
continue
except Exception:
pass
out_lines.append(line)
return "\n".join(out_lines)
# TTML: XML; adjust begin/end/dur attributes when present.
def parse_time_to_ms(value: str) -> Optional[int]:
"""
Accept forms like 'HH:MM:SS.mmm' or 'HH:MM:SS:FF' (rare) or 'XmYsZms'
Keep to simplest: HH:MM:SS.mmm and HH:MM:SS for typical TTML.
"""
m = re.match(r"^(\d{2}):(\d{2}):(\d{2})(?:\.(\d{1,3}))?$", value)
if m:
h, mi, s = map(int, m.groups()[:3])
ms = int((m.group(4) or "0").ljust(3, "0"))
return ((h * 3600 + mi * 60 + s) * 1000) + ms
return None
def ms_to_ttml(ms: int) -> str:
ms = clamp_ms(ms)
h = ms // 3600000; ms %= 3600000
mi = ms // 60000; ms %= 60000
s = ms // 1000; ms %= 1000
return f"{h:02}:{mi:02}:{s:02}.{ms:03}"
def shift_ttml_text(text: str, offset_ms: int) -> str:
try:
root = ET.fromstring(text)
# Common TTML namespaces vary; try to adjust attributes on any element
for elem in root.iter():
for attr in ("begin", "end", "dur"):
if attr in elem.attrib:
val = elem.attrib[attr]
ms = parse_time_to_ms(val)
if ms is not None:
if attr == "dur":
# duration stays the same when prepending silence
continue
elem.attrib[attr] = ms_to_ttml(ms + offset_ms)
return ET.tostring(root, encoding="unicode")
except Exception:
# If parsing fails, return original text
return text
def python_shift(input_sub: Path, output_sub: Path, seconds: float) -> bool:
"""
Format-aware shifting when FFmpeg fails or for negative offset clamping.
"""
ext = input_sub.suffix.lower()
text = input_sub.read_text(encoding="utf-8", errors="replace")
offset_ms = int(round(seconds * 1000))
if ext == ".srt":
out = shift_srt_text(text, offset_ms)
elif ext == ".vtt":
out = shift_vtt_text(text, offset_ms)
# Ensure WEBVTT header remains if present
if not out.lstrip().startswith("WEBVTT") and text.lstrip().startswith("WEBVTT"):
out = "WEBVTT\n\n" + out
elif ext == ".ass":
out = shift_ass_text(text, offset_ms)
elif ext == ".ttml":
out = shift_ttml_text(text, offset_ms)
else:
return False
output_sub.write_text(out, encoding="utf-8")
return True
# ---------- batch ----------
def find_sub_files(folder: Path, recursive: bool, exts: List[str]) -> List[Path]:
pattern = "**/*" if recursive else "*"
exts_norm = {e.lower() for e in exts}
return sorted([p for p in folder.glob(pattern) if p.is_file() and p.suffix.lower() in exts_norm])
def process_one(file: Path, out_dir: Path, seconds: float, skip_existing: bool):
out_path = out_dir / file.name
if skip_existing and out_path.exists():
print(f"Skip (exists): {out_path}")
return
# 1) Try FFmpeg first
rc = ffmpeg_shift(file, out_path, seconds)
if rc == 0:
print(f"OK (ffmpeg): {file.name} -> {out_path.name}")
return
print(f"FFmpeg failed; using Python fallback for {file.name}")
ok = python_shift(file, out_path, seconds)
if ok:
print(f"OK (python): {file.name} -> {out_path.name}")
else:
print(f"FAILED: {file}")
def main():
parser = argparse.ArgumentParser(
description="Batch shift text subtitles by N seconds (ASS/SRT/TTML/VTT)."
)
parser.add_argument("folder", help="Folder containing subtitle files")
parser.add_argument("seconds", nargs="?", type=float, default=1.0,
help="Constant time shift in seconds (default: +1.0)")
parser.add_argument("--recursive", action="store_true", help="Process subfolders")
parser.add_argument("--skip-existing", action="store_true", help="Skip if output already exists")
parser.add_argument("--exts", nargs="+", default=SUPPORTED_EXTS, help="Extensions to include")
parser.add_argument("--output-dir-name", default="output", help="Name of the output subfolder")
args = parser.parse_args()
check_binary("ffmpeg")
root = Path(args.folder).expanduser().resolve()
if not root.exists() or not root.is_dir():
print(f"Error: '{root}' is not a folder.")
sys.exit(1)
out_dir = root / args.output_dir_name
out_dir.mkdir(parents=True, exist_ok=True)
files = find_sub_files(root, args.recursive, args.exts)
files = [f for f in files if out_dir not in f.parents]
if not files:
print("No matching subtitle files found.")
sys.exit(0)
print(f"Found {len(files)} file(s). Output: {out_dir}")
for f in files:
try:
process_one(f, out_dir, args.seconds, args.skip_existing)
except KeyboardInterrupt:
print("\nInterrupted.")
sys.exit(130)
except Exception as e:
print(f"Error on '{f}': {e}")
print("Done.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,217 @@
import os
import subprocess
import glob
import sys
import json
import shlex
# ---------- ffprobe helpers ----------
def _run_probe(args):
return json.loads(
subprocess.run(args, capture_output=True, text=True, check=True).stdout
)
def get_media_info(file_path):
"""
Detect container-agnostic media properties required to reproduce the same
encoding format for an intro clip that can be safely appended.
"""
# Video stream (include codec, pix_fmt, color info, fps as *string* rational)
vprobe = _run_probe([
"ffprobe","-v","error",
"-select_streams","v:0",
"-show_entries","stream=codec_name,pix_fmt,width,height,r_frame_rate,color_range,color_space,color_transfer,color_primaries",
"-of","json", file_path
])
v = vprobe["streams"][0]
width = int(v["width"])
height = int(v["height"])
fps_rational = v.get("r_frame_rate","24000/1001") # keep rational string for exact match
vcodec = v.get("codec_name","h264")
pix_fmt = v.get("pix_fmt","yuv420p")
color_range = v.get("color_range") # "tv" or "pc" typically
color_space = v.get("color_space") # e.g. "bt709", "bt2020nc"
color_trc = v.get("color_transfer") # e.g. "smpte2084", "bt709"
color_primaries = v.get("color_primaries") # e.g. "bt2020", "bt709"
# Audio stream (channels, sample_rate, codec, layout if present)
# Some inputs have no audio; handle gracefully.
try:
aprobe = _run_probe([
"ffprobe","-v","error",
"-select_streams","a:0",
"-show_entries","stream=codec_name,channels,sample_rate,channel_layout",
"-of","json", file_path
])
a = aprobe["streams"][0]
achannels = int(a.get("channels", 2))
arate = int(a.get("sample_rate", 48000))
acodec = a.get("codec_name", "aac")
alayout = a.get("channel_layout") # may be None
has_audio = True
except Exception:
achannels = 0
arate = 0
acodec = None
alayout = None
has_audio = False
return {
"width": width, "height": height, "fps_rational": fps_rational,
"vcodec": vcodec, "pix_fmt": pix_fmt,
"color_range": color_range, "color_space": color_space,
"color_trc": color_trc, "color_primaries": color_primaries,
"has_audio": has_audio, "achannels": achannels,
"arate": arate, "acodec": acodec, "alayout": alayout
}
# ---------- encoder selection ----------
VIDEO_ENCODER_MAP = {
"av1": ("libaom-av1", ["-crf","30","-b:v","0","-cpu-used","6"]),
"hevc": ("libx265", ["-crf","20","-preset","medium"]),
"h264": ("libx264", ["-crf","20","-preset","medium"]),
"vp9": ("libvpx-vp9", ["-crf","32","-b:v","0","-row-mt","1"]),
"mpeg2video": ("mpeg2video", []),
"mpeg4": ("mpeg4", []),
# fall back handled below
}
AUDIO_ENCODER_MAP = {
"aac": ("aac", ["-b:a","192k"]),
"opus": ("libopus", ["-b:a","160k"]),
"ac3": ("ac3", ["-b:a","384k"]),
"eac3": ("eac3", ["-b:a","384k"]),
"flac": ("flac", []),
"vorbis":("libvorbis", ["-q:a","4"]),
"mp3": ("libmp3lame", ["-b:a","192k"]),
"pcm_s16le": ("pcm_s16le", []),
# more can be added as needed
}
def pick_video_encoder(vcodec):
enc, extra = VIDEO_ENCODER_MAP.get(vcodec, ("libx264", ["-crf","20","-preset","medium"]))
return enc, extra
def pick_audio_encoder(acodec):
if acodec is None:
return None, [] # no audio in input
enc, extra = AUDIO_ENCODER_MAP.get(acodec, ("aac", ["-b:a","192k"]))
return enc, extra
# ---------- intro generation ----------
def build_intro_cmd(info, duration, intro_file):
"""
Build an ffmpeg command that creates a silent (black) intro with *matching*
A/V format to the input.
"""
w, h = info["width"], info["height"]
fps = info["fps_rational"] # keep as rational string
venc, venc_extra = pick_video_encoder(info["vcodec"])
pix_fmt = info["pix_fmt"]
cmd = [
"ffmpeg",
"-f","lavfi","-i", f"color=black:s={w}x{h}:r={fps}:d={duration}",
]
# Audio input (only if input has audio; otherwise omit to keep track counts aligned)
if info["has_audio"]:
ch = info["achannels"]
sr = info["arate"]
layout = info["alayout"] or ( "stereo" if ch == 2 else f"{ch}c" )
cmd += ["-f","lavfi","-i", f"anullsrc=channel_layout={layout}:sample_rate={sr}"]
else:
# still add a null audio to ensure mkv track counts match? no — input had no audio.
pass
# Map streams explicitly
# If there is audio in the input, produce both v+a; else only v.
# (FFmpeg will auto-create stream 0:v:0 and 1:a:0 if two inputs present.)
# Codec settings
cmd += ["-c:v", venc, "-pix_fmt", pix_fmt]
cmd += venc_extra
# Preserve basic color tags when present (helps SDR/HDR tagging)
if info["color_primaries"]:
cmd += ["-color_primaries", info["color_primaries"]]
if info["color_trc"]:
cmd += ["-color_trc", info["color_trc"]]
if info["color_space"]:
cmd += ["-colorspace", info["color_space"]]
if info["color_range"]:
cmd += ["-color_range", info["color_range"]] # "pc" or "tv"
if info["has_audio"]:
aenc, aenc_extra = pick_audio_encoder(info["acodec"])
cmd += ["-c:a", aenc] + aenc_extra
# Ensure channels & rate match exactly
cmd += ["-ac", str(info["achannels"]), "-ar", str(info["arate"])]
# Keep duration tight and avoid stuck encodes
cmd += ["-shortest", "-y", intro_file]
return cmd
# ---------- main pipeline ----------
def extend_with_intro(input_file, output_folder, duration=0.5):
base_name = os.path.basename(input_file)
intro_file = os.path.join(output_folder, f"{base_name}_intro.mkv")
output_file = os.path.join(output_folder, base_name)
info = get_media_info(input_file)
# Step 1: generate intro clip
ffmpeg_intro = build_intro_cmd(info, duration, intro_file)
print("FFmpeg intro command:\n " + " ".join(shlex.quote(x) for x in ffmpeg_intro))
subprocess.run(ffmpeg_intro, check=True)
# Step 2: mkvmerge intro + main (VA) + main (subs+attachments+chapters)
mkvmerge_cmd = [
"mkvmerge",
"-o", output_file,
"--append-to", "1:0:0:0,1:1:0:1",
intro_file,
"+",
"--no-subtitles", "--no-chapters", "--no-attachments", # VA only from main
input_file,
"--no-video", "--no-audio", # subs+attachments+chapters only
input_file,
]
print("mkvmerge command:\n " + " ".join(shlex.quote(x) for x in mkvmerge_cmd))
subprocess.run(mkvmerge_cmd, check=True)
os.remove(intro_file)
print(f"Extended {base_name} -> {output_file}")
def process_folder(input_folder, duration=0.5):
output_folder = os.path.join(input_folder, "output")
os.makedirs(output_folder, exist_ok=True)
mkv_files = glob.glob(os.path.join(input_folder, "*.mkv"))
if not mkv_files:
print("No MKV files found in the specified folder.")
return
for mkv_file in mkv_files:
try:
extend_with_intro(mkv_file, output_folder, duration=duration)
except subprocess.CalledProcessError as e:
# Surface stderr if present to help debug codec/param mismatches
print(f"Error processing {mkv_file}: {getattr(e, 'stderr', e)}")
if __name__ == "__main__":
if len(sys.argv) < 2 or len(sys.argv) > 3:
print("Usage: python extend_video.py /path/to/folder [duration_seconds]")
sys.exit(1)
input_folder = sys.argv[1]
if not os.path.isdir(input_folder):
print(f"Error: '{input_folder}' is not a valid folder path.")
sys.exit(1)
duration = float(sys.argv[2]) if len(sys.argv) == 3 else 1
print(f"Extending videos in folder: {input_folder} with intro duration: {duration} seconds\n")
process_folder(input_folder, duration=duration)

View File

@@ -0,0 +1,50 @@
import os
import sys
import ffmpeg
def extract_audio_from_videos(input_folder, output_folder):
# Create output folder if it doesn't exist
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# Common video extensions (you can add more if needed)
video_extensions = ('.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm')
# Iterate through all files in the input folder
for filename in os.listdir(input_folder):
if filename.lower().endswith(video_extensions):
video_path = os.path.join(input_folder, filename)
try:
# Create output filename (replace video extension with .m4a for AAC)
output_filename = os.path.splitext(filename)[0] + '.m4a'
output_path = os.path.join(output_folder, output_filename)
# Use ffmpeg to extract audio as AAC
stream = ffmpeg.input(video_path)
stream = ffmpeg.output(stream, output_path, acodec='aac', vn=True, format='ipod')
ffmpeg.run(stream)
print(f"Extracted audio from {filename} to {output_filename}")
except ffmpeg.Error as e:
print(f"Error processing {filename}: {str(e)}")
if __name__ == "__main__":
os.umask(0)
# Check if input folder is provided as command-line argument
if len(sys.argv) != 2:
print("Usage: python3 extract_audio_ffmpeg.py <input_folder>")
sys.exit(1)
# Get input folder from command-line argument
input_folder = sys.argv[1]
# Validate input folder
if not os.path.isdir(input_folder):
print(f"Error: {input_folder} is not a valid directory")
sys.exit(1)
# Set output folder (same level as input folder, named 'audio_output')
output_folder = os.path.join(os.path.dirname(input_folder), ".")
extract_audio_from_videos(input_folder, output_folder)

View File

@@ -0,0 +1,124 @@
import subprocess
import json
import os
import sys
import glob
def get_subtitle_streams(video_path):
"""Get information about subtitle streams in the video file."""
try:
cmd = [
'ffprobe',
'-v', 'error',
'-print_format', 'json',
'-show_streams',
'-select_streams', 's',
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
streams = json.loads(result.stdout).get('streams', [])
return streams
except subprocess.CalledProcessError as e:
print(f"Error probing video file '{video_path}': {e.stderr}")
return []
except json.JSONDecodeError as e:
print(f"Error parsing ffprobe output for '{video_path}': {e}")
return []
def extract_subtitles(video_path):
"""Extract all subtitle streams from a single video file in their original format."""
# Get the directory of the input video
output_dir = os.path.dirname(video_path) or '.'
# Get subtitle streams
subtitle_streams = get_subtitle_streams(video_path)
if not subtitle_streams:
print(f"No subtitle streams found in '{video_path}'.")
return
# Get the base name of the video file (without extension)
video_name = os.path.splitext(os.path.basename(video_path))[0]
# Map codec names to standard file extensions
codec_to_extension = {
'subrip': 'srt',
'ass': 'ass',
'webvtt': 'vtt',
'srt': 'srt', # In case codec is already named srt
# Add more mappings as needed
}
# Extract each subtitle stream
for index, stream in enumerate(subtitle_streams):
codec = stream.get('codec_name', 'unknown')
lang = stream.get('tags', {}).get('language', 'unknown')
# Use mapped extension if available, otherwise use codec name
extension = codec_to_extension.get(codec, codec)
output_file = os.path.join(output_dir, f"{video_name}.{lang}.{extension}")
try:
cmd = [
'ffmpeg',
'-i', video_path,
'-map', f'0:s:{index}',
'-c:s', 'copy',
output_file
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
print(f"Extracted subtitle stream {index} ({lang}, {codec}) to {output_file}")
except subprocess.CalledProcessError as e:
print(f"Error extracting subtitle stream {index} from '{video_path}' with copy: {e.stderr}")
# Fallback: Try extracting without copy
try:
cmd = [
'ffmpeg',
'-i', video_path,
'-map', f'0:s:{index}',
output_file
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
print(f"Fallback: Extracted subtitle stream {index} ({lang}, {codec}) to {output_file} without copy")
except subprocess.CalledProcessError as e:
print(f"Fallback failed for subtitle stream {index} from '{video_path}': {e.stderr}")
def process_input(input_path):
"""Process a single file or a folder containing video files."""
# Supported video extensions
video_extensions = ['*.mp4', '*.mkv', '*.avi', '*.mov', '*.wmv', '*.flv']
if os.path.isfile(input_path):
# Process single video file
if any(input_path.lower().endswith(ext[1:]) for ext in video_extensions):
extract_subtitles(input_path)
else:
print(f"Skipping '{input_path}': Not a recognized video file extension.")
elif os.path.isdir(input_path):
# Process all video files in the folder
video_files = []
for ext in video_extensions:
video_files.extend(glob.glob(os.path.join(input_path, ext)))
if not video_files:
print(f"No video files found in folder '{input_path}'.")
return
for video_file in video_files:
print(f"\nProcessing '{video_file}'...")
extract_subtitles(video_file)
else:
print(f"Error: '{input_path}' is neither a valid file nor a directory.")
def main():
if len(sys.argv) < 2:
print("Usage: python extract_subtitles.py <video_file_or_folder>")
sys.exit(1)
input_path = sys.argv[1]
if not os.path.exists(input_path):
print(f"Error: Path '{input_path}' does not exist.")
sys.exit(1)
process_input(input_path)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,67 @@
import subprocess
import json
from pathlib import Path
INPUT_DIR = Path("/Entertainment_1/Downloads/USCK/Series/Release.that.Witch.2026.S01.1080p.CR.WEB-DL.AAC2.0.H.264-[SeFree]")
OUTPUT_DIR = Path.joinpath(INPUT_DIR,"output")
SHIFT_MS = 28500 # +15 sec (negative = backward)
# choose one mode
TARGET_TRACK_IDS = None # example: [1,2]
TARGET_LANGS = ["tha","eng"] # example: ["eng", "jpn", "tha"]
# track types to shift
TRACK_TYPES = ["subtitles"] # ["audio", "subtitles"]
OUTPUT_DIR.mkdir(exist_ok=True)
def get_tracks(file):
cmd = ["mkvmerge", "-J", str(file)]
result = subprocess.run(cmd, capture_output=True, text=True)
return json.loads(result.stdout)["tracks"]
def select_tracks(tracks):
selected = []
for t in tracks:
if t["type"] not in TRACK_TYPES:
continue
track_id = t["id"]
lang = t["properties"].get("language")
if TARGET_TRACK_IDS:
if track_id in TARGET_TRACK_IDS:
selected.append(track_id)
elif TARGET_LANGS:
if lang in TARGET_LANGS:
selected.append(track_id)
return selected
for mkv_file in INPUT_DIR.glob("*.mkv"):
print(f"Processing: {mkv_file.name}")
tracks = get_tracks(mkv_file)
selected_ids = select_tracks(tracks)
if not selected_ids:
print(" No matching tracks")
continue
output_file = OUTPUT_DIR / mkv_file.name
cmd = ["mkvmerge", "-o", str(output_file)]
for track_id in selected_ids:
cmd.extend(["--sync", f"{track_id}:{SHIFT_MS}"])
cmd.append(str(mkv_file))
subprocess.run(cmd)
print("Done")

View File

@@ -0,0 +1,105 @@
import os
import re
import subprocess
import sys
import json
def get_duration(file):
"""Get duration in seconds using ffprobe"""
result = subprocess.run(
[
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "format=duration",
"-of", "json",
file
],
capture_output=True,
text=True,
check=True
)
data = json.loads(result.stdout)
return float(data["format"]["duration"])
def seconds_to_timestamp(seconds):
"""Convert seconds to MKV chapter timestamp"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = seconds % 60
return f"{h:02}:{m:02}:{s:06.3f}"
def create_chapter_file(files, chapter_file):
current = 0.0
with open(chapter_file, "w", encoding="utf-8") as f:
f.write("<?xml version=\"1.0\"?>\n")
f.write("<!DOCTYPE Chapters SYSTEM \"matroskachapters.dtd\">\n")
f.write("<Chapters>\n <EditionEntry>\n")
for i, file in enumerate(files):
start = seconds_to_timestamp(current)
title = f"Part {i+1}"
f.write(" <ChapterAtom>\n")
f.write(f" <ChapterTimeStart>{start}</ChapterTimeStart>\n")
f.write(" <ChapterDisplay>\n")
f.write(f" <ChapterString>{title}</ChapterString>\n")
f.write(" <ChapterLanguage>eng</ChapterLanguage>\n")
f.write(" </ChapterDisplay>\n")
f.write(" </ChapterAtom>\n")
current += get_duration(file)
f.write(" </EditionEntry>\n</Chapters>\n")
def append_videos_in_folder(folder_path):
episode_pattern = re.compile(r"(S\d+E\d+)")
video_groups = {}
for root, dirs, files in os.walk(folder_path):
for file in files:
match = episode_pattern.search(file)
if match:
episode = match.group(1)
full_path = os.path.join(root, file)
video_groups.setdefault(episode, []).append(full_path)
for episode, files in video_groups.items():
if len(files) > 1:
files.sort()
output_file = os.path.join(folder_path, f"{episode}.mkv")
chapter_file = os.path.join(folder_path, f"{episode}_chapters.xml")
print(f"Processing {episode}...")
create_chapter_file(files, chapter_file)
# mkvmerge append syntax
cmd = ["mkvmerge", "-o", output_file]
for i, f in enumerate(files):
if i == 0:
cmd.append(f)
else:
cmd.extend(["+", f])
cmd.extend(["--chapters", chapter_file])
try:
subprocess.run(cmd, check=True)
print(f"Created {output_file}")
except subprocess.CalledProcessError as e:
print(f"Error processing {episode}: {e}")
if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <folder_path>")
sys.exit(1)
append_videos_in_folder(sys.argv[1])

View File

@@ -0,0 +1,60 @@
import os
import subprocess
import glob
import sys
def trim_mkv_files(input_folder, trim_duration=0.375):
# Ensure FFmpeg is installed and ffmpeg is in PATH
ffmpeg_path = "ffmpeg" # Adjust path if ffmpeg is not in PATH
# Create output folder if it doesn't exist
output_folder = os.path.join(input_folder, "trimmed")
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# Find all MKV files in the input folder
mkv_files = glob.glob(os.path.join(input_folder, "*.mkv"))
if not mkv_files:
print("No MKV files found in the specified folder.")
return
for mkv_file in mkv_files:
# Get the base filename and create output filename
base_name = os.path.basename(mkv_file)
output_file = os.path.join(output_folder, f"{base_name}")
# Construct ffmpeg command to trim first second using stream copy
command = [
ffmpeg_path,
"-i", mkv_file,
"-ss", str(trim_duration),
"-c", "copy",
"-map", "0",
output_file
]
try:
# Execute the command
result = subprocess.run(command, capture_output=True, text=True, check=True)
print(f"Successfully trimmed {base_name} -> {output_file}")
except subprocess.CalledProcessError as e:
print(f"Error processing {base_name}: {e.stderr}")
except FileNotFoundError:
print("Error: ffmpeg not found. Ensure FFmpeg is installed and in PATH.")
break
if __name__ == "__main__":
# Check if folder path is provided as command-line argument
if len(sys.argv) != 2:
print("Usage: python trim_video.py /path/to/folder")
sys.exit(1)
input_folder = sys.argv[1]
# Validate folder path
if not os.path.isdir(input_folder):
print(f"Error: '{input_folder}' is not a valid folder path.")
sys.exit(1)
trim_mkv_files(input_folder)

View File

@@ -0,0 +1,57 @@
import os
import re
import subprocess
import sys
def get_episode_code(filename):
"""Extract episode code like S01E01 from filename."""
match = re.search(r"S\d+E\d+", filename)
return match.group(0) if match else None
def append_videos_in_chunks(folder_path, chunk_size=4):
video_files = []
# Collect all video files
for root, dirs, files in os.walk(folder_path):
for file in sorted(files):
if file.lower().endswith(('.mp4', '.mkv', '.mov', '.ts')):
full_path = os.path.join(root, file)
video_files.append(full_path)
# Process files in chunks of 4
for i in range(0, len(video_files), chunk_size):
chunk = video_files[i:i + chunk_size]
if not chunk:
continue
# Use the episode code of the first file in the chunk for output name
base_filename = os.path.basename(chunk[0])
episode_code = get_episode_code(base_filename) or f"group_{i//chunk_size + 1}"
output_file = os.path.join(folder_path, f"{episode_code}.mkv")
# Create the temporary list file
temp_list_file = os.path.join(folder_path, f"{episode_code}_list.txt")
with open(temp_list_file, "w", encoding="utf-8") as f:
for video in chunk:
f.write(f"file '{video}'\n")
# Run ffmpeg to concatenate the files
try:
print(f"Processing chunk starting with {episode_code}...")
subprocess.run([
"ffmpeg", "-f", "concat", "-safe", "0", "-i", temp_list_file,
"-map", "0", "-c", "copy", output_file
], check=True)
print(f"Created {output_file}")
except subprocess.CalledProcessError as e:
print(f"Error processing {episode_code}: {e}")
# finally:
# os.remove(temp_list_file)
if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <folder_path>")
sys.exit(1)
folder_path = sys.argv[1]
append_videos_in_chunks(folder_path)

View File

@@ -2475,7 +2475,7 @@ class dl:
final_dir = self.output_dir or config.directories.downloads
final_filename = title.get_filename(media_info, show_service=not no_source,season_overwrite=int(season_overwrite) if season_overwrite else None,episode_overwrite=int(episode_overwrite) if episode_overwrite else None)
audio_codec_suffix = muxed_audio_codecs.get(muxed_path)
if isinstance(title, Movie):
final_dir = Path.joinpath(Path(final_dir),"Movie")
elif isinstance(title, Episode):