refactor: remove remote-service code until feature is more complete
Temporarily removes client-side remote service discovery and authentication until the implementation is more fleshed out and working.
This commit is contained in:
@@ -191,12 +191,73 @@ def serialize_title(title: Title_T) -> Dict[str, Any]:
|
||||
return result
|
||||
|
||||
|
||||
def serialize_video_track(track: Video) -> Dict[str, Any]:
|
||||
def serialize_drm(drm_list) -> Optional[List[Dict[str, Any]]]:
|
||||
"""Serialize DRM objects to JSON-serializable list."""
|
||||
if not drm_list:
|
||||
return None
|
||||
|
||||
if not isinstance(drm_list, list):
|
||||
drm_list = [drm_list]
|
||||
|
||||
result = []
|
||||
for drm in drm_list:
|
||||
drm_info = {}
|
||||
drm_class = drm.__class__.__name__
|
||||
drm_info["type"] = drm_class.lower()
|
||||
|
||||
# Get PSSH - handle both Widevine and PlayReady
|
||||
if hasattr(drm, "_pssh") and drm._pssh:
|
||||
try:
|
||||
pssh_obj = drm._pssh
|
||||
# Try to get base64 representation
|
||||
if hasattr(pssh_obj, "dumps"):
|
||||
# pywidevine PSSH has dumps() method
|
||||
drm_info["pssh"] = pssh_obj.dumps()
|
||||
elif hasattr(pssh_obj, "__bytes__"):
|
||||
# Convert to base64
|
||||
import base64
|
||||
drm_info["pssh"] = base64.b64encode(bytes(pssh_obj)).decode()
|
||||
elif hasattr(pssh_obj, "to_base64"):
|
||||
drm_info["pssh"] = pssh_obj.to_base64()
|
||||
else:
|
||||
# Fallback - str() works for pywidevine PSSH
|
||||
pssh_str = str(pssh_obj)
|
||||
# Check if it's already base64-like or an object repr
|
||||
if not pssh_str.startswith("<"):
|
||||
drm_info["pssh"] = pssh_str
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Get KIDs
|
||||
if hasattr(drm, "kids") and drm.kids:
|
||||
drm_info["kids"] = [str(kid) for kid in drm.kids]
|
||||
|
||||
# Get content keys if available
|
||||
if hasattr(drm, "content_keys") and drm.content_keys:
|
||||
drm_info["content_keys"] = {str(k): v for k, v in drm.content_keys.items()}
|
||||
|
||||
# Get license URL - essential for remote licensing
|
||||
if hasattr(drm, "license_url") and drm.license_url:
|
||||
drm_info["license_url"] = str(drm.license_url)
|
||||
elif hasattr(drm, "_license_url") and drm._license_url:
|
||||
drm_info["license_url"] = str(drm._license_url)
|
||||
|
||||
result.append(drm_info)
|
||||
|
||||
return result if result else None
|
||||
|
||||
|
||||
def serialize_video_track(track: Video, include_url: bool = False) -> Dict[str, Any]:
|
||||
"""Convert video track to JSON-serializable dict."""
|
||||
codec_name = track.codec.name if hasattr(track.codec, "name") else str(track.codec)
|
||||
range_name = track.range.name if hasattr(track.range, "name") else str(track.range)
|
||||
|
||||
return {
|
||||
# Get descriptor for N_m3u8DL-RE compatibility (HLS, DASH, URL, etc.)
|
||||
descriptor_name = None
|
||||
if hasattr(track, "descriptor") and track.descriptor:
|
||||
descriptor_name = track.descriptor.name if hasattr(track.descriptor, "name") else str(track.descriptor)
|
||||
|
||||
result = {
|
||||
"id": str(track.id),
|
||||
"codec": codec_name,
|
||||
"codec_display": VIDEO_CODEC_MAP.get(codec_name, codec_name),
|
||||
@@ -208,15 +269,24 @@ def serialize_video_track(track: Video) -> Dict[str, Any]:
|
||||
"range": range_name,
|
||||
"range_display": DYNAMIC_RANGE_MAP.get(range_name, range_name),
|
||||
"language": str(track.language) if track.language else None,
|
||||
"drm": str(track.drm) if hasattr(track, "drm") and track.drm else None,
|
||||
"drm": serialize_drm(track.drm) if hasattr(track, "drm") and track.drm else None,
|
||||
"descriptor": descriptor_name,
|
||||
}
|
||||
if include_url and hasattr(track, "url") and track.url:
|
||||
result["url"] = str(track.url)
|
||||
return result
|
||||
|
||||
|
||||
def serialize_audio_track(track: Audio) -> Dict[str, Any]:
|
||||
def serialize_audio_track(track: Audio, include_url: bool = False) -> Dict[str, Any]:
|
||||
"""Convert audio track to JSON-serializable dict."""
|
||||
codec_name = track.codec.name if hasattr(track.codec, "name") else str(track.codec)
|
||||
|
||||
return {
|
||||
# Get descriptor for N_m3u8DL-RE compatibility
|
||||
descriptor_name = None
|
||||
if hasattr(track, "descriptor") and track.descriptor:
|
||||
descriptor_name = track.descriptor.name if hasattr(track.descriptor, "name") else str(track.descriptor)
|
||||
|
||||
result = {
|
||||
"id": str(track.id),
|
||||
"codec": codec_name,
|
||||
"codec_display": AUDIO_CODEC_MAP.get(codec_name, codec_name),
|
||||
@@ -225,20 +295,33 @@ def serialize_audio_track(track: Audio) -> Dict[str, Any]:
|
||||
"language": str(track.language) if track.language else None,
|
||||
"atmos": track.atmos if hasattr(track, "atmos") else False,
|
||||
"descriptive": track.descriptive if hasattr(track, "descriptive") else False,
|
||||
"drm": str(track.drm) if hasattr(track, "drm") and track.drm else None,
|
||||
"drm": serialize_drm(track.drm) if hasattr(track, "drm") and track.drm else None,
|
||||
"descriptor": descriptor_name,
|
||||
}
|
||||
if include_url and hasattr(track, "url") and track.url:
|
||||
result["url"] = str(track.url)
|
||||
return result
|
||||
|
||||
|
||||
def serialize_subtitle_track(track: Subtitle) -> Dict[str, Any]:
|
||||
def serialize_subtitle_track(track: Subtitle, include_url: bool = False) -> Dict[str, Any]:
|
||||
"""Convert subtitle track to JSON-serializable dict."""
|
||||
return {
|
||||
# Get descriptor for compatibility
|
||||
descriptor_name = None
|
||||
if hasattr(track, "descriptor") and track.descriptor:
|
||||
descriptor_name = track.descriptor.name if hasattr(track.descriptor, "name") else str(track.descriptor)
|
||||
|
||||
result = {
|
||||
"id": str(track.id),
|
||||
"codec": track.codec.name if hasattr(track.codec, "name") else str(track.codec),
|
||||
"language": str(track.language) if track.language else None,
|
||||
"forced": track.forced if hasattr(track, "forced") else False,
|
||||
"sdh": track.sdh if hasattr(track, "sdh") else False,
|
||||
"cc": track.cc if hasattr(track, "cc") else False,
|
||||
"descriptor": descriptor_name,
|
||||
}
|
||||
if include_url and hasattr(track, "url") and track.url:
|
||||
result["url"] = str(track.url)
|
||||
return result
|
||||
|
||||
|
||||
async def list_titles_handler(data: Dict[str, Any], request: Optional[web.Request] = None) -> web.Response:
|
||||
|
||||
Reference in New Issue
Block a user