339 lines
14 KiB
Python
339 lines
14 KiB
Python
import os
|
|
import re
|
|
import asyncio
|
|
import subprocess
|
|
import cv2
|
|
import numpy as np
|
|
import shutil
|
|
import time
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
from urllib.parse import urljoin
|
|
import aiohttp
|
|
from dotenv import dotenv_values
|
|
from datetime import timedelta
|
|
|
|
from lib.logging_data import logger
|
|
import json
|
|
|
|
class ScreenShot:
|
|
FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
|
|
|
|
def __init__(self, OUTPUT_DIR="temp_screenshots",console:logger=None):
|
|
self.VIDEO_PATH = None
|
|
self.OUTPUT_DIR = OUTPUT_DIR
|
|
self.OUTPUT_IMAGE = None
|
|
|
|
self.GRID_COLS = 3
|
|
self.GRID_ROWS = 5
|
|
self.TOTAL_FRAMES = self.GRID_COLS * self.GRID_ROWS
|
|
# self.WIDTH = 1600
|
|
# self.HEIGHT = 900
|
|
self.WIDTH = None
|
|
self.HEIGHT = None
|
|
self.HEADER_HEIGHT = 90
|
|
|
|
self.base_url = "https://imgbb.ws"
|
|
self.imgbb_token = None
|
|
self.session = None
|
|
|
|
self.console=console or logger(app_name="torrent_uploader",log_dir="./log")
|
|
|
|
def get_metadata(self):
|
|
def ffprobe_entry(stream, entry):
|
|
cmd = [
|
|
"ffprobe", "-v", "error", "-select_streams", stream,
|
|
"-show_entries", f"stream={entry}",
|
|
"-of", "default=noprint_wrappers=1:nokey=1", self.VIDEO_PATH
|
|
]
|
|
# print(f"Running command: {' '.join(cmd)}")
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE)
|
|
return result.stdout.decode().strip()
|
|
|
|
def ffprobe_tag_languages(stream_type):
|
|
cmd = [
|
|
"ffprobe", "-v", "error", "-select_streams", stream_type,
|
|
"-show_entries", "stream_tags=language",
|
|
"-of", "csv=p=0", self.VIDEO_PATH
|
|
]
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE)
|
|
langs = list(set([lang.strip() for lang in result.stdout.decode().splitlines() if lang.strip()]))
|
|
return ",".join(langs) if langs else "und"
|
|
|
|
duration = float(self.get_duration(self.VIDEO_PATH))
|
|
vcodec = ffprobe_entry("v:0", "codec_name").splitlines()[0] + " " + ffprobe_entry("v:0", "profile").splitlines()[0]
|
|
acodec_profile = ffprobe_entry("a:0", "profile").splitlines()[0]
|
|
acodec = ffprobe_entry("a:0", "codec_name").splitlines()[0]
|
|
acodec += " DDP" if "Dolby Digital Plus" in acodec_profile else " DD" if "Dolby Digital" in acodec_profile else ""
|
|
acodec += " Atmos" if "Atmos" in acodec_profile else ""
|
|
audio_channels = ffprobe_entry("a:0", "channel_layout").splitlines()[0]
|
|
resolution = f"{ffprobe_entry('v:0', 'width').splitlines()[0]}x{ffprobe_entry('v:0', 'height').splitlines()[0]}"
|
|
self.WIDTH, self.HEIGHT = map(int, resolution.split('x'))
|
|
self.WIDTH, self.HEIGHT=self.WIDTH/self.GRID_COLS, self.HEIGHT/self.GRID_COLS
|
|
size_mb = os.path.getsize(self.VIDEO_PATH) / (1024 * 1024)
|
|
audio_lang = ffprobe_tag_languages("a").upper()
|
|
subtitle_lang = ffprobe_tag_languages("s").upper()
|
|
|
|
return duration, vcodec.upper(), acodec.upper(), audio_channels, resolution, size_mb, audio_lang, subtitle_lang
|
|
|
|
# def get_metadata(self):
|
|
# def get_mkv_json():
|
|
# cmd = ["mkvmerge", "-J", self.VIDEO_PATH]
|
|
# result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# return json.loads(result.stdout.decode())
|
|
|
|
# data = get_mkv_json()
|
|
|
|
# video_track = next(t for t in data["tracks"] if t["type"] == "video")
|
|
# audio_track = next(t for t in data["tracks"] if t["type"] == "audio")
|
|
|
|
# # duration (ns -> sec)
|
|
# duration = data["container"]["properties"]["duration"] / 1_000_000_000
|
|
|
|
# # video codec
|
|
# vcodec = video_track["codec"]
|
|
# profile = video_track.get("properties", {}).get("profile")
|
|
# if profile:
|
|
# vcodec = f"{vcodec} {profile}"
|
|
|
|
# # audio codec
|
|
# acodec = audio_track["codec"]
|
|
# acodec_profile = audio_track.get("properties", {}).get("profile", "")
|
|
|
|
# if "Dolby Digital Plus" in acodec_profile:
|
|
# acodec += " DDP"
|
|
# elif "Dolby Digital" in acodec_profile:
|
|
# acodec += " DD"
|
|
|
|
# if "Atmos" in acodec_profile:
|
|
# acodec += " Atmos"
|
|
|
|
# # channels
|
|
# audio_channels = audio_track.get("properties", {}).get("audio_channels")
|
|
|
|
# # resolution
|
|
# width = video_track["properties"]["pixel_dimensions"].split("x")[0]
|
|
# height = video_track["properties"]["pixel_dimensions"].split("x")[1]
|
|
|
|
# resolution = f"{width}x{height}"
|
|
|
|
# self.WIDTH, self.HEIGHT = int(width), int(height)
|
|
# self.WIDTH, self.HEIGHT = self.WIDTH/self.GRID_COLS, self.HEIGHT/self.GRID_COLS
|
|
|
|
# # size
|
|
# size_mb = os.path.getsize(self.VIDEO_PATH) / (1024 * 1024)
|
|
|
|
# # audio languages
|
|
# audio_langs = {
|
|
# t.get("properties", {}).get("language", "und")
|
|
# for t in data["tracks"] if t["type"] == "audio"
|
|
# }
|
|
|
|
# subtitle_langs = {
|
|
# t.get("properties", {}).get("language", "und")
|
|
# for t in data["tracks"] if t["type"] == "subtitles"
|
|
# }
|
|
|
|
# audio_lang = ",".join(sorted(audio_langs)).upper()
|
|
# subtitle_lang = ",".join(sorted(subtitle_langs)).upper()
|
|
|
|
# return (
|
|
# duration,
|
|
# vcodec.upper(),
|
|
# acodec.upper(),
|
|
# audio_channels,
|
|
# resolution,
|
|
# size_mb,
|
|
# audio_lang,
|
|
# subtitle_lang
|
|
# )
|
|
@staticmethod
|
|
def get_duration(filename):
|
|
result = subprocess.run(
|
|
["ffprobe", "-v", "error", "-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1", filename],
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
|
|
)
|
|
return float(result.stdout)
|
|
|
|
async def extract_screenshots(self, duration):
|
|
return await asyncio.to_thread(self._extract_screenshots_blocking, duration)
|
|
|
|
def _extract_screenshots_blocking(self, duration):
|
|
os.makedirs(self.OUTPUT_DIR, exist_ok=True)
|
|
interval =duration / self.TOTAL_FRAMES
|
|
timestamps = []
|
|
|
|
for i in range(self.TOTAL_FRAMES):
|
|
timestamp = int(i * interval)
|
|
if timestamp ==0:
|
|
timestamp = 5
|
|
output_file = os.path.join(self.OUTPUT_DIR, f"shot_{i:02d}.jpg")
|
|
|
|
# drawtext = (
|
|
# f"drawtext=fontfile={self.FONT_PATH}:"
|
|
# f"text='%{{pts\\:hms}}':"
|
|
# f"x=10:y=10:fontsize=18:fontcolor=white:borderw=2"
|
|
# )
|
|
|
|
cmd = [
|
|
"ffmpeg", "-ss", str(timestamp), "-i", self.VIDEO_PATH,
|
|
"-frames:v", "1", "-q:v", "2",
|
|
"-vf",
|
|
f"scale={self.WIDTH}:{self.HEIGHT}",
|
|
output_file, "-y"
|
|
]
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
|
|
# self.console.debug("result : ",result.stdout)
|
|
# print(" ".join(cmd))
|
|
# print(result)
|
|
# Draw timestamp with Pillow
|
|
timestamp_str = str(timedelta(seconds=timestamp)).split(".")[0]
|
|
img = Image.open(output_file)
|
|
draw = ImageDraw.Draw(img)
|
|
font = ImageFont.truetype(self.FONT_PATH, 32)
|
|
draw.text((10, 10), timestamp_str, font=font, fill="white", stroke_width=2, stroke_fill="black")
|
|
img.save(output_file)
|
|
|
|
timestamps.append(timestamp)
|
|
|
|
return timestamps
|
|
|
|
|
|
def stitch_images(self, metadata_text, timestamps):
|
|
images = []
|
|
for i in range(self.TOTAL_FRAMES):
|
|
img_path = os.path.join(self.OUTPUT_DIR, f"shot_{i:02d}.jpg")
|
|
img = cv2.imread(img_path)
|
|
if img is not None:
|
|
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
|
images.append(img_rgb)
|
|
|
|
if len(images) != self.TOTAL_FRAMES:
|
|
self.console.error("Not enough images. Exiting.")
|
|
return
|
|
|
|
rows = [np.hstack(images[i*self.GRID_COLS:(i+1)*self.GRID_COLS]) for i in range(self.GRID_ROWS)]
|
|
sheet = np.vstack(rows)
|
|
|
|
sheet_img = Image.fromarray(sheet)
|
|
banner = Image.new("RGB", (sheet_img.width, self.HEADER_HEIGHT), color=(40, 40, 40))
|
|
draw = ImageDraw.Draw(banner)
|
|
font = ImageFont.truetype(self.FONT_PATH, 20)
|
|
draw.text((10, 10), metadata_text, font=font, fill="white")
|
|
|
|
final_image = Image.new("RGB", (sheet_img.width, sheet_img.height + self.HEADER_HEIGHT))
|
|
final_image.paste(banner, (0, 0))
|
|
final_image.paste(sheet_img, (0, self.HEADER_HEIGHT))
|
|
final_image.save(self.OUTPUT_IMAGE, quality=95, subsampling=0)
|
|
|
|
self.console.log(f"📷 Saved to {self.OUTPUT_IMAGE}")
|
|
self.cleanup_screenshots()
|
|
|
|
def cleanup_screenshots(self):
|
|
if os.path.exists(self.OUTPUT_DIR):
|
|
shutil.rmtree(self.OUTPUT_DIR)
|
|
self.console.log("✅ All extracted screenshots deleted.")
|
|
|
|
async def run(self, VIDEO_PATH, is_movie=False):
|
|
self.VIDEO_PATH = VIDEO_PATH
|
|
base = os.path.dirname(self.VIDEO_PATH)
|
|
parent = os.path.dirname(base)
|
|
filename = os.path.basename(self.VIDEO_PATH).replace("#", "") + "_screenshot.jpg"
|
|
self.OUTPUT_IMAGE = os.path.join(base if is_movie else parent, filename)
|
|
|
|
duration, vcodec, acodec, audio_channels, resolution, size_mb, audio_lang, subtitle_lang = self.get_metadata()
|
|
metadata_text = (
|
|
f"{os.path.basename(self.VIDEO_PATH)}\n"
|
|
f"{vcodec} | {acodec} {audio_channels} \n"
|
|
f"{resolution} | {size_mb:.2f} MB | {duration/60:.2f} min | Audio: {audio_lang} | Subtitles: {subtitle_lang}"
|
|
)
|
|
self.console.log(f"🎬 Metadata: {metadata_text}")
|
|
|
|
timestamps = await self.extract_screenshots(duration)
|
|
self.stitch_images(metadata_text, timestamps)
|
|
return self.OUTPUT_IMAGE
|
|
|
|
async def upload_to_imgbb(self, image_path):
|
|
|
|
timestamp = str(int(time.time() * 1000))
|
|
retry = 0
|
|
|
|
while retry < 5:
|
|
form = aiohttp.FormData()
|
|
form.add_field('source', open(image_path, 'rb'), filename=os.path.basename(image_path), content_type='image/jpeg')
|
|
form.add_field('type', 'file')
|
|
form.add_field('action', 'upload')
|
|
form.add_field('timestamp', timestamp)
|
|
form.add_field('auth_token', self.imgbb_token)
|
|
form.add_field('nsfw', '0')
|
|
form.add_field('mimetype', 'image/jpeg')
|
|
|
|
async with self.session.post(urljoin(self.base_url, '/json'), data=form) as response:
|
|
await response.text() # drain the response
|
|
if 200 <= response.status < 300:
|
|
self.console.log("✅ Upload successful")
|
|
await asyncio.sleep(5)
|
|
data = await response.json()
|
|
os.remove(image_path)
|
|
return data['image']['url']
|
|
else:
|
|
self.console.warn(f"❌ Upload failed ({response.status})")
|
|
retry += 1
|
|
await asyncio.sleep(10)
|
|
|
|
self.console.error("❌ Max retries reached")
|
|
return None
|
|
|
|
async def login(self):
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession(headers={
|
|
'accept': 'application/json',
|
|
'accept-language': 'en-US,en;q=0.9,th;q=0.8',
|
|
'cache-control': 'no-cache',
|
|
'origin': 'https://imgbb.ws',
|
|
'pragma': 'no-cache',
|
|
'referer': 'https://imgbb.ws/',
|
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0',
|
|
})
|
|
|
|
async with self.session.get(self.base_url) as response:
|
|
html = await response.text()
|
|
if response.status != 200:
|
|
self.console.error(f"❌ Failed to connect to {self.base_url}: {response.status}")
|
|
exit(1)
|
|
|
|
match = re.search(r'auth_token\s*=\s*"([a-f0-9]{40})"', html)
|
|
if match:
|
|
self.imgbb_token = match.group(1)
|
|
self.console.log("Auth token:", self.imgbb_token)
|
|
else:
|
|
self.console.error("Auth token not found.")
|
|
|
|
creds = dotenv_values(".env")
|
|
data = {
|
|
'login-subject': creds.get("imgbb_id"),
|
|
'password': creds.get("imgbb_password"),
|
|
'auth_token': self.imgbb_token,
|
|
}
|
|
async with self.session.post(urljoin(self.base_url, '/login'), data=data) as response:
|
|
self.console.log(f"Login status: {response.status}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from lib.torrent_creator import TorrentCreator,ffprobe_streams_to_bbcode
|
|
|
|
torrent = TorrentCreator()
|
|
# VIDEO_PATH='/Entertainment_1/Anime/Series/365 Days to the Wedding (2024) [tvdbid-433584]/S01/365.Days.to.the.Wedding.2024.S01E01.Why.Dont.We.Get.Married.CR.WEBDL-1080P.X264.AAC.[SeFree].mkv'
|
|
VIDEO_PATH='/Entertainment_1/Anime/Movie/KPop Demon Hunters (2025) [tmdbid-803796]/KPop.Demon.Hunters.2025.NF.WEBDL-1080P.AV1.EAC3.ATMOS.[SeFree].mkv'
|
|
|
|
async def main():
|
|
duration, video_codec, audio_codec, audio_channels, resolution, size_mb, audio_lang, subtitle_lang, json_metadata = torrent.get_metadata(VIDEO_PATH)
|
|
with open("output.json","w") as f:
|
|
json.dump(json_metadata,f,indent=4,ensure_ascii=True)
|
|
|
|
bb=ffprobe_streams_to_bbcode(json_metadata, os.path.basename(VIDEO_PATH))
|
|
with open("output_bb.txt","w") as f:
|
|
# json.dump(bb,f,indent=4,ensure_ascii=True)
|
|
f.write(bb)
|
|
asyncio.run(main())
|