import os import re import asyncio import subprocess import cv2 import numpy as np import shutil import time from PIL import Image, ImageDraw, ImageFont from urllib.parse import urljoin import aiohttp from dotenv import dotenv_values from datetime import timedelta from lib.logging_data import logger import json class ScreenShot: FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" def __init__(self, OUTPUT_DIR="temp_screenshots",console:logger=None): self.VIDEO_PATH = None self.OUTPUT_DIR = OUTPUT_DIR self.OUTPUT_IMAGE = None self.GRID_COLS = 3 self.GRID_ROWS = 5 self.TOTAL_FRAMES = self.GRID_COLS * self.GRID_ROWS # self.WIDTH = 1600 # self.HEIGHT = 900 self.WIDTH = None self.HEIGHT = None self.HEADER_HEIGHT = 90 self.base_url = "https://imgbb.ws" self.imgbb_token = None self.session = None self.console=console or logger(app_name="torrent_uploader",log_dir="./log") def get_metadata(self): def ffprobe_entry(stream, entry): cmd = [ "ffprobe", "-v", "error", "-select_streams", stream, "-show_entries", f"stream={entry}", "-of", "default=noprint_wrappers=1:nokey=1", self.VIDEO_PATH ] # print(f"Running command: {' '.join(cmd)}") result = subprocess.run(cmd, stdout=subprocess.PIPE) return result.stdout.decode().strip() def ffprobe_tag_languages(stream_type): cmd = [ "ffprobe", "-v", "error", "-select_streams", stream_type, "-show_entries", "stream_tags=language", "-of", "csv=p=0", self.VIDEO_PATH ] result = subprocess.run(cmd, stdout=subprocess.PIPE) langs = list(set([lang.strip() for lang in result.stdout.decode().splitlines() if lang.strip()])) return ",".join(langs) if langs else "und" duration = float(self.get_duration(self.VIDEO_PATH)) vcodec = ffprobe_entry("v:0", "codec_name").splitlines()[0] + " " + ffprobe_entry("v:0", "profile").splitlines()[0] acodec_profile = ffprobe_entry("a:0", "profile").splitlines()[0] acodec = ffprobe_entry("a:0", "codec_name").splitlines()[0] acodec += " DDP" if "Dolby Digital Plus" in acodec_profile else " DD" if "Dolby Digital" in acodec_profile else "" acodec += " Atmos" if "Atmos" in acodec_profile else "" audio_channels = ffprobe_entry("a:0", "channel_layout").splitlines()[0] resolution = f"{ffprobe_entry('v:0', 'width').splitlines()[0]}x{ffprobe_entry('v:0', 'height').splitlines()[0]}" self.WIDTH, self.HEIGHT = map(int, resolution.split('x')) self.WIDTH, self.HEIGHT=self.WIDTH/self.GRID_COLS, self.HEIGHT/self.GRID_COLS size_mb = os.path.getsize(self.VIDEO_PATH) / (1024 * 1024) audio_lang = ffprobe_tag_languages("a").upper() subtitle_lang = ffprobe_tag_languages("s").upper() return duration, vcodec.upper(), acodec.upper(), audio_channels, resolution, size_mb, audio_lang, subtitle_lang # def get_metadata(self): # def get_mkv_json(): # cmd = ["mkvmerge", "-J", self.VIDEO_PATH] # result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # return json.loads(result.stdout.decode()) # data = get_mkv_json() # video_track = next(t for t in data["tracks"] if t["type"] == "video") # audio_track = next(t for t in data["tracks"] if t["type"] == "audio") # # duration (ns -> sec) # duration = data["container"]["properties"]["duration"] / 1_000_000_000 # # video codec # vcodec = video_track["codec"] # profile = video_track.get("properties", {}).get("profile") # if profile: # vcodec = f"{vcodec} {profile}" # # audio codec # acodec = audio_track["codec"] # acodec_profile = audio_track.get("properties", {}).get("profile", "") # if "Dolby Digital Plus" in acodec_profile: # acodec += " DDP" # elif "Dolby Digital" in acodec_profile: # acodec += " DD" # if "Atmos" in acodec_profile: # acodec += " Atmos" # # channels # audio_channels = audio_track.get("properties", {}).get("audio_channels") # # resolution # width = video_track["properties"]["pixel_dimensions"].split("x")[0] # height = video_track["properties"]["pixel_dimensions"].split("x")[1] # resolution = f"{width}x{height}" # self.WIDTH, self.HEIGHT = int(width), int(height) # self.WIDTH, self.HEIGHT = self.WIDTH/self.GRID_COLS, self.HEIGHT/self.GRID_COLS # # size # size_mb = os.path.getsize(self.VIDEO_PATH) / (1024 * 1024) # # audio languages # audio_langs = { # t.get("properties", {}).get("language", "und") # for t in data["tracks"] if t["type"] == "audio" # } # subtitle_langs = { # t.get("properties", {}).get("language", "und") # for t in data["tracks"] if t["type"] == "subtitles" # } # audio_lang = ",".join(sorted(audio_langs)).upper() # subtitle_lang = ",".join(sorted(subtitle_langs)).upper() # return ( # duration, # vcodec.upper(), # acodec.upper(), # audio_channels, # resolution, # size_mb, # audio_lang, # subtitle_lang # ) @staticmethod def get_duration(filename): result = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) return float(result.stdout) async def extract_screenshots(self, duration): return await asyncio.to_thread(self._extract_screenshots_blocking, duration) def _extract_screenshots_blocking(self, duration): os.makedirs(self.OUTPUT_DIR, exist_ok=True) interval =duration / self.TOTAL_FRAMES timestamps = [] for i in range(self.TOTAL_FRAMES): timestamp = int(i * interval) if timestamp ==0: timestamp = 5 output_file = os.path.join(self.OUTPUT_DIR, f"shot_{i:02d}.jpg") # drawtext = ( # f"drawtext=fontfile={self.FONT_PATH}:" # f"text='%{{pts\\:hms}}':" # f"x=10:y=10:fontsize=18:fontcolor=white:borderw=2" # ) cmd = [ "ffmpeg", "-ss", str(timestamp), "-i", self.VIDEO_PATH, "-frames:v", "1", "-q:v", "2", "-vf", f"scale={self.WIDTH}:{self.HEIGHT}", output_file, "-y" ] result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) self.console.debug("result : ",result) # print(" ".join(cmd)) # print(result) # Draw timestamp with Pillow timestamp_str = str(timedelta(seconds=timestamp)).split(".")[0] img = Image.open(output_file) draw = ImageDraw.Draw(img) font = ImageFont.truetype(self.FONT_PATH, 32) draw.text((10, 10), timestamp_str, font=font, fill="white", stroke_width=2, stroke_fill="black") img.save(output_file) timestamps.append(timestamp) return timestamps def stitch_images(self, metadata_text, timestamps): images = [] for i in range(self.TOTAL_FRAMES): img_path = os.path.join(self.OUTPUT_DIR, f"shot_{i:02d}.jpg") img = cv2.imread(img_path) if img is not None: img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) images.append(img_rgb) if len(images) != self.TOTAL_FRAMES: self.console.error("Not enough images. Exiting.") return rows = [np.hstack(images[i*self.GRID_COLS:(i+1)*self.GRID_COLS]) for i in range(self.GRID_ROWS)] sheet = np.vstack(rows) sheet_img = Image.fromarray(sheet) banner = Image.new("RGB", (sheet_img.width, self.HEADER_HEIGHT), color=(40, 40, 40)) draw = ImageDraw.Draw(banner) font = ImageFont.truetype(self.FONT_PATH, 20) draw.text((10, 10), metadata_text, font=font, fill="white") final_image = Image.new("RGB", (sheet_img.width, sheet_img.height + self.HEADER_HEIGHT)) final_image.paste(banner, (0, 0)) final_image.paste(sheet_img, (0, self.HEADER_HEIGHT)) final_image.save(self.OUTPUT_IMAGE, quality=95, subsampling=0) self.console.log(f"📷 Saved to {self.OUTPUT_IMAGE}") self.cleanup_screenshots() def cleanup_screenshots(self): if os.path.exists(self.OUTPUT_DIR): shutil.rmtree(self.OUTPUT_DIR) self.console.log("✅ All extracted screenshots deleted.") async def run(self, VIDEO_PATH, is_movie=False): self.VIDEO_PATH = VIDEO_PATH base = os.path.dirname(self.VIDEO_PATH) parent = os.path.dirname(base) filename = os.path.basename(self.VIDEO_PATH).replace("#", "") + "_screenshot.jpg" self.OUTPUT_IMAGE = os.path.join(base if is_movie else parent, filename) duration, vcodec, acodec, audio_channels, resolution, size_mb, audio_lang, subtitle_lang = self.get_metadata() metadata_text = ( f"{os.path.basename(self.VIDEO_PATH)}\n" f"{vcodec} | {acodec} {audio_channels} \n" f"{resolution} | {size_mb:.2f} MB | {duration/60:.2f} min | Audio: {audio_lang} | Subtitles: {subtitle_lang}" ) self.console.log(f"🎬 Metadata: {metadata_text}") timestamps = await self.extract_screenshots(duration) self.stitch_images(metadata_text, timestamps) return self.OUTPUT_IMAGE async def upload_to_imgbb(self, image_path): timestamp = str(int(time.time() * 1000)) retry = 0 while retry < 5: form = aiohttp.FormData() form.add_field('source', open(image_path, 'rb'), filename=os.path.basename(image_path), content_type='image/jpeg') form.add_field('type', 'file') form.add_field('action', 'upload') form.add_field('timestamp', timestamp) form.add_field('auth_token', self.imgbb_token) form.add_field('nsfw', '0') form.add_field('mimetype', 'image/jpeg') async with self.session.post(urljoin(self.base_url, '/json'), data=form) as response: await response.text() # drain the response if 200 <= response.status < 300: self.console.log("✅ Upload successful") await asyncio.sleep(5) data = await response.json() os.remove(image_path) return data['image']['url'] else: self.console.warn(f"❌ Upload failed ({response.status})") retry += 1 await asyncio.sleep(10) self.console.error("❌ Max retries reached") return None async def login(self): if not self.session: self.session = aiohttp.ClientSession(headers={ 'accept': 'application/json', 'accept-language': 'en-US,en;q=0.9,th;q=0.8', 'cache-control': 'no-cache', 'origin': 'https://imgbb.ws', 'pragma': 'no-cache', 'referer': 'https://imgbb.ws/', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0', }) async with self.session.get(self.base_url) as response: html = await response.text() if response.status != 200: self.console.error(f"❌ Failed to connect to {self.base_url}: {response.status}") exit(1) match = re.search(r'auth_token\s*=\s*"([a-f0-9]{40})"', html) if match: self.imgbb_token = match.group(1) self.console.log("Auth token:", self.imgbb_token) else: self.console.error("Auth token not found.") creds = dotenv_values(".env") data = { 'login-subject': creds.get("imgbb_id"), 'password': creds.get("imgbb_password"), 'auth_token': self.imgbb_token, } async with self.session.post(urljoin(self.base_url, '/login'), data=data) as response: self.console.log(f"Login status: {response.status}") if __name__ == "__main__": from lib.torrent_creator import TorrentCreator,ffprobe_streams_to_bbcode torrent = TorrentCreator() # VIDEO_PATH='/Entertainment_1/Anime/Series/365 Days to the Wedding (2024) [tvdbid-433584]/S01/365.Days.to.the.Wedding.2024.S01E01.Why.Dont.We.Get.Married.CR.WEBDL-1080P.X264.AAC.[SeFree].mkv' VIDEO_PATH='/Entertainment_1/Anime/Movie/KPop Demon Hunters (2025) [tmdbid-803796]/KPop.Demon.Hunters.2025.NF.WEBDL-1080P.AV1.EAC3.ATMOS.[SeFree].mkv' async def main(): duration, video_codec, audio_codec, audio_channels, resolution, size_mb, audio_lang, subtitle_lang, json_metadata = torrent.get_metadata(VIDEO_PATH) with open("output.json","w") as f: json.dump(json_metadata,f,indent=4,ensure_ascii=True) bb=ffprobe_streams_to_bbcode(json_metadata, os.path.basename(VIDEO_PATH)) with open("output_bb.txt","w") as f: # json.dump(bb,f,indent=4,ensure_ascii=True) f.write(bb) asyncio.run(main())