1124 lines
43 KiB
Python
1124 lines
43 KiB
Python
from __future__ import annotations
|
|
|
|
import atexit
|
|
import click
|
|
import gzip
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
|
|
from bs4 import BeautifulSoup
|
|
from click import Context
|
|
from Cryptodome.Hash import MD5
|
|
from langcodes import Language
|
|
from pathlib import Path
|
|
from requests import Request
|
|
from typing import Any, Optional, Union, List, Tuple
|
|
from urllib import parse
|
|
from urllib.request import url2pathname
|
|
from requests.adapters import BaseAdapter
|
|
from requests.models import Response
|
|
|
|
from unshackle.core.binaries import FFMPEG
|
|
from unshackle.core.cdm.monalisa import MonaLisaCDM
|
|
from unshackle.core.config import config
|
|
from unshackle.core.constants import AnyTrack
|
|
from unshackle.core.credential import Credential
|
|
from unshackle.core.downloaders import requests
|
|
from unshackle.core.drm import MonaLisa
|
|
from unshackle.core.service import Service
|
|
from unshackle.core.titles import Title_T, Titles_T, Episode, Movie, Movies, Series
|
|
from unshackle.core.tracks import Chapters, Tracks, Video, Audio, Subtitle, Chapter
|
|
from unshackle.core.utilities import get_ip_info
|
|
|
|
class LocalFileAdapter(BaseAdapter):
|
|
"""Adapter to handle file:// URLs in requests."""
|
|
|
|
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
|
|
# Parse the file:// URL to extract the path component
|
|
parsed = urllib.parse.urlparse(request.url)
|
|
# url2pathname handles platform-specific path conversion
|
|
# Windows: file:///C:/path -> C:\path
|
|
# Linux: file:///path -> /path
|
|
path = url2pathname(parsed.path)
|
|
|
|
resp = Response()
|
|
resp.status_code = 200
|
|
resp.url = request.url
|
|
|
|
try:
|
|
file_path = Path(path)
|
|
file_size = file_path.stat().st_size
|
|
resp.raw = open(file_path, "rb")
|
|
resp.headers["Content-Length"] = str(file_size)
|
|
except Exception as e:
|
|
resp.status_code = 404
|
|
resp._content = str(e).encode()
|
|
|
|
return resp
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
class iQ(Service):
|
|
"""
|
|
Service code for iQIYI Streaming Service (https://www.iq.com).
|
|
|
|
Author: Service made by CodeName393 with Special Thanks to narakama and DRM module made by Hugov and Improvement by sp4rk.y\n
|
|
Authorization: Cookies\n
|
|
Security: UHD@ML, FHD@ML
|
|
"""
|
|
|
|
ALIASES = ("iQ", "iQIYI", "IQ", "IQIYI")
|
|
TITLE_RE = (
|
|
r"^(?:https?://(?:www\.)?iq\.com/(?:play|album)/)?(?P<id>[^/?]+)(?:\?.*)?$",
|
|
)
|
|
|
|
# Path to MonaLisa CDM device file (relative to this service)
|
|
CDM_PATH = Path(__file__).parent / "CDM" / "monalisa.mld"
|
|
|
|
ORIG_LANG_MAP = {
|
|
"Mandarin": "zh-Hans",
|
|
"Cantonese": "zh-Hant",
|
|
"English": "en",
|
|
"Korean": "ko",
|
|
"Japanese": "ja",
|
|
"Thai": "th",
|
|
"Vietnamese": "vi",
|
|
"Indonesian": "id",
|
|
"Malay": "ms",
|
|
"Spanish": "es-419",
|
|
"Portuguese": "pt-BR",
|
|
"Arabic": "ar",
|
|
"French": "fr",
|
|
"German": "de"
|
|
}
|
|
|
|
LANG_MAP = {
|
|
1: "zh-Hans", # 표준 중국어(중국어 간체)
|
|
2: "zh-Hant", # 광동어(중국어 번체)
|
|
3: "en", # 영어
|
|
5: "ko", # 한국어
|
|
143: "pt-BR", # 포르투갈어
|
|
157: "th", # 태국어
|
|
161: "vi" # 베트남어
|
|
}
|
|
|
|
SUB_LANG_MAP = {
|
|
1: "zh-Hans", # 표준 중국어(중국어 간체)
|
|
2: "zh-Hant", # 광동어(중국어 번체)
|
|
3: "en", # 영어
|
|
4: "ko", # 한국어
|
|
5: "ja", # 일본어
|
|
6: "fr", # 불어
|
|
18: "th", # 태국어
|
|
21: "ms", # 말레이어
|
|
23: "vi", # 베트남어
|
|
24: "id", # 인도네시아어
|
|
26: "es-419", # 스페인어
|
|
27: "pt-BR", # 포르투갈어
|
|
28: "ar", # 아랍어
|
|
30: "de" # 독일어
|
|
}
|
|
|
|
BID_QUALITY = {
|
|
4320: ["1020"],
|
|
2160: ["2048", "860", "800"],
|
|
1080: ["650", "600"],
|
|
720: ["500"],
|
|
480: ["300"],
|
|
360: ["200"]
|
|
}
|
|
|
|
@staticmethod
|
|
@click.command(name="iQIYI", short_help="https://www.iq.com", help=__doc__)
|
|
@click.argument("title", type=str)
|
|
@click.pass_context
|
|
def cli(ctx: Context, **kwargs: Any) -> iQ:
|
|
return iQ(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx: Context, title: str):
|
|
self.title = title
|
|
super().__init__(ctx)
|
|
|
|
self.title_id = self.title
|
|
for pattern in self.TITLE_RE:
|
|
match = re.match(pattern, self.title)
|
|
if match:
|
|
self.title_id = match.group("id")
|
|
break
|
|
|
|
self.session.mount("file://", LocalFileAdapter())
|
|
self._temp_files: list[Path] = []
|
|
atexit.register(self._cleanup_temp_files)
|
|
self.decrypt_tool_path = MonaLisaCDM.get_worker_path()
|
|
|
|
self.vcodec = ctx.parent.params.get("vcodec")
|
|
self.acodec : Audio.Codec = ctx.parent.params.get("acodec")
|
|
self.range = ctx.parent.params.get("range_") or [Video.Range.SDR]
|
|
self.quality: List[int] = ctx.parent.params.get("quality") or [1080]
|
|
self.wanted = ctx.parent.params.get("wanted")
|
|
self.audio_only = ctx.parent.params.get("audio_only")
|
|
self.subs_only = ctx.parent.params.get("subs_only")
|
|
self.chapters_only = ctx.parent.params.get("chapters_only")
|
|
self.list_ = ctx.parent.params.get("list_")
|
|
|
|
self.active_session = {}
|
|
self.playback_data = {
|
|
"has_external_audio": False,
|
|
"svp": None
|
|
}
|
|
|
|
self.log.info("Preparing...")
|
|
|
|
if self.quality > [2160]:
|
|
self.log.info(" + 8K video maybe banned from account.")
|
|
if not self.vcodec:
|
|
if self.quality > [1080]:
|
|
self.vcodec = Video.Codec.HEVC
|
|
self.log.info(f" + Switched video codec to H265.")
|
|
else:
|
|
self.vcodec = Video.Codec.AVC
|
|
self.log.info(f" + Switched video codec to H264.")
|
|
|
|
self.session.headers.update({
|
|
"User-Agent": self.config["device"]["user_agent"],
|
|
"Referer": "https://www.iq.com/",
|
|
"Origin": "https://www.iq.com"
|
|
})
|
|
|
|
ip_info = get_ip_info(self.session)
|
|
country_key = None
|
|
possible_keys = ["countryCode", "country", "country_code", "country-code"]
|
|
for key in possible_keys:
|
|
if key in ip_info:
|
|
country_key = key
|
|
break
|
|
if country_key:
|
|
region = str(ip_info[country_key]).upper()
|
|
self.log.info(f" + IP Region: {region}")
|
|
else:
|
|
self.log.warning(f" - The region could not be determined from IP information: {ip_info}")
|
|
region = "US"
|
|
self.log.info(f" + IP Region: {region} (By Default)")
|
|
|
|
def authenticate(self, cookies: Optional[Any] = None, credential: Optional[Credential] = None) -> None:
|
|
super().authenticate(cookies, credential)
|
|
self.cookies = cookies
|
|
if not self.cookies:
|
|
raise EnvironmentError("Service requires Cookies for Authentication.")
|
|
|
|
self.log.info("Logging into iQIYI...")
|
|
self._login()
|
|
|
|
def _login(self) -> None:
|
|
cookie_dict = {c.name: c.value for c in self.cookies} if self.cookies else {}
|
|
self.active_session["uid"] = cookie_dict.get("pspStatusUid", "0") # User ID
|
|
self.active_session["qc005"] = cookie_dict.get("QC005", "")
|
|
self.active_session["pck"] = cookie_dict.get("I00001", "") # Passport Cookie
|
|
self.active_session["dfp"] = cookie_dict.get("__dfp", "").split("@")[0] # Device Fingerprint
|
|
self.active_session["type_code"] = cookie_dict.get("QCVtype", "")
|
|
self.active_session["mode_code"] = cookie_dict.get("mod", "")
|
|
self.active_session["lang_code"] = cookie_dict.get("lang", "en_us")
|
|
|
|
if not self.active_session["qc005"]:
|
|
self.log.warning(" - QC005 not found. Playback might fail.")
|
|
self.active_session["qc005"] = "0"
|
|
|
|
if not self.active_session["pck"]:
|
|
self.log.debug(" + Fetching PCK token...")
|
|
self.active_session["pck"] = self._fetch_pck()
|
|
|
|
if not self.active_session["type_code"]:
|
|
self.log.warning(" - Type code not found. Playback might fail.")
|
|
self.active_session["qc005"] = "100013"
|
|
|
|
if not self.active_session["mode_code"]:
|
|
self.active_session["mode_code"] = self._get_mode_code()
|
|
|
|
self.active_session["ptid"] = self._get_ptid()
|
|
|
|
data = self._get_vip_info()
|
|
is_vip = False
|
|
if data["code"] == "0":
|
|
if "userinfo" not in data["data"]:
|
|
self.log.error(f" - {data['data']['msg']}")
|
|
sys.exit(1)
|
|
elif "vip_list" in data["data"]:
|
|
is_vip = True
|
|
else:
|
|
self.log.error(" - Could not resolve cookie session.")
|
|
sys.exit(1)
|
|
|
|
if not is_vip:
|
|
self.log.warning(" - Account is not subscribed to iQiyi. Playback might fail.", exc_info=False)
|
|
|
|
self.log.info(f" + Account ID: {self.active_session['uid']}")
|
|
self.log.info(f" + Type Code: {self.active_session['type_code']}")
|
|
self.log.info(f" + Mode Code: {self.active_session['mode_code']}")
|
|
self.log.info(f" + Subscribed: {is_vip}")
|
|
self.log.debug(f" + Platform Type ID: {self.active_session['ptid']}")
|
|
|
|
def get_titles(self) -> Titles_T:
|
|
video_info, lang_info = self._get_album_info(self.title_id, self.active_session["lang_code"])
|
|
tvid = video_info.get("tvId") or video_info.get("defaultTvId")
|
|
albumid = video_info["albumId"]
|
|
qipuid = video_info["qipuId"]
|
|
name = video_info["name"]
|
|
year = video_info.get("year") or video_info.get("publishTime", "")[:4]
|
|
|
|
video_info_en, _ = self._get_album_info(self.title_id, "en_us")
|
|
orig_lang = video_info_en.get("categoryTagMap", {}).get("Language", [{}])[0].get("name", "Mandarin") # album orig Lang
|
|
|
|
if not tvid and (albumid == qipuid): # Movie
|
|
content_type = "movie"
|
|
else:
|
|
content_type = "series"
|
|
self.log.debug(f" + Content Type: {content_type.upper()}")
|
|
|
|
if content_type == "movie":
|
|
return Movies([
|
|
Movie(
|
|
id_=hashlib.md5(str(qipuid).encode()).hexdigest()[0:6],
|
|
service=self.__class__,
|
|
name=name,
|
|
year=year,
|
|
data={"tvid": qipuid, "orig_lang": orig_lang}
|
|
)
|
|
])
|
|
|
|
elif content_type == "series":
|
|
return Series(self._get_series(albumid, name, year, orig_lang, lang_info["episode"]))
|
|
|
|
def _get_series(self, albumid: str, name: str, year: str, lang: str, episode_lang: str) -> Series:
|
|
episodes: List[Episode] = []
|
|
raw_episodes = []
|
|
block_size = 50
|
|
|
|
first_batch = self._get_episode_list(albumid, 1, block_size)
|
|
total_count = first_batch["data"]["total"]
|
|
|
|
if scripts := first_batch["data"]["epg"]:
|
|
raw_episodes.extend(scripts)
|
|
|
|
if total_count > block_size:
|
|
for start_order in range(block_size + 1, total_count + 1, block_size):
|
|
end_order = min(start_order + block_size - 1, total_count)
|
|
|
|
batch_data = self._get_episode_list(albumid, start_order, end_order)
|
|
if scripts := batch_data["data"]["epg"]:
|
|
raw_episodes.extend(scripts)
|
|
|
|
for ep in raw_episodes:
|
|
episodes.append(
|
|
Episode(
|
|
id_=hashlib.md5(str(ep["qipuId"]).encode()).hexdigest()[0:6],
|
|
service=self.__class__,
|
|
title=name,
|
|
season=1 if ep["contentType"] == 1 else 0,
|
|
number=ep["order"],
|
|
name=episode_lang.format(ep["order"]) if ep["contentType"] == 1 else ep["extraName"],
|
|
year=year,
|
|
data={"tvid": ep["qipuId"], "orig_lang": lang}
|
|
)
|
|
)
|
|
|
|
return episodes
|
|
|
|
def get_tracks(self, title: Title_T) -> Tracks:
|
|
tvid = title.data["tvid"]
|
|
self.log.debug(f" + TVID: {tvid}")
|
|
videos, audios, subs = self._collect_media_sources(tvid)
|
|
|
|
if not videos:
|
|
self.log.error("No playable streams found.")
|
|
sys.exit(1)
|
|
|
|
# Set Original Lang
|
|
orig_lang_name = title.data.get("orig_lang")
|
|
orig_lang_code = self.ORIG_LANG_MAP.get(orig_lang_name)
|
|
title.language = Language.get(orig_lang_code if orig_lang_code else "zh-Hans")
|
|
self.log.debug(f" + Original Language : {title.language}")
|
|
|
|
unique_video_lids = set(v.get("lid") for v in videos if "lid" in v)
|
|
unique_audio_lids = set(a.get("lid") for a in audios if "lid" in a)
|
|
|
|
force_video_lang = orig_lang_code if (len(unique_video_lids) <= 1 and orig_lang_code) else None
|
|
force_audio_lang = orig_lang_code if (len(unique_audio_lids) <= 1 and orig_lang_code) else None
|
|
|
|
tracks = Tracks()
|
|
|
|
for video in videos:
|
|
try:
|
|
track = self._parse_video_track(video, title, force_video_lang)
|
|
if track:
|
|
tracks.add(track, warn_only=True)
|
|
except Exception as e:
|
|
self.log.info(f"Failed to parse video track: {e}")
|
|
|
|
for aud_data in audios:
|
|
try:
|
|
track = self._parse_audio_track(aud_data, title, force_audio_lang)
|
|
if track:
|
|
tracks.add(track, warn_only=True)
|
|
self.playback_data["has_external_audio"] = True
|
|
except Exception as e:
|
|
self.log.info(f"Failed to parse audio track: {e}")
|
|
|
|
for sub_data in subs:
|
|
try:
|
|
track = self._parse_subtitle_track(sub_data, title)
|
|
if track:
|
|
tracks.add(track, warn_only=True)
|
|
except Exception as e:
|
|
self.log.info(f"Failed to parse subtitle track: {e}")
|
|
|
|
return tracks
|
|
|
|
def _collect_media_sources(self, tvid: str) -> Tuple[List[dict], List[dict], List[dict]]:
|
|
all_videos = []
|
|
all_audios = []
|
|
all_subs = []
|
|
|
|
if self.vcodec == Video.Codec.HEVC:
|
|
target_codec_keys = {"h265", "h265_edr"}
|
|
else:
|
|
target_codec_keys = {"h264"}
|
|
|
|
if Video.Range.HYBRID in self.range[0]:
|
|
target_codec_keys.update(["dv_edr", "hdr_edr", "dv", "hdr"])
|
|
elif Video.Range.DV in self.range[0]:
|
|
target_codec_keys.update(["dv_edr", "dv"])
|
|
elif Video.Range.HDR10 in self.range[0] or Video.Range.HDR10P in self.range[0]:
|
|
target_codec_keys.update(["hdr_edr", "hdr"])
|
|
|
|
# Request video
|
|
target_bids = []
|
|
available_qualities = sorted(self.BID_QUALITY.keys())
|
|
|
|
for q in self.quality:
|
|
if q in self.BID_QUALITY:
|
|
target_bids.extend(self.BID_QUALITY[q])
|
|
else:
|
|
closest_q = min(available_qualities, key=lambda x: abs(x - q))
|
|
target_bids.extend(self.BID_QUALITY[closest_q])
|
|
if q > 2160:
|
|
target_codec_keys.add("8k")
|
|
|
|
for c_key in target_codec_keys:
|
|
for bid in target_bids:
|
|
try:
|
|
v_list, a_list, s_list = self._get_media_data(tvid, c_key, bid)
|
|
if v_list:
|
|
all_videos.extend(v_list)
|
|
if a_list:
|
|
all_audios.extend(a_list)
|
|
if s_list:
|
|
all_subs.extend(s_list)
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Request audio
|
|
found_lids = set()
|
|
for aud in all_audios:
|
|
if "lid" in aud:
|
|
found_lids.add(str(aud["lid"]))
|
|
|
|
additional_audio_types = ["dolby", "aac"]
|
|
|
|
for lid in found_lids:
|
|
for audio_type in additional_audio_types:
|
|
try:
|
|
_, a_list, _ = self._get_media_data(tvid, "h265_edr", "800", audio_type, lid) # edr include dolby surround audio
|
|
if a_list:
|
|
all_audios.extend(a_list)
|
|
except Exception as e:
|
|
continue
|
|
|
|
return all_videos, all_audios, all_subs
|
|
|
|
def _parse_video_track(self, video: dict, title: Title_T, forced_lang: str = None) -> Optional[Video]:
|
|
m3u8_url = video.get("url") or video.get("fs") or video.get("m3u8")
|
|
if not m3u8_url:
|
|
return None
|
|
|
|
scrsz = video.get("scrsz", "0x0")
|
|
width, height = map(int, scrsz.split("x")) if "x" in scrsz else (0, 0)
|
|
duration = video.get("duration", 1)
|
|
vsize = video.get("vsize", 0)
|
|
bitrate = int((vsize / duration * 8) / 1000 * 1024) if duration else 0
|
|
fps = video.get("fr", 0)
|
|
|
|
codec_code = video.get("code")
|
|
actual_codec = Video.Codec.AVC if codec_code == 2 else Video.Codec.HEVC
|
|
|
|
dr = video.get("dr") # Dynamic Range
|
|
if dr == 1:
|
|
range_type = Video.Range.DV
|
|
elif dr == 2:
|
|
range_type = Video.Range.HDR10 # HDR Vivid
|
|
else:
|
|
range_type = Video.Range.SDR
|
|
|
|
if forced_lang:
|
|
lang_code = forced_lang
|
|
else:
|
|
lang_code = self.LANG_MAP.get(video.get("lid"), "und")
|
|
if video.get("duration"):
|
|
title.data["duration"] = video.get("duration")
|
|
|
|
video_info = f"{video.get('bid')}_{lang_code}_{height}_{fps}_{codec_code}_{dr}_{bitrate}"
|
|
video_id = f"Video_{hashlib.md5(video_info.encode()).hexdigest()[0:6]}"
|
|
|
|
# If m3u8 is raw content (not HTTP URL), save to local file
|
|
from_file = None
|
|
track_url = m3u8_url
|
|
if not m3u8_url.strip().startswith("http"):
|
|
from_file = self._save_temp_m3u8(m3u8_url, video_id)
|
|
if not from_file:
|
|
return None
|
|
track_url = from_file.as_uri() # n_m3u8dl_re can read file paths
|
|
|
|
track = Video(
|
|
id_=video_id,
|
|
url=track_url,
|
|
codec=actual_codec,
|
|
bitrate=bitrate,
|
|
fps=fps,
|
|
width=width,
|
|
height=height,
|
|
language=Language.get(lang_code),
|
|
descriptor=Video.Descriptor.HLS,
|
|
range_=range_type,
|
|
from_file=from_file,
|
|
)
|
|
|
|
if drm := video.get("drm"):
|
|
self._handle_monalisa_drm(track, drm.get("ticket")) # DRM PSSH
|
|
track.downloader=requests # DRM requires segment decryption
|
|
|
|
return track
|
|
|
|
def _parse_audio_track(self, audio: dict, title: Title_T, forced_lang: str = None) -> Optional[Audio]:
|
|
m3u8_url = audio.get("m3u8Url") or audio.get("url") or audio.get("mpdUrl")
|
|
fs = audio.get("fs")
|
|
if not m3u8_url and not fs:
|
|
return None
|
|
|
|
if forced_lang:
|
|
lang_code = forced_lang
|
|
lang_name = title.data["orig_lang"]
|
|
else:
|
|
lang_code = self.LANG_MAP.get(audio.get("lid"), "und")
|
|
lang_name = audio.get("name", "Unknown")
|
|
|
|
cf = audio.get("cf", "aac") # Codec Format
|
|
ct = audio.get("ct", 1) # Codec Type
|
|
|
|
if cf == "dolby":
|
|
if ct == 1:
|
|
acodec, channels, bitrate = Audio.Codec.EC3, 2.0, 96_000
|
|
elif ct == 2:
|
|
acodec, channels, bitrate = Audio.Codec.EC3, 5.1, 256_000
|
|
elif ct == 4:
|
|
acodec, channels, bitrate = Audio.Codec.EC3, 16, 448_000
|
|
else:
|
|
self.log.debug(f" - Unknown audio codec type.{cf} - {ct}")
|
|
acodec, channels, bitrate = Audio.Codec.EC3, 2.0, 0
|
|
elif cf == "aac":
|
|
if ct == 1:
|
|
acodec, channels, bitrate = Audio.Codec.AAC, 2.0, 128_000
|
|
elif ct == 6:
|
|
acodec, channels, bitrate = Audio.Codec.AAC, 2.0, 192_000
|
|
else:
|
|
self.log.debug(f" - Unknown audio codec type.{cf} - {ct}")
|
|
acodec, channels, bitrate = Audio.Codec.AAC, 2.0, 0
|
|
else:
|
|
self.log.debug(f" - Unknown audio codec.{cf} - {ct}")
|
|
acodec, channels, bitrate = None, 2.0, 0
|
|
|
|
display_name = lang_name
|
|
is_original = Language.get(lang_code) == title.language
|
|
if is_original:
|
|
display_name += " [Original]"
|
|
|
|
audio_info = f"{audio.get('bid')}_{lang_code}_{cf}_{ct}"
|
|
audio_id = f"Audio_{hashlib.md5(audio_info.encode()).hexdigest()[0:6]}"
|
|
|
|
# Determine m3u8 source: stitch segments or use provided URL
|
|
from_file = None
|
|
track_url = m3u8_url
|
|
|
|
if not m3u8_url and fs: # Stitch from flag segments
|
|
from_file = self._stitch_audio_segments(audio, audio_id)
|
|
if not from_file:
|
|
return None
|
|
track_url = from_file.as_uri()
|
|
elif m3u8_url:
|
|
if not isinstance(m3u8_url, str):
|
|
return None
|
|
if not m3u8_url.strip().startswith("http"):
|
|
from_file = self._save_temp_m3u8(m3u8_url, audio_id)
|
|
if not from_file:
|
|
return None
|
|
track_url = from_file.as_uri()
|
|
else:
|
|
return None
|
|
|
|
track = Audio(
|
|
id_=audio_id,
|
|
url=track_url,
|
|
codec=acodec,
|
|
bitrate=bitrate,
|
|
channels=channels,
|
|
language=Language.get(lang_code),
|
|
is_original_lang=is_original,
|
|
descriptor=Audio.Descriptor.HLS,
|
|
name=display_name,
|
|
downloader=requests, # Audio needs segment processing due to gzip processing
|
|
from_file=from_file,
|
|
)
|
|
|
|
# Atmos(16ch) -> 5.1 JOC 16
|
|
if track.channels == 16:
|
|
track.channels = 5.1
|
|
track.joc = 16
|
|
|
|
if drm := audio.get("drm"):
|
|
self._handle_monalisa_drm(track, drm.get("ticket"))
|
|
|
|
return track
|
|
|
|
def _stitch_audio_segments(self, audio: dict, audio_id: str) -> Optional[Path]:
|
|
try:
|
|
segments = audio["fs"]
|
|
m3u8_lines = [
|
|
"#EXTM3U",
|
|
"#EXT-X-VERSION:3",
|
|
"#EXT-X-TARGETDURATION:6"
|
|
]
|
|
|
|
resolved_count = 0
|
|
for seg in segments:
|
|
api_path = seg.get("l")
|
|
if not api_path:
|
|
continue
|
|
|
|
json_data = self._get_audio_segment_info(api_path)
|
|
|
|
real_media_url = json_data.get("l") if isinstance(json_data, dict) else None
|
|
|
|
if real_media_url:
|
|
m3u8_lines.append("#EXTINF:10.0,")
|
|
m3u8_lines.append(real_media_url)
|
|
resolved_count += 1
|
|
|
|
if resolved_count > 0:
|
|
m3u8_lines.append("#EXT-X-ENDLIST")
|
|
raw_m3u8 = "\n".join(m3u8_lines)
|
|
return self._save_temp_m3u8(raw_m3u8, audio_id)
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Failed to stitch audio segments: {e}")
|
|
|
|
return None
|
|
|
|
def _parse_subtitle_track(self, subtitle: dict, title: Title_T) -> Optional[Subtitle]:
|
|
path = subtitle.get("webvtt")
|
|
codec = Subtitle.Codec.WebVTT
|
|
if not path:
|
|
path = subtitle.get("xml")
|
|
codec = Subtitle.Codec.TimedTextMarkupLang
|
|
if not path:
|
|
path = subtitle.get("srt")
|
|
codec = Subtitle.Codec.SubRip
|
|
if not path:
|
|
return None
|
|
|
|
lang_code = self.SUB_LANG_MAP.get(subtitle.get("lid"), "und") # Lang ID
|
|
lang_obj = Language.get(lang_code)
|
|
is_original = lang_obj == title.language
|
|
|
|
base_name = subtitle.get("_name", "Unknown")
|
|
name_parts = [base_name]
|
|
is_ai = int(subtitle.get("ss", 0)) == 1
|
|
if is_ai:
|
|
name_parts.append("(AI)")
|
|
if is_original:
|
|
name_parts.append("[Original]")
|
|
|
|
return Subtitle(
|
|
id_=lang_code,
|
|
url=self.config["endpoint"]["subtitle"].format(path=path),
|
|
codec=codec,
|
|
language=lang_obj,
|
|
is_original_lang=is_original,
|
|
name=" ".join(name_parts)
|
|
)
|
|
|
|
def get_chapters(self, title: Title_T) -> Chapters:
|
|
# self.log.info(title.data["duration"])
|
|
# self.log.info(self.playback_data.get("svp"))
|
|
svp_data = self.playback_data["svp"]
|
|
if not svp_data:
|
|
return Chapters()
|
|
|
|
valid_segments = []
|
|
for item in svp_data:
|
|
for segment in item.get("vl", []):
|
|
start = float(segment.get("sp", 0))
|
|
end = float(segment.get("ep", 0))
|
|
|
|
if start <= 0 and end <= 0:
|
|
continue
|
|
|
|
valid_segments.append((start, end))
|
|
|
|
duration = float(title.data["duration"])
|
|
pre_chapter = []
|
|
|
|
if valid_segments and duration > 0:
|
|
min_start = min(s for s, e in valid_segments)
|
|
max_end = max(e for s, e in valid_segments)
|
|
|
|
if min_start > 0:
|
|
pre_chapter.append(("Intro", 0.0))
|
|
pre_chapter.append(("Scene", min_start))
|
|
else:
|
|
pre_chapter.append(("Scene", 0.0))
|
|
|
|
if max_end < (duration - 1.0):
|
|
pre_chapter.append(("Credits", max_end))
|
|
|
|
else:
|
|
pre_chapter.append(("Scene", 0.0))
|
|
if valid_segments:
|
|
for start, end in valid_segments:
|
|
if start < 600:
|
|
pre_chapter.append(("Intro", 0.0))
|
|
pre_chapter.append(("Scene", start))
|
|
break
|
|
|
|
pre_chapter.sort(key=lambda x: x[1])
|
|
|
|
unique_chapters_data = []
|
|
if pre_chapter:
|
|
curr_time = -1.0
|
|
last_name = None
|
|
|
|
for name, timestamp in pre_chapter:
|
|
if abs(timestamp - curr_time) <= 1.0:
|
|
continue
|
|
|
|
if name == last_name:
|
|
continue
|
|
|
|
unique_chapters_data.append((name, timestamp))
|
|
curr_time = timestamp
|
|
last_name = name
|
|
|
|
if not unique_chapters_data:
|
|
unique_chapters_data.insert(0, ("Scene", 0.0))
|
|
elif unique_chapters_data[0][1] > 1.0:
|
|
if unique_chapters_data[0][0] == "Scene":
|
|
unique_chapters_data.insert(0, ("Intro", 0.0))
|
|
else:
|
|
unique_chapters_data.insert(0, ("Scene", 0.0))
|
|
|
|
chapters = Chapters()
|
|
for name, timestamp in unique_chapters_data:
|
|
c_name = name if name != "Scene" else None
|
|
chapters.add(
|
|
Chapter(
|
|
timestamp=timestamp,
|
|
name=c_name
|
|
)
|
|
)
|
|
|
|
return chapters
|
|
|
|
def get_widevine_service_certificate(self, **kwargs) -> Union[bytes, str]:
|
|
return None
|
|
|
|
def get_widevine_license(self, **kwargs) -> Optional[Union[bytes, str]]:
|
|
return None
|
|
|
|
def get_playready_license(self, **kwargs) -> Optional[bytes]:
|
|
return None
|
|
|
|
def _handle_monalisa_drm(self, track: AnyTrack, ticket: str) -> None:
|
|
if not ticket:
|
|
return
|
|
|
|
if not self.decrypt_tool_path or not self.decrypt_tool_path.exists():
|
|
self.log.error("ML-Worker not found. Place it in unshackle/binaries/")
|
|
sys.exit(1)
|
|
|
|
if not self.CDM_PATH.exists():
|
|
self.log.error(f"MonaLisa CDM not found at: {self.CDM_PATH}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
drm = MonaLisa(
|
|
ticket=ticket,
|
|
aes_key=self.config["key"]["ml"],
|
|
device_path=self.CDM_PATH,
|
|
)
|
|
# Store DRM on track like Widevine/PlayReady
|
|
track.drm = [drm]
|
|
|
|
except Exception as e:
|
|
self.log.error(f"MonaLisa Key challenge failed: {e}")
|
|
|
|
def on_segment_downloaded(self, track: AnyTrack, segment: Path) -> None:
|
|
# Video Segment - decrypt MonaLisa DRM
|
|
if isinstance(track, Video):
|
|
if hasattr(track, "drm") and track.drm:
|
|
for drm in track.drm:
|
|
if isinstance(drm, MonaLisa):
|
|
try:
|
|
drm.decrypt_segment(segment)
|
|
except MonaLisa.Exceptions.WorkerNotFound:
|
|
self.log.error("ML-Worker not found. Place it in unshackle/binaries/")
|
|
except MonaLisa.Exceptions.DecryptionFailed as e:
|
|
self.log.error(str(e))
|
|
except Exception as e:
|
|
self.log.error(f"Failed to decrypt segment {segment.name}: {e}")
|
|
break
|
|
|
|
# Audio Segment - decompress gzip if needed
|
|
if isinstance(track, Audio):
|
|
try:
|
|
if segment.exists():
|
|
with open(segment, "rb") as f:
|
|
data = f.read()
|
|
if data.startswith(b"\x1f\x8b"):
|
|
decompressed_data = gzip.decompress(data)
|
|
with open(segment, "wb") as f_out:
|
|
f_out.write(decompressed_data)
|
|
except Exception as e:
|
|
self.log.warning(f"Failed to decompress gzip segment {segment.name}: {e}")
|
|
|
|
def on_track_downloaded(self, track: AnyTrack) -> None:
|
|
# Use FFmpeg to remux Video MPEG-TS to MKV
|
|
if isinstance(track, Video):
|
|
if track.path.suffix.lower() == ".mkv":
|
|
return
|
|
|
|
mkv_path = track.path.with_suffix(".mkv")
|
|
cmd = [str(FFMPEG), "-y", "-i", str(track.path)]
|
|
if self.playback_data["has_external_audio"]:
|
|
# External audio exists -> Remove audio from TS, keep video only
|
|
cmd.extend(["-c:v", "copy", "-an"])
|
|
else:
|
|
# No external audio -> Keep everything (Audio + Video) from TS
|
|
cmd.extend(["-c", "copy"])
|
|
cmd.append(str(mkv_path))
|
|
|
|
try:
|
|
process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8")
|
|
if process.returncode == 0 and mkv_path.exists():
|
|
if track.path.exists():
|
|
track.path.unlink()
|
|
track.path = mkv_path
|
|
else:
|
|
self.log.warning(f"FFmpeg failed: {process.stderr or 'unknown error'}")
|
|
if mkv_path.exists():
|
|
mkv_path.unlink()
|
|
except FileNotFoundError:
|
|
self.log.warning("FFmpeg not found, skipping remux")
|
|
except Exception as e:
|
|
self.log.error(f"Error processing video track: {e}")
|
|
if mkv_path.exists():
|
|
try:
|
|
mkv_path.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
def _save_temp_m3u8(self, content: str, prefix: str) -> Optional[Path]:
|
|
"""Save m3u8 content to a temp file and return as file:// URL."""
|
|
temp_dir = config.directories.temp
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
safe_prefix = re.sub(r'[\\/*?:"<>|]', "", str(prefix))
|
|
temp_path = temp_dir / f"{safe_prefix}_{int(time.time())}.m3u8"
|
|
|
|
try:
|
|
temp_path = temp_path.resolve()
|
|
|
|
if self.list_ or self.chapters_only:
|
|
return temp_path
|
|
|
|
filtered_lines = [
|
|
line for line in content.splitlines()
|
|
if "#EM" not in line and line.strip() != "#EXT-X-DISCONTINUITY"
|
|
]
|
|
content_to_save = "\n".join(filtered_lines)
|
|
|
|
temp_path.write_text(content_to_save, encoding="utf-8")
|
|
self._temp_files.append(temp_path)
|
|
|
|
# Return as file:// URL for compatibility with downloaders
|
|
return temp_path
|
|
except Exception as e:
|
|
self.log.warning(f"Failed to save temp m3u8: {e}")
|
|
return None
|
|
|
|
def _cleanup_temp_files(self) -> None:
|
|
"""Clean up all tracked temp files."""
|
|
for path in self._temp_files:
|
|
try:
|
|
if path.exists():
|
|
path.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
def _get_album_info(self, title_id: str, lang_code: str) -> Tuple[dict, dict]:
|
|
|
|
endpoint = self.config["endpoint"]["album"].format(id=title_id, lang_code=lang_code)
|
|
cookies_dict = {c.name: c.value for c in self.cookies}
|
|
if "lang" not in cookies_dict or cookies_dict["lang"] != lang_code:
|
|
cookies_dict["lang"] = lang_code
|
|
cookie_header = "; ".join(f"{k}={v}" for k, v in cookies_dict.items())
|
|
if "lang" not in cookies_dict:
|
|
cookie_header += "; lang=en_us"
|
|
headers = {
|
|
"User-Agent": self.config["device"]["user_agent_bws"],
|
|
"Cookie": cookie_header
|
|
}
|
|
try:
|
|
res = self.session.get(endpoint, headers=headers)
|
|
res.raise_for_status()
|
|
soup = BeautifulSoup(res.text, "html.parser")
|
|
script_tag = soup.find("script", id="__NEXT_DATA__", type="application/json")
|
|
if not script_tag:
|
|
self.log.error(" - Failed to parse page data.")
|
|
sys.exit(1)
|
|
|
|
data = json.loads(script_tag.string)
|
|
initial_state = data["props"]["initialState"]
|
|
play_info = initial_state.get("play", {}).get("videoInfo", {})
|
|
album_info = initial_state.get("album", {}).get("videoAlbumInfo", {})
|
|
language_info = initial_state.get("language", {}).get("langPkg", {})
|
|
if play_info and (play_info.get("qipuId")) and language_info:
|
|
return play_info, language_info
|
|
elif album_info and language_info:
|
|
return album_info, language_info
|
|
else:
|
|
raise ValueError("No content information found.")
|
|
except Exception as e:
|
|
self.log.error(f"Content info not found: {e}", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
def _get_episode_list(self, episode_list_id: str, start_order: int, end_order: int) -> dict:
|
|
headers = {
|
|
"User-Agent": self.config["device"]["user_agent_bws"],
|
|
"Cookie": "; ".join([f"{cookie.name}={cookie.value}" for cookie in self.cookies])
|
|
}
|
|
params = {
|
|
"platformId": "3",
|
|
"modeCode": self.active_session["mode_code"],
|
|
"langCode": self.active_session["lang_code"],
|
|
"deviceId": self.active_session["qc005"],
|
|
"startOrder": str(start_order),
|
|
"endOrder": str(end_order),
|
|
"isVip": "true"
|
|
}
|
|
try:
|
|
url = self.config["endpoint"]["episode"].format(list_id=episode_list_id)
|
|
data = self._request("GET", url, params=params, headers=headers)
|
|
return data
|
|
except Exception:
|
|
return {}
|
|
|
|
def _get_dash_stream(self, data: dict) -> dict:
|
|
query = parse.urlencode(data)
|
|
path = f"/dash?{query}"
|
|
vf = MD5.new((path + self.config["key"]["dash"]).encode()).hexdigest()
|
|
url = self.config["endpoint"]["stream"].format(path=path, vf=vf)
|
|
headers = {
|
|
"accept": "*/*",
|
|
"qyid": self.active_session["qc005"],
|
|
"bop": f'{{"b_ft1":"3","dfp":"{str(self.active_session["dfp"])}","version":"9.0"}}',
|
|
"pck": self.active_session["pck"],
|
|
"User-Agent": self.config["device"]["user_agent"]
|
|
}
|
|
data = self._request("GET", url, headers=headers)
|
|
return data
|
|
|
|
def _get_media_data(
|
|
self, tvid: str, codec_key: str, bid: str, audio_codec_key: str = "dolby", lid: str = "1"
|
|
) -> Tuple[List[dict], List[dict], List[dict]]:
|
|
video_config = self.config["quality"]["video"]
|
|
audio_config = self.config["quality"]["audio"]
|
|
|
|
video_params = video_config.get(codec_key, video_config["h264"]).copy()
|
|
audio_params = audio_config.get(audio_codec_key, audio_config["dolby"]).copy()
|
|
|
|
data = {
|
|
"tvid": tvid,
|
|
"uid": self.active_session["uid"], # User ID
|
|
"k_uid": self.active_session["qc005"], # Client ID
|
|
"tm": str(int(time.time() * 1000)), # Timestemp
|
|
"bid": str(bid), # Resolution
|
|
"ut": self.active_session["type_code"], # User Type
|
|
"src": self.active_session["ptid"], # Platform Typr ID
|
|
"ps": "1", # ?
|
|
"d": "0",
|
|
"pm": "0",
|
|
"fr": "25", # Frame Rate
|
|
"pt": "0", # ?
|
|
"s": "0",
|
|
"rs": "1",
|
|
"sr": "1",
|
|
"sver": "2",
|
|
"k_ver": "7.12.0", # Client Version
|
|
"k_tag": "1",
|
|
"atype": "0",
|
|
"vid": "",
|
|
"lid": lid, # Request Lang(Audio)
|
|
"dcdv": "3", # DRM Type (3 = Monalisa, 5 = ?, 7 = Widevine, 8 = ?, 9 = ?)
|
|
"ccsn": self.config["key"]["ccsn"],
|
|
"agent_type": "366",
|
|
"su": "2",
|
|
"applang": "en_us", # Display Lang
|
|
"ds": "0",
|
|
"from_type": "1",
|
|
"hdcp": "22", # Client Display Content Protection version
|
|
"cc_site": self.active_session["mode_code"], # User mode code
|
|
"cc_business": "1",
|
|
"pano264": "800",
|
|
"pano265": "800",
|
|
"pre": "0",
|
|
"ap": "1",
|
|
"qd_v": "1",
|
|
"fv": "2",
|
|
"rt": "1",
|
|
"dcv": "6",
|
|
"ori": "puma",
|
|
"X-USER-MODE": self.active_session["mode_code"], # User mode code
|
|
"ff": "ts" # Segement Type
|
|
}
|
|
data.update(video_params)
|
|
data.update(audio_params)
|
|
|
|
# If the parameters are different, account ban...
|
|
if codec_key == "8k":
|
|
data.update({
|
|
"ps": "0", # ?
|
|
"pt": "28000", # ?
|
|
"fr": "60", # Frame Rate(HFR)
|
|
})
|
|
|
|
try:
|
|
json_data = self._get_dash_stream(data)
|
|
except SystemExit:
|
|
return [], [], []
|
|
|
|
program = json_data.get("data", {}).get("program", {})
|
|
self.playback_data["svp"] = json_data.get("data", {}).get("svp", {}) # Chapter
|
|
|
|
return program.get("video", []), program.get("audio", []), program.get("stl", [])
|
|
|
|
def _get_audio_segment_info(self, path: str) -> dict:
|
|
url = self.config["endpoint"]["audio"].format(path=path)
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"User-Agent": self.config["device"]["user_agent"]
|
|
}
|
|
try:
|
|
res = self.session.get(url, headers=headers)
|
|
return res.json() if res.status_code == 200 else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
def _get_mode_code(self) -> str:
|
|
url = self.config["endpoint"]["mode"]
|
|
params = {"format": "json", "scene": "4"}
|
|
headers = {"Accept": "*/*"}
|
|
data = self._request("GET", url, params=params, headers=headers)
|
|
return data.get("data", {}).get("country", "br").lower() if data else "br"
|
|
|
|
def _fetch_pck(self) -> str:
|
|
endpoint = self.config["endpoint"]["pck"]
|
|
params = {
|
|
"platformId": "3",
|
|
"modeCode": self.active_session["mode_code"],
|
|
"langCode": self.active_session["lang_code"],
|
|
"deviceId": self.active_session["qc005"],
|
|
"uid": self.active_session["uid"],
|
|
"interfaceCode": "indexnav_layer"
|
|
}
|
|
headers = {
|
|
"User-Agent": self.config["device"]["user_agent_bws"],
|
|
"Cookie": "; ".join([f"{cookie.name}={cookie.value}" for cookie in self.cookies])
|
|
}
|
|
res_data = self._request("GET", endpoint, params=params, headers=headers)
|
|
|
|
for item in res_data.get("data", []):
|
|
url = item.get("apiUrl")
|
|
if url:
|
|
parsed = urllib.parse.urlparse(url)
|
|
qs = urllib.parse.parse_qs(parsed.query)
|
|
return qs.get("P00001", [None])[0]
|
|
return None
|
|
|
|
def _get_ptid(self) -> str:
|
|
endpoint = self.config["endpoint"]["ptid"]
|
|
params = {
|
|
"platformId": "3",
|
|
"modeCode": self.active_session["mode_code"],
|
|
"langCode": self.active_session["lang_code"],
|
|
"deviceId": self.active_session["qc005"]
|
|
}
|
|
headers = {
|
|
"User-Agent": self.config["device"]["user_agent_bws"],
|
|
"Cookie": "; ".join([f"{cookie.name}={cookie.value}" for cookie in self.cookies])
|
|
}
|
|
data = self._request("GET", endpoint, params=params, headers=headers)
|
|
ptid = data.get("data", {}).get("ptid", "") if data else ""
|
|
if ptid.startswith("0101003"):
|
|
ptid = ptid.replace("0101003", "0202200", 1)
|
|
return ptid
|
|
|
|
def _get_vip_info(self) -> dict:
|
|
endpoint = self.config["endpoint"]["vip"]
|
|
params = {
|
|
"platformId": "3",
|
|
"modeCode": self.active_session["mode_code"],
|
|
"langCode": self.active_session["lang_code"],
|
|
"deviceId": self.active_session["qc005"],
|
|
"fields": "userinfo",
|
|
"version": "1.0",
|
|
"vipInfoVersion": "5.0",
|
|
}
|
|
headers = {
|
|
"User-Agent": self.config["device"]["user_agent_bws"],
|
|
"Cookie": "; ".join([f"{cookie.name}={cookie.value}" for cookie in self.cookies])
|
|
}
|
|
data = self._request("GET", endpoint, params=params, headers=headers)
|
|
return data
|
|
|
|
def _request(self, method: str, endpoint: str, params: dict = None, headers: dict = None, payload: dict = None) -> Any:
|
|
_headers = self.session.headers.copy()
|
|
if headers:
|
|
_headers.update(headers)
|
|
|
|
req = Request(method, endpoint, headers=_headers, params=params, json=payload)
|
|
prepped = self.session.prepare_request(req)
|
|
|
|
try:
|
|
res = self.session.send(prepped)
|
|
res.raise_for_status()
|
|
data = res.json() if res.text else {}
|
|
return data
|
|
except Exception as e:
|
|
ignore_keys = ["episode", "stream", "audio"]
|
|
if any(self.config["endpoints"][key] in endpoint for key in ignore_keys):
|
|
raise e
|
|
else:
|
|
self.log.error(f"API Request failed: {e}", exc_info=False)
|
|
sys.exit(1) |