feat(drm): add MonaLisa DRM support to core infrastructure
- Add MonaLisaCDM class wrapping wasmtime for key extraction - Add MonaLisa DRM class with decrypt_segment() for per-segment decryption - Display Content ID and keys in download output (matching Widevine/PlayReady) - Add wasmtime dependency for WASM module execution
This commit is contained in:
280
unshackle/core/drm/monalisa.py
Normal file
280
unshackle/core/drm/monalisa.py
Normal file
@@ -0,0 +1,280 @@
|
||||
"""
|
||||
MonaLisa DRM System.
|
||||
|
||||
A WASM-based DRM system that uses local key extraction and two-stage
|
||||
segment decryption (ML-Worker binary + AES-ECB).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Union
|
||||
from uuid import UUID
|
||||
|
||||
from Cryptodome.Cipher import AES
|
||||
from Cryptodome.Util.Padding import unpad
|
||||
|
||||
|
||||
class MonaLisa:
|
||||
"""
|
||||
MonaLisa DRM System.
|
||||
|
||||
Unlike Widevine/PlayReady, MonaLisa does not use a challenge/response flow
|
||||
with a license server. Instead, the PSSH value (ticket) is provided directly
|
||||
by the service API, and keys are extracted locally via a WASM module.
|
||||
|
||||
Decryption is performed in two stages:
|
||||
1. ML-Worker binary: Removes MonaLisa encryption layer (bbts -> ents)
|
||||
2. AES-ECB decryption: Final decryption with service-provided key
|
||||
"""
|
||||
|
||||
class Exceptions:
|
||||
class TicketNotFound(Exception):
|
||||
"""Raised when no PSSH/ticket data is provided."""
|
||||
|
||||
class KeyExtractionFailed(Exception):
|
||||
"""Raised when key extraction from the ticket fails."""
|
||||
|
||||
class WorkerNotFound(Exception):
|
||||
"""Raised when the ML-Worker binary is not found."""
|
||||
|
||||
class DecryptionFailed(Exception):
|
||||
"""Raised when segment decryption fails."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ticket: Union[str, bytes],
|
||||
aes_key: Union[str, bytes],
|
||||
device_path: Path,
|
||||
**kwargs: Any,
|
||||
):
|
||||
"""
|
||||
Initialize MonaLisa DRM.
|
||||
|
||||
Args:
|
||||
ticket: PSSH value from service API (base64 string or raw bytes).
|
||||
aes_key: AES-ECB key for second-stage decryption (hex string or bytes).
|
||||
device_path: Path to the CDM device file (.mld).
|
||||
**kwargs: Additional metadata stored in self.data.
|
||||
|
||||
Raises:
|
||||
TicketNotFound: If ticket/PSSH is empty.
|
||||
KeyExtractionFailed: If key extraction fails.
|
||||
"""
|
||||
if not ticket:
|
||||
raise MonaLisa.Exceptions.TicketNotFound("No PSSH/ticket data provided.")
|
||||
|
||||
self._ticket = ticket
|
||||
|
||||
# Store AES key for second-stage decryption
|
||||
if isinstance(aes_key, str):
|
||||
self._aes_key = bytes.fromhex(aes_key)
|
||||
else:
|
||||
self._aes_key = aes_key
|
||||
|
||||
self._device_path = device_path
|
||||
self._kid: Optional[UUID] = None
|
||||
self._key: Optional[str] = None
|
||||
self.data: dict = kwargs or {}
|
||||
|
||||
# Extract keys immediately
|
||||
self._extract_keys()
|
||||
|
||||
def _extract_keys(self) -> None:
|
||||
"""Extract keys from the ticket using the MonaLisa CDM."""
|
||||
# Import here to avoid circular import
|
||||
from unshackle.core.cdm.monalisa import MonaLisaCDM
|
||||
|
||||
try:
|
||||
cdm = MonaLisaCDM(device_path=self._device_path)
|
||||
session_id = cdm.open()
|
||||
try:
|
||||
keys = cdm.extract_keys(self._ticket)
|
||||
if keys:
|
||||
kid_hex = keys.get("kid")
|
||||
if kid_hex:
|
||||
self._kid = UUID(hex=kid_hex)
|
||||
self._key = keys.get("key")
|
||||
finally:
|
||||
cdm.close(session_id)
|
||||
except Exception as e:
|
||||
raise MonaLisa.Exceptions.KeyExtractionFailed(f"Failed to extract keys: {e}")
|
||||
|
||||
@classmethod
|
||||
def from_ticket(
|
||||
cls,
|
||||
ticket: Union[str, bytes],
|
||||
aes_key: Union[str, bytes],
|
||||
device_path: Path,
|
||||
) -> MonaLisa:
|
||||
"""
|
||||
Create a MonaLisa DRM instance from a PSSH/ticket.
|
||||
|
||||
Args:
|
||||
ticket: PSSH value from service API.
|
||||
aes_key: AES-ECB key for second-stage decryption.
|
||||
device_path: Path to the CDM device file (.mld).
|
||||
|
||||
Returns:
|
||||
MonaLisa DRM instance with extracted keys.
|
||||
"""
|
||||
return cls(ticket=ticket, aes_key=aes_key, device_path=device_path)
|
||||
|
||||
@property
|
||||
def kid(self) -> Optional[UUID]:
|
||||
"""Get the Key ID."""
|
||||
return self._kid
|
||||
|
||||
@property
|
||||
def key(self) -> Optional[str]:
|
||||
"""Get the content key as hex string."""
|
||||
return self._key
|
||||
|
||||
@property
|
||||
def pssh(self) -> str:
|
||||
"""
|
||||
Get the raw PSSH/ticket value as a string.
|
||||
|
||||
Returns:
|
||||
The raw PSSH value as a base64 string.
|
||||
"""
|
||||
if isinstance(self._ticket, bytes):
|
||||
return self._ticket.decode("utf-8")
|
||||
return self._ticket
|
||||
|
||||
@property
|
||||
def content_id(self) -> Optional[str]:
|
||||
"""
|
||||
Extract the Content ID from the PSSH for display.
|
||||
|
||||
The PSSH contains an embedded Content ID at bytes 21-75 with format:
|
||||
H5DCID-V3-P1-YYYYMMDD-HHMMSS-MEDIAID-TIMESTAMP-SUFFIX
|
||||
|
||||
Returns:
|
||||
The Content ID string if extractable, None otherwise.
|
||||
"""
|
||||
import base64
|
||||
|
||||
try:
|
||||
# Decode base64 PSSH to get raw bytes
|
||||
if isinstance(self._ticket, bytes):
|
||||
data = self._ticket
|
||||
else:
|
||||
data = base64.b64decode(self._ticket)
|
||||
|
||||
# Content ID is at bytes 21-75 (55 bytes)
|
||||
if len(data) >= 76:
|
||||
content_id = data[21:76].decode("ascii")
|
||||
# Validate it looks like a content ID
|
||||
if content_id.startswith("H5DCID-"):
|
||||
return content_id
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
def content_keys(self) -> dict[UUID, str]:
|
||||
"""
|
||||
Get content keys in the same format as Widevine/PlayReady.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping KID to key hex string.
|
||||
"""
|
||||
if self._kid and self._key:
|
||||
return {self._kid: self._key}
|
||||
return {}
|
||||
|
||||
def decrypt_segment(self, segment_path: Path) -> None:
|
||||
"""
|
||||
Decrypt a single segment using two-stage decryption.
|
||||
|
||||
Stage 1: ML-Worker binary (bbts -> ents)
|
||||
Stage 2: AES-ECB decryption (ents -> ts)
|
||||
|
||||
Args:
|
||||
segment_path: Path to the encrypted segment file.
|
||||
|
||||
Raises:
|
||||
WorkerNotFound: If ML-Worker binary is not available.
|
||||
DecryptionFailed: If decryption fails at any stage.
|
||||
"""
|
||||
if not self._key:
|
||||
return
|
||||
|
||||
# Import here to avoid circular import
|
||||
from unshackle.core.cdm.monalisa import MonaLisaCDM
|
||||
|
||||
worker_path = MonaLisaCDM.get_worker_path()
|
||||
if not worker_path or not worker_path.exists():
|
||||
raise MonaLisa.Exceptions.WorkerNotFound("ML-Worker not found.")
|
||||
|
||||
bbts_path = segment_path.with_suffix(".bbts")
|
||||
ents_path = segment_path.with_suffix(".ents")
|
||||
|
||||
try:
|
||||
if segment_path.exists():
|
||||
segment_path.replace(bbts_path)
|
||||
else:
|
||||
raise MonaLisa.Exceptions.DecryptionFailed(f"Segment file does not exist: {segment_path}")
|
||||
|
||||
# Stage 1: ML-Worker decryption
|
||||
cmd = [str(worker_path), self._key, str(bbts_path), str(ents_path)]
|
||||
|
||||
startupinfo = None
|
||||
if sys.platform == "win32":
|
||||
startupinfo = subprocess.STARTUPINFO()
|
||||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||||
|
||||
process = subprocess.run(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
startupinfo=startupinfo,
|
||||
)
|
||||
|
||||
if process.returncode != 0:
|
||||
raise MonaLisa.Exceptions.DecryptionFailed(
|
||||
f"ML-Worker failed for {segment_path.name}: {process.stderr}"
|
||||
)
|
||||
|
||||
if not ents_path.exists():
|
||||
raise MonaLisa.Exceptions.DecryptionFailed(
|
||||
f"Decrypted .ents file was not created for {segment_path.name}"
|
||||
)
|
||||
|
||||
# Stage 2: AES-ECB decryption
|
||||
with open(ents_path, "rb") as f:
|
||||
ents_data = f.read()
|
||||
|
||||
crypto = AES.new(self._aes_key, AES.MODE_ECB)
|
||||
decrypted_data = unpad(crypto.decrypt(ents_data), AES.block_size)
|
||||
|
||||
# Write decrypted segment back to original path
|
||||
with open(segment_path, "wb") as f:
|
||||
f.write(decrypted_data)
|
||||
|
||||
except MonaLisa.Exceptions.DecryptionFailed:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise MonaLisa.Exceptions.DecryptionFailed(f"Failed to decrypt segment {segment_path.name}: {e}")
|
||||
finally:
|
||||
if ents_path.exists():
|
||||
os.remove(ents_path)
|
||||
if bbts_path != segment_path and bbts_path.exists():
|
||||
os.remove(bbts_path)
|
||||
|
||||
def decrypt(self, _path: Path) -> None:
|
||||
"""
|
||||
MonaLisa uses per-segment decryption during download via the
|
||||
on_segment_downloaded callback. By the time this method is called,
|
||||
the content has already been decrypted and muxed into a container.
|
||||
|
||||
Args:
|
||||
path: Path to the file (ignored).
|
||||
"""
|
||||
pass
|
||||
Reference in New Issue
Block a user