Compare commits

..

21 Commits

Author SHA1 Message Date
sefree
16000bc576 . 2026-03-31 22:53:33 +07:00
sefree
edd918cb5c . 2026-03-31 21:39:22 +07:00
sefree
8fed8325ca . 2026-03-31 12:57:52 +07:00
sefree
ac89cbf545 add SeFree-Custom-Script 2026-03-31 12:57:14 +07:00
sefree
99bacaff3f . 2026-03-31 12:56:13 +07:00
sefree
853878f38b test-dev-branch 2026-03-31 12:51:16 +07:00
sefree
7de68e5c2a . 2026-03-31 12:50:16 +07:00
panitan103
2eb1d91987 . 2026-03-31 12:49:39 +07:00
panitan103
fcc9ccd74a . 2026-03-31 12:28:01 +07:00
panitan103
9fd5e50805 . 2026-03-31 12:27:05 +07:00
panitan103
cfcde1e587 . 2026-03-31 11:51:59 +07:00
panitan103
bbeb93efa1 . 2026-03-30 23:04:48 +07:00
panitan103
0d93afb4af . 2026-03-30 23:04:04 +07:00
panitan103
58db1935ea set forced-subs default to True 2026-03-30 22:03:47 +07:00
panitan103
fea97880ab remove android for BLBL in discord downloader 2026-03-30 11:48:06 +07:00
panitan103
e9ca391575 - handle season,episode overwrite for song,movie
- Make folder for each type of title
- Fix bug for discord downloader
2026-03-30 11:46:33 +07:00
panitan103
c2fafcd406 - add season_overwrite and episode_overwrite for Schedule work
- print File path at the end of file for Schedule work
- add discord downloader
2026-03-30 11:05:08 +07:00
Andy
fe1ccd085c Revert "fix(drm): add track ID fallback for mp4decrypt CBCS zero-KID content"
This reverts commit 23466cae8b.
2026-03-25 14:39:08 -06:00
Andy
23466cae8b fix(drm): add track ID fallback for mp4decrypt CBCS zero-KID content
Some CBCS-encrypted content has an all-zeros default_KID in the tenc box while the real KID is only in the PSSH boxes. mp4decrypt matches keys against the tenc KID, so it silently skips decryption when the provided KID doesn't match. This adds a track ID-based key fallback when a zero KID is detected, matching the existing shaka-packager zero-KID fallback behavior.
2026-03-25 14:36:26 -06:00
Andy
d4bc095f96 fix: update actions/checkout to v5 in release workflow 2026-03-17 09:16:46 -06:00
Andy
79e8184474 ci: enable manual triggering of release workflow 2026-03-17 09:10:50 -06:00
23 changed files with 2505 additions and 16 deletions

View File

@@ -1,6 +1,7 @@
name: Release
on:
workflow_dispatch:
push:
branches: [main]
paths:
@@ -16,7 +17,7 @@ jobs:
should_release: ${{ steps.version_check.outputs.should_release }}
new_version: ${{ steps.version_check.outputs.new_version }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
@@ -56,7 +57,7 @@ jobs:
if: needs.check-version.outputs.should_release == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Install uv
uses: astral-sh/setup-uv@v4

13
.gitignore vendored
View File

@@ -238,3 +238,16 @@ CLAUDE.md
marimo/_static/
marimo/_lsp/
__marimo__/
WVDs/
PRDs/
Logs/
Cookies/
Cache/
Temp/
bot_logs/
test*.py
Unshackle-Service-SeFree/
SeFree-Custom-Script/example_tid_decrypt.js
SeFree-Custom-Script/example_tid_decrypt.py

View File

@@ -0,0 +1,151 @@
import os
import glob
import subprocess
import sys
import langcodes
import pycountry
from ass_editor import ASS_Editor, attach_font
def find_files(folder_path):
"""Find all video (MP4, TS, MKV), AAC, and subtitle files in the folder."""
video_files = (glob.glob(os.path.join(folder_path, "*.mkv")) +
glob.glob(os.path.join(folder_path, "*.mp4")) +
glob.glob(os.path.join(folder_path, "*.ts")))
base_video_files = [f for f in video_files if not f.endswith("_DUB.mkv") and
not f.endswith("_DUB.mp4") and not f.endswith("_DUB.ts")]
dub_video_files = [f for f in video_files if f.endswith("_DUB.mkv") or
f.endswith("_DUB.mp4") or f.endswith("_DUB.ts")]
aac_files = (glob.glob(os.path.join(folder_path, "*.aac")) +
glob.glob(os.path.join(folder_path, "*.m4a")) +
glob.glob(os.path.join(folder_path, "*.mka")))
subtitle_files = (glob.glob(os.path.join(folder_path, "*.srt")) +
glob.glob(os.path.join(folder_path, "*.ass")) +
glob.glob(os.path.join(folder_path, "*.vtt")))
return base_video_files, dub_video_files, aac_files, subtitle_files
def get_base_name(file_path):
"""Extract base name by removing '_DUB' and file extension for video files."""
file_name = os.path.basename(file_path)
for ext in ["_DUB.mkv", "_DUB.mp4", "_DUB.ts"]:
if file_name.endswith(ext):
return file_name[:-len(ext)]
return os.path.splitext(file_name)[0]
def get_lang_code(file_path):
"""Helper to extract language code from filename."""
parts = os.path.splitext(os.path.basename(file_path))[0].split('.')
lang = parts[-1]
if len(lang) == 2:
try:
return langcodes.Language.make(lang).to_alpha3()
except:
return "und"
elif len(lang) == 3:
return lang
return "und"
def group_files(video_files, dub_video_files, aac_files, subtitle_files):
"""Group video, AAC, and subtitle files by base name."""
file_groups = {}
for video_file in video_files:
base_name = get_base_name(video_file)
file_groups[base_name] = {'video': video_file, 'dub_video': None, 'audio': [], 'subtitles': []}
for dub_video_file in dub_video_files:
base_name = get_base_name(dub_video_file)
if base_name in file_groups:
file_groups[base_name]['dub_video'] = dub_video_file
for aac_file in aac_files:
base_name = os.path.splitext(os.path.basename(aac_file))[0].split('.')[0]
lang = get_lang_code(aac_file)
if base_name in file_groups:
file_groups[base_name]['audio'].append((aac_file, lang))
for sub_file in subtitle_files:
base_name = os.path.splitext(os.path.basename(sub_file))[0].split('.')[0]
lang = get_lang_code(sub_file)
if base_name in file_groups:
file_groups[base_name]['subtitles'].append((sub_file, lang))
return file_groups
def embed_files(folder_path):
"""Embed audio and subtitles using mkvmerge."""
output_folder = os.path.join(folder_path, "Output")
os.makedirs(output_folder, exist_ok=True)
video_files, dub_video_files, aac_files, subtitle_files = find_files(folder_path)
file_groups = group_files(video_files, dub_video_files, aac_files, subtitle_files)
for base_name, files in file_groups.items():
video_file = files['video']
dub_video_file = files['dub_video']
audio_inputs = files['audio']
subtitle_inputs = files['subtitles']
output_file = os.path.join(output_folder, base_name + ".mkv")
if not video_file:
continue
# Start mkvmerge command
cmd = ["mkvmerge", "-o", output_file]
# 1. Base Video File
# Set a title for the first video track if desired
cmd.extend(["--track-name", "0:SeFree", video_file])
# 2. Dubbed Video File (if exists, add its audio/subs)
if dub_video_file:
# Assume track 0 is video (skipped), track 1 is Japanese, track 2 is Thai
# This logic depends on the internal track IDs of the dub file
cmd.extend([
"--no-video",
"--language", "2:jpn", "--track-name", "2:Japanese",
"--language", "3:tha", "--track-name", "3:Thai",
dub_video_file
])
# 3. External Audio Files
if not dub_video_file:
for aac_file, lang in audio_inputs:
# mkvmerge track ID for external single-track files is usually 0
cmd.extend([
"--language", f"0:{lang}",
"--track-name", f"0:{pycountry.languages.get(alpha_3=lang).name if lang != 'und' else 'Unknown'}",
aac_file
])
# 4. External Subtitle Files
for sub_file, lang in subtitle_inputs:
cmd.extend([
"--language", f"0:{lang}",
"--track-name", f"0:{pycountry.languages.get(alpha_3=lang).name if lang != 'und' else 'Unknown'}_BLBL",
sub_file
])
# cmd=attach_font(cmd, "BLBL")
print(f"Processing {base_name} with mkvmerge...")
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
print(f"✅ Successfully created {os.path.basename(output_file)}")
except subprocess.CalledProcessError as e:
print(f"❌ Error processing {base_name}: {e.stderr}")
def main():
os.umask(0o000)
if len(sys.argv) != 2:
print("Usage: python add_subtitles_to_mkv.py <folder_path>")
sys.exit(1)
folder_path = sys.argv[1].strip()
if not os.path.exists(folder_path):
print(f"Error: Folder '{folder_path}' does not exist!")
sys.exit(1)
embed_files(folder_path)
print("🎉 Processing complete!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,87 @@
import pysubs2
import shutil
import os
class ASS_Editor:
def __init__(self,ass_file):
base_name, extension = os.path.splitext(ass_file)
if extension.lower() == ".mp4":
shutil.move(ass_file, base_name + ".ass")
ass_file = base_name + ".ass"
self.input=ass_file
self.subs = pysubs2.load(self.input)
def batch_custom_style(self,font_name=None, font_size=64,output=None):
# Change font and size for every existing style
for style in self.subs.styles.values():
if font_name is not None:
style.fontname = font_name # <-- put your font family here
style.fontsize = font_size
self.save_out(self.input, output)
def save_out(self,input_file, output=None):
if output:
self.subs.save(input_file)
base_name, extension = os.path.splitext(input_file)
if extension.lower() == ".ass":
shutil.move(input_file, base_name + ".mp4")
self.input = base_name + ".mp4"
else:
self.subs.save(os.path.join(os.path.dirname(input_file), os.path.basename(input_file)+".modified.ass"))
def attach_font(cl,service,FONT_DIR="/root/VT.PR.WV/assets/fonts/{Service}"):
FONT_DIR = FONT_DIR.format(Service=service)
for font_file in os.listdir(FONT_DIR):
if font_file.lower().endswith((".ttf")):
cl.extend(["--attach-file", os.path.join(FONT_DIR, font_file),
"--attachment-mime-type", "font/ttf"])
elif font_file.lower().endswith((".otf")):
cl.extend(["--attach-file", os.path.join(FONT_DIR, font_file),
"--attachment-mime-type", "font/otf"])
return cl
def encode_uu(data: bytes, filename: str) -> str:
import io
out = io.StringIO()
out.write(f"begin 644 {filename}\n")
# encode in 45-byte chunks
for i in range(0, len(data), 45):
chunk = data[i:i+45]
# output length char
out.write(chr(32 + len(chunk)))
# process every 3 bytes
for j in range(0, len(chunk), 3):
triple = chunk[j:j+3]
# pad to 3 bytes
while len(triple) < 3:
triple += b"\0"
# 24 bits
b1, b2, b3 = triple
c1 = (b1 >> 2) & 0x3F
c2 = ((b1 << 4) & 0x30) | ((b2 >> 4) & 0xF)
c3 = ((b2 << 2) & 0x3C) | ((b3 >> 6) & 0x3)
c4 = b3 & 0x3F
for c in (c1, c2, c3, c4):
out.write(chr(32 + (c & 0x3F)))
out.write("\n")
out.write("`\nend\n")
return out.getvalue()
def main():
ass_file= "/root/VT.PR.WV/test.ass"
ass_editor= ASS_Editor(ass_file)
# print(ass_editor.subs.fonts_opaque.values())
font_path="/root/VT.PR.WV/assets/fonts/BLBL/NotoSansThai-Regular.ttf"
with open(font_path, "rb") as f:
font_bytes = f.read()
uue_text = encode_uu(font_bytes, "NotoSansThai-Regular.ttf")
print(uue_text[:200])
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,251 @@
#!/usr/bin/env python3
"""
Prepend N seconds of silence to audio files using ffmpeg as a subprocess.
- Primary path: concat demuxer + -c copy (avoids re-encoding main audio)
- Fallback: filter_complex concat (re-encodes output) when stream-copy can't be used
- Writes outputs to <folder>/output/
- Optional recursion, skip-existing, and extension filtering
Usage:
python extend_audio.py /path/to/folder [duration_seconds]
Examples:
python extend_audio.py "/folder_that_contain_many_audio" 1
python extend_audio.py "/folder_that_contain_many_audio" 1 --recursive --skip-existing
"""
import os
import sys
import json
import shlex
import subprocess
from pathlib import Path
# ------------------- helpers -------------------
def check_binary(name: str):
if shutil.which(name) is None:
print(f"Error: '{name}' not found on PATH. Install it and try again.")
sys.exit(1)
def run_probe(args):
return json.loads(subprocess.run(args, capture_output=True, text=True, check=True).stdout)
def get_audio_info(file_path):
"""
Probe first audio stream; return codec_name, sample_rate, channels, channel_layout.
Handle files without audio gracefully.
"""
try:
data = run_probe([
"ffprobe", "-v", "error",
"-select_streams", "a:0",
"-show_entries", "stream=codec_name,channels,sample_rate,channel_layout,bit_rate",
"-of", "json", file_path
])
s = data["streams"][0]
codec = s.get("codec_name")
channels = int(s.get("channels", 2))
sample_rate = int(s.get("sample_rate", 48000))
layout = s.get("channel_layout") or ("mono" if channels == 1 else "stereo")
bitrate = s.get("bit_rate")
return {
"codec": codec,
"channels": channels,
"sample_rate": sample_rate,
"layout": layout,
"bitrate": bitrate,
"has_audio": True
}
except Exception:
# If the container has no audio stream, still allow generating silence only.
return {
"codec": "aac",
"channels": 2,
"sample_rate": 48000,
"layout": "stereo",
"bitrate": None,
"has_audio": False
}
# ------------------- encoders -------------------
AUDIO_ENCODER_MAP = {
"aac": ("aac", ["-b:a", "192k"]),
"mp3": ("libmp3lame", ["-b:a", "192k"]),
"libmp3lame": ("libmp3lame", ["-b:a", "192k"]),
"opus": ("libopus", ["-b:a", "160k"]),
"vorbis": ("libvorbis", ["-q:a", "4"]),
"ac3": ("ac3", ["-b:a", "384k"]),
"eac3": ("eac3", ["-b:a", "384k"]),
"flac": ("flac", []),
"alac": ("alac", []),
"pcm_s16le": ("pcm_s16le", []),
"wav": ("pcm_s16le", []), # convenience
}
def pick_audio_encoder(codec_name):
# Normalize codec name to the encoder we want to use
if not codec_name:
return "aac", ["-b:a", "192k"]
enc, extra = AUDIO_ENCODER_MAP.get(codec_name, ("aac", ["-b:a", "192k"]))
return enc, extra
# ------------------- build intro (silence) -------------------
def build_silence_cmd(info, seconds, intro_path):
"""
Create a short silent file encoded with the SAME codec + params as the input,
so we can try concat demuxer + -c copy.
"""
enc, extra = pick_audio_encoder(info["codec"])
ch = info["channels"]
sr = info["sample_rate"]
layout = info["layout"]
# Important: tell ffmpeg that anullsrc is a filter input (-f lavfi). [3](https://stackoverflow.com/questions/42147512/ffmpeg-adding-silence-struggling-to-use-i-anullsrc-option)
cmd = [
"ffmpeg",
"-hide_banner", "-loglevel", "error", "-y",
"-f", "lavfi", "-t", str(seconds),
"-i", f"anullsrc=channel_layout={layout}:sample_rate={sr}:d={seconds}",
"-c:a", enc, *extra,
"-ac", str(ch), "-ar", str(sr),
intro_path
]
return cmd
# ------------------- concat methods -------------------
def concat_demuxer_copy(intro_path, input_path, output_path):
"""
Concatenate via the concat demuxer and stream copy. Requires same codec & params. [1](https://trac.ffmpeg.org/wiki/Concatenate)
"""
# Build a temporary list file
list_file = Path(output_path).with_suffix(".concat.txt")
list_file.write_text(f"file '{intro_path}'\nfile '{input_path}'\n", encoding="utf-8")
cmd = [
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
"-f", "concat", "-safe", "0",
"-i", str(list_file),
"-c", "copy",
output_path
]
print("Concat demuxer command:\n " + " ".join(shlex.quote(x) for x in cmd))
proc = subprocess.run(cmd)
try:
list_file.unlink()
except Exception:
pass
return proc.returncode
def concat_filter_reencode(input_path, seconds, output_path, info):
"""
Fallback: use filter_complex concat to prepend the silent intro and re-encode output. [1](https://trac.ffmpeg.org/wiki/Concatenate)
"""
enc, extra = pick_audio_encoder(info["codec"])
ch = info["channels"]
sr = info["sample_rate"]
layout = info["layout"]
# Build filter path: [silence][input] concat to 1 audio stream. [4](https://superuser.com/questions/579008/add-1-second-of-silence-to-audio-through-ffmpeg)
cmd = [
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
"-f", "lavfi", "-t", str(seconds),
"-i", f"anullsrc=channel_layout={layout}:sample_rate={sr}:d={seconds}",
"-i", input_path,
"-filter_complex", "[0:a][1:a]concat=n=2:v=0:a=1[a]", # concat filter for audio
"-map", "[a]",
"-c:a", enc, *extra,
"-ac", str(ch), "-ar", str(sr),
output_path
]
print("Concat filter (fallback) command:\n " + " ".join(shlex.quote(x) for x in cmd))
return subprocess.run(cmd).returncode
# ------------------- batch logic -------------------
import shutil
SUPPORTED_EXTS = [".wav", ".mp3", ".m4a", ".aac", ".flac", ".ogg", ".opus"]
def find_audio_files(folder: Path, recursive: bool, exts):
pattern = "**/*" if recursive else "*"
exts_norm = {e.lower() for e in exts}
return sorted([p for p in folder.glob(pattern) if p.is_file() and p.suffix.lower() in exts_norm])
def process_one(input_file: Path, out_dir: Path, seconds: float, skip_existing: bool):
info = get_audio_info(str(input_file))
output_file = out_dir / input_file.name
intro_file = out_dir / (input_file.stem + "_intro" + input_file.suffix)
if skip_existing and output_file.exists():
print(f"Skip (exists): {output_file}")
return
# 1) Create silence intro encoded like the source
intro_cmd = build_silence_cmd(info, seconds, str(intro_file))
print("Intro command:\n " + " ".join(shlex.quote(x) for x in intro_cmd))
rc = subprocess.run(intro_cmd).returncode
if rc != 0:
print(f"Failed to make intro for {input_file.name}")
return
# 2) Try demuxer (stream copy) first
rc = concat_demuxer_copy(str(intro_file), str(input_file), str(output_file))
if rc == 0:
print(f"OK (copy): {input_file.name} -> {output_file.name}")
else:
print(f"Demuxer concat failed; trying filter fallback…")
rc = concat_filter_reencode(str(input_file), seconds, str(output_file), info)
if rc == 0:
print(f"OK (re-encode): {input_file.name} -> {output_file.name}")
else:
print(f"FAILED: {input_file}")
# Cleanup the intro file
try:
intro_file.unlink()
except Exception:
pass
def process_folder(root: Path, seconds: float, recursive: bool, skip_existing: bool, output_dir_name: str, exts):
out_dir = root / output_dir_name
out_dir.mkdir(parents=True, exist_ok=True)
files = find_audio_files(root, recursive, exts)
# Don't process anything in output/
files = [f for f in files if out_dir not in f.parents]
if not files:
print("No matching audio files found.")
return
print(f"Found {len(files)} file(s). Output dir: {out_dir}")
for f in files:
try:
process_one(f, out_dir, seconds, skip_existing)
except KeyboardInterrupt:
print("\nInterrupted.")
sys.exit(130)
except Exception as e:
print(f"Error on '{f}': {e}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Prepend N seconds of silence to audio files. Uses concat demuxer when possible; falls back to re-encode."
)
parser.add_argument("folder", help="Folder containing audio files")
parser.add_argument("seconds", nargs="?", type=float, default=1.0, help="Silence duration in seconds (default: 1.0)")
parser.add_argument("--recursive", action="store_true", help="Process subfolders")
parser.add_argument("--skip-existing", action="store_true", help="Skip if output already exists")
parser.add_argument("--output-dir-name", default="output", help="Subfolder name for outputs (default: output)")
parser.add_argument("--exts", nargs="+", default=SUPPORTED_EXTS, help="Extensions to include")
args = parser.parse_args()
check_binary("ffmpeg")
check_binary("ffprobe")
root = Path(args.folder).expanduser().resolve()
if not root.exists() or not root.is_dir():
print(f"Error: '{root}' is not a folder.")
sys.exit(1)
print(f"Extending audio in: {root} | silence: {args.seconds}s\n")
process_folder(root, args.seconds, args.recursive, args.skip_existing, args.output_dir_name, args.exts)

View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""
Batch-shift text subtitles (ASS, SRT, TTML, VTT) by +N seconds.
Primary method:
- FFmpeg with `-itsoffset` to create a shifted external subtitle file,
preserving the original format/extension.
Fallback:
- Python per-format shifters for SRT, VTT, ASS, TTML (handles negative-time clamping).
Outputs:
- Writes to <folder>/output/ keeping the same file names.
Usage:
python shift_subtitles_batch.py /path/to/folder 1
python shift_subtitles_batch.py "/subs_folder" 1 --recursive --skip-existing
"""
import argparse
import re
import sys
import shutil
import subprocess
from pathlib import Path
from typing import List, Tuple, Optional
import xml.etree.ElementTree as ET
SUPPORTED_EXTS = [".srt", ".ass", ".vtt", ".ttml"]
FFMPEG_CODEC_BY_EXT = {
".srt": None, # copy is fine
".ass": "ass", # be explicit if needed
".vtt": "webvtt", # FFmpeg supports webvtt muxer/codec
".ttml": "ttml", # may not be available in all builds; fallback if fails
}
def check_binary(name: str):
if shutil.which(name) is None:
print(f"Error: '{name}' not found on PATH. Install it and try again.")
sys.exit(1)
def ffmpeg_shift(input_sub: Path, output_sub: Path, seconds: float) -> int:
"""
Try to shift a text subtitle with FFmpeg using -itsoffset.
Use -c:s <codec> when known; otherwise -c copy.
"""
ext = input_sub.suffix.lower()
codec = FFMPEG_CODEC_BY_EXT.get(ext)
cmd = [
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
"-itsoffset", str(seconds),
"-i", str(input_sub),
]
if codec:
cmd += ["-c:s", codec]
else:
cmd += ["-c", "copy"]
cmd += [str(output_sub)]
print("FFmpeg shift:\n " + " ".join(map(str, cmd)))
return subprocess.run(cmd).returncode
# ---------- Python fallback shifters ----------
def clamp_ms(ms: int) -> int:
return max(ms, 0)
# SRT: 00:00:05,123 --> 00:00:08,456
SRT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2}),(\d{3})")
def srt_to_ms(m: re.Match) -> int:
h, mi, s, ms = map(int, m.groups())
return ((h * 3600 + mi * 60 + s) * 1000) + ms
def ms_to_srt(ms: int) -> str:
ms = clamp_ms(ms)
h = ms // 3600000; ms %= 3600000
mi = ms // 60000; ms %= 60000
s = ms // 1000; ms %= 1000
return f"{h:02}:{mi:02}:{s:02},{ms:03}"
def shift_srt_text(text: str, offset_ms: int) -> str:
out_lines = []
for line in text.splitlines():
if "-->" in line:
parts = line.split("-->")
left = SRT_TIME.search(parts[0])
right = SRT_TIME.search(parts[1])
if left and right:
l_ms = srt_to_ms(left) + offset_ms
r_ms = srt_to_ms(right) + offset_ms
new_line = f"{ms_to_srt(l_ms)} --> {ms_to_srt(r_ms)}"
out_lines.append(new_line)
continue
out_lines.append(line)
return "\n".join(out_lines)
# VTT: WEBVTT header; times use '.' separator: 00:00:05.123 --> ...
VTT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{3})")
def vtt_to_ms(m: re.Match) -> int:
h, mi, s, ms = map(int, m.groups())
return ((h * 3600 + mi * 60 + s) * 1000) + ms
def ms_to_vtt(ms: int) -> str:
ms = clamp_ms(ms)
h = ms // 3600000; ms %= 3600000
mi = ms // 60000; ms %= 60000
s = ms // 1000; ms %= 1000
return f"{h:02}:{mi:02}:{s:02}.{ms:03}"
def shift_vtt_text(text: str, offset_ms: int) -> str:
out_lines = []
for i, line in enumerate(text.splitlines()):
if "-->" in line:
# Preserve cue settings like "line:-1 align:right" if they exist.
left, right = line.split("-->", 1)
# Left timestamp may have trailing settings; isolate the time token
lm = VTT_TIME.search(left)
rm = VTT_TIME.search(right)
if lm and rm:
l_ms = vtt_to_ms(lm) + offset_ms
r_ms = vtt_to_ms(rm) + offset_ms
# Replace only the matched portions; keep extra cue settings
left_new = VTT_TIME.sub(ms_to_vtt(l_ms), left, count=1)
right_new = VTT_TIME.sub(ms_to_vtt(r_ms), right, count=1)
out_lines.append(f"{left_new}-->{right_new}")
continue
out_lines.append(line)
return "\n".join(out_lines)
# ASS: times appear in Dialogue events; format line defines field order.
# Typical: "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text"
# Dialogue: 0,00:00:05.12,00:00:08.34,Default,...
ASS_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{2})")
def ass_to_cs(m: re.Match) -> int:
h, mi, s, cs = map(int, m.groups())
return ((h * 3600 + mi * 60 + s) * 100) + cs # centiseconds
def cs_to_ass(cs: int) -> str:
cs = max(cs, 0)
h = cs // (3600 * 100); cs %= (3600 * 100)
mi = cs // (60 * 100); cs %= (60 * 100)
s = cs // 100; cs %= 100
return f"{h:02}:{mi:02}:{s:02}.{cs:02}"
def shift_ass_text(text: str, offset_ms: int) -> str:
offset_cs = int(round(offset_ms / 10.0))
out_lines = []
fmt_fields: Optional[List[str]] = None
for line in text.splitlines():
if line.startswith("Format:"):
# Capture field order for reference
fmt_fields = [f.strip() for f in line.split(":", 1)[1].split(",")]
out_lines.append(line)
continue
if line.startswith("Dialogue:"):
parts = line.split(":", 1)[1].split(",", maxsplit=len(fmt_fields) or 10)
# Heuristic: Start = field named "Start" or position 1; End = "End" or position 2
try:
if fmt_fields:
start_idx = fmt_fields.index("Start")
end_idx = fmt_fields.index("End")
else:
start_idx, end_idx = 1, 2
sm = ASS_TIME.search(parts[start_idx])
em = ASS_TIME.search(parts[end_idx])
if sm and em:
s_cs = ass_to_cs(sm) + offset_cs
e_cs = ass_to_cs(em) + offset_cs
parts[start_idx] = ASS_TIME.sub(cs_to_ass(s_cs), parts[start_idx], count=1)
parts[end_idx] = ASS_TIME.sub(cs_to_ass(e_cs), parts[end_idx], count=1)
out_lines.append("Dialogue:" + ",".join(parts))
continue
except Exception:
pass
out_lines.append(line)
return "\n".join(out_lines)
# TTML: XML; adjust begin/end/dur attributes when present.
def parse_time_to_ms(value: str) -> Optional[int]:
"""
Accept forms like 'HH:MM:SS.mmm' or 'HH:MM:SS:FF' (rare) or 'XmYsZms'
Keep to simplest: HH:MM:SS.mmm and HH:MM:SS for typical TTML.
"""
m = re.match(r"^(\d{2}):(\d{2}):(\d{2})(?:\.(\d{1,3}))?$", value)
if m:
h, mi, s = map(int, m.groups()[:3])
ms = int((m.group(4) or "0").ljust(3, "0"))
return ((h * 3600 + mi * 60 + s) * 1000) + ms
return None
def ms_to_ttml(ms: int) -> str:
ms = clamp_ms(ms)
h = ms // 3600000; ms %= 3600000
mi = ms // 60000; ms %= 60000
s = ms // 1000; ms %= 1000
return f"{h:02}:{mi:02}:{s:02}.{ms:03}"
def shift_ttml_text(text: str, offset_ms: int) -> str:
try:
root = ET.fromstring(text)
# Common TTML namespaces vary; try to adjust attributes on any element
for elem in root.iter():
for attr in ("begin", "end", "dur"):
if attr in elem.attrib:
val = elem.attrib[attr]
ms = parse_time_to_ms(val)
if ms is not None:
if attr == "dur":
# duration stays the same when prepending silence
continue
elem.attrib[attr] = ms_to_ttml(ms + offset_ms)
return ET.tostring(root, encoding="unicode")
except Exception:
# If parsing fails, return original text
return text
def python_shift(input_sub: Path, output_sub: Path, seconds: float) -> bool:
"""
Format-aware shifting when FFmpeg fails or for negative offset clamping.
"""
ext = input_sub.suffix.lower()
text = input_sub.read_text(encoding="utf-8", errors="replace")
offset_ms = int(round(seconds * 1000))
if ext == ".srt":
out = shift_srt_text(text, offset_ms)
elif ext == ".vtt":
out = shift_vtt_text(text, offset_ms)
# Ensure WEBVTT header remains if present
if not out.lstrip().startswith("WEBVTT") and text.lstrip().startswith("WEBVTT"):
out = "WEBVTT\n\n" + out
elif ext == ".ass":
out = shift_ass_text(text, offset_ms)
elif ext == ".ttml":
out = shift_ttml_text(text, offset_ms)
else:
return False
output_sub.write_text(out, encoding="utf-8")
return True
# ---------- batch ----------
def find_sub_files(folder: Path, recursive: bool, exts: List[str]) -> List[Path]:
pattern = "**/*" if recursive else "*"
exts_norm = {e.lower() for e in exts}
return sorted([p for p in folder.glob(pattern) if p.is_file() and p.suffix.lower() in exts_norm])
def process_one(file: Path, out_dir: Path, seconds: float, skip_existing: bool):
out_path = out_dir / file.name
if skip_existing and out_path.exists():
print(f"Skip (exists): {out_path}")
return
# 1) Try FFmpeg first
rc = ffmpeg_shift(file, out_path, seconds)
if rc == 0:
print(f"OK (ffmpeg): {file.name} -> {out_path.name}")
return
print(f"FFmpeg failed; using Python fallback for {file.name}")
ok = python_shift(file, out_path, seconds)
if ok:
print(f"OK (python): {file.name} -> {out_path.name}")
else:
print(f"FAILED: {file}")
def main():
parser = argparse.ArgumentParser(
description="Batch shift text subtitles by N seconds (ASS/SRT/TTML/VTT)."
)
parser.add_argument("folder", help="Folder containing subtitle files")
parser.add_argument("seconds", nargs="?", type=float, default=1.0,
help="Constant time shift in seconds (default: +1.0)")
parser.add_argument("--recursive", action="store_true", help="Process subfolders")
parser.add_argument("--skip-existing", action="store_true", help="Skip if output already exists")
parser.add_argument("--exts", nargs="+", default=SUPPORTED_EXTS, help="Extensions to include")
parser.add_argument("--output-dir-name", default="output", help="Name of the output subfolder")
args = parser.parse_args()
check_binary("ffmpeg")
root = Path(args.folder).expanduser().resolve()
if not root.exists() or not root.is_dir():
print(f"Error: '{root}' is not a folder.")
sys.exit(1)
out_dir = root / args.output_dir_name
out_dir.mkdir(parents=True, exist_ok=True)
files = find_sub_files(root, args.recursive, args.exts)
files = [f for f in files if out_dir not in f.parents]
if not files:
print("No matching subtitle files found.")
sys.exit(0)
print(f"Found {len(files)} file(s). Output: {out_dir}")
for f in files:
try:
process_one(f, out_dir, args.seconds, args.skip_existing)
except KeyboardInterrupt:
print("\nInterrupted.")
sys.exit(130)
except Exception as e:
print(f"Error on '{f}': {e}")
print("Done.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,217 @@
import os
import subprocess
import glob
import sys
import json
import shlex
# ---------- ffprobe helpers ----------
def _run_probe(args):
return json.loads(
subprocess.run(args, capture_output=True, text=True, check=True).stdout
)
def get_media_info(file_path):
"""
Detect container-agnostic media properties required to reproduce the same
encoding format for an intro clip that can be safely appended.
"""
# Video stream (include codec, pix_fmt, color info, fps as *string* rational)
vprobe = _run_probe([
"ffprobe","-v","error",
"-select_streams","v:0",
"-show_entries","stream=codec_name,pix_fmt,width,height,r_frame_rate,color_range,color_space,color_transfer,color_primaries",
"-of","json", file_path
])
v = vprobe["streams"][0]
width = int(v["width"])
height = int(v["height"])
fps_rational = v.get("r_frame_rate","24000/1001") # keep rational string for exact match
vcodec = v.get("codec_name","h264")
pix_fmt = v.get("pix_fmt","yuv420p")
color_range = v.get("color_range") # "tv" or "pc" typically
color_space = v.get("color_space") # e.g. "bt709", "bt2020nc"
color_trc = v.get("color_transfer") # e.g. "smpte2084", "bt709"
color_primaries = v.get("color_primaries") # e.g. "bt2020", "bt709"
# Audio stream (channels, sample_rate, codec, layout if present)
# Some inputs have no audio; handle gracefully.
try:
aprobe = _run_probe([
"ffprobe","-v","error",
"-select_streams","a:0",
"-show_entries","stream=codec_name,channels,sample_rate,channel_layout",
"-of","json", file_path
])
a = aprobe["streams"][0]
achannels = int(a.get("channels", 2))
arate = int(a.get("sample_rate", 48000))
acodec = a.get("codec_name", "aac")
alayout = a.get("channel_layout") # may be None
has_audio = True
except Exception:
achannels = 0
arate = 0
acodec = None
alayout = None
has_audio = False
return {
"width": width, "height": height, "fps_rational": fps_rational,
"vcodec": vcodec, "pix_fmt": pix_fmt,
"color_range": color_range, "color_space": color_space,
"color_trc": color_trc, "color_primaries": color_primaries,
"has_audio": has_audio, "achannels": achannels,
"arate": arate, "acodec": acodec, "alayout": alayout
}
# ---------- encoder selection ----------
VIDEO_ENCODER_MAP = {
"av1": ("libaom-av1", ["-crf","30","-b:v","0","-cpu-used","6"]),
"hevc": ("libx265", ["-crf","20","-preset","medium"]),
"h264": ("libx264", ["-crf","20","-preset","medium"]),
"vp9": ("libvpx-vp9", ["-crf","32","-b:v","0","-row-mt","1"]),
"mpeg2video": ("mpeg2video", []),
"mpeg4": ("mpeg4", []),
# fall back handled below
}
AUDIO_ENCODER_MAP = {
"aac": ("aac", ["-b:a","192k"]),
"opus": ("libopus", ["-b:a","160k"]),
"ac3": ("ac3", ["-b:a","384k"]),
"eac3": ("eac3", ["-b:a","384k"]),
"flac": ("flac", []),
"vorbis":("libvorbis", ["-q:a","4"]),
"mp3": ("libmp3lame", ["-b:a","192k"]),
"pcm_s16le": ("pcm_s16le", []),
# more can be added as needed
}
def pick_video_encoder(vcodec):
enc, extra = VIDEO_ENCODER_MAP.get(vcodec, ("libx264", ["-crf","20","-preset","medium"]))
return enc, extra
def pick_audio_encoder(acodec):
if acodec is None:
return None, [] # no audio in input
enc, extra = AUDIO_ENCODER_MAP.get(acodec, ("aac", ["-b:a","192k"]))
return enc, extra
# ---------- intro generation ----------
def build_intro_cmd(info, duration, intro_file):
"""
Build an ffmpeg command that creates a silent (black) intro with *matching*
A/V format to the input.
"""
w, h = info["width"], info["height"]
fps = info["fps_rational"] # keep as rational string
venc, venc_extra = pick_video_encoder(info["vcodec"])
pix_fmt = info["pix_fmt"]
cmd = [
"ffmpeg",
"-f","lavfi","-i", f"color=black:s={w}x{h}:r={fps}:d={duration}",
]
# Audio input (only if input has audio; otherwise omit to keep track counts aligned)
if info["has_audio"]:
ch = info["achannels"]
sr = info["arate"]
layout = info["alayout"] or ( "stereo" if ch == 2 else f"{ch}c" )
cmd += ["-f","lavfi","-i", f"anullsrc=channel_layout={layout}:sample_rate={sr}"]
else:
# still add a null audio to ensure mkv track counts match? no — input had no audio.
pass
# Map streams explicitly
# If there is audio in the input, produce both v+a; else only v.
# (FFmpeg will auto-create stream 0:v:0 and 1:a:0 if two inputs present.)
# Codec settings
cmd += ["-c:v", venc, "-pix_fmt", pix_fmt]
cmd += venc_extra
# Preserve basic color tags when present (helps SDR/HDR tagging)
if info["color_primaries"]:
cmd += ["-color_primaries", info["color_primaries"]]
if info["color_trc"]:
cmd += ["-color_trc", info["color_trc"]]
if info["color_space"]:
cmd += ["-colorspace", info["color_space"]]
if info["color_range"]:
cmd += ["-color_range", info["color_range"]] # "pc" or "tv"
if info["has_audio"]:
aenc, aenc_extra = pick_audio_encoder(info["acodec"])
cmd += ["-c:a", aenc] + aenc_extra
# Ensure channels & rate match exactly
cmd += ["-ac", str(info["achannels"]), "-ar", str(info["arate"])]
# Keep duration tight and avoid stuck encodes
cmd += ["-shortest", "-y", intro_file]
return cmd
# ---------- main pipeline ----------
def extend_with_intro(input_file, output_folder, duration=0.5):
base_name = os.path.basename(input_file)
intro_file = os.path.join(output_folder, f"{base_name}_intro.mkv")
output_file = os.path.join(output_folder, base_name)
info = get_media_info(input_file)
# Step 1: generate intro clip
ffmpeg_intro = build_intro_cmd(info, duration, intro_file)
print("FFmpeg intro command:\n " + " ".join(shlex.quote(x) for x in ffmpeg_intro))
subprocess.run(ffmpeg_intro, check=True)
# Step 2: mkvmerge intro + main (VA) + main (subs+attachments+chapters)
mkvmerge_cmd = [
"mkvmerge",
"-o", output_file,
"--append-to", "1:0:0:0,1:1:0:1",
intro_file,
"+",
"--no-subtitles", "--no-chapters", "--no-attachments", # VA only from main
input_file,
"--no-video", "--no-audio", # subs+attachments+chapters only
input_file,
]
print("mkvmerge command:\n " + " ".join(shlex.quote(x) for x in mkvmerge_cmd))
subprocess.run(mkvmerge_cmd, check=True)
os.remove(intro_file)
print(f"Extended {base_name} -> {output_file}")
def process_folder(input_folder, duration=0.5):
output_folder = os.path.join(input_folder, "output")
os.makedirs(output_folder, exist_ok=True)
mkv_files = glob.glob(os.path.join(input_folder, "*.mkv"))
if not mkv_files:
print("No MKV files found in the specified folder.")
return
for mkv_file in mkv_files:
try:
extend_with_intro(mkv_file, output_folder, duration=duration)
except subprocess.CalledProcessError as e:
# Surface stderr if present to help debug codec/param mismatches
print(f"Error processing {mkv_file}: {getattr(e, 'stderr', e)}")
if __name__ == "__main__":
if len(sys.argv) < 2 or len(sys.argv) > 3:
print("Usage: python extend_video.py /path/to/folder [duration_seconds]")
sys.exit(1)
input_folder = sys.argv[1]
if not os.path.isdir(input_folder):
print(f"Error: '{input_folder}' is not a valid folder path.")
sys.exit(1)
duration = float(sys.argv[2]) if len(sys.argv) == 3 else 1
print(f"Extending videos in folder: {input_folder} with intro duration: {duration} seconds\n")
process_folder(input_folder, duration=duration)

View File

@@ -0,0 +1,50 @@
import os
import sys
import ffmpeg
def extract_audio_from_videos(input_folder, output_folder):
# Create output folder if it doesn't exist
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# Common video extensions (you can add more if needed)
video_extensions = ('.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm')
# Iterate through all files in the input folder
for filename in os.listdir(input_folder):
if filename.lower().endswith(video_extensions):
video_path = os.path.join(input_folder, filename)
try:
# Create output filename (replace video extension with .m4a for AAC)
output_filename = os.path.splitext(filename)[0] + '.m4a'
output_path = os.path.join(output_folder, output_filename)
# Use ffmpeg to extract audio as AAC
stream = ffmpeg.input(video_path)
stream = ffmpeg.output(stream, output_path, acodec='aac', vn=True, format='ipod')
ffmpeg.run(stream)
print(f"Extracted audio from {filename} to {output_filename}")
except ffmpeg.Error as e:
print(f"Error processing {filename}: {str(e)}")
if __name__ == "__main__":
os.umask(0)
# Check if input folder is provided as command-line argument
if len(sys.argv) != 2:
print("Usage: python3 extract_audio_ffmpeg.py <input_folder>")
sys.exit(1)
# Get input folder from command-line argument
input_folder = sys.argv[1]
# Validate input folder
if not os.path.isdir(input_folder):
print(f"Error: {input_folder} is not a valid directory")
sys.exit(1)
# Set output folder (same level as input folder, named 'audio_output')
output_folder = os.path.join(os.path.dirname(input_folder), ".")
extract_audio_from_videos(input_folder, output_folder)

View File

@@ -0,0 +1,124 @@
import subprocess
import json
import os
import sys
import glob
def get_subtitle_streams(video_path):
"""Get information about subtitle streams in the video file."""
try:
cmd = [
'ffprobe',
'-v', 'error',
'-print_format', 'json',
'-show_streams',
'-select_streams', 's',
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
streams = json.loads(result.stdout).get('streams', [])
return streams
except subprocess.CalledProcessError as e:
print(f"Error probing video file '{video_path}': {e.stderr}")
return []
except json.JSONDecodeError as e:
print(f"Error parsing ffprobe output for '{video_path}': {e}")
return []
def extract_subtitles(video_path):
"""Extract all subtitle streams from a single video file in their original format."""
# Get the directory of the input video
output_dir = os.path.dirname(video_path) or '.'
# Get subtitle streams
subtitle_streams = get_subtitle_streams(video_path)
if not subtitle_streams:
print(f"No subtitle streams found in '{video_path}'.")
return
# Get the base name of the video file (without extension)
video_name = os.path.splitext(os.path.basename(video_path))[0]
# Map codec names to standard file extensions
codec_to_extension = {
'subrip': 'srt',
'ass': 'ass',
'webvtt': 'vtt',
'srt': 'srt', # In case codec is already named srt
# Add more mappings as needed
}
# Extract each subtitle stream
for index, stream in enumerate(subtitle_streams):
codec = stream.get('codec_name', 'unknown')
lang = stream.get('tags', {}).get('language', 'unknown')
# Use mapped extension if available, otherwise use codec name
extension = codec_to_extension.get(codec, codec)
output_file = os.path.join(output_dir, f"{video_name}.{lang}.{extension}")
try:
cmd = [
'ffmpeg',
'-i', video_path,
'-map', f'0:s:{index}',
'-c:s', 'copy',
output_file
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
print(f"Extracted subtitle stream {index} ({lang}, {codec}) to {output_file}")
except subprocess.CalledProcessError as e:
print(f"Error extracting subtitle stream {index} from '{video_path}' with copy: {e.stderr}")
# Fallback: Try extracting without copy
try:
cmd = [
'ffmpeg',
'-i', video_path,
'-map', f'0:s:{index}',
output_file
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
print(f"Fallback: Extracted subtitle stream {index} ({lang}, {codec}) to {output_file} without copy")
except subprocess.CalledProcessError as e:
print(f"Fallback failed for subtitle stream {index} from '{video_path}': {e.stderr}")
def process_input(input_path):
"""Process a single file or a folder containing video files."""
# Supported video extensions
video_extensions = ['*.mp4', '*.mkv', '*.avi', '*.mov', '*.wmv', '*.flv']
if os.path.isfile(input_path):
# Process single video file
if any(input_path.lower().endswith(ext[1:]) for ext in video_extensions):
extract_subtitles(input_path)
else:
print(f"Skipping '{input_path}': Not a recognized video file extension.")
elif os.path.isdir(input_path):
# Process all video files in the folder
video_files = []
for ext in video_extensions:
video_files.extend(glob.glob(os.path.join(input_path, ext)))
if not video_files:
print(f"No video files found in folder '{input_path}'.")
return
for video_file in video_files:
print(f"\nProcessing '{video_file}'...")
extract_subtitles(video_file)
else:
print(f"Error: '{input_path}' is neither a valid file nor a directory.")
def main():
if len(sys.argv) < 2:
print("Usage: python extract_subtitles.py <video_file_or_folder>")
sys.exit(1)
input_path = sys.argv[1]
if not os.path.exists(input_path):
print(f"Error: Path '{input_path}' does not exist.")
sys.exit(1)
process_input(input_path)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,91 @@
import subprocess
import json
from pathlib import Path
INPUT_DIR = Path("/Entertainment_1/Downloads/USCK/Series/Hokkaido.Gals.Are.Super.Adorable.2024.S01.1080p.CR.WEB-DL.DUAL.AAC2.0.H.264-[SeFree]")
OUTPUT_DIR = INPUT_DIR / "output"
# remove only forced thai
TARGET_LANGS = ["tha"]
REMOVE_FORCED = True
# track types to apply
TRACK_TYPES = ["subtitles"]
OUTPUT_DIR.mkdir(exist_ok=True)
def get_tracks(file):
cmd = ["mkvmerge", "-J", str(file)]
result = subprocess.run(cmd, capture_output=True, text=True)
return json.loads(result.stdout)["tracks"]
def split_tracks(tracks):
keep = {
"video": [],
"audio": [],
"subtitles": [],
}
removed = []
for t in tracks:
track_id = t["id"]
track_type = t["type"]
props = t["properties"]
lang = props.get("language")
forced = props.get("forced_track", False)
should_remove = False
# remove ONLY forced thai subtitles
if (
track_type in TRACK_TYPES
and TARGET_LANGS
and lang in TARGET_LANGS
and REMOVE_FORCED
and forced
):
should_remove = True
if should_remove:
removed.append(track_id)
else:
if track_type in keep:
keep[track_type].append(track_id)
return keep, removed
for mkv_file in INPUT_DIR.glob("*.mkv"):
print(f"Processing: {mkv_file.name}")
tracks = get_tracks(mkv_file)
keep, removed = split_tracks(tracks)
if not removed:
print(" No forced Thai tracks")
continue
output_file = OUTPUT_DIR / mkv_file.name
cmd = ["mkvmerge", "-o", str(output_file)]
if keep["video"]:
cmd.extend(["--video-tracks", ",".join(map(str, keep["video"]))])
if keep["audio"]:
cmd.extend(["--audio-tracks", ",".join(map(str, keep["audio"]))])
if keep["subtitles"]:
cmd.extend(["--subtitle-tracks", ",".join(map(str, keep["subtitles"]))])
cmd.append(str(mkv_file))
subprocess.run(cmd)
print(f" Removed forced Thai tracks: {removed}")
print("Done")

View File

@@ -0,0 +1,67 @@
import subprocess
import json
from pathlib import Path
INPUT_DIR = Path("/Entertainment_1/Downloads/USCK/Series/Release.that.Witch.2026.S01.1080p.CR.WEB-DL.AAC2.0.H.264-[SeFree]")
OUTPUT_DIR = Path.joinpath(INPUT_DIR,"output")
SHIFT_MS = 28500 # +15 sec (negative = backward)
# choose one mode
TARGET_TRACK_IDS = None # example: [1,2]
TARGET_LANGS = ["tha","eng"] # example: ["eng", "jpn", "tha"]
# track types to shift
TRACK_TYPES = ["subtitles"] # ["audio", "subtitles"]
OUTPUT_DIR.mkdir(exist_ok=True)
def get_tracks(file):
cmd = ["mkvmerge", "-J", str(file)]
result = subprocess.run(cmd, capture_output=True, text=True)
return json.loads(result.stdout)["tracks"]
def select_tracks(tracks):
selected = []
for t in tracks:
if t["type"] not in TRACK_TYPES:
continue
track_id = t["id"]
lang = t["properties"].get("language")
if TARGET_TRACK_IDS:
if track_id in TARGET_TRACK_IDS:
selected.append(track_id)
elif TARGET_LANGS:
if lang in TARGET_LANGS:
selected.append(track_id)
return selected
for mkv_file in INPUT_DIR.glob("*.mkv"):
print(f"Processing: {mkv_file.name}")
tracks = get_tracks(mkv_file)
selected_ids = select_tracks(tracks)
if not selected_ids:
print(" No matching tracks")
continue
output_file = OUTPUT_DIR / mkv_file.name
cmd = ["mkvmerge", "-o", str(output_file)]
for track_id in selected_ids:
cmd.extend(["--sync", f"{track_id}:{SHIFT_MS}"])
cmd.append(str(mkv_file))
subprocess.run(cmd)
print("Done")

View File

@@ -0,0 +1,105 @@
import os
import re
import subprocess
import sys
import json
def get_duration(file):
"""Get duration in seconds using ffprobe"""
result = subprocess.run(
[
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "format=duration",
"-of", "json",
file
],
capture_output=True,
text=True,
check=True
)
data = json.loads(result.stdout)
return float(data["format"]["duration"])
def seconds_to_timestamp(seconds):
"""Convert seconds to MKV chapter timestamp"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = seconds % 60
return f"{h:02}:{m:02}:{s:06.3f}"
def create_chapter_file(files, chapter_file):
current = 0.0
with open(chapter_file, "w", encoding="utf-8") as f:
f.write("<?xml version=\"1.0\"?>\n")
f.write("<!DOCTYPE Chapters SYSTEM \"matroskachapters.dtd\">\n")
f.write("<Chapters>\n <EditionEntry>\n")
for i, file in enumerate(files):
start = seconds_to_timestamp(current)
title = f"Part {i+1}"
f.write(" <ChapterAtom>\n")
f.write(f" <ChapterTimeStart>{start}</ChapterTimeStart>\n")
f.write(" <ChapterDisplay>\n")
f.write(f" <ChapterString>{title}</ChapterString>\n")
f.write(" <ChapterLanguage>eng</ChapterLanguage>\n")
f.write(" </ChapterDisplay>\n")
f.write(" </ChapterAtom>\n")
current += get_duration(file)
f.write(" </EditionEntry>\n</Chapters>\n")
def append_videos_in_folder(folder_path):
episode_pattern = re.compile(r"(S\d+E\d+)")
video_groups = {}
for root, dirs, files in os.walk(folder_path):
for file in files:
match = episode_pattern.search(file)
if match:
episode = match.group(1)
full_path = os.path.join(root, file)
video_groups.setdefault(episode, []).append(full_path)
for episode, files in video_groups.items():
if len(files) > 1:
files.sort()
output_file = os.path.join(folder_path, f"{episode}.mkv")
chapter_file = os.path.join(folder_path, f"{episode}_chapters.xml")
print(f"Processing {episode}...")
create_chapter_file(files, chapter_file)
# mkvmerge append syntax
cmd = ["mkvmerge", "-o", output_file]
for i, f in enumerate(files):
if i == 0:
cmd.append(f)
else:
cmd.extend(["+", f])
cmd.extend(["--chapters", chapter_file])
try:
subprocess.run(cmd, check=True)
print(f"Created {output_file}")
except subprocess.CalledProcessError as e:
print(f"Error processing {episode}: {e}")
if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <folder_path>")
sys.exit(1)
append_videos_in_folder(sys.argv[1])

View File

@@ -0,0 +1,60 @@
import os
import subprocess
import glob
import sys
def trim_mkv_files(input_folder, trim_duration=0.375):
# Ensure FFmpeg is installed and ffmpeg is in PATH
ffmpeg_path = "ffmpeg" # Adjust path if ffmpeg is not in PATH
# Create output folder if it doesn't exist
output_folder = os.path.join(input_folder, "trimmed")
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# Find all MKV files in the input folder
mkv_files = glob.glob(os.path.join(input_folder, "*.mkv"))
if not mkv_files:
print("No MKV files found in the specified folder.")
return
for mkv_file in mkv_files:
# Get the base filename and create output filename
base_name = os.path.basename(mkv_file)
output_file = os.path.join(output_folder, f"{base_name}")
# Construct ffmpeg command to trim first second using stream copy
command = [
ffmpeg_path,
"-i", mkv_file,
"-ss", str(trim_duration),
"-c", "copy",
"-map", "0",
output_file
]
try:
# Execute the command
result = subprocess.run(command, capture_output=True, text=True, check=True)
print(f"Successfully trimmed {base_name} -> {output_file}")
except subprocess.CalledProcessError as e:
print(f"Error processing {base_name}: {e.stderr}")
except FileNotFoundError:
print("Error: ffmpeg not found. Ensure FFmpeg is installed and in PATH.")
break
if __name__ == "__main__":
# Check if folder path is provided as command-line argument
if len(sys.argv) != 2:
print("Usage: python trim_video.py /path/to/folder")
sys.exit(1)
input_folder = sys.argv[1]
# Validate folder path
if not os.path.isdir(input_folder):
print(f"Error: '{input_folder}' is not a valid folder path.")
sys.exit(1)
trim_mkv_files(input_folder)

View File

@@ -0,0 +1,57 @@
import os
import re
import subprocess
import sys
def get_episode_code(filename):
"""Extract episode code like S01E01 from filename."""
match = re.search(r"S\d+E\d+", filename)
return match.group(0) if match else None
def append_videos_in_chunks(folder_path, chunk_size=4):
video_files = []
# Collect all video files
for root, dirs, files in os.walk(folder_path):
for file in sorted(files):
if file.lower().endswith(('.mp4', '.mkv', '.mov', '.ts')):
full_path = os.path.join(root, file)
video_files.append(full_path)
# Process files in chunks of 4
for i in range(0, len(video_files), chunk_size):
chunk = video_files[i:i + chunk_size]
if not chunk:
continue
# Use the episode code of the first file in the chunk for output name
base_filename = os.path.basename(chunk[0])
episode_code = get_episode_code(base_filename) or f"group_{i//chunk_size + 1}"
output_file = os.path.join(folder_path, f"{episode_code}.mkv")
# Create the temporary list file
temp_list_file = os.path.join(folder_path, f"{episode_code}_list.txt")
with open(temp_list_file, "w", encoding="utf-8") as f:
for video in chunk:
f.write(f"file '{video}'\n")
# Run ffmpeg to concatenate the files
try:
print(f"Processing chunk starting with {episode_code}...")
subprocess.run([
"ffmpeg", "-f", "concat", "-safe", "0", "-i", temp_list_file,
"-map", "0", "-c", "copy", output_file
], check=True)
print(f"Created {output_file}")
except subprocess.CalledProcessError as e:
print(f"Error processing {episode_code}: {e}")
# finally:
# os.remove(temp_list_file)
if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <folder_path>")
sys.exit(1)
folder_path = sys.argv[1]
append_videos_in_chunks(folder_path)

Submodule Unshackle-Service-SeFree added at d464d0fac8

View File

@@ -68,6 +68,8 @@ dependencies = [
"language-data>=1.4.0",
"wasmtime>=41.0.0",
"animeapi-py>=0.6.0",
"discord-py>=2.7.1",
"dotenv>=0.9.9",
]
[project.urls]

View File

@@ -392,7 +392,7 @@ class dl:
default=[],
help="Required subtitle languages. Downloads all subtitles only if these languages exist. Cannot be used with --s-lang.",
)
@click.option("-fs", "--forced-subs", is_flag=True, default=False, help="Include forced subtitle tracks.")
@click.option("-fs", "--forced-subs", is_flag=True, default=True, help="Include forced subtitle tracks.")
@click.option(
"--exact-lang",
is_flag=True,
@@ -519,6 +519,11 @@ class dl:
default=False,
help="Continue with best available quality if requested resolutions are not available.",
)
@click.option("-so", "--season-overwrite",type=int, default=None,
help="Overwrite season number")
@click.option("-eo", "--episode-overwrite",type=int, default=None,
help="Overwrite episode number")
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> dl:
return dl(ctx, **kwargs)
@@ -1000,6 +1005,10 @@ class dl:
worst: bool,
best_available: bool,
split_audio: Optional[bool] = None,
season_overwrite: Optional[int] = None,
episode_overwrite: Optional[int] = None,
*_: Any,
**__: Any,
) -> None:
@@ -2464,12 +2473,18 @@ class dl:
for muxed_path in muxed_paths:
media_info = MediaInfo.parse(muxed_path)
final_dir = self.output_dir or config.directories.downloads
final_filename = title.get_filename(media_info, show_service=not no_source)
final_filename = title.get_filename(media_info, show_service=not no_source,season_overwrite=int(season_overwrite) if season_overwrite else None,episode_overwrite=int(episode_overwrite) if episode_overwrite else None)
audio_codec_suffix = muxed_audio_codecs.get(muxed_path)
if not no_folder and isinstance(title, (Episode, Song)):
final_dir /= title.get_filename(media_info, show_service=not no_source, folder=True)
if isinstance(title, Movie):
final_dir = Path.joinpath(Path(final_dir),"Movie")
elif isinstance(title, Episode):
final_dir = Path.joinpath(Path(final_dir),"Series")
elif isinstance(title, Song):
final_dir = Path.joinpath(Path(final_dir),"Song")
if not no_folder and isinstance(title, (Episode, Song)):
final_dir /= title.get_filename(media_info, show_service=not no_source, folder=True,season_overwrite=int(season_overwrite) if season_overwrite else None)
final_dir.mkdir(parents=True, exist_ok=True)
final_path = final_dir / f"{final_filename}{muxed_path.suffix}"
template_type = (
@@ -2500,6 +2515,9 @@ class dl:
console.print(
Padding(f":tada: Title downloaded in [progress.elapsed]{title_dl_time}[/]!", (0, 5, 1, 5))
)
console.print(
Padding(f"File path - {final_path}", (0, 5, 1, 5))
)
# update cookies
cookie_file = self.get_cookie_path(self.service, self.profile)
@@ -2510,6 +2528,7 @@ class dl:
console.print(Padding(f"Processed all titles in [progress.elapsed]{dl_time}", (0, 5, 1, 5)))
def prepare_drm(
self,
drm: DRM_T,

View File

@@ -78,14 +78,25 @@ class Episode(Title):
self.year = year
self.description = description
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True,season_overwrite=None,episode_overwrite=None) -> dict:
"""Build template context dictionary from MediaInfo."""
context = self._build_base_template_context(media_info, show_service)
context["title"] = self.title.replace("$", "S")
context["year"] = self.year or ""
context["season"] = f"S{self.season:02}"
context["episode"] = f"E{self.number:02}"
context["season_episode"] = f"S{self.season:02}E{self.number:02}"
if season_overwrite is not None:
season = season_overwrite
else:
season = self.season
if episode_overwrite is not None:
episode = episode_overwrite
else:
episode = self.number
context["season"] = f"S{season:02}"
context["episode"] = f"E{episode:02}"
context["season_episode"] = f"S{season:02}E{episode:02}"
context["episode_name"] = self.name or ""
return context
@@ -98,7 +109,7 @@ class Episode(Title):
name=self.name or "",
).strip()
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True,season_overwrite=None,episode_overwrite=None) -> str:
if folder:
series_template = config.output_template.get("series")
if series_template:
@@ -114,7 +125,7 @@ class Episode(Title):
formatter = TemplateFormatter(folder_template)
context = self._build_template_context(media_info, show_service)
context['season'] = f"S{self.season:02}"
context['season'] = f"S{self.season:02}" if not season_overwrite else f"S{season_overwrite:02}"
folder_name = formatter.format(context)
@@ -130,7 +141,7 @@ class Episode(Title):
return sanitize_filename(name, " ")
formatter = TemplateFormatter(config.output_template["series"])
context = self._build_template_context(media_info, show_service)
context = self._build_template_context(media_info, show_service,season_overwrite,episode_overwrite)
return formatter.format(context)

View File

@@ -57,7 +57,7 @@ class Movie(Title):
return f"{self.name} ({self.year})"
return self.name
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True,season_overwrite=None,episode_overwrite=None) -> str:
if folder:
name = f"{self.name}"
if self.year:

View File

@@ -92,7 +92,7 @@ class Song(Title):
context["disc"] = f"{self.disc:02}" if self.disc > 1 else ""
return context
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True,season_overwrite=None,episode_overwrite=None) -> str:
if folder:
name = f"{self.artist} - {self.album}"
if self.year:

View File

@@ -175,7 +175,7 @@ class Title:
return context
@abstractmethod
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True,season_overwrite=None,episode_overwrite=None) -> str:
"""
Get a Filename for this Title with the provided Media Info.
All filenames should be sanitized with the sanitize_filename() utility function.

839
usk_downloader_discord.py Executable file
View File

@@ -0,0 +1,839 @@
import discord
from discord.ext import commands
from discord import app_commands
import os
import json
import asyncio
from datetime import datetime
from dotenv import load_dotenv
from typing import Optional
import subprocess
# Load environment variables
load_dotenv()
# Bot configuration
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
class DownloadBot(commands.Bot):
def __init__(self):
super().__init__(
command_prefix='!',
intents=intents,
help_command=None
)
# Initialize data storage (in-memory for this example)
# In production, you'd want to use a database
self.download_queue = []
self.download_history = []
self.authorized_users = []
# Load data from files if they exist
self.load_data()
def load_data(self):
"""Load persistent bot_logs from JSON files"""
try:
if os.path.exists('bot_logs/download_history.json'):
with open('bot_logs/download_history.json', 'r') as f:
self.download_history = json.load(f)
if os.path.exists('bot_logs/authorized_users.json'):
with open('bot_logs/authorized_users.json', 'r') as f:
self.authorized_users = set(json.load(f))
except Exception as e:
print(f"Error loading data: {e}")
def save_data(self):
"""Save persistent data to JSON files"""
try:
os.makedirs('bot_logs', exist_ok=True)
with open('bot_logs/download_history.json', 'w') as f:
json.dump(self.download_history, f, indent=2)
with open('bot_logs/authorized_users.json', 'w') as f:
json.dump(list(self.authorized_users), f, indent=2)
except Exception as e:
print(f"Error saving bot_logs: {e}")
async def setup_hook(self):
"""Called when the bot is starting up"""
print(f"Logged in as {self.user} (ID: {self.user.id})")
print("------")
# Sync slash commands
try:
synced = await self.tree.sync()
print(f"Synced {len(synced)} command(s)")
# threading.Thread(target=vt_worker).start() # Start the download worker in the background
except Exception as e:
print(f"Failed to sync commands: {e}")
bot = DownloadBot()
# Helper function to check if user is authorized
def is_authorized():
def predicate(interaction: discord.Interaction):
return interaction.user.id in bot.authorized_users or interaction.user.guild_permissions.administrator
return app_commands.check(predicate)
@bot.event
async def on_ready():
print(f'{bot.user} has connected to Discord!')
activity = discord.Game(name="Managing downloads | /help")
await bot.change_presence(activity=activity)
asyncio.create_task(usk_worker())
# /download command
@bot.tree.command(name="download", description="Download a file from a URL|ID")
@app_commands.describe(
service="Service to use for downloading (e.g., AMZN, NF, HS, VIU, TID, MMAX, BLBL)",
url="The URL|ID to download from",
keys="Get keys only (default: False, True to get keys only)",
quality="Desired video quality (default: 1080)",
codec="Video codec to use (default: h265)",
range_="Dynamic range to use (default: SDR)",
bitrate="Video bitrate to use (default: Max)",
start_season="Season to download (optional, e.g., 1)",
start_episode="Specific episodes to download (e.g., 1)",
end_season="Season to download (optional, e.g., 2)",
end_episode="Specific episodes to download (e.g., 2)",
video_language="Video language(s) to use (default: orig)",
audio_language="Audio language(s) to use (default: orig,th)",
subtitle_language="Subtitle language(s) to use (default: th,en)",
audio_channel="Audio channel(s) to use (default: 2.0,5.1,Best)",
worst="Download worst quality available (default: False, True to download worst)",
proxy="Proxy to use (optional, e.g., http://username:password@proxyserver:port or nordvpn country code/id)",
### Unshackle options
no_cache="Disable vault cache (default: False, True to disable cache)",
## for iTunes
store_front="For iTunes: Store front to use (default: 143475)",
### for BiliBili or Other
season="For BiliBili: Season to download (optional, e.g., 1)",
title_language="For BiliBili | Laftel: Title language(s) to use (default: ja)",
original_url="For BiliBili: Original URL to download from (optional, e.g., https://www.bilibili.com/video/BV1xxxxxx)",
original_language="For BiliBili: Original language(s) to use (default: ja)",
movie="For BiliBili | Laftel: Is this a movie? (default: False, True for movies, False for series)", # New parameter to indicate if it's a movie
)
@app_commands.choices(keys=[
app_commands.Choice(name="True", value='True'),
app_commands.Choice(name="False", value='False'),
])
@app_commands.choices(service=[
app_commands.Choice(name="Amazon Prime", value="AMZN"),
app_commands.Choice(name="Netflix", value="NF"),
app_commands.Choice(name="Hotstar", value="HS"),
app_commands.Choice(name="VIU", value="VIU"),
app_commands.Choice(name="TrueID", value="TID"),
app_commands.Choice(name="Mono Max", value="MMAX"),
app_commands.Choice(name="BiliBili", value="BLBL"),
app_commands.Choice(name="FutureSkill", value="FSK"),
app_commands.Choice(name="HBO Max", value="HMAX"),
app_commands.Choice(name="iQIYI", value="IQ"),
app_commands.Choice(name="WeTV", value="WTV"),
app_commands.Choice(name="Crunchyroll", value="CR"),
app_commands.Choice(name="Laftel", value="LT"),
app_commands.Choice(name="Flixer", value="FLX"),
app_commands.Choice(name="iTune", value="IT"),
app_commands.Choice(name="Apple TV+", value="ATVP"),
app_commands.Choice(name="TrueVisionNow", value="TVN"),
app_commands.Choice(name="OneD", value="OND"),
app_commands.Choice(name="HIDIVE", value="HIDI"),
])
# @app_commands.choices(quality=[
# app_commands.Choice(name="2160p", value="2160"),
# app_commands.Choice(name="1440p", value="1440"),
# app_commands.Choice(name="1080p", value="1080"),
# app_commands.Choice(name="720p", value="720"),
# app_commands.Choice(name="480p", value="480"),
# app_commands.Choice(name="Best", value="Best"),
# ])
@app_commands.choices(codec=[
app_commands.Choice(name="H264", value="H.264"),
app_commands.Choice(name="H265", value="H.265"),
app_commands.Choice(name="AV1", value="AV1"),
app_commands.Choice(name="VP9", value="VP9"),
])
@app_commands.choices(range_=[
app_commands.Choice(name="HDR", value="HDR"),
app_commands.Choice(name="SDR", value="SDR"),
app_commands.Choice(name="DV", value="DV"),
app_commands.Choice(name="DV+HDR", value="DV+HDR"),
])
@app_commands.choices(audio_channel=[
app_commands.Choice(name="2.0", value="2.0"),
app_commands.Choice(name="5.1", value="5.1"),
app_commands.Choice(name="Best", value= "Best"),
])
@app_commands.choices(worst=[
app_commands.Choice(name="True", value='True'),
app_commands.Choice(name="False", value='False'),
])
@app_commands.choices(movie=[
app_commands.Choice(name="True", value='True'),
app_commands.Choice(name="False", value='False'),
])
@app_commands.choices(no_cache=[
app_commands.Choice(name="True", value=1),
app_commands.Choice(name="False", value=0),
])
async def download_command(
interaction: discord.Interaction,
service: str,
url: str,
keys: Optional[str] = 'False',
quality: Optional[str] = '1080',
codec: Optional[str] = "h.265",
range_: Optional[str] = "SDR",
bitrate: Optional[str] = "Max",
start_season: Optional[int] = None,
start_episode: Optional[int] = None,
end_season: Optional[int] = None,
end_episode: Optional[int] = None,
video_language: Optional[str] = "all",
audio_language: Optional[str] = "orig,th",
subtitle_language: Optional[str] = "th,en",
audio_channel: Optional[str] = "Best",
worst: Optional[str] = 'False',
proxy: Optional[str] = None,
no_cache: Optional[int] = 0, # 1 for True and 0 for False
# title_cache: Optional[int] = 0,
# iTunes specific parameters
store_front: Optional[str] = "143475",
# BiliBili specific parameters
season: Optional[int] = None,
title_language: Optional[str] = "ja",
original_url: Optional[str] = None,
original_language: Optional[str] = "ja",
movie: Optional[str] = 'False',
):
# Check if user has permission
if not (interaction.user.id in bot.authorized_users or interaction.user.guild_permissions.administrator):
embed = discord.Embed(
title="❌ Access Denied",
description="You don't have permission to use this command.",
color=0xff0000
)
await interaction.response.send_message(embed=embed, ephemeral=True)
return
try:
bitrate = int(bitrate)
except ValueError:
if bitrate.lower() == 'max':
bitrate = None
else:
embed = discord.Embed(
title="❌ Invalid Bitrate",
description="Bitrate must be an integer or 'Max'.",
color=0xff0000
)
await interaction.response.send_message(embed=embed, ephemeral=True)
return
# Create download entry
download_id = len(bot.download_history) + 1
download_entry = {
'interaction': interaction,
'data': {
'id': download_id,
'url': url,
'user': interaction.user.display_name,
'user_id': interaction.user.id,
'channel_id': interaction.channel_id,
'timestamp': datetime.now().isoformat(),
'status': 'queued',
'service': service.upper(),
'keys': keys == 'True', # Convert to boolean
'quality': quality.upper() if quality else None,
'codec': codec.upper() if codec else None,
'range': range_.upper() if range_ else None,
'bitrate': bitrate,
'start_season': f'{start_season:02}' if start_season is not None else None,
'start_episode': f'{start_episode:02}' if start_episode is not None else None,
'end_season': f'{end_season:02}' if end_season is not None else None,
'end_episode': f'{end_episode:02}' if end_episode is not None else None,
'video_language': video_language.lower() if video_language else None,
'audio_language': audio_language.lower() if audio_language else None,
'subtitle_language': subtitle_language.lower() if subtitle_language else None,
'audio_channel': audio_channel if audio_channel != "Best" else None,
'worst': worst == 'True', # Convert to boolean
'no_cache': no_cache == 1, # Convert to boolean
# 'title_cache': title_cache == 1,
'proxy': proxy,
### iTunes specific parameters
'store_front': store_front if store_front else "143475",
### BiliBili specific parameters
'season': season,
'title_language': title_language.lower() if title_language else None,
'original_url': original_url,
'original_language': original_language.lower() if original_language else None,
'movie': movie if movie is not None else 'False',
}
}
embed = discord.Embed(
title="📥 Download Queued",
description="Download request has been added to the queue.",
color=0x00ff00
)
embed.add_field(name="🛠 Service", value=service, inline=True)
embed.add_field(name="🆔 Download ID", value=download_id, inline=True)
embed.add_field(name="🔗 URL", value=url, inline=False)
embed.add_field(name="🔑 Keys", value=keys, inline=True)
embed.add_field(name="🎥 Quality", value=quality, inline=True)
embed.add_field(name="🎞 Codec", value=codec, inline=True)
embed.add_field(name="🌈 Range", value=range_, inline=True)
embed.add_field(name="🎥 Bitrate", value=bitrate if bitrate else "Max", inline=True)
embed.add_field(name="🎯 Start Season", value=start_season or "None", inline=True)
embed.add_field(name="🎯 End Season", value=end_season or "None", inline=True)
embed.add_field(name="📺 Start Episode", value=start_episode or "None", inline=True)
embed.add_field(name="📺 End Episode", value=end_episode or "None", inline=True)
embed.add_field(name="🔊 Audio Language", value=audio_language, inline=False)
embed.add_field(name="📜 Subtitle Language", value=subtitle_language, inline=False)
embed.add_field(name="🔊 Audio Channel", value=audio_channel, inline=True)
embed.add_field(name="📊 Queue Position", value=len(bot.download_queue), inline=False)
embed.set_footer(text=f"Requested by {interaction.user.display_name}")
await interaction.response.send_message(embed=embed)
# Add to queue and history
bot.download_queue.append(download_entry)
bot.download_history.append(download_entry['data'])
bot.save_data()
async def usk_worker():
"""Continuously process the download queue in the background"""
while True:
if bot.download_queue:
entry = bot.download_queue.pop(0)
await process_download(entry['data'])
else:
await asyncio.sleep(5) # Sleep briefly if queue is empty
async def process_download(entry):
"""Background worker to process download queue"""
entry['status'] = 'in_progress'
bot.save_data()
channel = bot.get_channel(entry['channel_id'])
cmd=['/root/unshackle-SeFree/.venv/bin/unshackle','dl']
if entry['proxy'] and entry['service'] not in ['HIDI']:
cmd += ['--proxy', entry['proxy']]
elif entry['service'] in ['HIDI']:
cmd += ['--proxy',"ca"]
if entry['keys']:
cmd.append('--skip-dl')
# if entry['service'] in ['AMZN'] and not entry['keys']:
# cmd += ['--delay', '30']
# elif entry['service'] in ['CR'] and not entry['keys']:
# cmd += ['--delay', '15']
# elif entry['keys']:
# cmd += ['--delay', '3']
# else:
# cmd += ['--delay', '10']
if entry['no_cache'] or entry['service'] in ['HMAX'] :
cmd.append('--cdm-only')
if entry['quality'].lower() != 'best':
cmd += ['--quality', entry['quality']]
cmd += ['--range', entry['range']]
cmd += ['--vcodec', entry['codec']]
if entry['bitrate'] is not None:
cmd += ['--vbitrate', str(entry['bitrate'])]
if entry['worst']:
cmd += ['--worst']
cmd += ['--v-lang',entry["video_language"]]
cmd += ['--a-lang', f"{entry['audio_language'] if entry['service'] not in [ 'MMAX'] else 'all'}"]
cmd += ['--s-lang', f"{entry['subtitle_language'] if entry['service'] not in [ 'MMAX'] else 'all'}"]
if entry['service'] in ['BLBL'] and not entry['audio_channel']:
cmd += ['--channels', '2.0']
# else:
# cmd += ['--channels', entry['audio_channel']]
if entry['start_season'] or entry['start_episode'] or entry['end_season'] or entry['end_episode']:
cmd += ['--wanted']
wanted=None
if entry['start_season']:
wanted = 's'+entry['start_season']
else:
wanted = "s01"
if entry['start_episode']:
if wanted:
wanted += ('e'+entry['start_episode'])
else:
wanted = ('e'+entry['start_episode'])
if entry['end_season']:
if wanted:
wanted += '-s'+entry['end_season']
else:
wanted = 's'+entry['end_season']
if entry['end_episode']:
if entry['end_season']:
if wanted:
wanted += ('e'+entry['end_episode'])
else:
wanted = ('e'+entry['end_episode'])
else:
if wanted:
wanted += ('-s01e'+entry['end_episode'])
else:
wanted = ('s01e'+entry['end_episode'])
cmd += [wanted]
# if entry["title_cache"]:
# cmd.append('--title-cache')
cmd += [entry['service']]
if entry['service'] in ['AMZN']:
cmd += ["https://www.primevideo.com/detail/"+entry['url']]
else:
cmd += [entry['url']]
# if entry['service'] == 'HS':
# cmd += ['--all']
if entry['service'] == 'LT':
if entry['title_language']:
cmd += ['--title_lang', entry['title_language']]
if entry['movie'] and entry['movie'].lower() == 'true':
cmd += ['--movie']
if entry['service'] == 'OND':
if entry['title_language']:
cmd += ['--title_lang', entry['title_language']]
if entry['service'] in ['FLX','IT','TVN','BLBL','CR']:
if entry['movie'] and entry['movie'].lower() == 'true':
cmd += ['--movie']
if entry['service'] == 'IT':
if entry['store_front']:
cmd += ['--storefront', entry['store_front']]
if entry['service'] == 'TID':
if entry['season']:
cmd += ['--season', str(entry['season'])]
# if entry['title_cache']:
# cmd += ['--title-cache']
cmd += ['--drm','wv']
if entry['service'] == 'BLBL':
if entry['season']:
cmd += ['--season', str(entry['season'])]
if entry['title_language']:
cmd += ['--title_lang', entry['title_language']]
if entry['original_url']:
cmd += ['--original_url', entry['original_url']]
if entry['original_language']:
cmd += ['--original_lang', entry['original_language']]
# if entry['android'] and entry['android'].lower() == 'true':
# cmd += ['--android']
if entry['service'] == 'TVN':
if entry['original_language']:
cmd += ['--original_lang', entry['original_language']]
print(f"Running command: {cmd}")
# print(f"Running command:\n{' '.join(cmd)}")
embed = discord.Embed(
title="🖹 Download Command",
description=' '.join(cmd),
color=0x0000ff
)
await channel.send(embed=embed)
result = await asyncio.to_thread(subprocess.run, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
if '[E]' in result.stdout.decode() or "Processed all titles" not in result.stdout.decode():
embed = discord.Embed(
title="❌ Download Failed",
description="Download request has been failed.",
color=0xff0000
)
embed.add_field(name="🛠 Service", value=entry['service'], inline=True)
embed.add_field(name="🔗 URL", value=entry['url'], inline=False)
embed.add_field(name="🎥 Quality", value=entry['quality'], inline=True)
embed.add_field(name="🎞 Codec", value=entry['codec'], inline=True)
embed.add_field(name="🌈 Range", value=entry['range'], inline=True)
embed.add_field(name="🎯 Start Season", value=entry['start_season'] or "None", inline=True)
embed.add_field(name="🎯 End Season", value=entry['end_season'] or "None", inline=True)
embed.add_field(name="📺 Start Episode", value=entry['start_episode'] or "None", inline=True)
embed.add_field(name="📺 End Episode", value=entry['end_episode'] or "None", inline=True)
embed.add_field(name="🔊 Audio Language", value=entry['audio_language'], inline=False)
embed.add_field(name="📜 Subtitle Language", value=entry['subtitle_language'], inline=False)
embed.add_field(name="📊 Queue Position", value=len(bot.download_queue), inline=False)
embed.add_field(name="📅 Timestamp", value=entry['timestamp'], inline=False)
embed.set_footer(text=f"Requested by {entry['user']}")
print(result.stdout.decode())
print(f"Error downloading {entry['url']}: ")
entry['error'] = result.stdout.decode()
entry['status'] = 'failed'
await channel.send(embed=embed)
else:
embed = discord.Embed(
title="✅ Download Complete",
description="Download request has been completed.",
color=0x00ff00
)
embed.add_field(name="🛠 Service", value=entry['service'], inline=True)
embed.add_field(name="🔗 URL", value=entry['url'], inline=False)
embed.add_field(name="🎥 Quality", value=entry['quality'], inline=True)
embed.add_field(name="🎞 Codec", value=entry['codec'], inline=True)
embed.add_field(name="🌈 Range", value=entry['range'], inline=True)
embed.add_field(name="🎯 Start Season", value=entry['start_season'] or "None", inline=True)
embed.add_field(name="🎯 End Season", value=entry['end_season'] or "None", inline=True)
embed.add_field(name="📺 Start Episode", value=entry['start_episode'] or "None", inline=True)
embed.add_field(name="📺 End Episode", value=entry['end_episode'] or "None", inline=True)
embed.add_field(name="🔊 Audio Language", value=entry['audio_language'], inline=False)
embed.add_field(name="📜 Subtitle Language", value=entry['subtitle_language'], inline=False)
embed.add_field(name="📊 Queue Position", value=len(bot.download_queue), inline=False)
embed.add_field(name="📅 Timestamp", value=entry['timestamp'], inline=False)
embed.set_footer(text=f"Requested by {entry['user']}")
entry['status'] = 'completed'
print(f"Download {entry['url']} completed")
await channel.send(embed=embed)
except Exception as e:
print(f"Error processing download {entry['url']}: {e}")
embed = discord.Embed(
title="❌ Download Error",
description="An error occurred while processing the download.",
color=0xff0000
)
embed.add_field(name="🛠 Service", value=entry['service'], inline=True)
embed.add_field(name="🔗 URL", value=entry['url'], inline=False)
embed.add_field(name="🎥 Quality", value=entry['quality'], inline=True)
embed.add_field(name="🎞 Codec", value=entry['codec'], inline=True)
embed.add_field(name="🌈 Range", value=entry['range'], inline=True)
embed.add_field(name="🎯 Start Season", value=entry['start_season'] or "None", inline=True)
embed.add_field(name="🎯 End Season", value=entry['end_season'] or "None", inline=True)
embed.add_field(name="📺 Start Episode", value=entry['start_episode'] or "None", inline=True)
embed.add_field(name="📺 End Episode", value=entry['end_episode'] or "None", inline=True)
embed.add_field(name="🔊 Audio Language", value=entry['audio_language'], inline=False)
embed.add_field(name="📜 Subtitle Language", value=entry['subtitle_language'], inline=False)
embed.add_field(name="📊 Queue Position", value=len(bot.download_queue), inline=False)
embed.add_field(name="📅 Timestamp", value=entry['timestamp'], inline=False)
embed.set_footer(text=f"Requested by {entry['user']}")
await channel.send(embed=embed)
bot.save_data()
# /check H265 command
@bot.tree.command(name="check_codec", description="Check if codec is available")
@app_commands.describe(
service="Service to use for downloading (e.g., AMZN, NF, HS, VIU, TID, MMAX, BLBL)",
url="The URL|ID to check for codec support",
codec="Video codec to check (default: H265)",
range_="Dynamic range to use (default: SDR)",
)
@app_commands.choices(service=[
app_commands.Choice(name="Amazon Prime", value="AMZN"),
app_commands.Choice(name="Netflix", value="NF"),
app_commands.Choice(name="Hotstar", value="HS"),
app_commands.Choice(name="VIU", value="VIU"),
app_commands.Choice(name="TrueID", value="TID"),
app_commands.Choice(name="Mono Max", value="MMAX"),
app_commands.Choice(name="BiliBili", value="BLBL"),
])
@app_commands.choices(codec=[
app_commands.Choice(name="H264", value="H.264"),
app_commands.Choice(name="H265", value="H.265"),
app_commands.Choice(name="AV1", value="AV1"),
app_commands.Choice(name="VP9", value="VP9"),
])
@app_commands.choices(range_=[
app_commands.Choice(name="HDR", value="HDR"),
app_commands.Choice(name="SDR", value="SDR"),
app_commands.Choice(name="DV", value="DV"),
])
async def check_codec_command(
interaction: discord.Interaction,
service: str,
url: str,
codec: str = "H265",
range_: Optional[str] = "SDR",
):
embed = discord.Embed(
title="🛠 H265 Codec Check",
description=f"Checking H265 codec availability for URL: {url}",
color=0x0000ff
)
embed.add_field(name="🛠 Service", value=service, inline=True)
embed.add_field(name="🌈 Range", value=range_, inline=True)
await interaction.response.send_message(embed=embed)
# Check if H265 codec is available for the given URL
cmd, codec_available, range_available = check_codec_support(url, codec, service, range_)
if codec_available == 'error':
embed = discord.Embed(
title="❌ Error Checking Codec",
description=f"An error occurred while checking codec support for URL: {url}",
color=0xff0000
)
await interaction.followup.send(embed=embed)
return
embed = discord.Embed(
title=f"🛠 {codec} Codec Check",
description=f"{codec} codec is {'available' if codec_available else 'not available'} for URL: {url}",
color=0x00ff00 if codec_available else 0xff0000 if codec_available or codec_available else 0xffa500
)
embed.add_field(name=range_, value='available' if range_available else 'not available', inline=True)
embed.add_field(name="Command", value=cmd, inline=False)
await interaction.followup.send(embed=embed)
# /check H265 command
@bot.tree.command(name="clear_temp", description="Clear temporary files")
async def clear_temp_command(
interaction: discord.Interaction,
):
embed = discord.Embed(
title="🛠 Clear Temporary Files",
description="Clearing temporary files...",
color=0x0000ff
)
await interaction.response.send_message(embed=embed)
# Check if H265 codec is available for the given URL
os.removedirs("/root/unshackle-SeFree/Temp")
embed = discord.Embed(
title="🛠 Temporary Files Cleared",
description="Temporary files have been successfully cleared.",
color=0x00ff00
)
await interaction.followup.send(embed=embed)
def check_codec_support(url: str, codec: str, service: str, range_: str):
"""Check if H265 codec is available for the given URL"""
h264_alias=['h264', 'H264', 'H.264', 'H.264', 'AVC', 'avc', 'AVC1', 'avc1']
h265_alias=['h265', 'H265', 'H.265', 'H.265', 'HEVC', 'hevc', 'HEVC1', 'hevc1']
error_alias=['error', 'Error', 'ERROR', '[E]', '[e]','No tracks returned']
av1_alias=['av1', 'AV1', 'AV1.0', 'av1.0']
vp9_alias=['vp9', 'VP9', 'VP9.0', 'vp9.0']
cmd = ['/root/unshackle-SeFree/.venv/bin/unshackle','dl', '--list',
'--wanted','s01e01',
'--vcodec', codec, '--range', range_]
cmd += [service,url] # Always disable cache for codec checks
# if service == 'NF' or service == 'HS':
# cmd += ['--all']
try:
print(f"Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if ("Processed all titles" not in result.stdout.decode() or any(alias in result.stdout.decode() for alias in error_alias)):
print(f"Error checking codec support for {url}: {result.stdout.decode()}")
return ' '.join(cmd),'error','error'
# codec check
codec_available = False
if codec.lower() in h264_alias:
if any(alias in result.stdout.decode() for alias in h264_alias):
codec_available = True
elif codec.lower() in h265_alias:
if any(alias in result.stdout.decode() for alias in h265_alias):
codec_available = True
elif codec.lower() in av1_alias:
if any(alias in result.stdout.decode() for alias in av1_alias):
codec_available = True
elif codec.lower() in vp9_alias:
if any(alias in result.stdout.decode() for alias in vp9_alias):
codec_available = True
if not codec_available:
print(f"{codec} codec is not available for {url}")
return ' '.join(cmd), codec_available, False
print(f"{codec} codec {'is' if codec_available else 'is not'} available for {url}")
# Check if HDR is available
range_available = False
print(f"Checking {range_} support for {url}")
if range_ not in result.stdout.decode():
print(f"HDR support not available for {url}")
else:
print(f"{range_} support available for {url}")
range_available = True
return ' '.join(cmd), codec_available, range_available
except Exception as e:
print(f"Exception while checking codec support: {e}")
return ' '.join(cmd),'error','error'
# /history command
@bot.tree.command(name="history", description="List download history")
@app_commands.describe(
user="Filter by specific user (optional)"
)
async def history_command(
interaction: discord.Interaction,
user: Optional[discord.Member] = None
):
embed = discord.Embed(color=0x0099ff)
embed.title = "📚 Download History"
history = bot.download_history
if user:
history = [item for item in history if item['user_id'] == user.id]
embed.title += f" - {user.display_name}"
if not history:
embed.description = "No download history found."
else:
history_list = []
for item in history[-20:]: # Show last 20
status_emoji = "" if item['status'] == 'completed' else "" if item['status'] == 'failed' else "" if item['status'] == 'in_progress' else "🕔"
timestamp = datetime.fromisoformat(item['timestamp']).strftime("%m/%d %H:%M")
history_list.append(f"{status_emoji} **{item['id']} **{item['service']} **{item['url']} **{item['quality']} **{item['codec']} **{item['range']}")
history_list.append(f" └── {timestamp} • by {item['user']}")
embed.description = "\n".join(history_list)
if len(history) > 20:
embed.set_footer(text=f"Showing last 20 of {len(history)} downloads")
await interaction.response.send_message(embed=embed)
# Help command
@bot.tree.command(name="help", description="Show bot commands and usage")
async def help_command(interaction: discord.Interaction):
embed = discord.Embed(
title="🤖 Bot Commands Help",
description="Here are all available commands:",
color=0x0099ff
)
embed.add_field(
name="📥 /download",
value="`/download <service> <url> [quality] [codec] [want] [audio_language] [subtitle_language]`\n"
"Download a file from the specified URL|ID\n",
inline=False
)
embed.add_field(
name="📋 /history",
value="`/history <user>`\n"
"List download history for a specific user (or all users if not specified)\n",
inline=False
)
embed.add_field(
name="❓ /help",
value="Show this help message",
inline=False
)
# embed.set_footer(text="Use /keys list to see authorized users and API keys")
await interaction.response.send_message(embed=embed)
# Error handling
# @bot.tree.error
# async def on_app_command_error(interaction: discord.Interaction, error: app_commands.AppCommandError):
# channel = bot.get_channel(interaction.channel_id)
# if isinstance(error, app_commands.CheckFailure):
# embed = discord.Embed(
# title="❌ Permission Denied",
# description="You don't have permission to use this command.",
# color=0xff0000
# )
# if not interaction.response.is_done():
# await interaction.response.send_message(embed=embed, ephemeral=True)
# return
# embed = discord.Embed(
# title="❌ Error",
# description=f"An error occurred: {str(error)}",
# color=0xff0000
# )
# try:
# if interaction.response.is_done():
# await interaction.followup.send(embed=embed, ephemeral=True)
# else:
# await interaction.response.send_message(embed=embed, ephemeral=True)
# except discord.HTTPException:
# # If the interaction response is already sent, send a follow-up message
# await channel.send(embed=embed)
@bot.tree.command(name="my_roles", description="List all roles the bot has in this server")
async def my_roles(interaction: discord.Interaction):
# Get the bot's Member object in this guild
bot_member: discord.Member = interaction.guild.get_member(bot.user.id)
if not bot_member:
await interaction.response.send_message("Couldn't find myself in this guild.", ephemeral=True)
return
roles = [role.mention for role in bot_member.roles if role.name != "@everyone"]
if roles:
await interaction.response.send_message(f"My roles are: {' '.join(roles)}")
else:
await interaction.response.send_message("I have no roles besides @everyone.")
# Run the bot
if __name__ == "__main__":
token = os.getenv('DISCORD_TOKEN')
if not token:
print("❌ DISCORD_TOKEN not found in environment variables!")
print("Make sure you have a .env file with your bot token.")
else:
try:
bot.run(token)
except discord.LoginFailure:
print("❌ Invalid bot token! Please check your DISCORD_TOKEN in the .env file.")
except Exception as e:
print(f"❌ An error occurred: {e}")

36
uv.lock generated
View File

@@ -472,6 +472,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/94/35/386550fd60316d1e37eccdda609b074113298f23cef5bddb2049823fe666/dacite-1.9.2-py3-none-any.whl", hash = "sha256:053f7c3f5128ca2e9aceb66892b1a3c8936d02c686e707bee96e19deef4bc4a0", size = 16600, upload-time = "2025-02-05T09:27:24.345Z" },
]
[[package]]
name = "discord-py"
version = "2.7.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ef/57/9a2d9abdabdc9db8ef28ce0cf4129669e1c8717ba28d607b5ba357c4de3b/discord_py-2.7.1.tar.gz", hash = "sha256:24d5e6a45535152e4b98148a9dd6b550d25dc2c9fb41b6d670319411641249da", size = 1106326, upload-time = "2026-03-03T18:40:46.24Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f7/a7/17208c3b3f92319e7fad259f1c6d5a5baf8fd0654c54846ced329f83c3eb/discord_py-2.7.1-py3-none-any.whl", hash = "sha256:849dca2c63b171146f3a7f3f8acc04248098e9e6203412ce3cf2745f284f7439", size = 1227550, upload-time = "2026-03-03T18:40:44.492Z" },
]
[[package]]
name = "distlib"
version = "0.4.0"
@@ -481,6 +493,17 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/33/6b/e0547afaf41bf2c42e52430072fa5658766e3d65bd4b03a563d1b6336f57/distlib-0.4.0-py2.py3-none-any.whl", hash = "sha256:9659f7d87e46584a30b5780e43ac7a2143098441670ff0a49d5f9034c54a6c16", size = 469047, upload-time = "2025-07-17T16:51:58.613Z" },
]
[[package]]
name = "dotenv"
version = "0.9.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "python-dotenv" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/b2/b7/545d2c10c1fc15e48653c91efde329a790f2eecfbbf2bd16003b5db2bab0/dotenv-0.9.9-py2.py3-none-any.whl", hash = "sha256:29cf74a087b31dafdb5a446b6d7e11cbce8ed2741540e2339c69fbef92c94ce9", size = 1892, upload-time = "2025-02-19T22:15:01.647Z" },
]
[[package]]
name = "ecpy"
version = "1.2.5"
@@ -1318,6 +1341,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/99/09/0fc0719162e5ad723f71d41cf336f18b6b5054d70dc0fe42ace6b4d2bdc9/pysubs2-1.8.0-py3-none-any.whl", hash = "sha256:05716f5039a9ebe32cd4d7673f923cf36204f3a3e99987f823ab83610b7035a0", size = 43516, upload-time = "2024-12-24T12:39:44.469Z" },
]
[[package]]
name = "python-dotenv"
version = "1.2.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/82/ed/0301aeeac3e5353ef3d94b6ec08bbcabd04a72018415dcb29e588514bba8/python_dotenv-1.2.2.tar.gz", hash = "sha256:2c371a91fbd7ba082c2c1dc1f8bf89ca22564a087c2c287cd9b662adde799cf3", size = 50135, upload-time = "2026-03-01T16:00:26.196Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/d7/1959b9648791274998a9c3526f6d0ec8fd2233e4d4acce81bbae76b44b2a/python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a", size = 22101, upload-time = "2026-03-01T16:00:25.09Z" },
]
[[package]]
name = "pywidevine"
version = "1.9.0"
@@ -1664,6 +1696,8 @@ dependencies = [
{ name = "crccheck" },
{ name = "cryptography" },
{ name = "curl-cffi" },
{ name = "discord-py" },
{ name = "dotenv" },
{ name = "filelock" },
{ name = "fonttools" },
{ name = "httpx" },
@@ -1723,6 +1757,8 @@ requires-dist = [
{ name = "crccheck", specifier = ">=1.3.0,<2" },
{ name = "cryptography", specifier = ">=45.0.0,<47" },
{ name = "curl-cffi", specifier = ">=0.7.0b4,<0.14" },
{ name = "discord-py", specifier = ">=2.7.1" },
{ name = "dotenv", specifier = ">=0.9.9" },
{ name = "filelock", specifier = ">=3.20.3,<4" },
{ name = "fonttools", specifier = ">=4.60.2,<5" },
{ name = "httpx", specifier = ">=0.28.1,<0.29" },