import re from http.cookiejar import CookieJar from typing import Optional import click from langcodes import Language from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video, Audio from urllib.parse import urlparse from bs4 import BeautifulSoup import requests import time import pysubs2 from subby import BilibiliJSONConverter,CommonIssuesFixer from hashlib import md5 class BLBL(Service): """ Service code for BiliBili streaming service (https://www.bilibili.tv/). Author: SeFree Version: 1.0.0 Authorization: Cookies or android access_key Security: No CDM require. Just direct download Note: BiliBili has a lot of different regions, so you may need to specify the region in the config. The default region is "th". Cookies are required for authentication, and you can get them from the browser. Cookies seem not to expire, so you can use them for a long time. """ TITLE_RE = [ r"^https?://(?:www\.)?bilibili\.tv/(?:[^/]+/)?play/(?:\d+/)?(?P\d+)/?", r"^https?://(?:www\.)?bili\.im/(?P[A-Za-z0-9]+)", ] # VIDEO_CODEC_MAP = { # 'h265': Video.Codec.HEVC, # 'h264': Video.Codec.AVC, # } VIDEO_RANGE_MAP = { "SDR": "sdr", "HDR10": "hdr10", "DV": "dolby_vision", } LANGUAGE_MAP = { 'Japan': 'ja', 'Chinese Mainland': 'zh', 'Thai': 'th', 'CN': 'zh', } @staticmethod @click.command(name="BLBL", short_help="https://www.bilibili.tv/") @click.argument("title", type=str) @click.option("-SE", "--season", default=1, required=False, type=int, help="BiliBili not provide Season in info, so specify it manually.") @click.option("-tl", "--title_lang", default=None, required=False, type=str, help="If the title is foreigner audio language, specify the foreigner language.") @click.option("-m", "--movie", default=False,is_flag=True, required=False, type=bool, help="If the title is foreigner audio language, specify the original title.") @click.option("-o", "--original_url", default=None, required=False, type=str, help="If the title is foreigner audio language, specify the original title.") @click.option("-ol", "--original_lang", default=None, required=False, type=str, help="If the title is foreigner audio language, specify the original language.") @click.pass_context def cli(ctx, **kwargs): return BLBL(ctx, **kwargs) def extract_id(self,url: str): for pattern in self.TITLE_RE: match = re.search(pattern, url) if match: return match.group("id") return None def __init__(self, ctx, title,season, original_url,title_lang,original_lang,movie): super().__init__(ctx) if "bili.im" in title: title="https://bili.im/"+ self.extract_id(title) self.log.warning("Short URL detected") title= self.session.get(title, allow_redirects=True).url title = self.extract_id(title) self.skip_dl=ctx.parent.params.get("skip_dl") self.title=f"https://www.bilibili.tv/play/{title}" self.Season = season self.title_lang = title_lang self.movie = movie self.android_access_key= self.config.get("access_key") if original_url and "bili.im" in original_url: original_url="https://bili.im/"+ self.extract_id(original_url) self.log.warning("Short URL detected") original_url= self.session.get(original_url, allow_redirects=True).url self.original_url=f"https://www.bilibili.tv/play/{self.extract_id(original_url)}" if original_url else None self.original_lang = original_lang self.appbuild = self.config['default_config']['appbuild'] self.lang = self.config['default_config']['lang'] self.region = self.config['default_config']['region'] self.already_json_to_srt=[] def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) if not cookies: raise EnvironmentError("Service requires Cookies for Authentication.") def get_titles(self) -> Titles_T: content_type = None media_title = None media_year = None want_ep=None if self.original_url and (len(urlparse(self.title).path.split('/')) != len(urlparse(self.original_url).path.split('/'))) : self.log.error("Original URL is provided, but the title and original URL do not match in structure.") self.log.error(f"Foreigner URL : {self.title}") self.log.error(f"Original URL : {self.original_url}") exit(1) id=urlparse(self.title).path.split('/')[-1] org_id=urlparse(self.original_url).path.split('/')[-1] if self.original_url else None content_type = "SERIES" if urlparse(self.title).path.split('/')[-2] != 'play': self.log.info("Title is a single Episode") content_type = "SINGLE" id=urlparse(self.title).path.split('/')[-2] org_id=urlparse(self.original_url).path.split('/')[-2] if self.original_url else None want_ep= urlparse(self.title).path.split('/')[-1] else: self.log.info("Title is a Series, will fetch all episodes") self.original_lang=self.get_original_language(id) if not self.original_lang else self.original_lang Season_Info=self.getSeasonInfo(id) Season_Org_Info=self.getSeasonInfo(org_id) if org_id else None if self.movie : content_type = "MOVIE" if (Season_Info or Season_Info.get('episode_info')) and Season_Org_Info and (Season_Org_Info or Season_Org_Info.get('episode_info')) : intersection = set(item['short_title_display'] for item in Season_Info['episode_info']) & \ set(item['short_title_display'] for item in Season_Org_Info['episode_info']) # Filter both lists to include only items with matching short_title_display Season_Info['episode_info'] = [item for item in Season_Info['episode_info'] if item['short_title_display'] in intersection] Season_Org_Info['episode_info'] = [item for item in Season_Org_Info['episode_info'] if item['short_title_display'] in intersection] media_title= Season_Org_Info['detail']['title'] if Season_Org_Info else Season_Info['detail']['title'] if 'Dub' in Season_Info['detail']['title']: match = re.search(r'\((.*?)\)', Season_Info['detail']['title']) if match: result = match.group(1) lang=result.split(' ') lang.remove('Dub') # print(f"Dubbed Language: {lang[0]}") self.title_lang= self.LANGUAGE_MAP.get(lang[0], self.original_lang) if not self.title_lang else self.title_lang elif 'Thai' in Season_Info['detail']['title']: self.title_lang= 'th' else: self.title_lang = self.original_lang episode_list=[{"ep": s['short_title_display'].split('E')[-1], "short_title_display": s['short_title_display'], "ep_name":s['long_title_display'], "id": s['episode_id']} for s in Season_Info['episode_info']] episode_original_list=[{"ep": s['short_title_display'].split('E')[-1], "id": s['episode_id'], "name": s['long_title_display']} for s in Season_Org_Info['episode_info']] if Season_Org_Info else [] titles = [] special_ep=0 for i,ep in enumerate(episode_list): if content_type == "SINGLE" or content_type == "MOVIE": # print(ep.get('streams', '').get('dash', '').get('auto', '')[0].get('subtitles', '')) if content_type == "SINGLE": if ep.get('id') != want_ep: self.log.info(f"Skipping episode {ep.get('id')} as it does not match the requested episode {want_ep}.") continue if not (isinstance(ep.get('ep', ''), int) or isinstance(ep.get('ep', ''), int)): self.log.info(f"Skipping episode {ep.get('id')} as it is not a valid episode number.") continue return Movies([ Movie( id_=ep.get('id', ''), service=self.__class__, name=media_title, language= self.original_lang, data={ "Episode": ep, "Original": episode_original_list[i] if Season_Org_Info else None, }, )] ) if content_type == "MOVIE": return Movies([ Movie( id_=ep.get('id', ''), service=self.__class__, name=media_title, language= self.original_lang, data={ "Episode": ep, "Original": episode_original_list[i] if Season_Org_Info else None, }, )] ) elif content_type == "SERIES": # ep_name=None ep_name=ep.get('ep_name', None) special_season=False ## to handle special episode like SP1, SP2, 2.5, OVA etc. if str(ep['ep']).isdigit(): ep['ep'] = float(ep.get('ep', '')) if ep['ep'] % 1 > 0: ep_name = f"special_{ep['ep']}" special_season=True special_ep=special_ep+1 else: special_season=False else: if bool(re.match(r"(SP|SP\d|OVA*|OAD*)", ep['short_title_display'])): special_season=True special_ep=special_ep+1 else: special_season=False titles.append(Episode( id_=ep.get('id', ''), service=self.__class__, title=self.shorten_filename(media_title), season=self.Season if not special_season else 0, number=int(ep.get('ep', '')) if not special_season else int(special_ep), name=ep_name, data={ "Episode": ep, "Original": episode_original_list[i] if Season_Org_Info else None, }, language= self.original_lang )) return Series(titles) # DASH Example: Service requires separate API calls per codec/range. # Uses _get_tracks_for_variants() which iterates codecs x ranges, # handles HYBRID (HDR10+DV), and best_available fallback. def get_tracks(self, title: Title_T) -> Tracks: tracks=Tracks() time.sleep(1) # print(title.id) media_url=self.getMediaURL(title.id) media_ori_url=self.getMediaURL(title.data['Original']['id']) if title.data.get('Original') else None videos=media_url['videos'] video_track=[] for _, video in enumerate(videos): video_track = Video( id_=md5(video["url"].encode()).hexdigest()[0:6], drm=None, url=video["url"], width=video["width"], height=video["height"], fps=video.get("framerate", None), # metadata # size=int(video["size"]), language=Language.get(self.title_lang), bitrate=video["bandwidth"], codec=video["codec"], # extra={'user-agent': True if self.android_access_key else False, # 'host': urlparse(video["url"]).hostname} if self.android else None ) tracks.add(video_track) audios=media_url['audios'] audio_tracks=[] for audio in audios: audio_track = Audio( id_=md5(audio["url"].encode()).hexdigest()[0:6], drm=None, url=audio["url"], # metadata # size=int(audio["size"]), bitrate=audio["bandwidth"], codec=Audio.Codec.from_mime(audio['codec'].split(".")[0]), channels=audio['codec'].split(".")[-1], language=Language.get(self.title_lang), # extra={'user-agent': self.android, # 'host': urlparse(video["url"]).hostname} if self.android else None ) audio_tracks.append(audio_track) audios_ori=media_ori_url['audios'] if media_ori_url else [] for audio in audios_ori: audio_track = Audio( id_=md5(audio["url"].encode()).hexdigest()[0:6], drm=None, url=audio["url"], # metadata # size=int(audio["size"])- 1, bitrate=audio["bandwidth"], codec=Audio.Codec.from_mime(audio['codec'].split(".")[0]), channels=audio['codec'].split(".")[-1], is_original_lang=True, language=Language.get(self.original_lang), # extra={'user-agent': self.android, # 'host': urlparse(video["url"]).hostname} if self.android else None ) # audio_track.is_original_lang=True audio_tracks.append(audio_track) tracks.add(audio_tracks) subtitle_tracks = [] get_sub=self.getSubtitle(title.id)['video_subtitle'] if get_sub: for subtitle in get_sub: subtitle_track_srt=Subtitle( id_=md5(subtitle['srt']["url"].encode()).hexdigest()[0:6], drm=None, url=subtitle['srt']["url"], # metadata codec=Subtitle.Codec.SubRip, language=subtitle["lang_key"], ) subtitle_track_srt.is_original_lang=False if media_ori_url else True subtitle_tracks.append(subtitle_track_srt) if subtitle['ass']: subtitle_track_ass=Subtitle( id_=md5(subtitle['ass']["url"].encode()).hexdigest()[0:6], drm=None, url=subtitle['ass']["url"], # metadata codec=Subtitle.Codec.SubStationAlphav4, language=subtitle["lang_key"], ) subtitle_track_ass.is_original_lang=False if media_ori_url else True subtitle_tracks.append(subtitle_track_ass) if media_ori_url: get_ori_sub=self.getSubtitle(title.data['Original']['id'])['video_subtitle'] if get_ori_sub: for subtitle in get_ori_sub: subtitle_track_srt=Subtitle( id_=md5(subtitle['srt']["url"].encode()).hexdigest()[0:6], drm=None, url=subtitle['srt']["url"], # metadata codec=Subtitle.Codec.SubRip, language=subtitle["lang_key"], ) subtitle_track_srt.is_original_lang=True subtitle_tracks.append(subtitle_track_srt) if subtitle['ass']: subtitle_track_ass=Subtitle( id_=md5(subtitle['ass']["url"].encode()).hexdigest()[0:6], drm=None, url=subtitle['ass']["url"], # metadata codec=Subtitle.Codec.SubStationAlphav4, language=subtitle["lang_key"], ) subtitle_track_ass.is_original_lang=True subtitle_tracks.append(subtitle_track_ass) tracks.add(subtitle_tracks) return tracks def get_chapters(self, title: Title_T) -> list[Chapter]: chapters=[] CHAPTER_TITLE_MAP={ 'opening_start_time':'intro', 'opening_end_time':'Chapter {chapter}', 'ending_start_time':'ending', 'ending_end_time':'Chapter {chapter}', } params = { 'episode_id': title.data['Episode']['id'], } res=self.session.get(self.config['endpoints']['time_stampdetail'], params=params).json() i=0 c=1 for cpt in res.get("data").values(): if cpt is None: continue for chapter,time_stamp in cpt.items(): # print(CHAPTER_TITLE_MAP.get(chapter, chapter),time_stamp) if time_stamp ==0: continue if "_start_" in chapter: chapters.append( Chapter( name=CHAPTER_TITLE_MAP.get(chapter, chapter), timestamp=time_stamp, ) ) i+=1 elif "_end_" in chapter: chapters.append( Chapter( name=CHAPTER_TITLE_MAP.get(chapter, chapter).format(chapter=c), timestamp=time_stamp, ) ) i+=1 c+=1 return chapters def getEpisodes(self, season_id: int): url = self.config['endpoints']['gateway_play'].format(season_id=season_id, lang=self.lang.lower(), region=self.region.upper()) response = self.session.get(url).json() if not response['code'] == 0: raise ValueError(f"API Error {response['message']}") episodes = [] for section in response['data']['sections']: for episode in section['episodes']: if (re.match(r"(E[0-9]+|\d+[AB]|SP|SP\d|OVA*|OAD)", episode['short_title_display'])): episodes.append(episode) return episodes def getSeasonInfo(self, season_id): agent = f"bilibili/{self.appbuild} CFNetwork/1.0 Darwin/23.0.0 os/ios model/iPhone 12 Pro Max mobi_app/bstar_i build/2450100 osVer/17.0 network/2 channel/AppStore" url = self.config['endpoints']['gateway_view'].format(appbuild=self.appbuild,season_id=season_id, lang=self.lang.lower(), region=self.region.upper()) payload = {} headers = headers = { 'User-Agent': agent } response = requests.request( "GET", url, headers=headers, data=payload).json() if not response['code'] == 0: raise ValueError(f"API Error {response['message']}") #if is_db: season_info=response['data'] episode_info = self.getEpisodes(season_id=int(season_id)) # with open("data.json", "w") as f: # json.dump(episode_info, f, indent=4, ensure_ascii=False) return {'detail':season_info,'episode_info':episode_info} def getMediaURL(self, episode_id: int): res=self.config['default_config']['res'] responses=[] url = self.config['endpoints']['get_media_url'].format(episode_id=episode_id, res=res) res = self.session.get(url,headers={'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'}).json() if not res['code'] == 0 and not self.android_access_key: if res['message'] in ["10004004", "10004005", "10023006"]: raise ValueError(f"API Code {res['message']}: This video is only available for registered users") elif res['message'] in ["10004001"]: raise ValueError(f"API Code {res['message']}: This video is not available from your location due to geo restriction") else: raise ValueError(f"API Error {res['message']}") elif res['code'] == 0 : responses.append({"manifest":"web","data":res['data']}) if self.android_access_key: url = self.config['endpoints']['get_media_url_android'] # cookies = { # 'regionforbid': self.config['android']['regionforbid'], # } params = { 'access_key': self.android_access_key, 'ep_id': episode_id, 'platform': 'android', 'prefer_code_type': '1', } res =self.session.get(url,params=params).json() if not res['code'] == 0: if res['message'] in ["10004004", "10004005", "10023006"]: raise ValueError(f"API Code {res['message']}: This video is only available for registered users") elif res['message'] in ["10004001"]: raise ValueError(f"API Code {res['message']}: This video is not available from your location due to geo restriction") else: raise ValueError(f"API Error {res['message']}") responses.append({"manifest":"android","data":res['data']}) videos=[] audios=[] for response_data in responses: if response_data["manifest"] == "web" and response_data["data"]: response=response_data['data'] for video in response['playurl']['video']: sel_res = int(video['stream_info']['quality']) video_url = video['video_resource']['url'] if not video_url: continue # video_backup_url = video['video_resource']['backup_url'][0] video_size = video['video_resource']['size'] video_bandwidth = video['video_resource']['bandwidth'] video_codec= Video.Codec.HEVC if "hev" in video['video_resource']['codecs'] else Video.Codec.AVC video_width= video['video_resource']['width'] video_height= video['video_resource']['height'] # print(video_codec) # print(video_url) video_framerate= int(video['video_resource']['frame_rate'].split('/')[0])/int(video['video_resource']['frame_rate'].split('/')[-1]) videos.append({ "url": video_url, # "backup_url": video_backup_url, "size": video_size, "bandwidth": video_bandwidth, "codec": video_codec, "width": video_width, "height": video_height, "framerate": video_framerate }) for audio in response['playurl']['audio_resource']: audios.append({ "url": audio['url'], "backup_url": audio['backup_url'], "size": audio['size'], "bandwidth": audio['bandwidth'], "codec": audio['codecs'], }) if response_data["manifest"] == "android" and response_data["data"]: # video_codec_map={ # 120:"avc1.640033", # 112:"avc1.640032", # 80:"avc1.640032", # 64:"avc1.640028", # 32:"avc1.64001F", # 16:"avc1.64001E", # 12:"hev1.1.6.L153.90", # 6:"avc1.64001E", # 5:"avc1.64001E" # } response=response_data['data'] video_codec_map={ 7:Video.Codec.AVC, 12:Video.Codec.HEVC } audio_codec_map={ 30280:"mp4a.40.2", 30232:"mp4a.40.2", 30216:"mp4a.40.5" } quality_map={ 120: [3840, 2160], 112: [1920, 1080], 80: [1920, 1080], 64: [1280, 720], 32: [852, 480], 16: [640, 360], 6: [426, 240], 5: [256, 144] } # print(response) if response.get('video_info') is None: # if response.get('dialog')['type'] == 2: # raise ValueError(f"API Error : {response.get('dialog')['title']}") # elif response.get('dialog')['type'] == 5: # raise ValueError(f"API Error : {response.get('dialog')['title']}") raise ValueError(f"API Error : {response.get('dialog')['title']}") for video in response['video_info']['stream_list']: sel_res = int(video['stream_info']['quality']) video_url = video['dash_video']['base_url'] # video_backup_url = video['dash_video']['backup_url'][0] video_size = video['dash_video']['size'] video_bandwidth = video['dash_video']['bandwidth'] video_codec= video['dash_video']['codecid'] video_width= quality_map.get(sel_res,[])[0] video_height= quality_map.get(sel_res,[])[1] # video_framerate= int(video['dash_video']['frame_rate'].split('/')[0])/int(video['dash_video']['frame_rate'].split('/')[-1]) videos.append({ "url": video_url, # "backup_url": video_backup_url, "size": video_size, "bandwidth": video_bandwidth, "codec": video_codec_map.get(video_codec,Video.Codec.AVC), "width": video_width, "height": video_height, # "framerate": video_framerate }) for audio in response['video_info']['dash_audio']: audios.append({ # "id": audio['id'], "url": audio['base_url'], "backup_url": audio['backup_url'][0], "size": audio['size'], "bandwidth": audio['bandwidth'], "codec": audio_codec_map.get(audio['id'], "mp4a.40.2"), }) import json with open("data.json", "w") as f: json.dump({"videos":videos, "audios": audios, }, f, indent=4, ensure_ascii=False) return {"videos":videos, "audios": audios, } def getSubtitle(self, episode_id: int): url = self.config['endpoints']['get_subtitle_url'].format(episode_id=episode_id, lang=self.lang.lower(), region=self.region.upper()) response = requests.get(url,cookies=self.session.cookies).json() if not response['code'] == 0: raise ValueError(f"API Error {response['message']}") response = response['data'] return response def get_original_language(self,id): """ Get the homepage of BiliBili. """ url = self.config['endpoints']['get_homepage'].format(media_id=id) response = requests.get(url, headers={'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'}) if response.status_code != 200: raise ValueError(f"Failed to fetch homepage: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') # div = soup.find('div', class_='detail-table media-info__info') # print(div) spans = soup.findAll(name='span', class_='detail-table__text') for span in spans: original_lang = self.LANGUAGE_MAP.get(span.contents[0], None) if original_lang: # self.log.info(f"Original Language: {original_lang}") return original_lang return None @staticmethod def shorten_filename(name, max_length=150): # Extract folder, file name, and extension if len(name) > max_length: short_name = name[:max_length] + "..." else: short_name = name # Reconstruct shortened path with extension return short_name def on_track_downloaded(self, track: AnyTrack) -> None: """ Called when a Track has finished downloading. Parameters: track: The Track object that was downloaded. """ if isinstance(track,Subtitle) and not self.skip_dl: # print(track.path.absolute().__str__().replace(track.path.suffix,"")) if track.path.suffix == ".ass": font_name= "Noto Sans Thai" ass_file = pysubs2.load(track.path.absolute().__str__()) for name, style in ass_file.styles.items(): if "Noto Sans" in style.fontname and "Thai" not in style.fontname: style.fontname = style.fontname.replace("Noto Sans", font_name) style.fontsize=style.fontsize/1.25 ass_file.save(track.path.absolute().__str__()) if track.path.suffix == ".srt" and track.path.name not in self.already_json_to_srt: with open(track.path.__str__(), 'rb') as fd: data = fd.read() converter=BilibiliJSONConverter() if isinstance(data, bytes): srt = converter.from_bytes(data) else: srt = converter.from_string(data) fixer = CommonIssuesFixer() fixed, status = fixer.from_srt(srt) if status and fixed: srt = fixed srt.save(track.path) self.already_json_to_srt.append(track.path.name)