Files
usk_schedule_downloader/lib/ScreenShot.py
panitan103 c899eb5ca6 - add limit ratio
- add limit ratio and super seed parameter to upload
2026-03-31 10:09:17 +07:00

339 lines
14 KiB
Python

import os
import re
import asyncio
import subprocess
import cv2
import numpy as np
import shutil
import time
from PIL import Image, ImageDraw, ImageFont
from urllib.parse import urljoin
import aiohttp
from dotenv import dotenv_values
from datetime import timedelta
from lib.logging_data import logger
import json
class ScreenShot:
FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
def __init__(self, OUTPUT_DIR="temp_screenshots",console:logger=None):
self.VIDEO_PATH = None
self.OUTPUT_DIR = OUTPUT_DIR
self.OUTPUT_IMAGE = None
self.GRID_COLS = 3
self.GRID_ROWS = 5
self.TOTAL_FRAMES = self.GRID_COLS * self.GRID_ROWS
# self.WIDTH = 1600
# self.HEIGHT = 900
self.WIDTH = None
self.HEIGHT = None
self.HEADER_HEIGHT = 90
self.base_url = "https://imgbb.ws"
self.imgbb_token = None
self.session = None
self.console=console or logger(app_name="torrent_uploader",log_dir="./log")
def get_metadata(self):
def ffprobe_entry(stream, entry):
cmd = [
"ffprobe", "-v", "error", "-select_streams", stream,
"-show_entries", f"stream={entry}",
"-of", "default=noprint_wrappers=1:nokey=1", self.VIDEO_PATH
]
# print(f"Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, stdout=subprocess.PIPE)
return result.stdout.decode().strip()
def ffprobe_tag_languages(stream_type):
cmd = [
"ffprobe", "-v", "error", "-select_streams", stream_type,
"-show_entries", "stream_tags=language",
"-of", "csv=p=0", self.VIDEO_PATH
]
result = subprocess.run(cmd, stdout=subprocess.PIPE)
langs = list(set([lang.strip() for lang in result.stdout.decode().splitlines() if lang.strip()]))
return ",".join(langs) if langs else "und"
duration = float(self.get_duration(self.VIDEO_PATH))
vcodec = ffprobe_entry("v:0", "codec_name").splitlines()[0] + " " + ffprobe_entry("v:0", "profile").splitlines()[0]
acodec_profile = ffprobe_entry("a:0", "profile").splitlines()[0]
acodec = ffprobe_entry("a:0", "codec_name").splitlines()[0]
acodec += " DDP" if "Dolby Digital Plus" in acodec_profile else " DD" if "Dolby Digital" in acodec_profile else ""
acodec += " Atmos" if "Atmos" in acodec_profile else ""
audio_channels = ffprobe_entry("a:0", "channel_layout").splitlines()[0]
resolution = f"{ffprobe_entry('v:0', 'width').splitlines()[0]}x{ffprobe_entry('v:0', 'height').splitlines()[0]}"
self.WIDTH, self.HEIGHT = map(int, resolution.split('x'))
self.WIDTH, self.HEIGHT=self.WIDTH/self.GRID_COLS, self.HEIGHT/self.GRID_COLS
size_mb = os.path.getsize(self.VIDEO_PATH) / (1024 * 1024)
audio_lang = ffprobe_tag_languages("a").upper()
subtitle_lang = ffprobe_tag_languages("s").upper()
return duration, vcodec.upper(), acodec.upper(), audio_channels, resolution, size_mb, audio_lang, subtitle_lang
# def get_metadata(self):
# def get_mkv_json():
# cmd = ["mkvmerge", "-J", self.VIDEO_PATH]
# result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# return json.loads(result.stdout.decode())
# data = get_mkv_json()
# video_track = next(t for t in data["tracks"] if t["type"] == "video")
# audio_track = next(t for t in data["tracks"] if t["type"] == "audio")
# # duration (ns -> sec)
# duration = data["container"]["properties"]["duration"] / 1_000_000_000
# # video codec
# vcodec = video_track["codec"]
# profile = video_track.get("properties", {}).get("profile")
# if profile:
# vcodec = f"{vcodec} {profile}"
# # audio codec
# acodec = audio_track["codec"]
# acodec_profile = audio_track.get("properties", {}).get("profile", "")
# if "Dolby Digital Plus" in acodec_profile:
# acodec += " DDP"
# elif "Dolby Digital" in acodec_profile:
# acodec += " DD"
# if "Atmos" in acodec_profile:
# acodec += " Atmos"
# # channels
# audio_channels = audio_track.get("properties", {}).get("audio_channels")
# # resolution
# width = video_track["properties"]["pixel_dimensions"].split("x")[0]
# height = video_track["properties"]["pixel_dimensions"].split("x")[1]
# resolution = f"{width}x{height}"
# self.WIDTH, self.HEIGHT = int(width), int(height)
# self.WIDTH, self.HEIGHT = self.WIDTH/self.GRID_COLS, self.HEIGHT/self.GRID_COLS
# # size
# size_mb = os.path.getsize(self.VIDEO_PATH) / (1024 * 1024)
# # audio languages
# audio_langs = {
# t.get("properties", {}).get("language", "und")
# for t in data["tracks"] if t["type"] == "audio"
# }
# subtitle_langs = {
# t.get("properties", {}).get("language", "und")
# for t in data["tracks"] if t["type"] == "subtitles"
# }
# audio_lang = ",".join(sorted(audio_langs)).upper()
# subtitle_lang = ",".join(sorted(subtitle_langs)).upper()
# return (
# duration,
# vcodec.upper(),
# acodec.upper(),
# audio_channels,
# resolution,
# size_mb,
# audio_lang,
# subtitle_lang
# )
@staticmethod
def get_duration(filename):
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", filename],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
return float(result.stdout)
async def extract_screenshots(self, duration):
return await asyncio.to_thread(self._extract_screenshots_blocking, duration)
def _extract_screenshots_blocking(self, duration):
os.makedirs(self.OUTPUT_DIR, exist_ok=True)
interval =duration / self.TOTAL_FRAMES
timestamps = []
for i in range(self.TOTAL_FRAMES):
timestamp = int(i * interval)
if timestamp ==0:
timestamp = 5
output_file = os.path.join(self.OUTPUT_DIR, f"shot_{i:02d}.jpg")
# drawtext = (
# f"drawtext=fontfile={self.FONT_PATH}:"
# f"text='%{{pts\\:hms}}':"
# f"x=10:y=10:fontsize=18:fontcolor=white:borderw=2"
# )
cmd = [
"ffmpeg", "-ss", str(timestamp), "-i", self.VIDEO_PATH,
"-frames:v", "1", "-q:v", "2",
"-vf",
f"scale={self.WIDTH}:{self.HEIGHT}",
output_file, "-y"
]
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
# self.console.debug("result : ",result.stdout)
# print(" ".join(cmd))
# print(result)
# Draw timestamp with Pillow
timestamp_str = str(timedelta(seconds=timestamp)).split(".")[0]
img = Image.open(output_file)
draw = ImageDraw.Draw(img)
font = ImageFont.truetype(self.FONT_PATH, 32)
draw.text((10, 10), timestamp_str, font=font, fill="white", stroke_width=2, stroke_fill="black")
img.save(output_file)
timestamps.append(timestamp)
return timestamps
def stitch_images(self, metadata_text, timestamps):
images = []
for i in range(self.TOTAL_FRAMES):
img_path = os.path.join(self.OUTPUT_DIR, f"shot_{i:02d}.jpg")
img = cv2.imread(img_path)
if img is not None:
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
images.append(img_rgb)
if len(images) != self.TOTAL_FRAMES:
self.console.error("Not enough images. Exiting.")
return
rows = [np.hstack(images[i*self.GRID_COLS:(i+1)*self.GRID_COLS]) for i in range(self.GRID_ROWS)]
sheet = np.vstack(rows)
sheet_img = Image.fromarray(sheet)
banner = Image.new("RGB", (sheet_img.width, self.HEADER_HEIGHT), color=(40, 40, 40))
draw = ImageDraw.Draw(banner)
font = ImageFont.truetype(self.FONT_PATH, 20)
draw.text((10, 10), metadata_text, font=font, fill="white")
final_image = Image.new("RGB", (sheet_img.width, sheet_img.height + self.HEADER_HEIGHT))
final_image.paste(banner, (0, 0))
final_image.paste(sheet_img, (0, self.HEADER_HEIGHT))
final_image.save(self.OUTPUT_IMAGE, quality=95, subsampling=0)
self.console.log(f"📷 Saved to {self.OUTPUT_IMAGE}")
self.cleanup_screenshots()
def cleanup_screenshots(self):
if os.path.exists(self.OUTPUT_DIR):
shutil.rmtree(self.OUTPUT_DIR)
self.console.log("✅ All extracted screenshots deleted.")
async def run(self, VIDEO_PATH, is_movie=False):
self.VIDEO_PATH = VIDEO_PATH
base = os.path.dirname(self.VIDEO_PATH)
parent = os.path.dirname(base)
filename = os.path.basename(self.VIDEO_PATH).replace("#", "") + "_screenshot.jpg"
self.OUTPUT_IMAGE = os.path.join(base if is_movie else parent, filename)
duration, vcodec, acodec, audio_channels, resolution, size_mb, audio_lang, subtitle_lang = self.get_metadata()
metadata_text = (
f"{os.path.basename(self.VIDEO_PATH)}\n"
f"{vcodec} | {acodec} {audio_channels} \n"
f"{resolution} | {size_mb:.2f} MB | {duration/60:.2f} min | Audio: {audio_lang} | Subtitles: {subtitle_lang}"
)
self.console.log(f"🎬 Metadata: {metadata_text}")
timestamps = await self.extract_screenshots(duration)
self.stitch_images(metadata_text, timestamps)
return self.OUTPUT_IMAGE
async def upload_to_imgbb(self, image_path):
timestamp = str(int(time.time() * 1000))
retry = 0
while retry < 5:
form = aiohttp.FormData()
form.add_field('source', open(image_path, 'rb'), filename=os.path.basename(image_path), content_type='image/jpeg')
form.add_field('type', 'file')
form.add_field('action', 'upload')
form.add_field('timestamp', timestamp)
form.add_field('auth_token', self.imgbb_token)
form.add_field('nsfw', '0')
form.add_field('mimetype', 'image/jpeg')
async with self.session.post(urljoin(self.base_url, '/json'), data=form) as response:
await response.text() # drain the response
if 200 <= response.status < 300:
self.console.log("✅ Upload successful")
await asyncio.sleep(5)
data = await response.json()
os.remove(image_path)
return data['image']['url']
else:
self.console.warn(f"❌ Upload failed ({response.status})")
retry += 1
await asyncio.sleep(10)
self.console.error("❌ Max retries reached")
return None
async def login(self):
if not self.session:
self.session = aiohttp.ClientSession(headers={
'accept': 'application/json',
'accept-language': 'en-US,en;q=0.9,th;q=0.8',
'cache-control': 'no-cache',
'origin': 'https://imgbb.ws',
'pragma': 'no-cache',
'referer': 'https://imgbb.ws/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0',
})
async with self.session.get(self.base_url) as response:
html = await response.text()
if response.status != 200:
self.console.error(f"❌ Failed to connect to {self.base_url}: {response.status}")
exit(1)
match = re.search(r'auth_token\s*=\s*"([a-f0-9]{40})"', html)
if match:
self.imgbb_token = match.group(1)
self.console.log("Auth token:", self.imgbb_token)
else:
self.console.error("Auth token not found.")
creds = dotenv_values(".env")
data = {
'login-subject': creds.get("imgbb_id"),
'password': creds.get("imgbb_password"),
'auth_token': self.imgbb_token,
}
async with self.session.post(urljoin(self.base_url, '/login'), data=data) as response:
self.console.log(f"Login status: {response.status}")
if __name__ == "__main__":
from lib.torrent_creator import TorrentCreator,ffprobe_streams_to_bbcode
torrent = TorrentCreator()
# VIDEO_PATH='/Entertainment_1/Anime/Series/365 Days to the Wedding (2024) [tvdbid-433584]/S01/365.Days.to.the.Wedding.2024.S01E01.Why.Dont.We.Get.Married.CR.WEBDL-1080P.X264.AAC.[SeFree].mkv'
VIDEO_PATH='/Entertainment_1/Anime/Movie/KPop Demon Hunters (2025) [tmdbid-803796]/KPop.Demon.Hunters.2025.NF.WEBDL-1080P.AV1.EAC3.ATMOS.[SeFree].mkv'
async def main():
duration, video_codec, audio_codec, audio_channels, resolution, size_mb, audio_lang, subtitle_lang, json_metadata = torrent.get_metadata(VIDEO_PATH)
with open("output.json","w") as f:
json.dump(json_metadata,f,indent=4,ensure_ascii=True)
bb=ffprobe_streams_to_bbcode(json_metadata, os.path.basename(VIDEO_PATH))
with open("output_bb.txt","w") as f:
# json.dump(bb,f,indent=4,ensure_ascii=True)
f.write(bb)
asyncio.run(main())