Files
Unshackle-Service-SeFree/iQ/__init__.py
2026-03-30 10:46:39 +07:00

1124 lines
43 KiB
Python

from __future__ import annotations
import atexit
import click
import gzip
import hashlib
import json
import re
import subprocess
import sys
import time
import urllib.parse
from bs4 import BeautifulSoup
from click import Context
from Cryptodome.Hash import MD5
from langcodes import Language
from pathlib import Path
from requests import Request
from typing import Any, Optional, Union, List, Tuple
from urllib import parse
from urllib.request import url2pathname
from requests.adapters import BaseAdapter
from requests.models import Response
from unshackle.core.binaries import FFMPEG
from unshackle.core.cdm.monalisa import MonaLisaCDM
from unshackle.core.config import config
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.downloaders import requests
from unshackle.core.drm import MonaLisa
from unshackle.core.service import Service
from unshackle.core.titles import Title_T, Titles_T, Episode, Movie, Movies, Series
from unshackle.core.tracks import Chapters, Tracks, Video, Audio, Subtitle, Chapter
from unshackle.core.utilities import get_ip_info
class LocalFileAdapter(BaseAdapter):
"""Adapter to handle file:// URLs in requests."""
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
# Parse the file:// URL to extract the path component
parsed = urllib.parse.urlparse(request.url)
# url2pathname handles platform-specific path conversion
# Windows: file:///C:/path -> C:\path
# Linux: file:///path -> /path
path = url2pathname(parsed.path)
resp = Response()
resp.status_code = 200
resp.url = request.url
try:
file_path = Path(path)
file_size = file_path.stat().st_size
resp.raw = open(file_path, "rb")
resp.headers["Content-Length"] = str(file_size)
except Exception as e:
resp.status_code = 404
resp._content = str(e).encode()
return resp
def close(self):
pass
class iQ(Service):
"""
Service code for iQIYI Streaming Service (https://www.iq.com).
Author: Service made by CodeName393 with Special Thanks to narakama and DRM module made by Hugov and Improvement by sp4rk.y\n
Authorization: Cookies\n
Security: UHD@ML, FHD@ML
"""
ALIASES = ("iQ", "iQIYI", "IQ", "IQIYI")
TITLE_RE = (
r"^(?:https?://(?:www\.)?iq\.com/(?:play|album)/)?(?P<id>[^/?]+)(?:\?.*)?$",
)
# Path to MonaLisa CDM device file (relative to this service)
CDM_PATH = Path(__file__).parent / "CDM" / "monalisa.mld"
ORIG_LANG_MAP = {
"Mandarin": "zh-Hans",
"Cantonese": "zh-Hant",
"English": "en",
"Korean": "ko",
"Japanese": "ja",
"Thai": "th",
"Vietnamese": "vi",
"Indonesian": "id",
"Malay": "ms",
"Spanish": "es-419",
"Portuguese": "pt-BR",
"Arabic": "ar",
"French": "fr",
"German": "de"
}
LANG_MAP = {
1: "zh-Hans", # 표준 중국어(중국어 간체)
2: "zh-Hant", # 광동어(중국어 번체)
3: "en", # 영어
5: "ko", # 한국어
143: "pt-BR", # 포르투갈어
157: "th", # 태국어
161: "vi" # 베트남어
}
SUB_LANG_MAP = {
1: "zh-Hans", # 표준 중국어(중국어 간체)
2: "zh-Hant", # 광동어(중국어 번체)
3: "en", # 영어
4: "ko", # 한국어
5: "ja", # 일본어
6: "fr", # 불어
18: "th", # 태국어
21: "ms", # 말레이어
23: "vi", # 베트남어
24: "id", # 인도네시아어
26: "es-419", # 스페인어
27: "pt-BR", # 포르투갈어
28: "ar", # 아랍어
30: "de" # 독일어
}
BID_QUALITY = {
4320: ["1020"],
2160: ["2048", "860", "800"],
1080: ["650", "600"],
720: ["500"],
480: ["300"],
360: ["200"]
}
@staticmethod
@click.command(name="iQIYI", short_help="https://www.iq.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: Context, **kwargs: Any) -> iQ:
return iQ(ctx, **kwargs)
def __init__(self, ctx: Context, title: str):
self.title = title
super().__init__(ctx)
self.title_id = self.title
for pattern in self.TITLE_RE:
match = re.match(pattern, self.title)
if match:
self.title_id = match.group("id")
break
self.session.mount("file://", LocalFileAdapter())
self._temp_files: list[Path] = []
atexit.register(self._cleanup_temp_files)
self.decrypt_tool_path = MonaLisaCDM.get_worker_path()
self.vcodec = ctx.parent.params.get("vcodec")
self.acodec : Audio.Codec = ctx.parent.params.get("acodec")
self.range = ctx.parent.params.get("range_") or [Video.Range.SDR]
self.quality: List[int] = ctx.parent.params.get("quality") or [1080]
self.wanted = ctx.parent.params.get("wanted")
self.audio_only = ctx.parent.params.get("audio_only")
self.subs_only = ctx.parent.params.get("subs_only")
self.chapters_only = ctx.parent.params.get("chapters_only")
self.list_ = ctx.parent.params.get("list_")
self.active_session = {}
self.playback_data = {
"has_external_audio": False,
"svp": None
}
self.log.info("Preparing...")
if self.quality > [2160]:
self.log.info(" + 8K video maybe banned from account.")
if not self.vcodec:
if self.quality > [1080]:
self.vcodec = Video.Codec.HEVC
self.log.info(f" + Switched video codec to H265.")
else:
self.vcodec = Video.Codec.AVC
self.log.info(f" + Switched video codec to H264.")
self.session.headers.update({
"User-Agent": self.config["device"]["user_agent"],
"Referer": "https://www.iq.com/",
"Origin": "https://www.iq.com"
})
ip_info = get_ip_info(self.session)
country_key = None
possible_keys = ["countryCode", "country", "country_code", "country-code"]
for key in possible_keys:
if key in ip_info:
country_key = key
break
if country_key:
region = str(ip_info[country_key]).upper()
self.log.info(f" + IP Region: {region}")
else:
self.log.warning(f" - The region could not be determined from IP information: {ip_info}")
region = "US"
self.log.info(f" + IP Region: {region} (By Default)")
def authenticate(self, cookies: Optional[Any] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
self.cookies = cookies
if not self.cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
self.log.info("Logging into iQIYI...")
self._login()
def _login(self) -> None:
cookie_dict = {c.name: c.value for c in self.cookies} if self.cookies else {}
self.active_session["uid"] = cookie_dict.get("pspStatusUid", "0") # User ID
self.active_session["qc005"] = cookie_dict.get("QC005", "")
self.active_session["pck"] = cookie_dict.get("I00001", "") # Passport Cookie
self.active_session["dfp"] = cookie_dict.get("__dfp", "").split("@")[0] # Device Fingerprint
self.active_session["type_code"] = cookie_dict.get("QCVtype", "")
self.active_session["mode_code"] = cookie_dict.get("mod", "")
self.active_session["lang_code"] = cookie_dict.get("lang", "en_us")
if not self.active_session["qc005"]:
self.log.warning(" - QC005 not found. Playback might fail.")
self.active_session["qc005"] = "0"
if not self.active_session["pck"]:
self.log.debug(" + Fetching PCK token...")
self.active_session["pck"] = self._fetch_pck()
if not self.active_session["type_code"]:
self.log.warning(" - Type code not found. Playback might fail.")
self.active_session["qc005"] = "100013"
if not self.active_session["mode_code"]:
self.active_session["mode_code"] = self._get_mode_code()
self.active_session["ptid"] = self._get_ptid()
data = self._get_vip_info()
is_vip = False
if data["code"] == "0":
if "userinfo" not in data["data"]:
self.log.error(f" - {data['data']['msg']}")
sys.exit(1)
elif "vip_list" in data["data"]:
is_vip = True
else:
self.log.error(" - Could not resolve cookie session.")
sys.exit(1)
if not is_vip:
self.log.warning(" - Account is not subscribed to iQiyi. Playback might fail.", exc_info=False)
self.log.info(f" + Account ID: {self.active_session['uid']}")
self.log.info(f" + Type Code: {self.active_session['type_code']}")
self.log.info(f" + Mode Code: {self.active_session['mode_code']}")
self.log.info(f" + Subscribed: {is_vip}")
self.log.debug(f" + Platform Type ID: {self.active_session['ptid']}")
def get_titles(self) -> Titles_T:
video_info, lang_info = self._get_album_info(self.title_id, self.active_session["lang_code"])
tvid = video_info.get("tvId") or video_info.get("defaultTvId")
albumid = video_info["albumId"]
qipuid = video_info["qipuId"]
name = video_info["name"]
year = video_info.get("year") or video_info.get("publishTime", "")[:4]
video_info_en, _ = self._get_album_info(self.title_id, "en_us")
orig_lang = video_info_en.get("categoryTagMap", {}).get("Language", [{}])[0].get("name", "Mandarin") # album orig Lang
if not tvid and (albumid == qipuid): # Movie
content_type = "movie"
else:
content_type = "series"
self.log.debug(f" + Content Type: {content_type.upper()}")
if content_type == "movie":
return Movies([
Movie(
id_=hashlib.md5(str(qipuid).encode()).hexdigest()[0:6],
service=self.__class__,
name=name,
year=year,
data={"tvid": qipuid, "orig_lang": orig_lang}
)
])
elif content_type == "series":
return Series(self._get_series(albumid, name, year, orig_lang, lang_info["episode"]))
def _get_series(self, albumid: str, name: str, year: str, lang: str, episode_lang: str) -> Series:
episodes: List[Episode] = []
raw_episodes = []
block_size = 50
first_batch = self._get_episode_list(albumid, 1, block_size)
total_count = first_batch["data"]["total"]
if scripts := first_batch["data"]["epg"]:
raw_episodes.extend(scripts)
if total_count > block_size:
for start_order in range(block_size + 1, total_count + 1, block_size):
end_order = min(start_order + block_size - 1, total_count)
batch_data = self._get_episode_list(albumid, start_order, end_order)
if scripts := batch_data["data"]["epg"]:
raw_episodes.extend(scripts)
for ep in raw_episodes:
episodes.append(
Episode(
id_=hashlib.md5(str(ep["qipuId"]).encode()).hexdigest()[0:6],
service=self.__class__,
title=name,
season=1 if ep["contentType"] == 1 else 0,
number=ep["order"],
name=episode_lang.format(ep["order"]) if ep["contentType"] == 1 else ep["extraName"],
year=year,
data={"tvid": ep["qipuId"], "orig_lang": lang}
)
)
return episodes
def get_tracks(self, title: Title_T) -> Tracks:
tvid = title.data["tvid"]
self.log.debug(f" + TVID: {tvid}")
videos, audios, subs = self._collect_media_sources(tvid)
if not videos:
self.log.error("No playable streams found.")
sys.exit(1)
# Set Original Lang
orig_lang_name = title.data.get("orig_lang")
orig_lang_code = self.ORIG_LANG_MAP.get(orig_lang_name)
title.language = Language.get(orig_lang_code if orig_lang_code else "zh-Hans")
self.log.debug(f" + Original Language : {title.language}")
unique_video_lids = set(v.get("lid") for v in videos if "lid" in v)
unique_audio_lids = set(a.get("lid") for a in audios if "lid" in a)
force_video_lang = orig_lang_code if (len(unique_video_lids) <= 1 and orig_lang_code) else None
force_audio_lang = orig_lang_code if (len(unique_audio_lids) <= 1 and orig_lang_code) else None
tracks = Tracks()
for video in videos:
try:
track = self._parse_video_track(video, title, force_video_lang)
if track:
tracks.add(track, warn_only=True)
except Exception as e:
self.log.info(f"Failed to parse video track: {e}")
for aud_data in audios:
try:
track = self._parse_audio_track(aud_data, title, force_audio_lang)
if track:
tracks.add(track, warn_only=True)
self.playback_data["has_external_audio"] = True
except Exception as e:
self.log.info(f"Failed to parse audio track: {e}")
for sub_data in subs:
try:
track = self._parse_subtitle_track(sub_data, title)
if track:
tracks.add(track, warn_only=True)
except Exception as e:
self.log.info(f"Failed to parse subtitle track: {e}")
return tracks
def _collect_media_sources(self, tvid: str) -> Tuple[List[dict], List[dict], List[dict]]:
all_videos = []
all_audios = []
all_subs = []
if self.vcodec == Video.Codec.HEVC:
target_codec_keys = {"h265", "h265_edr"}
else:
target_codec_keys = {"h264"}
if Video.Range.HYBRID in self.range[0]:
target_codec_keys.update(["dv_edr", "hdr_edr", "dv", "hdr"])
elif Video.Range.DV in self.range[0]:
target_codec_keys.update(["dv_edr", "dv"])
elif Video.Range.HDR10 in self.range[0] or Video.Range.HDR10P in self.range[0]:
target_codec_keys.update(["hdr_edr", "hdr"])
# Request video
target_bids = []
available_qualities = sorted(self.BID_QUALITY.keys())
for q in self.quality:
if q in self.BID_QUALITY:
target_bids.extend(self.BID_QUALITY[q])
else:
closest_q = min(available_qualities, key=lambda x: abs(x - q))
target_bids.extend(self.BID_QUALITY[closest_q])
if q > 2160:
target_codec_keys.add("8k")
for c_key in target_codec_keys:
for bid in target_bids:
try:
v_list, a_list, s_list = self._get_media_data(tvid, c_key, bid)
if v_list:
all_videos.extend(v_list)
if a_list:
all_audios.extend(a_list)
if s_list:
all_subs.extend(s_list)
except Exception as e:
continue
# Request audio
found_lids = set()
for aud in all_audios:
if "lid" in aud:
found_lids.add(str(aud["lid"]))
additional_audio_types = ["dolby", "aac"]
for lid in found_lids:
for audio_type in additional_audio_types:
try:
_, a_list, _ = self._get_media_data(tvid, "h265_edr", "800", audio_type, lid) # edr include dolby surround audio
if a_list:
all_audios.extend(a_list)
except Exception as e:
continue
return all_videos, all_audios, all_subs
def _parse_video_track(self, video: dict, title: Title_T, forced_lang: str = None) -> Optional[Video]:
m3u8_url = video.get("url") or video.get("fs") or video.get("m3u8")
if not m3u8_url:
return None
scrsz = video.get("scrsz", "0x0")
width, height = map(int, scrsz.split("x")) if "x" in scrsz else (0, 0)
duration = video.get("duration", 1)
vsize = video.get("vsize", 0)
bitrate = int((vsize / duration * 8) / 1000 * 1024) if duration else 0
fps = video.get("fr", 0)
codec_code = video.get("code")
actual_codec = Video.Codec.AVC if codec_code == 2 else Video.Codec.HEVC
dr = video.get("dr") # Dynamic Range
if dr == 1:
range_type = Video.Range.DV
elif dr == 2:
range_type = Video.Range.HDR10 # HDR Vivid
else:
range_type = Video.Range.SDR
if forced_lang:
lang_code = forced_lang
else:
lang_code = self.LANG_MAP.get(video.get("lid"), "und")
if video.get("duration"):
title.data["duration"] = video.get("duration")
video_info = f"{video.get('bid')}_{lang_code}_{height}_{fps}_{codec_code}_{dr}_{bitrate}"
video_id = f"Video_{hashlib.md5(video_info.encode()).hexdigest()[0:6]}"
# If m3u8 is raw content (not HTTP URL), save to local file
from_file = None
track_url = m3u8_url
if not m3u8_url.strip().startswith("http"):
from_file = self._save_temp_m3u8(m3u8_url, video_id)
if not from_file:
return None
track_url = from_file.as_uri() # n_m3u8dl_re can read file paths
track = Video(
id_=video_id,
url=track_url,
codec=actual_codec,
bitrate=bitrate,
fps=fps,
width=width,
height=height,
language=Language.get(lang_code),
descriptor=Video.Descriptor.HLS,
range_=range_type,
from_file=from_file,
)
if drm := video.get("drm"):
self._handle_monalisa_drm(track, drm.get("ticket")) # DRM PSSH
track.downloader=requests # DRM requires segment decryption
return track
def _parse_audio_track(self, audio: dict, title: Title_T, forced_lang: str = None) -> Optional[Audio]:
m3u8_url = audio.get("m3u8Url") or audio.get("url") or audio.get("mpdUrl")
fs = audio.get("fs")
if not m3u8_url and not fs:
return None
if forced_lang:
lang_code = forced_lang
lang_name = title.data["orig_lang"]
else:
lang_code = self.LANG_MAP.get(audio.get("lid"), "und")
lang_name = audio.get("name", "Unknown")
cf = audio.get("cf", "aac") # Codec Format
ct = audio.get("ct", 1) # Codec Type
if cf == "dolby":
if ct == 1:
acodec, channels, bitrate = Audio.Codec.EC3, 2.0, 96_000
elif ct == 2:
acodec, channels, bitrate = Audio.Codec.EC3, 5.1, 256_000
elif ct == 4:
acodec, channels, bitrate = Audio.Codec.EC3, 16, 448_000
else:
self.log.debug(f" - Unknown audio codec type.{cf} - {ct}")
acodec, channels, bitrate = Audio.Codec.EC3, 2.0, 0
elif cf == "aac":
if ct == 1:
acodec, channels, bitrate = Audio.Codec.AAC, 2.0, 128_000
elif ct == 6:
acodec, channels, bitrate = Audio.Codec.AAC, 2.0, 192_000
else:
self.log.debug(f" - Unknown audio codec type.{cf} - {ct}")
acodec, channels, bitrate = Audio.Codec.AAC, 2.0, 0
else:
self.log.debug(f" - Unknown audio codec.{cf} - {ct}")
acodec, channels, bitrate = None, 2.0, 0
display_name = lang_name
is_original = Language.get(lang_code) == title.language
if is_original:
display_name += " [Original]"
audio_info = f"{audio.get('bid')}_{lang_code}_{cf}_{ct}"
audio_id = f"Audio_{hashlib.md5(audio_info.encode()).hexdigest()[0:6]}"
# Determine m3u8 source: stitch segments or use provided URL
from_file = None
track_url = m3u8_url
if not m3u8_url and fs: # Stitch from flag segments
from_file = self._stitch_audio_segments(audio, audio_id)
if not from_file:
return None
track_url = from_file.as_uri()
elif m3u8_url:
if not isinstance(m3u8_url, str):
return None
if not m3u8_url.strip().startswith("http"):
from_file = self._save_temp_m3u8(m3u8_url, audio_id)
if not from_file:
return None
track_url = from_file.as_uri()
else:
return None
track = Audio(
id_=audio_id,
url=track_url,
codec=acodec,
bitrate=bitrate,
channels=channels,
language=Language.get(lang_code),
is_original_lang=is_original,
descriptor=Audio.Descriptor.HLS,
name=display_name,
downloader=requests, # Audio needs segment processing due to gzip processing
from_file=from_file,
)
# Atmos(16ch) -> 5.1 JOC 16
if track.channels == 16:
track.channels = 5.1
track.joc = 16
if drm := audio.get("drm"):
self._handle_monalisa_drm(track, drm.get("ticket"))
return track
def _stitch_audio_segments(self, audio: dict, audio_id: str) -> Optional[Path]:
try:
segments = audio["fs"]
m3u8_lines = [
"#EXTM3U",
"#EXT-X-VERSION:3",
"#EXT-X-TARGETDURATION:6"
]
resolved_count = 0
for seg in segments:
api_path = seg.get("l")
if not api_path:
continue
json_data = self._get_audio_segment_info(api_path)
real_media_url = json_data.get("l") if isinstance(json_data, dict) else None
if real_media_url:
m3u8_lines.append("#EXTINF:10.0,")
m3u8_lines.append(real_media_url)
resolved_count += 1
if resolved_count > 0:
m3u8_lines.append("#EXT-X-ENDLIST")
raw_m3u8 = "\n".join(m3u8_lines)
return self._save_temp_m3u8(raw_m3u8, audio_id)
except Exception as e:
self.log.error(f"Failed to stitch audio segments: {e}")
return None
def _parse_subtitle_track(self, subtitle: dict, title: Title_T) -> Optional[Subtitle]:
path = subtitle.get("webvtt")
codec = Subtitle.Codec.WebVTT
if not path:
path = subtitle.get("xml")
codec = Subtitle.Codec.TimedTextMarkupLang
if not path:
path = subtitle.get("srt")
codec = Subtitle.Codec.SubRip
if not path:
return None
lang_code = self.SUB_LANG_MAP.get(subtitle.get("lid"), "und") # Lang ID
lang_obj = Language.get(lang_code)
is_original = lang_obj == title.language
base_name = subtitle.get("_name", "Unknown")
name_parts = [base_name]
is_ai = int(subtitle.get("ss", 0)) == 1
if is_ai:
name_parts.append("(AI)")
if is_original:
name_parts.append("[Original]")
return Subtitle(
id_=lang_code,
url=self.config["endpoint"]["subtitle"].format(path=path),
codec=codec,
language=lang_obj,
is_original_lang=is_original,
name=" ".join(name_parts)
)
def get_chapters(self, title: Title_T) -> Chapters:
# self.log.info(title.data["duration"])
# self.log.info(self.playback_data.get("svp"))
svp_data = self.playback_data["svp"]
if not svp_data:
return Chapters()
valid_segments = []
for item in svp_data:
for segment in item.get("vl", []):
start = float(segment.get("sp", 0))
end = float(segment.get("ep", 0))
if start <= 0 and end <= 0:
continue
valid_segments.append((start, end))
duration = float(title.data["duration"])
pre_chapter = []
if valid_segments and duration > 0:
min_start = min(s for s, e in valid_segments)
max_end = max(e for s, e in valid_segments)
if min_start > 0:
pre_chapter.append(("Intro", 0.0))
pre_chapter.append(("Scene", min_start))
else:
pre_chapter.append(("Scene", 0.0))
if max_end < (duration - 1.0):
pre_chapter.append(("Credits", max_end))
else:
pre_chapter.append(("Scene", 0.0))
if valid_segments:
for start, end in valid_segments:
if start < 600:
pre_chapter.append(("Intro", 0.0))
pre_chapter.append(("Scene", start))
break
pre_chapter.sort(key=lambda x: x[1])
unique_chapters_data = []
if pre_chapter:
curr_time = -1.0
last_name = None
for name, timestamp in pre_chapter:
if abs(timestamp - curr_time) <= 1.0:
continue
if name == last_name:
continue
unique_chapters_data.append((name, timestamp))
curr_time = timestamp
last_name = name
if not unique_chapters_data:
unique_chapters_data.insert(0, ("Scene", 0.0))
elif unique_chapters_data[0][1] > 1.0:
if unique_chapters_data[0][0] == "Scene":
unique_chapters_data.insert(0, ("Intro", 0.0))
else:
unique_chapters_data.insert(0, ("Scene", 0.0))
chapters = Chapters()
for name, timestamp in unique_chapters_data:
c_name = name if name != "Scene" else None
chapters.add(
Chapter(
timestamp=timestamp,
name=c_name
)
)
return chapters
def get_widevine_service_certificate(self, **kwargs) -> Union[bytes, str]:
return None
def get_widevine_license(self, **kwargs) -> Optional[Union[bytes, str]]:
return None
def get_playready_license(self, **kwargs) -> Optional[bytes]:
return None
def _handle_monalisa_drm(self, track: AnyTrack, ticket: str) -> None:
if not ticket:
return
if not self.decrypt_tool_path or not self.decrypt_tool_path.exists():
self.log.error("ML-Worker not found. Place it in unshackle/binaries/")
sys.exit(1)
if not self.CDM_PATH.exists():
self.log.error(f"MonaLisa CDM not found at: {self.CDM_PATH}")
sys.exit(1)
try:
drm = MonaLisa(
ticket=ticket,
aes_key=self.config["key"]["ml"],
device_path=self.CDM_PATH,
)
# Store DRM on track like Widevine/PlayReady
track.drm = [drm]
except Exception as e:
self.log.error(f"MonaLisa Key challenge failed: {e}")
def on_segment_downloaded(self, track: AnyTrack, segment: Path) -> None:
# Video Segment - decrypt MonaLisa DRM
if isinstance(track, Video):
if hasattr(track, "drm") and track.drm:
for drm in track.drm:
if isinstance(drm, MonaLisa):
try:
drm.decrypt_segment(segment)
except MonaLisa.Exceptions.WorkerNotFound:
self.log.error("ML-Worker not found. Place it in unshackle/binaries/")
except MonaLisa.Exceptions.DecryptionFailed as e:
self.log.error(str(e))
except Exception as e:
self.log.error(f"Failed to decrypt segment {segment.name}: {e}")
break
# Audio Segment - decompress gzip if needed
if isinstance(track, Audio):
try:
if segment.exists():
with open(segment, "rb") as f:
data = f.read()
if data.startswith(b"\x1f\x8b"):
decompressed_data = gzip.decompress(data)
with open(segment, "wb") as f_out:
f_out.write(decompressed_data)
except Exception as e:
self.log.warning(f"Failed to decompress gzip segment {segment.name}: {e}")
def on_track_downloaded(self, track: AnyTrack) -> None:
# Use FFmpeg to remux Video MPEG-TS to MKV
if isinstance(track, Video):
if track.path.suffix.lower() == ".mkv":
return
mkv_path = track.path.with_suffix(".mkv")
cmd = [str(FFMPEG), "-y", "-i", str(track.path)]
if self.playback_data["has_external_audio"]:
# External audio exists -> Remove audio from TS, keep video only
cmd.extend(["-c:v", "copy", "-an"])
else:
# No external audio -> Keep everything (Audio + Video) from TS
cmd.extend(["-c", "copy"])
cmd.append(str(mkv_path))
try:
process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8")
if process.returncode == 0 and mkv_path.exists():
if track.path.exists():
track.path.unlink()
track.path = mkv_path
else:
self.log.warning(f"FFmpeg failed: {process.stderr or 'unknown error'}")
if mkv_path.exists():
mkv_path.unlink()
except FileNotFoundError:
self.log.warning("FFmpeg not found, skipping remux")
except Exception as e:
self.log.error(f"Error processing video track: {e}")
if mkv_path.exists():
try:
mkv_path.unlink()
except Exception:
pass
def _save_temp_m3u8(self, content: str, prefix: str) -> Optional[Path]:
"""Save m3u8 content to a temp file and return as file:// URL."""
temp_dir = config.directories.temp
temp_dir.mkdir(parents=True, exist_ok=True)
safe_prefix = re.sub(r'[\\/*?:"<>|]', "", str(prefix))
temp_path = temp_dir / f"{safe_prefix}_{int(time.time())}.m3u8"
try:
temp_path = temp_path.resolve()
if self.list_ or self.chapters_only:
return temp_path
filtered_lines = [
line for line in content.splitlines()
if "#EM" not in line and line.strip() != "#EXT-X-DISCONTINUITY"
]
content_to_save = "\n".join(filtered_lines)
temp_path.write_text(content_to_save, encoding="utf-8")
self._temp_files.append(temp_path)
# Return as file:// URL for compatibility with downloaders
return temp_path
except Exception as e:
self.log.warning(f"Failed to save temp m3u8: {e}")
return None
def _cleanup_temp_files(self) -> None:
"""Clean up all tracked temp files."""
for path in self._temp_files:
try:
if path.exists():
path.unlink()
except Exception:
pass
def _get_album_info(self, title_id: str, lang_code: str) -> Tuple[dict, dict]:
endpoint = self.config["endpoint"]["album"].format(id=title_id, lang_code=lang_code)
cookies_dict = {c.name: c.value for c in self.cookies}
if "lang" not in cookies_dict or cookies_dict["lang"] != lang_code:
cookies_dict["lang"] = lang_code
cookie_header = "; ".join(f"{k}={v}" for k, v in cookies_dict.items())
if "lang" not in cookies_dict:
cookie_header += "; lang=en_us"
headers = {
"User-Agent": self.config["device"]["user_agent_bws"],
"Cookie": cookie_header
}
try:
res = self.session.get(endpoint, headers=headers)
res.raise_for_status()
soup = BeautifulSoup(res.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__", type="application/json")
if not script_tag:
self.log.error(" - Failed to parse page data.")
sys.exit(1)
data = json.loads(script_tag.string)
initial_state = data["props"]["initialState"]
play_info = initial_state.get("play", {}).get("videoInfo", {})
album_info = initial_state.get("album", {}).get("videoAlbumInfo", {})
language_info = initial_state.get("language", {}).get("langPkg", {})
if play_info and (play_info.get("qipuId")) and language_info:
return play_info, language_info
elif album_info and language_info:
return album_info, language_info
else:
raise ValueError("No content information found.")
except Exception as e:
self.log.error(f"Content info not found: {e}", exc_info=False)
sys.exit(1)
def _get_episode_list(self, episode_list_id: str, start_order: int, end_order: int) -> dict:
headers = {
"User-Agent": self.config["device"]["user_agent_bws"],
"Cookie": "; ".join([f"{cookie.name}={cookie.value}" for cookie in self.cookies])
}
params = {
"platformId": "3",
"modeCode": self.active_session["mode_code"],
"langCode": self.active_session["lang_code"],
"deviceId": self.active_session["qc005"],
"startOrder": str(start_order),
"endOrder": str(end_order),
"isVip": "true"
}
try:
url = self.config["endpoint"]["episode"].format(list_id=episode_list_id)
data = self._request("GET", url, params=params, headers=headers)
return data
except Exception:
return {}
def _get_dash_stream(self, data: dict) -> dict:
query = parse.urlencode(data)
path = f"/dash?{query}"
vf = MD5.new((path + self.config["key"]["dash"]).encode()).hexdigest()
url = self.config["endpoint"]["stream"].format(path=path, vf=vf)
headers = {
"accept": "*/*",
"qyid": self.active_session["qc005"],
"bop": f'{{"b_ft1":"3","dfp":"{str(self.active_session["dfp"])}","version":"9.0"}}',
"pck": self.active_session["pck"],
"User-Agent": self.config["device"]["user_agent"]
}
data = self._request("GET", url, headers=headers)
return data
def _get_media_data(
self, tvid: str, codec_key: str, bid: str, audio_codec_key: str = "dolby", lid: str = "1"
) -> Tuple[List[dict], List[dict], List[dict]]:
video_config = self.config["quality"]["video"]
audio_config = self.config["quality"]["audio"]
video_params = video_config.get(codec_key, video_config["h264"]).copy()
audio_params = audio_config.get(audio_codec_key, audio_config["dolby"]).copy()
data = {
"tvid": tvid,
"uid": self.active_session["uid"], # User ID
"k_uid": self.active_session["qc005"], # Client ID
"tm": str(int(time.time() * 1000)), # Timestemp
"bid": str(bid), # Resolution
"ut": self.active_session["type_code"], # User Type
"src": self.active_session["ptid"], # Platform Typr ID
"ps": "1", # ?
"d": "0",
"pm": "0",
"fr": "25", # Frame Rate
"pt": "0", # ?
"s": "0",
"rs": "1",
"sr": "1",
"sver": "2",
"k_ver": "7.12.0", # Client Version
"k_tag": "1",
"atype": "0",
"vid": "",
"lid": lid, # Request Lang(Audio)
"dcdv": "3", # DRM Type (3 = Monalisa, 5 = ?, 7 = Widevine, 8 = ?, 9 = ?)
"ccsn": self.config["key"]["ccsn"],
"agent_type": "366",
"su": "2",
"applang": "en_us", # Display Lang
"ds": "0",
"from_type": "1",
"hdcp": "22", # Client Display Content Protection version
"cc_site": self.active_session["mode_code"], # User mode code
"cc_business": "1",
"pano264": "800",
"pano265": "800",
"pre": "0",
"ap": "1",
"qd_v": "1",
"fv": "2",
"rt": "1",
"dcv": "6",
"ori": "puma",
"X-USER-MODE": self.active_session["mode_code"], # User mode code
"ff": "ts" # Segement Type
}
data.update(video_params)
data.update(audio_params)
# If the parameters are different, account ban...
if codec_key == "8k":
data.update({
"ps": "0", # ?
"pt": "28000", # ?
"fr": "60", # Frame Rate(HFR)
})
try:
json_data = self._get_dash_stream(data)
except SystemExit:
return [], [], []
program = json_data.get("data", {}).get("program", {})
self.playback_data["svp"] = json_data.get("data", {}).get("svp", {}) # Chapter
return program.get("video", []), program.get("audio", []), program.get("stl", [])
def _get_audio_segment_info(self, path: str) -> dict:
url = self.config["endpoint"]["audio"].format(path=path)
headers = {
"Accept": "*/*",
"User-Agent": self.config["device"]["user_agent"]
}
try:
res = self.session.get(url, headers=headers)
return res.json() if res.status_code == 200 else {}
except Exception:
return {}
def _get_mode_code(self) -> str:
url = self.config["endpoint"]["mode"]
params = {"format": "json", "scene": "4"}
headers = {"Accept": "*/*"}
data = self._request("GET", url, params=params, headers=headers)
return data.get("data", {}).get("country", "br").lower() if data else "br"
def _fetch_pck(self) -> str:
endpoint = self.config["endpoint"]["pck"]
params = {
"platformId": "3",
"modeCode": self.active_session["mode_code"],
"langCode": self.active_session["lang_code"],
"deviceId": self.active_session["qc005"],
"uid": self.active_session["uid"],
"interfaceCode": "indexnav_layer"
}
headers = {
"User-Agent": self.config["device"]["user_agent_bws"],
"Cookie": "; ".join([f"{cookie.name}={cookie.value}" for cookie in self.cookies])
}
res_data = self._request("GET", endpoint, params=params, headers=headers)
for item in res_data.get("data", []):
url = item.get("apiUrl")
if url:
parsed = urllib.parse.urlparse(url)
qs = urllib.parse.parse_qs(parsed.query)
return qs.get("P00001", [None])[0]
return None
def _get_ptid(self) -> str:
endpoint = self.config["endpoint"]["ptid"]
params = {
"platformId": "3",
"modeCode": self.active_session["mode_code"],
"langCode": self.active_session["lang_code"],
"deviceId": self.active_session["qc005"]
}
headers = {
"User-Agent": self.config["device"]["user_agent_bws"],
"Cookie": "; ".join([f"{cookie.name}={cookie.value}" for cookie in self.cookies])
}
data = self._request("GET", endpoint, params=params, headers=headers)
ptid = data.get("data", {}).get("ptid", "") if data else ""
if ptid.startswith("0101003"):
ptid = ptid.replace("0101003", "0202200", 1)
return ptid
def _get_vip_info(self) -> dict:
endpoint = self.config["endpoint"]["vip"]
params = {
"platformId": "3",
"modeCode": self.active_session["mode_code"],
"langCode": self.active_session["lang_code"],
"deviceId": self.active_session["qc005"],
"fields": "userinfo",
"version": "1.0",
"vipInfoVersion": "5.0",
}
headers = {
"User-Agent": self.config["device"]["user_agent_bws"],
"Cookie": "; ".join([f"{cookie.name}={cookie.value}" for cookie in self.cookies])
}
data = self._request("GET", endpoint, params=params, headers=headers)
return data
def _request(self, method: str, endpoint: str, params: dict = None, headers: dict = None, payload: dict = None) -> Any:
_headers = self.session.headers.copy()
if headers:
_headers.update(headers)
req = Request(method, endpoint, headers=_headers, params=params, json=payload)
prepped = self.session.prepare_request(req)
try:
res = self.session.send(prepped)
res.raise_for_status()
data = res.json() if res.text else {}
return data
except Exception as e:
ignore_keys = ["episode", "stream", "audio"]
if any(self.config["endpoints"][key] in endpoint for key in ignore_keys):
raise e
else:
self.log.error(f"API Request failed: {e}", exc_info=False)
sys.exit(1)