Files
Unshackle-Service-SeFree/TID/__init__.py
sefree d464d0fac8 - Change default from WV to AES
- Change way to select title lang
2026-03-31 22:52:52 +07:00

636 lines
24 KiB
Python

import re
import time
import json
from http.cookiejar import CookieJar
from typing import Optional, Union
import click
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.manifests import HLS
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video, Audio
import requests
from typing import Callable, Any
from Crypto.Cipher import AES
from Crypto.Hash import MD5
import base64
from bs4 import BeautifulSoup
class TID(Service):
"""
Service code for TrueID streaming service (https://www.trueid.net/).
\b
Original-Author: [SeFree]
Version: 1.0.0
Authorization: Cookies
Security: FHD@L3, and can be bypass using HLS instead of DASH.
\b
"""
TITLE_RE = r'https://(?:www|movie)\.trueid\.net/(?:watch/)?(?:[a-z]{2}-[a-z]{2}/)?(?P<type>series|movie)/(?P<id>[a-zA-Z0-9]+)(?:/(?P<season_id>[a-zA-Z0-9]+))?'
VIDEO_RANGE_MAP = {
"SDR": "sdr",
"HDR10": "hdr10",
"DV": "dolby_vision",
}
LANGUAGE_MAP = {
"en": "English",
"th": "Thai",
"jp": "Japanese",
"ko": "Korean",
"zh": "Chinese",
}
_DRM_MAP={
"wv":"WV_FPS",
"aes":"AES_128",
}
@staticmethod
@click.command(name="TID", short_help="https://www.trueid.net")
@click.argument("title", type=str)
@click.option("-SE", "--season", default=None, required=False, type=int,
help="TrueID sometime not provide Season in info, so specify it manually.")
@click.option("-d", "--drm", default="aes",type=click.Choice(["aes", "wv","None"]), required=False,
help="TrueID can be force to use DASH or HLS, this option will force the use of DASH (default: aes).")
@click.option("-tl", "--title_lang", default="ja", required=False, type=str,
help="If the title is foreigner audio language, specify the foreigner language.")
@click.pass_context
def cli(ctx, **kwargs):
return TID(ctx, **kwargs)
def extract_id(self,url: str):
match = re.compile(self.TITLE_RE).search(url)
if not match:
return None
return match.groupdict()
def __init__(self, ctx, title,drm, season,title_lang):
super().__init__(ctx)
self.data = self.extract_id(title)
self.title =self.data.get("id")
self.drm = drm
self.type=self.data.get('type') if self.data else "series"
self.season = season
self.title_lang = title_lang
self.license_api = None
self.auth_token=None
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
"""
Refreshes the cookies for the TrueID service.
This is necessary to maintain a valid session.
"""
self.auth_token=self.retry(self.get_auth_token,args=(self.data,))
# self.auth_token= self.get_auth_token(self.data)
# r=self.session.get(self.title)
# data = json.loads(
# BeautifulSoup(
# r.content, features="lxml"
# ).find(
# "script",
# {
# "id": "__NEXT_DATA__"
# }).text
# )
params = {
'client_id': '893',
'browser_id': '*****.*****',
'ver': '1.5.0',
'scope': 'public_profile,mobile,email,references,identity_account_read,identity_token_bridge',
}
headers = {
'origin': 'https://www.trueid.net',
'referer': 'https://www.trueid.net/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0',
}
# self.session.cookies.clear()
# self.session.cookies.update({
# "session_tids":self.config['check_sso_params']['session_tids']
# })
response = self.session.get(self.config['endpoints']['check_sso'], params=params, headers=headers)
access_token=response.json().get('access_token')
self.log.debug(f"SSO Check Response: {response.json()}")
if not response.json().get('status') == 'connected':
self.log.error("SSO check failed, cannot refresh cookies.")
exit()
# token = base64.b64encode(f"{data['buildId']}:{"b".join(data['buildId'].split("b")[2:])}".encode("utf-8")).decode("utf-8")
headers = {
# 'authorization': f"Basic {token}",
'authorization': self.auth_token,
}
self.session.headers.update(headers)
json_data = {
'accessToken': access_token,
'ssoId': self.session.cookies.get_dict().get("sso_u"),
}
response = self.session.post(self.config['endpoints']['me'], json=json_data)
# response = self.session.get(self.config['endpoints']['storelogin'])
# print(response.json())
# exit()
# exit()
if response.status_code < 300 and response.status_code >=200 and response.json().get('success')== True:
data=response.json()['data']
json_data = {
'USER_PROFILE': {
'avatar': data['avatar'],
'displayName': data['display_name'],
'uid': data['ssoid'],
'user_profile_id': data['id'],
'trueid': '',
},
'isLogin': True,
'avatar': data['avatar'],
'displayName': data['display_name'],
'AAA_PROFILE': {
'access_token': access_token,
'status': 'connected',
'income': 'trueid',
'hashed': self.config.get('check_sso_params', {}).get('hashed'),
'uid': data['ssoid'],
},
'SUB_RES': [],
'uniqueUserId': '*****.*****',
'PPID': '',
'UID_ads': '',
}
response = self.session.post(self.config['endpoints']['storelogin'], headers=headers, json=json_data)
if response.text == 'Successfully':
return True
else:
self.log.error("Failed to refresh cookies.")
exit()
# def refresh_cookies(self):
# print(self.cookies)
exit()
def get_titles(self) -> Titles_T:
kind = None
drm=None
if self.type=="movie":
headers = {
'authorization': self.auth_token,
# "referer": self.title,
}
self.session.headers.update(headers)
data = {
"id": self.title,
"lang": "en",
# "seasonId": "",
}
response = self.session.post(url=self.config['endpoints']['getDetailById'], json=data)
response.raise_for_status()
res_json=response.json()
chapters = self.find_hhmmss_in_dict(res_json)
manifest, license=None,None
if res_json.get("drm")=="WV_FPS":
# print("Found Widevine DRM")
drm="wv"
elif res_json.get("drm")=="AES_128":
# print("Found AES_128 DRM")
drm="aes"
else:
drm="aes"
if self.drm:
drm=self.drm
return Movies(
[
Movie(
id_=res_json.get('id', ''),
service=self.__class__,
name=res_json.get('title', ''),
year=res_json.get('release_year', ''),
# original_lang=Language.find(self.LANGUAGE_MAP[res_json['source_country'].lower()]) if res_json.get('source_country') else 'en',
data={
"title":res_json.get('title', ''),
# "series_title":res_json.get('title', ''),
"url": self.title,
"drm":drm,
"season_no":res_json.get('season_no', '') ,
"episode_no":res_json.get('episode_no', '') ,
"audio":res_json.get('audio', ''),
"subtitle":res_json.get('subtitle', '') ,
"id":res_json.get('id', ''),
"year": res_json.get('release_year', ''),
'manifest': manifest,
'license': license,
'chapters': chapters,
}
)
]
)
else:
titles=[]
drm=None
headers = {
'authorization': self.auth_token,
"referer": f'https://www.trueid.net/watch/th-th/series/{self.title}/{self.data.get("season_id")}',
}
# self.session.headers.update(headers)
seasonId=self.data.get("season_id")
params = {
"id": self.title,
"lang": "en",
# "seasonId": seasonId,
}
# print(self.session.headers)
# print(self.session.cookies.get_dict())
response =self.session.get(url=self.config['endpoints']['seasonsSerise'],headers=headers, params=params)
if not response.json():
params = {
"id": self.title,
"lang": "th",
# "seasonId": seasonId,
}
# print(self.session.headers)
# print(self.session.cookies.get_dict())
response =self.session.get(url=self.config['endpoints']['seasonsSerise'],headers=headers, params=params)
seasonItems=response.json()['seasonItems']
# original_lang=Language.find(self.LANGUAGE_MAP[res_json['source_country'].lower()]) if res_json.get('source_country') else 'en'
series_title=response.json().get('seasonShelf', '').get('title', '')
season_detail=None
url=None
previous_season_id=self.data.get('season_id', '')
# print(previous_season_id)
for season in seasonItems:
# print(season['id'])
headers = {
'authorization': self.auth_token,
# "referer": f'https://www.trueid.net/watch/th-th/series/{self.title}/{season['id']}',
}
params = {
"id": self.title,
"lang": "en",
"seasonId": season['id'],
}
response = self.session.get(url=self.config['endpoints']['seasonsSerise'], params=params).json()
if response is None or len(response) == 0:
self.log.warning("Season not found")
# exit(1)
# if len(response) == 0:
params = {
"id": self.title,
"lang": "th",
"seasonId": season['id'],
}
response = self.session.get(url=self.config['endpoints']['seasonsSerise'], params=params).json()
response = response.get("seasonShelf",[])
for ss in response.get('season', []):
for s in ss.get('episode', []):
for ep_item in s['subEpisode']:
# path=[]
# print(ep_item)
if ep_item.get("episode_no") is None:
continue
if ep_item.get('vod_season_id', '') != previous_season_id :
self.season= int(self.season)+1 if self.season else ep_item.get('season_no', '')
previous_season_id=ep_item.get('vod_season_id', '')
# print(self.season)
# path.append(ep_item.get('vod_title_id', ''))
# path.append(ep_item.get('vod_season_id', ''))
# path.append(ep_item.get('vod_episode_id', ''))
# path.append(ep_item.get('id', ''))
# url= urljoin(self.config['endpoints']['base_series_url'], '/'.join(path))
# manifest, license = self._get_manifest(url, drm)
manifest, license=None,None
# manifest, license = self._get_manifest(url, drm)
chapters = self.find_hhmmss_in_dict(ep_item)
titles.append({
"title":ep_item.get('title', ''),
"series_title":series_title,
# "url": url,
"drm":self._DRM_MAP.get(ep_item.get('drm', 'AES_128')) if self.drm == None else self.drm,
"season_no":ep_item.get('season_no', '') if not self.season else self.season,
"episode_no":ep_item.get('episode_no', '') ,
"sub_ep_no":ep_item.get('sub_ep_no', '') ,
"audio":ep_item.get('audio', ''),
'sub_ep_included': ep_item.get('sub_ep_included', False),
"subtitle":ep_item.get('subtitle', '') ,
"id":ep_item.get('id', ''),
# "year": res_json.get('release_year', ''),
'manifest': manifest,
'license': license,
'chapters': chapters,
# 'language': original_lang
# 'original_lang': original_lang,
})
return Series([Episode(
id_=x["id"],
service=self.__class__,
title=x["series_title"],
season=x.get("season_no"),
number=x.get("episode_no"),
name=x.get("title"),
data=x,
language=x.get("language",self.title_lang),
) for x in titles])
# DASH Example: Service requires separate API calls per codec/range.
# Uses _get_tracks_for_variants() which iterates codecs x ranges,
# handles HYBRID (HDR10+DV), and best_available fallback.
def get_tracks(self, title: Title_T) -> Tracks:
manifest, title.data["license_api"] = self._get_manifest(title.id, title.data['drm'])
res = self.session.get(url=manifest).text
tracks=Tracks()
if title.data['drm'] == "aes":
tracks.add(HLS.from_text(res, manifest).to_tracks(title.language))
else:
# r = self.session.get(url=manifest)
# res = r.text
tracks.add(DASH.from_text(
text=res,
url=manifest
).to_tracks(title.language))
return tracks
# HLS Example: Service returns all codecs/ranges in one master playlist.
# No need for _get_tracks_for_variants, dl.py filters by user selection.
#
# def get_tracks(self, title: Title_T) -> Tracks:
# playback = self.session.get(
# url=self.config["endpoints"]["playback"].format(title_id=title.id),
# params={"token": self.token},
# ).json()
# return HLS.from_url(
# url=playback["manifest_url"],
# session=self.session,
# ).to_tracks(title.language)
def get_chapters(self, title: Title_T) -> list[Chapter]:
chapters = []
list_chapter=title.data.get('chapters', [])
already_added=[]
for i,chapter in enumerate(list_chapter):
for key, value in chapter.items():
if not self.is_valid_nonzero_time(value) or value in already_added:
continue
chapters.append(Chapter(
name=key.replace('_', ' ').replace('skip', '').strip(),
timestamp=value,
))
already_added.append(value)
return chapters
def get_widevine_service_certificate(self, **_: any) -> str:
return self.config.get("certificate")
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
return self.session.post(
url=title.data["license_api"],
data=challenge # expects bytes
).content
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
license_url = self.config["endpoints"].get("playready_license")
if not license_url:
raise ValueError("PlayReady license endpoint not configured")
response = self.session.post(
url=license_url,
data=challenge,
headers={
"user-agent": self.config["client"][self.device]["license_user_agent"],
},
)
response.raise_for_status()
return response.content
@staticmethod
def find_hhmmss_in_dict(data) -> list[dict]:
hhmmss_pattern = re.compile(r'^\d{2}:\d{2}:\d{2}$')
matches = []
def search(d):
if isinstance(d, dict):
for k, v in d.items():
if isinstance(v, (dict, list)):
search(v)
elif isinstance(v, str) and hhmmss_pattern.match(v):
matches.append({k: v})
elif isinstance(d, list):
for item in d:
search(item)
search(data)
# Sorting by time value
def to_seconds(hms):
h, m, s = map(int, hms.split(':'))
return h * 3600 + m * 60 + s
matches.sort(key=lambda d: to_seconds(list(d.values())[0]))
return matches
@staticmethod
def is_valid_nonzero_time(value: str) -> bool:
# Match HH:MM:SS where each part is two digits
if not re.match(r'^\d{2}:\d{2}:\d{2}$', value):
return False
# Check that it's not 00:00:00
return value != "00:00:00"
def _get_manifest(self, cmsId: str, drm:str = "wv"):
def get_stream():
data = {
"drm": drm,
"cmsId": cmsId,
"contentType": "movie",
"lang": "en"
}
response = self.session.post(self.config['endpoints']['stream'], json=data)
retry_lang=["th"]
for lang in retry_lang:
if not response.status_code==200:
self.log.warning(f"Request for English lang fail. Retry with {lang}")
data["lang"] = lang
response = self.session.post(self.config['endpoints']['stream'], json=data)
if response.status_code==200:
return response
else:
return response
response:requests.Response|None = self.retry(get_stream)
stream = response.json().get("stream")
manifest=None
license=None
if drm == "wv":
manifest = stream.get("streamurl")
license = stream.get("license")
elif drm == "aes":
manifest = stream.get("result")
# print(manifest, license)
return manifest, license
def retry(self,func: Callable, args=None, kwargs=None, count: int = 5, delay: float = 5.0) -> Any:
args = args or ()
kwargs = kwargs or {}
last_exception = None
for attempt in range(1, count + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < count:
time.sleep(delay)
else:
raise last_exception
def get_auth_token(self,title_data):
authen = None
self.log.warning("Getting authorization token")
r=self.session.get(self.config["endpoints"]["title"].format(id=title_data.get("id"),season_id=title_data.get("season_id")))
data = json.loads(
BeautifulSoup(
r.content, features="lxml"
).find(
"script",
{
"id": "__NEXT_DATA__"
}).text
)
if self.type =="movie":
url = self.config["endpoints"]["movie_video"].format(id=data.get("id"))
else:
# sub_season_id = data["props"]["pageProps"]["resultTitleDetail"]["seriesData"]["vod_items"][0]["vod_items"][0]['id']
def get_id(d):
for i in d:
for j in i["vod_items"]:
sub_season_id = j.get("id")
if sub_season_id:
for k in j["ep_items"]:
ep_id = k.get("id")
if ep_id:
return sub_season_id, ep_id
sub_season_id,ep_id=get_id(data["props"]["pageProps"]["resultTitleDetail"]["seriesData"]["vod_items"])
# break
# break
# break
# ep_id = data["props"]["pageProps"]["resultTitleDetail"]["seriesData"]["vod_items"][0]["vod_items"][0]['ep_items'][0]['id']
url = self.config["endpoints"]["series_video"].format(id=title_data.get("id"),season_id=title_data.get("season_id"),sub_season_id=sub_season_id,ep_id=ep_id)
r=self.session.get(url)
match = re.search(r'__NEXT_DATA__\s*=\s*(\{.*?\});', r.text, re.DOTALL)
if match:
json_str = match.group(1)
try:
data = json.loads(json_str)
# You can also save it to a file:
# with open('next_data.json', 'w', encoding='utf-8') as f:
# json.dump(data, f, indent=2, ensure_ascii=False)
secret = "ads-top-position"
username = decrypt_cryptojs(data["runtimeConfig"]["baseAuth"]['username'], secret)
password = decrypt_cryptojs(data["runtimeConfig"]["baseAuth"]['password'], secret)
data_string=username+":"+password
authen = "Basic " + base64.b64encode(data_string.encode('utf-8')).decode()
self.log.warning(f"Got token : {authen}")
except json.JSONDecodeError as e:
self.log.error(f"JSON parsing error: {e}", )
except requests.exceptions.HTTPError as e:
self.log.error("HTTP error:", e)
else:
self.log.error("Cannot parsing __NEXT_DATA__ from title page.")
exit(1)
if not authen:
self.log.error("Cannot extract token which require.")
exit(1)
return authen
def evp_bytes_to_key(password, salt, key_len=32, iv_len=16):
dtot = b""
d = b""
while len(dtot) < (key_len + iv_len):
d = MD5.new(d + password + salt).digest()
dtot += d
return dtot[:key_len], dtot[key_len:key_len+iv_len]
def decrypt_cryptojs(ciphertext_b64, passphrase):
data = base64.b64decode(ciphertext_b64)
assert data[:8] == b"Salted__"
salt = data[8:16]
ciphertext = data[16:]
key, iv = evp_bytes_to_key(passphrase.encode(), salt)
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(ciphertext)
# remove PKCS7 padding
pad_len = decrypted[-1]
return decrypted[:-pad_len].decode("utf-8")