Files
Unshackle-Service-SeFree/AMZN/__init__.py
2026-03-30 10:46:39 +07:00

2098 lines
93 KiB
Python

import base64
import hashlib
import json
import sys
import re
import os
import uuid
import secrets
import string
import jsonpickle
import time
from pathlib import Path
from collections.abc import Generator
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from http.cookiejar import CookieJar
from typing import Optional, Union
from urllib.parse import urlencode, quote
from unshackle.core.downloaders import n_m3u8dl_re
from unshackle.core.downloaders import requests
import click
import random
from langcodes import Language
# import tldextract
from click.core import ParameterSource
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH, ISM
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video, Audio
from pywidevine.device import DeviceTypes
from pyplayready.cdm import Cdm as PlayReadyCdm
from unshackle.core.cacher import Cacher
class AMZN(Service):
"""
Service code for Amazon VOD (https://amazon.com) and Amazon Prime Video (https://primevideo.com).
\b
Authorization: Cookies
Security: UHD@L1/SL3000 FHD@L3(ChromeCDM) FHD@L3, Maintains their own license server like Netflix, be cautious.
\b
Region is chosen automatically based on domain extension found in cookies.
Prime Video specific code will be run if the ASIN is detected to be a prime video variant.
Use 'Amazon Video ASIN Display' for Tampermonkey addon for ASIN
https://greasyfork.org/en/scripts/381997-amazon-video-asin-display
unshackle dl --list -z uk -q 1080 Amazon B09SLGYLK8
"""
ALIASES = ["AMZN", "amazon", "amzn", "Amazon"]
TITLE_RE = re.compile(
r"^(?:https?://(?:www\.)?(?P<domain>amazon\.(?P<region>com|co\.uk|de|co\.jp)|primevideo\.com)"
r"(?:(?:/.+)?/|(?:/[^?]*)?(?:\?gti=)?)?)"
r"(?P<title_id>[A-Z0-9]{10,}|amzn1\.dv\.gti\.[a-f0-9-]+)"
)
CDN_PREFERENCE = ["Akamai", "Cloudfront", "Level3", "Limelight", "Fastly"]
REGION_TLD_MAP = {
"au": "com.au",
"br": "com.br",
"jp": "co.jp",
"mx": "com.mx",
"tr": "com.tr",
"gb": "co.uk",
"us": "com",
}
# Update the VIDEO_RANGE_MAP to handle HYBRID requests:
VIDEO_RANGE_MAP = {
"SDR": "None",
"HDR10": "Hdr10",
"HDR10+": "Hdr10",
"DV": "DolbyVision",
"HYBRID": "Hdr10", # Start with HDR10, DV will be fetched separately
}
AUDIO_CODEC_MAP = {
"AAC": "mp4a",
"EC3": "ec-3",
}
AUDIO_EDITION_MAP = {
"boosteddialoglow": "Dialogue Boost: Low",
"boosteddialogmedium": "Dialogue Boost: Medium",
"boosteddialoghigh": "Dialogue Boost: High",
}
@staticmethod
@click.command(name="AMZN", short_help="https://amazon.com, https://primevideo.com")
@click.argument("title", type=str, required=False)
@click.option("-b", "--bitrate", default="CBR",
type=click.Choice(["CVBR", "CBR", "CVBR+CBR"], case_sensitive=False),
help="Video Bitrate Mode to download in. CVBR=Constrained Variable Bitrate, CBR=Constant Bitrate.")
@click.option("-p", "--player", default="html5",
type=click.Choice(["html5", "xp"], case_sensitive=False),
help="Video playerType to download in. html5, xp.")
@click.option("-c", "--cdn", default="Akamai", type=str,
help="CDN to download from, defaults to the CDN with the highest weight set by Amazon.") # Akamai, Cloudfront
# UHD, HD, SD. UHD only returns HEVC, ever, even for <=HD only content
@click.option("-vq", "--vquality", default="HD",
type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False),
help="Manifest quality to request.")
@click.option("-s", "--single", is_flag=True, default=False,
help="Force single episode/season instead of getting series ASIN.")
@click.option("-am", "--amanifest", default="CVBR",
type=click.Choice(["CVBR", "CBR", "H265"], case_sensitive=False),
help="Manifest to use for audio. Defaults to H265 if the video manifest is missing 640k audio.")
@click.option("-aq", "--aquality", default="SD",
type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False),
help="Manifest quality to request for audio. Defaults to the same as --quality.")
@click.option("-nr", "--no_true_region",is_flag=True, default=False,
help="Skip checking true current region.")
@click.option("-atmos", "--atmos",is_flag=True, default=False,
help="Get atmos Audio")
@click.pass_context
def cli(ctx, **kwargs):
return AMZN(ctx, **kwargs)
def __init__(self, ctx, title, bitrate: str, player: str, cdn: str, vquality: str, single: bool,
amanifest: str, aquality: str, no_true_region: bool, atmos: bool):
self.title = title
self.bitrate = bitrate
self.player = player
self.bitrate_source = ctx.get_parameter_source("bitrate")
self.cdn = cdn
self.vquality = vquality
self.vquality_source = ctx.get_parameter_source("vquality")
self.single = single
self.amanifest = amanifest
self.aquality = aquality
self.atmos = atmos
self.cdm = ctx.obj.cdm
self.playready = isinstance(self.cdm, PlayReadyCdm)
self.no_true_region = no_true_region
super().__init__(ctx)
assert ctx.parent is not None
quality_value = ctx.parent.params.get("quality") or 1080
if isinstance(quality_value, list):
quality_value = quality_value[0] if quality_value else 1080
self.quality = int(quality_value)
self.vcodec = ctx.parent.params.get("vcodec")
self.vcodec = self.vcodec[0] if len(self.vcodec)>0 else Video.Codec.AVC if isinstance(self.vcodec,list) else self.vcodec
self.vcodec = self.vcodec.upper() or Video.Codec.AVC
self.acodec = (ctx.parent.params.get("acodec") or "").upper() or Audio.Codec.EC3
# Get range parameter for HDR support
range_param = ctx.parent.params.get("range_")
self.range = range_param[0].name if range_param else "SDR"
# NEW: Check if HYBRID mode is requested
self.hybrid_mode = any(r.name == "HYBRID" for r in range_param) if range_param else False
if self.config is None:
raise Exception("Config is missing!")
else:
profile_name = ctx.parent.params.get("profile")
if profile_name is None:
profile_name = "default"
self.profile = profile_name
self.region: dict[str, str] = {}
self.endpoints: dict[str, str] = {}
self.device: dict[str, str] = {}
self.pv = False
self.rpv = False
self.event = False
self.device_token = None
self.device_id: None
self.customer_id = None
self.client_id = "f22dbddb-ef2c-48c5-8876-bed0d47594fd" # browser client id
if self.vquality_source != ParameterSource.COMMANDLINE:
if 0 < self.quality <= 576 and self.range == "SDR":
self.log.info(" + Setting manifest quality to SD")
self.vquality = "SD"
if self.quality > 1080:
self.log.info(" + Setting manifest quality to UHD to be able to get 2160p video track")
self.vquality = "UHD"
self.vquality = self.vquality or "HD"
if self.vquality == "UHD":
self.vcodec = Video.Codec.HEVC
if self.bitrate_source != ParameterSource.COMMANDLINE:
if self.vcodec == Video.Codec.HEVC and self.range == "SDR" and self.bitrate != "CVBR+CBR":
self.bitrate = "CVBR+CBR"
self.log.info(" + Changed bitrate mode to CVBR+CBR to be able to get H.265 SDR video track")
elif self.vcodec in ["AV1"] and self.range == "SDR" and self.bitrate != "CVBR":
self.bitrate = "CVBR"
self.log.info(f" + Changed bitrate mode to CVBR to be able to get {self.vcodec} SDR video track")
if self.vquality == "UHD" and self.range != "SDR" and self.bitrate != "CBR":
self.bitrate = "CBR"
self.log.info(f" + Changed bitrate mode to CBR to be able to get highest quality UHD {self.range} video track")
self.orig_bitrate = self.bitrate
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies)
if not cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
# Get domain from cookies first - this sets self.domain
domain_region = self.get_domain_region()
# Determine if this is Prime Video based on the actual URL domain, not title length
match = self.TITLE_RE.match(self.title)
if match:
matched_domain = match.group("domain")
if matched_domain and "primevideo" in matched_domain:
self.pv = True
elif matched_domain and "amazon" in matched_domain:
self.pv = False
else:
# Fallback: check cookie domain
self.pv = self.domain == "primevideo"
else:
# If no match, assume based on cookie domain
self.pv = self.domain == "primevideo"
self.log.info("Getting Account Region")
self.region = self.get_region()
if not self.region:
sys.exit(" - Failed to get Amazon Account region")
self.log.info(f" + Region: {self.region['code'].upper()}")
if self.no_true_region:
self.log.info(f" + Region: {self.region['code']}")
# endpoints must be prepared AFTER region data is retrieved
self.endpoints = self.prepare_endpoints(self.config["endpoints"], self.region)
self.session.headers.update({
"Origin": f"https://{self.region['base']}",
"Referer": f"https://{self.region['base']}/"
})
self.device = (self.config.get("device") or {}).get(self.profile, {})
if self.device and self.device["device_type"] not in set(self.config["dtid_dict"]):
sys.exit(f"{self.device['device_type']} Banned from Amazon Prime, Use another one to avoid Amazon Account Ban !!!")
if (self.quality > 1080 or self.range != "SDR") and self.vcodec == Video.Codec.HEVC and hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["CHROME"]:
self.log.info(f"Using device to get UHD manifests")
self.register_device()
elif not self.device or hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["CHROME"] or self.vquality != "UHD":
# falling back to browser-based device ID
if not self.device:
self.log.warning(
"No Device information was provided for %s, using browser device...",
self.profile
)
self.device_id = "c3714f0d-59c9-4eb7-8b96-903f0f8c3619"
self.device = {"device_type": self.config["device_types"]["browser"]}
res = self.session.get(
url=self.endpoints["configuration"],
params = {
"deviceTypeID": self.device["device_type"],
"deviceID": "Web",
}
)
if not res.status_code == 200:
sys.exit(res.text)
data = res.json()
if not self.no_true_region:
self.log.info(f" + Current Region: {data['requestContext']['currentTerritory']}")
self.region["marketplace_id"] = data["requestContext"]["marketplaceID"]
else:
res = self.session.get(
url=self.endpoints["configuration"],
params = {
"deviceTypeID": self.device["device_type"],
"deviceID": "Tv",
}
)
if not res.status_code == 200:
sys.exit(res.text)
data = res.json()
if not self.no_true_region:
self.log.info(f" + Current Region: {data['requestContext']['currentTerritory']}")
self.region["marketplace_id"] = data["requestContext"]["marketplaceID"]
self.register_device()
def search(self) -> Generator[SearchResult, None, None]:
search = self.session.get(
url=self.config["endpoints"]["search"], params={"q": self.title, "token": self.token}
).json()
for result in search["entries"]:
yield SearchResult(
id_=result["id"],
title=result["title"],
label="SERIES" if result["programType"] == "series" else "MOVIE",
url=result["url"],
)
def get_titles(self) -> Titles_T:
# Extract just the title ID
match = self.TITLE_RE.match(self.title)
if not match:
# If no match, check if it's just a plain ASIN (10 uppercase alphanumeric characters)
if re.match(r'^[A-Z0-9]{10}$', self.title):
# It's a plain ASIN, use it directly
pass
else:
sys.exit("Invalid title URL or ID format")
else:
self.title = match.group("title_id")
res = self.session.get(
url=self.endpoints["details"],
params={
"titleID": self.title,
"isElcano": "1",
"sections": ["Atf", "Btf"]
},
headers={"Accept": "application/json"}
)
if not res.ok:
sys.exit(f"Unable to get title: {res.text} [{res.status_code}]")
data = res.json()["widgets"]
product_details = data.get("productDetails", {}).get("detail")
if not product_details:
error = res.json()["degradations"][0]
sys.exit(f"Unable to get title: {error['message']} [{error['code']}]")
titles_ = []
if data["pageContext"]["subPageType"] == "Event":
self.event = True
if data["pageContext"]["subPageType"] in ("Movie", "Event"):
card = data["productDetails"]["detail"]
movie = Movie(
id_=card["catalogId"],
service=self.__class__,
name=product_details["title"],
year=card.get("releaseYear", ""),
language=None,
data=card
)
playbackEnvelope_info = self.playbackEnvelope_data([card["catalogId"]])
for playbackInfo in playbackEnvelope_info:
if movie.id == playbackInfo["titleID"]:
movie.data.update({"playbackInfo": playbackInfo})
titles_.append(movie)
else:
if not self.single:
try:
seasons = [x.get("titleID") for x in data["seasonSelector"]]
for season in seasons:
titles_.extend(self.get_episodes(season))
except (KeyError, Exception) as e:
self.log.warning(f"Failed to parse seasons via seasonSelector ({e}), querying title directly...")
titles_.extend(self.get_episodes(self.title))
else:
titles_.extend(self.get_episodes(self.title))
if not titles_:
sys.exit(" - The profile used does not have the rights to this title.")
# Set language
original_lang = self.get_original_language(self.get_manifest(
titles_[0],
video_codec=self.vcodec,
bitrate_mode=self.bitrate,
quality=self.vquality,
ignore_errors=True
))
if original_lang:
for title in titles_:
title.language = Language.get(original_lang)
else:
for title in titles_:
title.language = Language.get("en")
# Deduplicate by id_ (catalogId)
unique_titles = {}
for t in titles_:
if t.id not in unique_titles:
unique_titles[t.id] = t
titles_ = list(unique_titles.values())
if data["pageContext"]["subPageType"] in ("Movie", "Event"):
return Movies(titles_)
return Series(titles_)
@staticmethod
def _restore_audio_language(tracks: Tracks) -> None:
"""
langcodes.Language.get() strips region subtags, collapsing "es-419"
and "es-ES" into plain "es". Amazon stores the full BCP-47 tag in
the adaptation set's audioTrackId attribute (e.g. "es-419_dd-joc_5.1").
Read that first, then fall back to the raw lang attribute.
"""
for audio in tracks.audio:
adaptation_set = audio.data.get("dash", {}).get("adaptation_set", {})
track_id = adaptation_set.get("audioTrackId", "")
raw_lang = track_id.split("_")[0] if "_" in track_id else ""
if not raw_lang:
raw_lang = (
adaptation_set.get("lang")
or audio.data.get("dash", {}).get("representation", {}).get("lang")
or ""
)
if not raw_lang:
continue
try:
restored = Language.get(raw_lang)
current_tag = audio.language.to_tag() if hasattr(audio.language, "to_tag") else str(audio.language)
restored_tag = restored.to_tag() if hasattr(restored, "to_tag") else str(restored)
if restored_tag != current_tag and restored_tag.startswith(current_tag):
audio.language = restored
except Exception:
pass
def get_tracks(self, title: Title_T) -> Tracks:
"""Get tracks with HYBRID mode support for HDR10+DV"""
if self.hybrid_mode:
self.log.info("HYBRID mode: Processing HDR10 and DV tracks")
tracks = Tracks()
bitrates = [self.orig_bitrate]
if self.vcodec != Video.Codec.HEVC:
bitrates = self.orig_bitrate.split('+')
for bitrate in bitrates:
hdr10_manifest = self.get_manifest(
title,
video_codec=self.vcodec,
bitrate_mode=bitrate,
quality=self.vquality,
hdr="HDR10",
ignore_errors=False
)
if "rightsException" in hdr10_manifest:
self.log.error(" - The profile used does not have the rights to this title.")
return tracks
bitrate_actual = hdr10_manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["bitrateAdaptation"]
hdr10_chosen = self.choose_manifest(hdr10_manifest, self.cdn)
if not hdr10_chosen:
self.log.warning(f"No {bitrate} HDR10 manifests available")
continue
hdr10_url = self.clean_mpd_url(hdr10_chosen["url"], False)
if self.event:
devicetype = self.device["device_type"]
hdr10_url = hdr10_chosen["url"]
hdr10_url = f"{hdr10_url}?amznDtid={devicetype}&encoding=segmentBase"
self.log.info(f" + Downloading {bitrate} HDR10 Manifest")
hdr10_streamingProtocol = hdr10_manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["streamingProtocol"]
hdr10_sessionHandoffToken = hdr10_manifest["sessionization"]["sessionHandoffToken"]
if hdr10_streamingProtocol == "DASH":
hdr10_tracks = DASH.from_url(url=hdr10_url, session=self.session).to_tracks(language=title.language)
elif hdr10_streamingProtocol == "SmoothStreaming":
hdr10_tracks = ISM.from_url(url=hdr10_url).to_tracks(language=title.language)
else:
self.log.error(f"Unsupported manifest type: {hdr10_streamingProtocol}")
continue
self._restore_audio_language(hdr10_tracks)
for track in hdr10_tracks:
track.extra['sessionHandoffToken'] = hdr10_sessionHandoffToken
for audio in hdr10_tracks.audio:
adaptation_set = audio.data.get("dash", {}).get("adaptation_set", {})
if adaptation_set.get("audioTrackSubtype") == "descriptive":
audio.descriptive = True
track_label = adaptation_set.get("label")
if track_label and "Audio Description" in track_label:
audio.descriptive = True
hdr10_tracks.audio = [
audio for audio in hdr10_tracks.audio
if not any(boost_type in audio.data.get("dash", {}).get("adaptation_set", {}).get("audioTrackSubtype", "").lower()
for boost_type in ["boosteddialoglow", "boosteddialogmedium", "boosteddialoghigh"])
]
for video in hdr10_tracks.videos:
video.range = Video.Range.HDR10
video.note = bitrate_actual
tracks.add(hdr10_tracks)
if len(self.orig_bitrate.split('+')) > 1:
self.bitrate = "CVBR,CBR"
self.log.info(f"Selected video manifest bitrate: {self.bitrate}")
if tracks.videos and self.quality:
filtered_videos = []
for video in tracks.videos:
if video.height == self.quality or int(video.width * 9 / 16) == self.quality:
filtered_videos.append(video)
if filtered_videos:
best_video = max(filtered_videos, key=lambda v: v.bitrate or 0)
tracks.videos = [best_video]
self.log.info(f"Filtered to single HDR10 track: {best_video.height}p @ {best_video.bitrate // 1000 if best_video.bitrate else 0} kb/s")
if tracks.audio:
audio_by_lang = defaultdict(list)
for audio in tracks.audio:
lang_key = audio.language.to_tag() if hasattr(audio.language, "to_tag") else str(audio.language)
audio_by_lang[lang_key].append(audio)
filtered_audio = []
for lang_key, audios in audio_by_lang.items():
best_audio = max(audios, key=lambda a: a.bitrate or 0)
filtered_audio.append(best_audio)
tracks.audio = filtered_audio
self.log.info(f"Filtered to {len(tracks.audio)} audio track(s)")
self.log.info(f"Final selection: {len(tracks.videos)} video track(s) and {len(tracks.audio)} audio track(s)")
first_hdr10_manifest = hdr10_manifest
self.log.info("HYBRID mode: Fetching DV manifest for RPU extraction")
dv_manifest = self.get_manifest(
title,
video_codec=self.vcodec,
bitrate_mode=self.bitrate,
quality="SD",
hdr="DolbyVision",
ignore_errors=True
)
if not dv_manifest:
self.log.warning("No DV manifest available, HYBRID mode requires DV tracks")
elif "rightsException" not in dv_manifest:
dv_chosen = self.choose_manifest(dv_manifest, self.cdn)
if dv_chosen:
dv_url = self.clean_mpd_url(dv_chosen["url"], False)
if self.event:
devicetype = self.device["device_type"]
dv_url = dv_chosen["url"]
dv_url = f"{dv_url}?amznDtid={devicetype}&encoding=segmentBase"
self.log.info(" + Downloading DV Manifest")
dv_streamingProtocol = dv_manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["streamingProtocol"]
dv_sessionHandoffToken = dv_manifest["sessionization"]["sessionHandoffToken"]
if dv_streamingProtocol == "DASH":
dv_tracks = DASH.from_url(url=dv_url, session=self.session).to_tracks(language=title.language)
elif dv_streamingProtocol == "SmoothStreaming":
dv_tracks = ISM.from_url(url=dv_url).to_tracks(language=title.language)
else:
self.log.warning(f"Unsupported DV manifest type: {dv_streamingProtocol}")
dv_tracks = None
if dv_tracks:
for track in dv_tracks:
track.extra['sessionHandoffToken'] = dv_sessionHandoffToken
for video in dv_tracks.videos:
video.range = Video.Range.DV
dv_video_tracks = sorted(dv_tracks.videos, key=lambda v: v.height if v.height else 0)
if dv_video_tracks:
selected_dv = dv_video_tracks[0]
tracks.add([selected_dv])
self.log.info(f"Added DV video track for RPU extraction: {selected_dv.height}p @ {selected_dv.bitrate // 1000 if selected_dv.bitrate else 0} kb/s")
for sub in first_hdr10_manifest.get("timedTextUrls", {}).get("result", {}).get("subtitleUrls", []) + \
first_hdr10_manifest.get("timedTextUrls", {}).get("result", {}).get("forcedNarrativeUrls", []):
tracks.add(Subtitle(
id_=f"{sub['trackGroupId']}_{sub['languageCode']}_{sub['type']}_{sub['subtype']}",
url=os.path.splitext(sub["url"])[0] + ".srt",
codec=Subtitle.Codec.from_mime("srt"),
language=sub["languageCode"],
forced="ForcedNarrative" in sub["type"],
sdh=sub["type"].lower() == "sdh"
), warn_only=True)
if self.vquality != "UHD" and not self.no_true_region:
self.manage_session(tracks.videos[0])
return tracks
# ── Non-hybrid path ────────────────────────────────────────────────────────
tracks = self.get_best_quality(title)
manifest = self.get_manifest(
title,
video_codec=self.vcodec,
bitrate_mode=self.bitrate,
quality=self.vquality,
hdr=self.range,
ignore_errors=False
)
if "rightsException" in manifest:
self.log.error(" - The profile used does not have the rights to this title.")
return tracks
chosen_manifest = self.choose_manifest(manifest, self.cdn)
if not chosen_manifest:
sys.exit(f"No manifests available")
manifest_url = self.clean_mpd_url(chosen_manifest["url"], False)
self.log.debug(manifest_url)
if self.event:
devicetype = self.device["device_type"]
manifest_url = chosen_manifest["url"]
manifest_url = f"{manifest_url}?amznDtid={devicetype}&encoding=segmentBase"
self.log.info(" + Downloading Manifest")
streamingProtocol = manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["streamingProtocol"]
sessionHandoffToken = manifest["sessionization"]["sessionHandoffToken"]
self._restore_audio_language(tracks)
need_separate_audio = ((self.aquality or self.vquality) != self.vquality
or self.amanifest == "CVBR" and (self.vcodec, self.bitrate) != (Video.Codec.AVC, "CVBR")
or self.amanifest == "CBR" and (self.vcodec, self.bitrate) != (Video.Codec.AVC, "CBR")
or self.amanifest == "H265" and self.vcodec != Video.Codec.HEVC
or self.amanifest != "H265" and self.vcodec == Video.Codec.HEVC)
if not need_separate_audio:
audios = defaultdict(list)
for audio in tracks.audio:
lang_tag = audio.language.to_tag() if hasattr(audio.language, "to_tag") else str(audio.language)
audios[lang_tag].append(audio)
for lang in audios:
if not any((x.bitrate or 0) >= 640000 for x in audios[lang]):
need_separate_audio = True
break
if need_separate_audio and not self.atmos:
manifest_type = self.amanifest or "CVBR"
self.log.info(f"Getting audio from {manifest_type} manifest for potential higher bitrate or better codec")
audio_manifest = self.get_manifest(
title=title,
video_codec="AV1" if self.vcodec == "AV1" else ("H265" if (self.amanifest == "H265" or self.vcodec == Video.Codec.HEVC) else "H264"),
bitrate_mode="CVBR",
quality=self.aquality or self.vquality,
hdr=None,
ignore_errors=True
)
if not audio_manifest:
self.log.warning(f" - Unable to get {manifest_type} audio manifests, skipping")
elif not (chosen_audio_manifest := self.choose_manifest(audio_manifest, self.cdn)):
self.log.warning(f" - No {manifest_type} audio manifests available, skipping")
else:
audio_mpd_url = self.clean_mpd_url(chosen_audio_manifest["url"], optimise=False)
self.log.debug(audio_mpd_url)
if self.event:
devicetype = self.device["device_type"]
audio_mpd_url = chosen_audio_manifest["url"]
audio_mpd_url = f"{audio_mpd_url}?amznDtid={devicetype}&encoding=segmentBase"
self.log.info(" + Downloading CVBR manifest")
streamingProtocol = audio_manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["streamingProtocol"]
sessionHandoffToken = audio_manifest["sessionization"]["sessionHandoffToken"]
try:
if streamingProtocol == "DASH":
audio_mpd = DASH.from_url(
url=audio_mpd_url,
session=self.session
).to_tracks(language=title.language)
for track in audio_mpd:
track.extra['sessionHandoffToken'] = sessionHandoffToken
elif streamingProtocol == "SmoothStreaming":
audio_mpd = Tracks([
x for x in iter(ISM.from_url(url=audio_mpd_url))
])
for track in audio_mpd:
track.extra['sessionHandoffToken'] = sessionHandoffToken
except KeyError:
self.log.warning(f" - Title has no {self.amanifest} stream, cannot get higher quality audio")
else:
self._restore_audio_language(audio_mpd)
for audio in audio_mpd.audio:
adaptation_set = audio.data.get("dash", {}).get("adaptation_set", {})
if adaptation_set.get("audioTrackSubtype") == "descriptive":
audio.descriptive = True
track_label = adaptation_set.get("label")
if track_label and "Audio Description" in track_label:
audio.descriptive = True
audio_mpd.audio = [
audio for audio in audio_mpd.audio
if not any(boost_type in audio.data.get("dash", {}).get("adaptation_set", {}).get("audioTrackSubtype", "").lower()
for boost_type in ["boosteddialoglow", "boosteddialogmedium", "boosteddialoghigh"])
]
tracks.add(audio_mpd.audio, warn_only=True)
need_uhd_audio = self.atmos
if not self.amanifest and ((self.aquality == "UHD" and self.vquality != "UHD") or not self.aquality):
audios = defaultdict(list)
for audio in tracks.audios:
lang_tag = audio.language.to_tag() if hasattr(audio.language, "to_tag") else str(audio.language)
audios[lang_tag].append(audio)
for lang in audios:
if not any((x.bitrate or 0) >= 640000 for x in audios[lang]):
need_uhd_audio = True
break
if need_uhd_audio and (self.config.get("device") or {}).get(self.profile, None):
self.log.info("Getting audio from UHD manifest for potential higher bitrate or better codec")
temp_device = self.device
temp_device_token = self.device_token
temp_device_id = self.device_id
uhd_audio_manifest = None
try:
if self.playready or hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["CHROME"] and self.quality < 2160:
self.log.info(f" + Switching to device to get UHD manifest")
self.register_device()
uhd_audio_manifest = self.get_manifest(
title=title,
video_codec="H265",
bitrate_mode="CVBR+CBR",
quality="UHD",
hdr="DolbyVision",
ignore_errors=True
)
except Exception:
pass
self.device = temp_device
self.device_token = temp_device_token
self.device_id = temp_device_id
if not uhd_audio_manifest:
self.log.warning(f" - Unable to get UHD manifests, skipping")
elif not (chosen_uhd_audio_manifest := self.choose_manifest(uhd_audio_manifest, self.cdn)):
self.log.warning(f" - No UHD manifests available, skipping")
else:
uhd_audio_mpd_url = self.clean_mpd_url(chosen_uhd_audio_manifest["url"], optimise=False)
self.log.debug(uhd_audio_mpd_url)
if self.event:
devicetype = self.device["device_type"]
uhd_audio_mpd_url = chosen_uhd_audio_manifest["url"]
uhd_audio_mpd_url = f"{uhd_audio_mpd_url}?amznDtid={devicetype}&encoding=segmentBase"
self.log.info(" + Downloading UHD manifest")
streamingProtocol = uhd_audio_manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["streamingProtocol"]
sessionHandoffToken = uhd_audio_manifest["sessionization"]["sessionHandoffToken"]
try:
if streamingProtocol == "DASH":
uhd_audio_mpd = DASH.from_url(
url=uhd_audio_mpd_url,
session=self.session
).to_tracks(language=title.language)
for track in uhd_audio_mpd:
track.extra['sessionHandoffToken'] = sessionHandoffToken
elif streamingProtocol == "SmoothStreaming":
uhd_audio_mpd = ISM.from_url(
url=uhd_audio_mpd_url
).to_tracks(language=title.language)
for track in uhd_audio_mpd:
track.extra['sessionHandoffToken'] = sessionHandoffToken
except KeyError:
self.log.warning(f" - Title has no UHD stream, cannot get higher quality audio")
else:
self._restore_audio_language(uhd_audio_mpd)
for audio in uhd_audio_mpd.audio:
adaptation_set = audio.data.get("dash", {}).get("adaptation_set", {})
if adaptation_set.get("audioTrackSubtype") == "descriptive":
audio.descriptive = True
track_label = adaptation_set.get("label")
if track_label and "Audio Description" in track_label:
audio.descriptive = True
uhd_audio_mpd.audio = [
audio for audio in uhd_audio_mpd.audio
if not any(boost_type in audio.data.get("dash", {}).get("adaptation_set", {}).get("audioTrackSubtype", "").lower()
for boost_type in ["boosteddialoglow", "boosteddialogmedium", "boosteddialoghigh"])
]
if any(x for x in uhd_audio_mpd.audio if x.atmos):
tracks.audio = uhd_audio_mpd.audio
for video in tracks.videos:
if manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["dynamicRange"] == "Hdr10":
video.range = Video.Range.HDR10
elif manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["dynamicRange"] == "DolbyVision":
video.range = Video.Range.DV
# Dedup by full language tag so es-ES and es-419 are both preserved.
unique_audio_tracks = {}
for audio in tracks.audio:
lang_tag = audio.language.to_tag() if hasattr(audio.language, "to_tag") else str(audio.language)
key = (lang_tag, audio.bitrate, audio.descriptive)
if key not in unique_audio_tracks:
unique_audio_tracks[key] = audio
tracks.audio = list(unique_audio_tracks.values())
for sub in manifest.get("timedTextUrls", {}).get("result", {}).get("subtitleUrls", []) + \
manifest.get("timedTextUrls", {}).get("result", {}).get("forcedNarrativeUrls", []):
tracks.add(Subtitle(
id_=f"{sub['trackGroupId']}_{sub['languageCode']}_{sub['type']}_{sub['subtype']}",
url=os.path.splitext(sub["url"])[0] + ".srt",
codec=Subtitle.Codec.from_mime("srt"),
language=sub["languageCode"],
forced="ForcedNarrative" in sub["type"],
sdh=sub["type"].lower() == "sdh"
), warn_only=True)
if self.vquality != "UHD" and not self.no_true_region:
self.manage_session(tracks.videos[0])
return tracks
def _get_tracks_for_range(self, title: Title_T, range_override: str = None) -> Tracks:
current_range = range_override if range_override else self.range
params = {
"token": self.token,
"guid": title.id,
}
data = {
"type": self.config["client"][self.device]["type"],
}
if current_range == "HDR10":
data["video_format"] = "hdr10"
elif current_range == "DV":
data["video_format"] = "dolby_vision"
else:
data["video_format"] = "sdr"
if current_range in ("HDR10", "DV") and self.cdm.security_level == 3:
return Tracks()
streams = self.session.post(
url=self.config["endpoints"]["streams"],
params=params,
data=data,
).json()["media"]
self.license = {
"url": streams["drm"]["url"],
"data": streams["drm"]["data"],
"session": streams["drm"]["session"],
}
manifest_url = streams["url"].split("?")[0]
self.log.debug(f"Manifest URL: {manifest_url}")
tracks = DASH.from_url(url=manifest_url, session=self.session).to_tracks(language=title.language)
for video in tracks.videos:
if current_range == "HDR10":
video.range = Video.Range.HDR10
elif current_range == "DV":
video.range = Video.Range.DV
else:
video.range = Video.Range.SDR
tracks.audio = [
track for track in tracks.audio if "clear" not in track.data["dash"]["representation"].get("id")
]
for track in tracks.audio:
if track.channels == 6.0:
track.channels = 5.1
track_label = track.data["dash"]["adaptation_set"].get("label")
if track_label and "Audio Description" in track_label:
track.descriptive = True
tracks.subtitles.clear()
if streams.get("captions"):
for subtitle in streams["captions"]:
tracks.add(
Subtitle(
id_=hashlib.md5(subtitle["url"].encode()).hexdigest()[0:6],
url=subtitle["url"],
codec=Subtitle.Codec.from_mime("vtt"),
language=Language.get(subtitle["language"]),
sdh=True,
)
)
if not self.movie:
title.data["chapters"] = self.session.get(
url=self.config["endpoints"]["metadata"].format(title_id=title.id), params={"token": self.token}
).json()["chapters"]
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
chapters = []
return chapters
def get_widevine_service_certificate(self, **_) -> str:
return self.config["certificate"]
def get_playready_license(
self, *, challenge: bytes, title, track
) -> Optional[bytes]:
"""Retrieve a PlayReady license for a given track (PlayReady only)."""
self.playbackInfo = self.playbackEnvelope_update(self.playbackInfo)
if isinstance(challenge, str):
challenge = challenge.encode("utf-8")
request_json = {
"licenseChallenge": base64.b64encode(challenge).decode("utf-8"),
"playbackEnvelope": self.playbackInfo["playbackExperienceMetadata"][
"playbackEnvelope"
],
}
if self.device_token:
request_json.update({
"capabilityDiscriminators": {
"discriminators": {
"hardware": {
"chipset": self.device["device_chipset"],
"manufacturer": self.device["manufacturer"],
"modelName": self.device["device_model"],
},
"software": {
"application": {
"name": self.device["app_name"],
"version": self.device["firmware"],
},
"operatingSystem": {
"name": "Android",
"version": self.device["os_version"],
},
"player": {
"name": "Android UIPlayer SDK",
"version": "4.1.18",
},
"renderer": {
"drmScheme": "PLAYREADY",
"name": "MCMD",
},
},
},
"version": 1,
},
"deviceCapabilityFamily": "AndroidPlayer",
"packagingFormat": (
"SMOOTH_STREAMING"
if track.descriptor == track.Descriptor.ISM
else "MPEG_DASH"
),
})
else:
session_handoff = track.extra.get("sessionHandoffToken")
if not session_handoff:
self.log.exit(
"No sessionHandoffToken found in track.extra. "
"Web PlayReady licensing requires sessionHandoffToken."
)
request_json.update({
"sessionHandoff": session_handoff,
"deviceCapabilityFamily": "WebPlayer",
})
license_url = self.endpoints["licence_pr"]
if not license_url:
raise ValueError("PlayReady license endpoint not configured")
try:
res = self.session.post(
url=license_url,
headers={
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8",
"Authorization": f"Bearer {self.device_token}"
if self.device_token
else None,
"connection": "Keep-Alive",
"x-gasc-enabled": "true",
"x-request-priority": "CRITICAL",
"x-retry-count": "0",
},
params={
"deviceID": self.device_id,
"deviceTypeID": self.device["device_type"],
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"uxLocale": "en_EN",
"firmware": "1",
"titleId": title.id,
"nerid": self.generate_nerid(),
},
json=request_json,
)
res.raise_for_status()
lic = res.json()
except Exception as e:
self.log.exit(f"Failed to request PlayReady license: {e}")
if "errorsByResource" in lic or "error" in lic:
err = lic.get("errorsByResource") or lic.get("error")
code = err.get("errorCode") or err.get("type")
msg = err.get("message", "Unknown PlayReady licensing error")
if code == "PRS.NoRights.AnonymizerIP":
self.log.exit(
"Amazon detected a Proxy/VPN and refused to return a PlayReady license."
)
self.log.exit(f"PlayReady license error: {msg} [{code}]")
return base64.b64decode(lic["playReadyLicense"]["license"])
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
self.playbackInfo = self.playbackEnvelope_update(self.playbackInfo)
response = self.session.post(
url=self.endpoints["licence"],
params={
"deviceID": self.device_id,
"deviceTypeID": self.device["device_type"],
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"uxLocale": "en_US",
"firmware": 1,
"titleId": title.id,
"nerid": self.generate_nerid(),
},
headers={
"Accept": "*/*",
"Content-Type": "text/plain",
"Authorization": f"Bearer {self.device_token}" if self.device_token else None,
},
cookies=None if self.device_token else self.session.cookies,
json={
"includeHdcpTestKey": True,
"licenseChallenge": base64.b64encode(challenge).decode(),
"playbackEnvelope": self.playbackInfo["playbackExperienceMetadata"]["playbackEnvelope"],
"sessionHandoffToken": track.extra['sessionHandoffToken'],
},
).json()
if "errorsByResource" in response:
error_code = response["errorsByResource"]["Widevine2License"]
if "errorCode" in error_code:
error_code = error_code["errorCode"]
elif "type" in error_code:
error_code = error_code["type"]
if error_code in ["PRS.NoRights.AnonymizerIP", "PRS.NoRights.NotOwned"]:
self.log.error("Proxy detected, Unable to License")
elif error_code == "PRS.Dependency.DRM.Widevine.UnsupportedCdmVersion":
self.log.error("Cdm version not supported")
else:
self.log.error(f" x Error from Amazon's License Server: [{error_code}]")
sys.exit(1)
return response["widevineLicense"]["license"]
def register_device(self) -> None:
self.device = (self.config.get("device") or {}).get(self.profile, {})
digest = hashlib.md5(json.dumps(self.device).encode()).hexdigest()[:6]
device_cache_path = self.cache.get(f"device_tokens_{self.profile}_{digest}")
self.device_token = self.DeviceRegistration(
device=self.device,
endpoints=self.endpoints,
log=self.log,
cache_path=device_cache_path,
session=self.session
).bearer
self.device_id = self.device.get("device_serial")
if not self.device_id:
sys.exit(f" - A device serial is required in the config, perhaps use: {os.urandom(8).hex()}")
def get_region(self) -> dict:
domain_region = self.get_domain_region()
if not domain_region:
return {}
region = self.config["regions"].get(domain_region)
if not region:
sys.exit(f" - There's no region configuration data for the region: {domain_region}")
region["code"] = domain_region
if self.pv:
res = self.session.get("https://www.primevideo.com").text
match = re.search(r'ue_furl *= *([\'"])fls-(na|eu|fe)\.amazon\.[a-z.]+\1', res)
if match:
pv_region = match.group(2).lower()
else:
self.log.error(" - Failed to get PrimeVideo region")
raise SystemExit(1)
pv_region = {"na": "atv-ps"}.get(pv_region, f"atv-ps-{pv_region}")
region["base_manifest"] = f"{pv_region}.primevideo.com"
region["base"] = "www.primevideo.com"
return region
def get_domain_region(self):
"""Get the region of the cookies from the domain."""
def parse_domain(domain: str):
domain = domain.lstrip('.').lower()
parts = domain.split('.')
if len(parts) < 2:
return None, None
# handle common multi-level TLDs (extend if needed)
if len(parts) >= 3 and parts[-2] in ('co', 'ac', 'go', 'or'):
root = parts[-3]
suffix = '.'.join(parts[-2:])
else:
root = parts[-2]
suffix = parts[-1]
return root, suffix
parsed = [
parse_domain(x.domain)
for x in self.session.cookies
if x.domain_specified
]
# find amazon / primevideo
match = next(
((root, suffix) for root, suffix in parsed
if root in ("amazon", "primevideo")),
(None, None)
)
root, suffix = match
self.domain = root
if suffix:
suffix = suffix.split('.')[-1]
return {"com": "us", "uk": "gb"}.get(suffix, suffix)
def prepare_endpoint(self, name: str, uri: str, region: dict) -> str:
if name in ("browse", "configuration", "refreshplayback", "playback", "licence", "licence_pr", "xray", "opensession", "updatesession", "closesession"):
return f"https://{(region['base_manifest'])}{uri}"
if name in ("ontv", "devicelink", "details", "getDetailWidgets", "metadata"):
if self.pv:
host = region["base"]
else:
if name in ("metadata"):
host = f"{region['base']}/gp/video"
else:
host = region["base"]
return f"https://{host}{uri}"
if name in ("codepair", "register", "token"):
return f"https://{self.config['regions']['us']['base_api']}{uri}"
raise ValueError(f"Unknown endpoint: {name}")
def prepare_endpoints(self, endpoints: dict, region: dict) -> dict:
return {k: self.prepare_endpoint(k, v, region) for k, v in endpoints.items()}
def choose_manifest(self, manifest: dict, cdn=None):
"""Get manifest URL for the title based on CDN weight (or specified CDN)."""
if cdn:
cdn = cdn.lower()
manifest = next((x for x in manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlSets"] if x["cdn"].lower() == cdn), {})
if not manifest:
sys.exit(f" - There isn't any DASH manifests available on the CDN \"{cdn}\" for this title")
else:
url_sets = manifest["vodPlaybackUrls"]["result"]["playbackUrls"].get("urlSets", [])
manifest = random.choice(url_sets) if url_sets else {}
return manifest
def manage_session(self, track: Tracks):
try:
current_progress_time = round(random.uniform(0, 10), 6)
time_ = 3 # Seconds
stream_update_time = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
res = self.session.post(
url=self.endpoints["opensession"],
params={
"deviceID": self.device_id,
"deviceTypeID": self.device["device_type"],
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"uxLocale": "en_EN",
"firmware": "1",
"version": "1",
"nerid": self.generate_nerid(),
},
headers={
"Content-Type": "application/json",
"accept": "application/json",
"x-request-priority": "CRITICAL",
"x-retry-count": "0"
},
json={
"sessionHandoff": track.extra['sessionHandoffToken'],
"playbackEnvelope": self.playbackEnvelope_update(self.playbackInfo)["playbackExperienceMetadata"]["playbackEnvelope"],
"streamInfo": {
"eventType": "START",
"streamUpdateTime": current_progress_time,
"vodProgressInfo": {
"currentProgressTime": f"PT{current_progress_time:.6f}S",
"timeFormat": "ISO8601DURATION",
},
},
"userWatchSessionId": str(uuid.uuid4())
}
)
if res.status_code == 200:
try:
data = res.json()
sessionToken = data["sessionToken"]
except Exception as e:
raise Exception(f"Unable to open session: {e}")
else:
raise Exception(f"Unable to open session: {res.text}")
# Update Session
time.sleep(time_)
stream_update_time = (datetime.fromisoformat(stream_update_time[:-1]) + timedelta(seconds=time_)).isoformat(timespec="milliseconds") + "Z"
res = self.session.post(
url=self.endpoints["updatesession"],
params={
"deviceID": self.device_id,
"deviceTypeID": self.device["device_type"],
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"uxLocale": "en_EN",
"firmware": "1",
"version": "1",
"nerid": self.generate_nerid()
},
headers={
"Content-Type": "application/json",
"accept": "application/json",
"x-request-priority": "CRITICAL",
"x-retry-count": "0"
},
json={
"sessionToken": sessionToken,
"streamInfo": {
"eventType": "PAUSE",
"streamUpdateTime": stream_update_time,
"vodProgressInfo": {
"currentProgressTime": f"PT{current_progress_time + time_:.6f}S",
"timeFormat": "ISO8601DURATION",
}
}
}
)
if res.status_code == 200:
try:
data = res.json()
sessionToken = data["sessionToken"]
except Exception as e:
raise Exception(f"Unable to update session: {e}")
else:
raise Exception(f"Unable to update session: {res.text}")
# Close session
res = self.session.post(
url=self.endpoints["closesession"],
params={
"deviceID": self.device_id,
"deviceTypeID": self.device["device_type"],
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"uxLocale": "en_EN",
"firmware": "1",
"version": "1",
"nerid": self.generate_nerid()
},
headers={
"Content-Type": "application/json",
"accept": "application/json",
"x-request-priority": "CRITICAL",
"x-retry-count": "0"
},
json={
"sessionToken": sessionToken,
"streamInfo": {
"eventType": "STOP",
"streamUpdateTime": stream_update_time,
"vodProgressInfo": {
"currentProgressTime": f"PT{current_progress_time + time_:.6f}S",
"timeFormat": "ISO8601DURATION",
}
}
}
)
if res.status_code == 200:
self.log.info("Session completed successfully!")
return None
else:
raise Exception(f"Unable to close session: {res.text}")
except Exception as e:
self.log.error(f"Unable to get session: {e}")
def playbackEnvelope_data(self, titles):
try:
res = self.session.get(
url=self.endpoints["metadata"],
params={
"metadataToEnrich": json.dumps({"placement": "HOVER", "playback": "true", "preroll": "true", "trailer": "true", "watchlist": "true"}),
"titleIDsToEnrich": json.dumps(titles),
"currentUrl": f"https://{self.region['base']}/"
},
headers={
"device-memory": "8",
"downlink": "10",
"dpr": "2",
"ect": "4g",
"rtt": "50",
"viewport-width": "671",
"x-amzn-client-ttl-seconds": "15",
"x-amzn-requestid": "".join(random.choices(string.ascii_uppercase + string.digits, k=20)).upper(),
"x-requested-with": "XMLHttpRequest"
}
)
if res.status_code == 200:
try:
data = res.json()
playbackEnvelope_info = []
enrichments = data["enrichments"]
for titleid_, enrichment in list(enrichments.items()):
playbackActions = enrichment["playbackActions"]
if enrichment["entitlementCues"]['focusMessage'].get('message') == "Watch with a 30 day free Prime trial, auto renews at €4.99/month":
self.log.error("Cookies Expired")
return []
if playbackActions == []:
continue
for playbackAction in playbackActions:
if playbackAction.get("titleID") or playbackAction.get("legacyOfferASIN"):
title_id = titleid_
playbackExperienceMetadata = playbackAction.get("playbackExperienceMetadata")
if not title_id or not playbackExperienceMetadata:
continue
playbackEnvelope_info.append({"titleID": title_id, "playbackExperienceMetadata": playbackExperienceMetadata})
return playbackEnvelope_info
except Exception as e:
self.log.error(f"Unable to get playbackEnvelope: {e}")
return []
else:
return []
except Exception as e:
return []
def playbackEnvelope_update(self, playbackInfo):
try:
if not playbackInfo:
self.log.error("Unable to update playbackEnvelope")
return playbackInfo
if (int(playbackInfo["playbackExperienceMetadata"]["expiryTime"]) / 1000) < int(datetime.now().timestamp()):
self.log.warning("Updating playbackEnvelope")
correlationId = playbackInfo["playbackExperienceMetadata"]["correlationId"]
titleID = playbackInfo["titleID"]
res = self.session.post(
url=self.endpoints["refreshplayback"],
params={
"deviceID": self.device_id,
"deviceTypeID": self.device["device_type"],
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"uxLocale": "en_EN",
"firmware": "1",
"version": "1",
"nerid": self.generate_nerid()
},
data=json.dumps({
"deviceId": self.device_id,
"deviceTypeId": self.device["device_type"],
"identifiers": {titleID: correlationId},
"geoToken": "null",
"identityContext": "null"
})
)
if res.status_code == 200:
try:
data = res.json()
playbackExperience = data["response"][titleID]["playbackExperience"]
playbackExperience["expiryTime"] = int(playbackExperience["expiryTime"] * 1000)
return {"titleID": titleID, "playbackExperienceMetadata": playbackExperience}
except Exception as e:
sys.exit(f"Unable to update playbackEnvelope: {e}")
else:
sys.exit(f"Unable to update playbackEnvelope: {res.text}")
else:
return playbackInfo
except Exception as e:
sys.exit(f"Unable to update playbackEnvelope: {e}")
def get_manifest(
self, title, video_codec: str, bitrate_mode: str, quality: str, hdr=None,
ignore_errors: bool = False
) -> dict:
self.playbackInfo = self.playbackEnvelope_update(title.data.get("playbackInfo"))
title.data["playbackInfo"] = self.playbackInfo
data_dict = {
"globalParameters": {
"deviceCapabilityFamily": "WebPlayer" if not self.device_token else "AndroidPlayer",
"playbackEnvelope": self.playbackInfo["playbackExperienceMetadata"]["playbackEnvelope"],
"capabilityDiscriminators": {
"operatingSystem": {"name": "Windows", "version": "10.0"},
"middleware": {"name": "EdgeNext", "version": "136.0.0.0"},
"nativeApplication": {"name": "EdgeNext", "version": "136.0.0.0"},
"hfrControlMode": "Legacy",
"displayResolution": {"height": 2304, "width": 4096}
} if not self.device_token else {
"discriminators": {
"software": {},
"version": 1
}
}
},
"auditPingsRequest": {
**({
"device": {
"category": "Tv",
"platform": "Android"
}
} if self.device_token else {})
},
"playbackDataRequest": {},
"timedTextUrlsRequest": {
"supportedTimedTextFormats": ["TTMLv2", "DFXP"]
},
"trickplayUrlsRequest": {},
"transitionTimecodesRequest": {},
"vodPlaybackUrlsRequest":
{
"device": {
"hdcpLevel": "2.2" if quality == "UHD" else "1.4",
"maxVideoResolution": (
"1080p" if quality == "HD" else
"480p" if quality == "SD" else
"2160p"
),
"supportedStreamingTechnologies": ["DASH"],
"streamingTechnologies": {
"DASH": {
"bitrateAdaptations": ["CVBR", "CBR"] if bitrate_mode in ("CVBR+CBR", "CVBR,CBR") else [bitrate_mode],
"codecs": [video_codec],
"drmKeyScheme": "SingleKey" if self.playready else "DualKey",
"drmType": "PlayReady" if self.playready else "Widevine",
"dynamicRangeFormats": self.VIDEO_RANGE_MAP.get(hdr, "None"),
"fragmentRepresentations": ["ByteOffsetRange", "SeparateFile"],
"frameRates": ["Standard"],
"segmentInfoType": "Base",
"timedTextRepresentations": [
"NotInManifestNorStream",
"SeparateStreamInManifest"
],
"trickplayRepresentations": ["NotInManifestNorStream"],
"variableAspectRatio": "supported"
}
},
"displayWidth": 4096,
"displayHeight": 2304
},
"ads": {
"sitePageUrl": "",
"gdpr": {
"enabled": "false",
"consentMap": {}
}
},
"playbackCustomizations": {},
"playbackSettingsRequest": {
"firmware": "UNKNOWN",
"playerType": self.player,
"responseFormatVersion": "1.0.0",
"titleId": title.id
}
} if not self.device_token else {
"ads": {},
"device": {
"displayBasedVending": "supported",
"displayHeight": 2304,
"displayWidth": 4096,
"streamingTechnologies": {
"DASH": {
"fragmentRepresentations": [
"ByteOffsetRange",
"SeparateFile"
],
"manifestThinningToSupportedResolution": "Forbidden",
"segmentInfoType": "List",
"timedTextRepresentations": [
"BurnedIn",
"NotInManifestNorStream",
"SeparateStreamInManifest"
],
"trickplayRepresentations": [
"NotInManifestNorStream"
],
"variableAspectRatio": "supported",
"vastTimelineType": "Absolute",
"bitrateAdaptations": ["CVBR", "CBR"] if bitrate_mode in ("CVBR+CBR", "CVBR,CBR") else [bitrate_mode],
"codecs": [video_codec],
"drmKeyScheme": "SingleKey",
"drmStrength": "L40",
"drmType": "PlayReady" if self.playready else "Widevine",
"dynamicRangeFormats": [self.VIDEO_RANGE_MAP.get(hdr, "None")],
"frameRates": ["Standard"]
},
"SmoothStreaming": {
"fragmentRepresentations": [
"ByteOffsetRange",
"SeparateFile"
],
"manifestThinningToSupportedResolution": "Forbidden",
"segmentInfoType": "List",
"timedTextRepresentations": [
"BurnedIn",
"NotInManifestNorStream",
"SeparateStreamInManifest"
],
"trickplayRepresentations": [
"NotInManifestNorStream"
],
"variableAspectRatio": "supported",
"vastTimelineType": "Absolute",
"bitrateAdaptations": ["CVBR", "CBR"] if bitrate_mode in ("CVBR+CBR", "CVBR,CBR") else [bitrate_mode],
"codecs": [video_codec],
"drmKeyScheme": "SingleKey",
"drmStrength": "L40",
"drmType": "PlayReady",
"dynamicRangeFormats": [self.VIDEO_RANGE_MAP.get(hdr, "None")],
"frameRates": ["Standard"]
}
},
"acceptedCreativeApis": [],
"category": "Tv",
"hdcpLevel": "2.2",
"maxVideoResolution": "2160p",
"platform": "Android",
"supportedStreamingTechnologies": [
"DASH", "SmoothStreaming"
]
},
"playbackCustomizations": {},
"playbackSettingsRequest": {
"firmware": "UNKNOWN",
"playerType": self.player,
"responseFormatVersion": "1.0.0",
"titleId": title.id
}
},
"vodXrayMetadataRequest": {
"xrayDeviceClass": "normal",
"xrayPlaybackMode": "playback",
"xrayToken": "XRAY_WEB_2023_V2"
}
}
json_data = json.dumps(data_dict)
res = self.session.post(
url=self.endpoints["playback"],
params={
"deviceID": self.device_id,
"deviceTypeID": self.device["device_type"],
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"uxLocale": "en_EN",
"firmware": "1",
"titleId": title.id,
"nerid": self.generate_nerid(),
},
data=json_data,
headers={
"Authorization": f"Bearer {self.device_token}" if self.device_token else None,
},
)
try:
manifest = res.json()
except json.JSONDecodeError:
if ignore_errors:
return {}
sys.exit(f" - Amazon reported an error when obtaining the Playback Manifest\n{res.text}")
if "error" in manifest["vodPlaybackUrls"]:
if ignore_errors:
return {}
message = manifest["vodPlaybackUrls"]["error"]["message"]
sys.exit(f" - Amazon reported an error when obtaining the Playback Manifest: {message}")
if (
manifest.get("errorsByResource", {}).get("PlaybackUrls") and
manifest["errorsByResource"]["PlaybackUrls"].get("errorCode") != "PRS.NoRights.NotOwned"
):
if ignore_errors:
return {}
error = manifest["errorsByResource"]["PlaybackUrls"]
sys.exit(f" - Amazon had an error with the Playback Urls: {error['message']} [{error['errorCode']}]")
if (
manifest.get("errorsByResource", {}).get("AudioVideoUrls") and
manifest["errorsByResource"]["AudioVideoUrls"].get("errorCode") != "PRS.NoRights.NotOwned"
):
if ignore_errors:
return {}
error = manifest["errorsByResource"]["AudioVideoUrls"]
sys.exit(f" - Amazon had an error with the A/V Urls: {error['message']} [{error['errorCode']}]")
return manifest
def get_episodes(self, titleID):
episodes_map = {}
res = self.session.get(
url=self.endpoints["details"],
params={"titleID": titleID, "isElcano": "1", "sections": ["Atf", "Btf"]},
headers={"Accept": "application/json"},
)
if not res.ok:
sys.exit(f"Unable to get title: {res.text} [{res.status_code}]")
root_data = res.json()["widgets"]
seasons = [x.get("titleID") for x in root_data.get("seasonSelector", [])]
for season in seasons:
season_data = self.session.get(
url=self.endpoints["details"],
params={"titleID": season, "isElcano": "1", "sections": "Btf"},
headers={"Accept": "application/json"},
).json()["widgets"]
product_details = season_data["productDetails"]["detail"]
episodes = season_data.get("episodeList", {}).get("episodes", [])
while True:
episodes_titles = []
for ep in episodes:
details = ep["detail"]
ep_id = details["catalogId"]
seq = ep["self"]["sequenceNumber"]
if ep_id not in episodes_map:
episodes_map[ep_id] = Episode(
id_=ep_id,
service=self.__class__,
title=product_details.get("parentTitle"),
season=product_details.get("seasonNumber"),
number=seq,
name=details.get("title"),
language=None,
data=ep,
)
else:
episodes_map[ep_id].data.update(ep)
episodes_titles.append(ep_id)
if episodes_titles:
playbackEnvelope_info = self.playbackEnvelope_data(episodes_titles)
playback_map = {info["titleID"]: info for info in playbackEnvelope_info}
for ep_id in episodes_titles:
if ep_id in playback_map:
episodes_map[ep_id].data.update({"playbackInfo": playback_map[ep_id]})
pagination_data = season_data.get("episodeList", {}).get("actions", {}).get("pagination", [])
token = next(
(quote(item.get("token")) for item in pagination_data if item.get("tokenType") == "NextPage"),
None,
)
if not token:
break
season_data = self.session.get(
url=self.endpoints["getDetailWidgets"],
params={
"titleID": season,
"isTvodOnRow": "1",
"widgets": f'[{{"widgetType":"EpisodeList","widgetToken":"{token}"}}]',
},
headers={"Accept": "application/json"},
).json()["widgets"]
episodes = season_data.get("episodeList", {}).get("episodes", [])
unique_sorted = sorted(
episodes_map.values(),
key=lambda ep: (ep.season, ep.number)
)
return Series(unique_sorted)
@staticmethod
def get_original_language(manifest):
"""Get a title's original language from manifest data."""
try:
return next(
x["language"].replace("_", "-")
for x in manifest["catalogMetadata"]["playback"]["audioTracks"]
if x["isOriginalLanguage"]
)
except (KeyError, StopIteration):
pass
if "defaultAudioTrackId" in manifest.get("vodPlaybackUrls", {}).get("result", {}).get("playbackUrls", {}):
try:
return manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["defaultAudioTrackId"].split("_")[0]
except IndexError:
pass
try:
return sorted(
manifest["audioVideoUrls"]["audioTrackMetadata"],
key=lambda x: x["index"]
)[0]["languageCode"]
except (KeyError, IndexError):
pass
return None
@staticmethod
def generate_nerid(e=0):
"""Generates Network Edge Request ID."""
BASE64_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
timestamp = (int(datetime.now().timestamp()) * 1000)
ts_chars = []
for _ in range(7):
ts_chars.append(BASE64_CHARS[timestamp % 64])
timestamp //= 64
ts_part = ''.join(reversed(ts_chars))
rand_part = ''.join(secrets.choice(BASE64_CHARS) for _ in range(15))
suffix = f"{e % 100:02d}"
return ts_part + rand_part + suffix
@staticmethod
def clean_mpd_url(mpd_url, optimise=False):
"""Clean up an Amazon MPD manifest url."""
if '@' in mpd_url:
mpd_url = re.sub(r'/\d+@[^/]+', '', mpd_url, count=1)
if optimise:
return mpd_url.replace("~", "") + "?encoding=segmentBase"
if match := re.match(r"(https?://.*/)d.?/.*~/(.*)", mpd_url):
mpd_url = "".join(match.groups())
else:
try:
mpd_url = "".join(
re.split(r"(?i)(/)", mpd_url)[:5] + re.split(r"(?i)(/)", mpd_url)[9:]
)
except IndexError:
raise IndexError("Unable to parse MPD URL")
return mpd_url
def get_best_quality(self, title):
"""
Choose the best quality manifest from CBR / CVBR
"""
tracks = Tracks()
bitrates = [self.orig_bitrate]
if self.vcodec != Video.Codec.HEVC:
bitrates = self.orig_bitrate.split('+')
for bitrate in bitrates:
manifest = self.get_manifest(
title,
video_codec=self.vcodec,
bitrate_mode=bitrate,
quality=self.vquality,
hdr=self.range,
ignore_errors=False
)
if not manifest:
self.log.warning(f"Skipping {bitrate} manifest due to error")
continue
bitrate = manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["bitrateAdaptation"]
chosen_manifest = self.choose_manifest(manifest, self.cdn)
if not chosen_manifest:
self.log.warning(f"No {bitrate} DASH manifests available")
continue
mpd_url = self.clean_mpd_url(chosen_manifest["url"], optimise=False)
self.log.debug(mpd_url)
if self.event:
devicetype = self.device["device_type"]
mpd_url = chosen_manifest["url"]
mpd_url = f"{mpd_url}?amznDtid={devicetype}&encoding=segmentBase"
self.log.info(f" + Downloading {bitrate} MPD")
streamingProtocol = manifest["vodPlaybackUrls"]["result"]["playbackUrls"]["urlMetadata"]["streamingProtocol"]
sessionHandoffToken = manifest["sessionization"]["sessionHandoffToken"]
if streamingProtocol == "DASH":
new_tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language)
for track in new_tracks:
track.extra['sessionHandoffToken'] = sessionHandoffToken
elif streamingProtocol == "SmoothStreaming":
new_tracks = ISM.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language)
for track in new_tracks:
track.extra['sessionHandoffToken'] = sessionHandoffToken
else:
sys.exit(f"Unsupported manifest type: {streamingProtocol}")
self._restore_audio_language(new_tracks)
for audio in new_tracks.audio:
adaptation_set = audio.data.get("dash", {}).get("adaptation_set", {})
if adaptation_set.get("audioTrackSubtype") == "descriptive":
audio.descriptive = True
track_label = adaptation_set.get("label")
if track_label and "Audio Description" in track_label:
audio.descriptive = True
new_tracks.audio = [
audio for audio in new_tracks.audio
if not any(boost_type in audio.data.get("dash", {}).get("adaptation_set", {}).get("audioTrackSubtype", "").lower()
for boost_type in ["boosteddialoglow", "boosteddialogmedium", "boosteddialoghigh"])
]
for video in new_tracks.videos:
video.note = bitrate
tracks.add(new_tracks)
if len(self.bitrate.split('+')) > 1:
self.bitrate = "CVBR,CBR"
self.log.info("Selected video manifest bitrate: %s", self.bitrate)
return tracks
# Service specific classes
class DeviceRegistration:
def __init__(self, device, endpoints, cache_path, session, log):
self.session = session
self.device = device
self.endpoints = endpoints
self.cache_path = cache_path # can be a path/str or a Cacher instance
self.log = log
# normalize device values
self.device = {k: str(v) if not isinstance(v, str) else v for k, v in self.device.items()}
self.bearer = None
# Resolve cache into a Cacher-like object and a key
if isinstance(self.cache_path, Cacher):
cacher_instance = self.cache_path
cache_key = cacher_instance.key or (cacher_instance.path.stem if cacher_instance.path else None)
if not cacher_instance.data and cache_key:
try:
cacher_instance = Cacher(cacher_instance.service_tag).get(cache_key)
except Exception:
pass
else:
p = Path(self.cache_path)
cache_key = p.stem
cacher_instance = Cacher("AMZN").get(cache_key)
if cacher_instance and cacher_instance.data:
cache = cacher_instance # type: ignore
if cache.expiration and cache.expiration > datetime.now():
self.log.info(" + Using cached device bearer")
self.bearer = cache.data.get("access_token")
else:
self.log.info("Cached device bearer expired, refreshing...")
refresh_token = cache.data.get("refresh_token")
if not refresh_token:
self.log.info("No refresh token in cache; registering a new device")
self.bearer = self.register(self.device)
else:
refreshed_tokens = self.refresh(self.device, refresh_token)
refreshed_tokens["refresh_token"] = refreshed_tokens.get("refresh_token", refresh_token)
expires_in = refreshed_tokens.get("expires_in")
if isinstance(expires_in, (int, float, str)):
try:
expires_seconds = int(expires_in)
expiration_dt = datetime.fromtimestamp(time.time() + expires_seconds)
except Exception:
try:
expiration_dt = datetime.fromtimestamp(int(expires_in))
except Exception:
expiration_dt = None
elif isinstance(expires_in, datetime):
expiration_dt = expires_in
else:
expiration_dt = None
try:
setter = Cacher("AMZN", cache_key)
setter.set(refreshed_tokens, expiration=expiration_dt)
except Exception:
try:
with open(p if not isinstance(self.cache_path, Cacher) else cacher_instance.path, "w", encoding="utf-8") as fd:
fd.write(jsonpickle.encode(refreshed_tokens))
except Exception:
pass
self.bearer = refreshed_tokens.get("access_token")
else:
self.log.info(" + Registering new device bearer")
self.bearer = self.register(self.device)
def _exit(self, message: str):
"""Call logger exit if available, otherwise raise SystemExit"""
if hasattr(self.log, "exit"):
try:
return self.log.error(message)
except Exception:
raise SystemExit(message)
raise SystemExit(message)
def register(self, device: dict) -> str:
"""
Register device to the account and return access token string.
"""
csrf_token = self.get_csrf_token()
code_pair = self.get_code_pair(device)
form_data = {
"ref_": "atv_set_rd_reg",
"publicCode": code_pair["public_code"],
"token": csrf_token,
}
response = self.session.post(
url=self.endpoints["devicelink"],
headers={
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9,es-US;q=0.8,es;q=0.7",
"Content-Type": "application/x-www-form-urlencoded",
"Referer": self.endpoints.get("ontv", "")
},
data=form_data,
)
if response.status_code != 200:
self._exit(f"Unexpected response with the codeBasedLinking request: {response.text} [{response.status_code}]")
response = self.session.post(
url=self.endpoints["register"],
headers={
"Content-Type": "application/json",
"Accept-Language": "en-US"
},
json={
"auth_data": {"code_pair": code_pair},
"registration_data": device,
"requested_token_type": ["bearer"],
"requested_extensions": ["device_info", "customer_info"]
},
cookies=None,
)
if response.status_code != 200:
self._exit(f"Unable to register: {response.text} [{response.status_code}]")
try:
resp_json = response.json()
bearer = resp_json["response"]["success"]["tokens"]["bearer"]
except Exception:
self._exit(f"Unexpected register response: {response.text}")
try:
expires_seconds = int(bearer.get("expires_in", 3600))
expiration_dt = datetime.fromtimestamp(time.time() + expires_seconds)
except Exception:
expiration_dt = None
try:
key = Path(self.cache_path).stem if not isinstance(self.cache_path, Cacher) else (self.cache_path.key or Path(self.cache_path.path).stem)
setter = Cacher("AMZN", key)
setter.set(bearer, expiration=expiration_dt)
except Exception:
try:
cache_file = self.cache_path if not isinstance(self.cache_path, Cacher) else self.cache_path.path
with open(cache_file, "w", encoding="utf-8") as fd:
fd.write(jsonpickle.encode(bearer))
except Exception:
pass
return bearer.get("access_token")
def refresh(self, device: dict, refresh_token: str) -> dict:
response = self.session.post(
url=self.endpoints["token"],
json={
"app_name": device.get("app_name", "unshackle"),
"app_version": device.get("app_version", "1.0"),
"source_token_type": "refresh_token",
"source_token": refresh_token,
"requested_token_type": "access_token"
}
)
if response.status_code != 200:
self._exit(f"Unable to refresh token: {response.text} [{response.status_code}]")
return response.json()
def get_csrf_token(self) -> str:
res = self.session.get(self.endpoints["ontv"])
html = res.text
if 'input type="hidden" name="appAction" value="SIGNIN"' in html:
self._exit(
"Cookies are signed out, cannot get ontv CSRF token. "
f"Expecting profile to have cookies for: {self.endpoints['ontv']}"
)
for match in re.finditer(r"<script type=\"text/template\">(.+?)</script>", html, re.DOTALL):
try:
prop = json.loads(match.group(1))
except Exception:
continue
token = prop.get("props", {}).get("codeEntry", {}).get("token")
if token:
return token
self._exit("Unable to get ontv CSRF token")
def get_code_pair(self, device: dict) -> dict:
res = self.session.post(
url=self.endpoints["codepair"],
headers={
"Content-Type": "application/json",
"Accept-Language": "en-US"
},
json={"code_data": device}
)
try:
data = res.json()
except Exception:
self._exit(f"Unable to parse code pair response: {res.text}")
if "error" in data:
self._exit(f"Unable to get code pair: {data.get('error_description', '')} [{data.get('error')}]")
return data