import re import time import json from http.cookiejar import CookieJar from typing import Optional, Union import click from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.manifests import HLS from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video, Audio import requests from typing import Callable, Any from Crypto.Cipher import AES from Crypto.Hash import MD5 import base64 from bs4 import BeautifulSoup class TID(Service): """ Service code for TrueID streaming service (https://www.trueid.net/). \b Original-Author: [SeFree] Version: 1.0.0 Authorization: Cookies Security: FHD@L3, and can be bypass using HLS instead of DASH. \b """ TITLE_RE = r'https://(?:www|movie)\.trueid\.net/(?:watch/)?(?:[a-z]{2}-[a-z]{2}/)?(?Pseries|movie)/(?P[a-zA-Z0-9]+)(?:/(?P[a-zA-Z0-9]+))?' VIDEO_RANGE_MAP = { "SDR": "sdr", "HDR10": "hdr10", "DV": "dolby_vision", } LANGUAGE_MAP = { "en": "English", "th": "Thai", "jp": "Japanese", "ko": "Korean", "zh": "Chinese", } _DRM_MAP={ "wv":"WV_FPS", "aes":"AES_128", } @staticmethod @click.command(name="TID", short_help="https://www.trueid.net") @click.argument("title", type=str) @click.option("-SE", "--season", default=None, required=False, type=int, help="TrueID sometime not provide Season in info, so specify it manually.") @click.option("-d", "--drm", default="wv",type=click.Choice(["aes", "wv","None"]), required=False, help="TrueID can be force to use DASH or HLS, this option will force the use of DASH (default: wv).") @click.option("-tl", "--title_lang", default="ja", required=False, type=str, help="If the title is foreigner audio language, specify the foreigner language.") @click.pass_context def cli(ctx, **kwargs): return TID(ctx, **kwargs) def extract_id(self,url: str): match = re.compile(self.TITLE_RE).search(url) if not match: return None return match.groupdict() def __init__(self, ctx, title,drm, season,title_lang): super().__init__(ctx) self.data = self.extract_id(title) self.title =self.data.get("id") self.drm = drm self.type=self.data.get('type') if self.data else "series" self.season = season self.title_lang = title_lang self.license_api = None self.auth_token=None def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) """ Refreshes the cookies for the TrueID service. This is necessary to maintain a valid session. """ self.auth_token=self.retry(self.get_auth_token,args=(self.data,)) # self.auth_token= self.get_auth_token(self.data) # r=self.session.get(self.title) # data = json.loads( # BeautifulSoup( # r.content, features="lxml" # ).find( # "script", # { # "id": "__NEXT_DATA__" # }).text # ) params = { 'client_id': '893', 'browser_id': '*****.*****', 'ver': '1.5.0', 'scope': 'public_profile,mobile,email,references,identity_account_read,identity_token_bridge', } headers = { 'origin': 'https://www.trueid.net', 'referer': 'https://www.trueid.net/', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0', } # self.session.cookies.clear() # self.session.cookies.update({ # "session_tids":self.config['check_sso_params']['session_tids'] # }) response = self.session.get(self.config['endpoints']['check_sso'], params=params, headers=headers) access_token=response.json().get('access_token') self.log.debug(f"SSO Check Response: {response.json()}") if not response.json().get('status') == 'connected': self.log.error("SSO check failed, cannot refresh cookies.") exit() # token = base64.b64encode(f"{data['buildId']}:{"b".join(data['buildId'].split("b")[2:])}".encode("utf-8")).decode("utf-8") headers = { # 'authorization': f"Basic {token}", 'authorization': self.auth_token, } self.session.headers.update(headers) json_data = { 'accessToken': access_token, 'ssoId': self.session.cookies.get_dict().get("sso_u"), } response = self.session.post(self.config['endpoints']['me'], json=json_data) # response = self.session.get(self.config['endpoints']['storelogin']) # print(response.json()) # exit() # exit() if response.status_code < 300 and response.status_code >=200 and response.json().get('success')== True: data=response.json()['data'] json_data = { 'USER_PROFILE': { 'avatar': data['avatar'], 'displayName': data['display_name'], 'uid': data['ssoid'], 'user_profile_id': data['id'], 'trueid': '', }, 'isLogin': True, 'avatar': data['avatar'], 'displayName': data['display_name'], 'AAA_PROFILE': { 'access_token': access_token, 'status': 'connected', 'income': 'trueid', 'hashed': self.config.get('check_sso_params', {}).get('hashed'), 'uid': data['ssoid'], }, 'SUB_RES': [], 'uniqueUserId': '*****.*****', 'PPID': '', 'UID_ads': '', } response = self.session.post(self.config['endpoints']['storelogin'], headers=headers, json=json_data) if response.text == 'Successfully': return True else: self.log.error("Failed to refresh cookies.") exit() # def refresh_cookies(self): # print(self.cookies) exit() def get_titles(self) -> Titles_T: kind = None drm=None if self.type=="movie": headers = { 'authorization': self.auth_token, # "referer": self.title, } self.session.headers.update(headers) data = { "id": self.title, "lang": "en", # "seasonId": "", } response = self.session.post(url=self.config['endpoints']['getDetailById'], json=data) response.raise_for_status() res_json=response.json() chapters = self.find_hhmmss_in_dict(res_json) manifest, license=None,None if res_json.get("drm")=="WV_FPS": # print("Found Widevine DRM") drm="wv" elif res_json.get("drm")=="AES_128": # print("Found AES_128 DRM") drm="aes" else: drm="aes" if self.drm: drm=self.drm return Movies( [ Movie( id_=res_json.get('id', ''), service=self.__class__, name=res_json.get('title', ''), year=res_json.get('release_year', ''), # original_lang=Language.find(self.LANGUAGE_MAP[res_json['source_country'].lower()]) if res_json.get('source_country') else 'en', data={ "title":res_json.get('title', ''), # "series_title":res_json.get('title', ''), "url": self.title, "drm":drm, "season_no":res_json.get('season_no', '') , "episode_no":res_json.get('episode_no', '') , "audio":res_json.get('audio', ''), "subtitle":res_json.get('subtitle', '') , "id":res_json.get('id', ''), "year": res_json.get('release_year', ''), 'manifest': manifest, 'license': license, 'chapters': chapters, } ) ] ) else: titles=[] drm=None headers = { 'authorization': self.auth_token, "referer": f'https://www.trueid.net/watch/th-th/series/{self.title}/{self.data.get("season_id")}', } # self.session.headers.update(headers) seasonId=self.data.get("season_id") params = { "id": self.title, "lang": "en", # "seasonId": seasonId, } # print(self.session.headers) # print(self.session.cookies.get_dict()) response =self.session.get(url=self.config['endpoints']['seasonsSerise'],headers=headers, params=params) if not response.json(): params = { "id": self.title, "lang": "th", # "seasonId": seasonId, } # print(self.session.headers) # print(self.session.cookies.get_dict()) response =self.session.get(url=self.config['endpoints']['seasonsSerise'],headers=headers, params=params) seasonItems=response.json()['seasonItems'] # original_lang=Language.find(self.LANGUAGE_MAP[res_json['source_country'].lower()]) if res_json.get('source_country') else 'en' series_title=response.json().get('seasonShelf', '').get('title', '') # with open("seasonShelf.json", "w") as f: # json.dump(series_title, f, indent=4, ensure_ascii=False) season_detail=None url=None previous_season_id=self.data.get('season_id', '') # print(previous_season_id) for season in seasonItems: # print(season['id']) headers = { 'authorization': self.auth_token, # "referer": f'https://www.trueid.net/watch/th-th/series/{self.title}/{season['id']}', } params = { "id": self.title, "lang": "en", "seasonId": season['id'], } response = self.session.get(url=self.config['endpoints']['seasonsSerise'], params=params).json() if response is None or len(response) == 0: self.log.warning("Season not found") # exit(1) # if len(response) == 0: params = { "id": self.title, "lang": "th", "seasonId": season['id'], } response = self.session.get(url=self.config['endpoints']['seasonsSerise'], params=params).json() response = response.get("seasonShelf",[]) for ss in response.get('season', []): for s in ss.get('episode', []): for ep_item in s['subEpisode']: # path=[] # print(ep_item) if ep_item.get("episode_no") is None: continue if ep_item.get('vod_season_id', '') != previous_season_id : self.season= int(self.season)+1 if self.season else ep_item.get('season_no', '') previous_season_id=ep_item.get('vod_season_id', '') # print(self.season) # path.append(ep_item.get('vod_title_id', '')) # path.append(ep_item.get('vod_season_id', '')) # path.append(ep_item.get('vod_episode_id', '')) # path.append(ep_item.get('id', '')) # url= urljoin(self.config['endpoints']['base_series_url'], '/'.join(path)) # manifest, license = self._get_manifest(url, drm) manifest, license=None,None # manifest, license = self._get_manifest(url, drm) chapters = self.find_hhmmss_in_dict(ep_item) # print(ep_item.get('season_no', '') if not self.season else self.season) titles.append({ "title":ep_item.get('title', ''), "series_title":series_title, # "url": url, "drm":self._DRM_MAP.get(ep_item.get('drm', 'AES_128')) if self.drm == None else self.drm, "season_no":ep_item.get('season_no', '') if not self.season else self.season, "episode_no":ep_item.get('episode_no', '') , "sub_ep_no":ep_item.get('sub_ep_no', '') , "audio":ep_item.get('audio', ''), 'sub_ep_included': ep_item.get('sub_ep_included', False), "subtitle":ep_item.get('subtitle', '') , "id":ep_item.get('id', ''), # "year": res_json.get('release_year', ''), 'manifest': manifest, 'license': license, 'chapters': chapters, # 'original_lang': original_lang, }) return Series([Episode( id_=x["id"], service=self.__class__, title=x["series_title"], season=x.get("season_no"), number=x.get("episode_no"), name=x.get("title"), data=x, # original_lang=x.get("original_lang", ""), ) for x in titles]) # DASH Example: Service requires separate API calls per codec/range. # Uses _get_tracks_for_variants() which iterates codecs x ranges, # handles HYBRID (HDR10+DV), and best_available fallback. def get_tracks(self, title: Title_T) -> Tracks: manifest, title.data["license_api"] = self._get_manifest(title.id, title.data['drm']) res = self.session.get(url=manifest).text tracks=Tracks() if title.data['drm'] == "aes": tracks.add(HLS.from_text(res, manifest).to_tracks(self.title_lang)) else: # r = self.session.get(url=manifest) # res = r.text tracks.add(DASH.from_text( text=res, url=manifest ).to_tracks(self.title_lang)) return tracks # HLS Example: Service returns all codecs/ranges in one master playlist. # No need for _get_tracks_for_variants, dl.py filters by user selection. # # def get_tracks(self, title: Title_T) -> Tracks: # playback = self.session.get( # url=self.config["endpoints"]["playback"].format(title_id=title.id), # params={"token": self.token}, # ).json() # return HLS.from_url( # url=playback["manifest_url"], # session=self.session, # ).to_tracks(title.language) def get_chapters(self, title: Title_T) -> list[Chapter]: chapters = [] list_chapter=title.data.get('chapters', []) already_added=[] for i,chapter in enumerate(list_chapter): for key, value in chapter.items(): if not self.is_valid_nonzero_time(value) or value in already_added: continue chapters.append(Chapter( name=key.replace('_', ' ').replace('skip', '').strip(), timestamp=value, )) already_added.append(value) return chapters def get_widevine_service_certificate(self, **_: any) -> str: return self.config.get("certificate") def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: return self.session.post( url=title.data["license_api"], data=challenge # expects bytes ).content def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: license_url = self.config["endpoints"].get("playready_license") if not license_url: raise ValueError("PlayReady license endpoint not configured") response = self.session.post( url=license_url, data=challenge, headers={ "user-agent": self.config["client"][self.device]["license_user_agent"], }, ) response.raise_for_status() return response.content @staticmethod def find_hhmmss_in_dict(data) -> list[dict]: hhmmss_pattern = re.compile(r'^\d{2}:\d{2}:\d{2}$') matches = [] def search(d): if isinstance(d, dict): for k, v in d.items(): if isinstance(v, (dict, list)): search(v) elif isinstance(v, str) and hhmmss_pattern.match(v): matches.append({k: v}) elif isinstance(d, list): for item in d: search(item) search(data) # Sorting by time value def to_seconds(hms): h, m, s = map(int, hms.split(':')) return h * 3600 + m * 60 + s matches.sort(key=lambda d: to_seconds(list(d.values())[0])) return matches @staticmethod def is_valid_nonzero_time(value: str) -> bool: # Match HH:MM:SS where each part is two digits if not re.match(r'^\d{2}:\d{2}:\d{2}$', value): return False # Check that it's not 00:00:00 return value != "00:00:00" def _get_manifest(self, cmsId: str, drm:str = "wv"): def get_stream(): data = { "drm": drm, "cmsId": cmsId, "contentType": "movie", "lang": "en" } response = self.session.post(self.config['endpoints']['stream'], json=data) retry_lang=["th"] for lang in retry_lang: if not response.status_code==200: self.log.warning(f"Request for English lang fail. Retry with {lang}") data["lang"] = lang response = self.session.post(self.config['endpoints']['stream'], json=data) if response.status_code==200: return response else: return response response:requests.Response|None = self.retry(get_stream) stream = response.json().get("stream") manifest=None license=None if drm == "wv": manifest = stream.get("streamurl") license = stream.get("license") elif drm == "aes": manifest = stream.get("result") # print(manifest, license) return manifest, license def retry(self,func: Callable, args=None, kwargs=None, count: int = 5, delay: float = 5.0) -> Any: args = args or () kwargs = kwargs or {} last_exception = None for attempt in range(1, count + 1): try: return func(*args, **kwargs) except Exception as e: last_exception = e if attempt < count: time.sleep(delay) else: raise last_exception def get_auth_token(self,title_data): authen = None self.log.warning("Getting authorization token") r=self.session.get(self.config["endpoints"]["title"].format(id=title_data.get("id"),season_id=title_data.get("season_id"))) data = json.loads( BeautifulSoup( r.content, features="lxml" ).find( "script", { "id": "__NEXT_DATA__" }).text ) if self.type =="movie": url = self.config["endpoints"]["movie_video"].format(id=data.get("id")) else: # sub_season_id = data["props"]["pageProps"]["resultTitleDetail"]["seriesData"]["vod_items"][0]["vod_items"][0]['id'] def get_id(d): for i in d: for j in i["vod_items"]: sub_season_id = j.get("id") if sub_season_id: for k in j["ep_items"]: ep_id = k.get("id") if ep_id: return sub_season_id, ep_id sub_season_id,ep_id=get_id(data["props"]["pageProps"]["resultTitleDetail"]["seriesData"]["vod_items"]) # break # break # break # ep_id = data["props"]["pageProps"]["resultTitleDetail"]["seriesData"]["vod_items"][0]["vod_items"][0]['ep_items'][0]['id'] url = self.config["endpoints"]["series_video"].format(id=title_data.get("id"),season_id=title_data.get("season_id"),sub_season_id=sub_season_id,ep_id=ep_id) r=self.session.get(url) match = re.search(r'__NEXT_DATA__\s*=\s*(\{.*?\});', r.text, re.DOTALL) if match: json_str = match.group(1) try: data = json.loads(json_str) # You can also save it to a file: # with open('next_data.json', 'w', encoding='utf-8') as f: # json.dump(data, f, indent=2, ensure_ascii=False) secret = "ads-top-position" username = decrypt_cryptojs(data["runtimeConfig"]["baseAuth"]['username'], secret) password = decrypt_cryptojs(data["runtimeConfig"]["baseAuth"]['password'], secret) data_string=username+":"+password authen = "Basic " + base64.b64encode(data_string.encode('utf-8')).decode() self.log.warning(f"Got token : {authen}") except json.JSONDecodeError as e: self.log.error(f"JSON parsing error: {e}", ) except requests.exceptions.HTTPError as e: self.log.error("HTTP error:", e) else: self.log.error("Cannot parsing __NEXT_DATA__ from title page.") exit(1) if not authen: self.log.error("Cannot extract token which require.") exit(1) return authen def evp_bytes_to_key(password, salt, key_len=32, iv_len=16): dtot = b"" d = b"" while len(dtot) < (key_len + iv_len): d = MD5.new(d + password + salt).digest() dtot += d return dtot[:key_len], dtot[key_len:key_len+iv_len] def decrypt_cryptojs(ciphertext_b64, passphrase): data = base64.b64decode(ciphertext_b64) assert data[:8] == b"Salted__" salt = data[8:16] ciphertext = data[16:] key, iv = evp_bytes_to_key(passphrase.encode(), salt) cipher = AES.new(key, AES.MODE_CBC, iv) decrypted = cipher.decrypt(ciphertext) # remove PKCS7 padding pad_len = decrypted[-1] return decrypted[:-pad_len].decode("utf-8")