mirror of
https://github.com/NohamR/OqeeRewind.git
synced 2026-01-10 08:08:20 +00:00
763 lines
27 KiB
Python
763 lines
27 KiB
Python
"""Utility module for streaming and manifest parsing."""
|
|
|
|
import xml.etree.ElementTree as ET
|
|
import base64
|
|
import os
|
|
import asyncio
|
|
import time
|
|
from typing import Dict, Any
|
|
from xml.dom import minidom
|
|
|
|
import requests
|
|
import aiohttp
|
|
from tqdm.asyncio import tqdm
|
|
from utils.logging_config import logger
|
|
|
|
# Register namespaces to preserve prefixes in output
|
|
ET.register_namespace("", "urn:mpeg:dash:schema:mpd:2011")
|
|
ET.register_namespace("xsi", "http://www.w3.org/2001/XMLSchema-instance")
|
|
ET.register_namespace("cenc", "urn:mpeg:cenc:2013")
|
|
|
|
|
|
def parse_mpd_manifest(mpd_content: str) -> Dict[str, Any]:
|
|
"""Parse an MPD manifest and extract metadata.
|
|
|
|
Args:
|
|
mpd_content: The MPD manifest content as a string.
|
|
|
|
Returns:
|
|
A dictionary containing parsed manifest information.
|
|
"""
|
|
root = ET.fromstring(mpd_content)
|
|
namespaces = {"mpd": "urn:mpeg:dash:schema:mpd:2011", "cenc": "urn:mpeg:cenc:2013"}
|
|
|
|
manifest_info = {
|
|
"type": root.get("type"),
|
|
"profiles": root.get("profiles"),
|
|
"publishTime": root.get("publishTime"),
|
|
"availabilityStartTime": root.get("availabilityStartTime"),
|
|
"minimumUpdatePeriod": root.get("minimumUpdatePeriod"),
|
|
"minBufferTime": root.get("minBufferTime"),
|
|
"timeShiftBufferDepth": root.get("timeShiftBufferDepth"),
|
|
"suggestedPresentationDelay": root.get("suggestedPresentationDelay"),
|
|
"periods": [],
|
|
}
|
|
|
|
for period in root.findall("mpd:Period", namespaces):
|
|
period_info = {
|
|
"id": period.get("id"),
|
|
"start": period.get("start"),
|
|
"adaptation_sets": [],
|
|
}
|
|
for adaptation_set in period.findall("mpd:AdaptationSet", namespaces):
|
|
adaptation_info = parse_adaptation_set(adaptation_set, namespaces)
|
|
period_info["adaptation_sets"].append(adaptation_info)
|
|
manifest_info["periods"].append(period_info)
|
|
return manifest_info
|
|
|
|
|
|
def parse_adaptation_set(
|
|
adaptation_set: ET.Element, namespaces: Dict[str, str]
|
|
) -> Dict[str, Any]:
|
|
"""Parse an AdaptationSet element from MPD manifest.
|
|
|
|
Args:
|
|
adaptation_set: The AdaptationSet XML element.
|
|
namespaces: XML namespaces dictionary.
|
|
|
|
Returns:
|
|
A dictionary containing parsed adaptation set information.
|
|
"""
|
|
adaptation_info = {
|
|
"id": adaptation_set.get("id"),
|
|
"group": adaptation_set.get("group"),
|
|
"contentType": adaptation_set.get("contentType"),
|
|
"lang": adaptation_set.get("lang"),
|
|
"segmentAlignment": adaptation_set.get("segmentAlignment"),
|
|
"startWithSAP": adaptation_set.get("startWithSAP"),
|
|
"drm_info": [],
|
|
"representations": [],
|
|
}
|
|
|
|
# Parse ContentProtection
|
|
for content_protection in adaptation_set.findall(
|
|
"mpd:ContentProtection", namespaces
|
|
):
|
|
drm_info = parse_content_protection(content_protection, namespaces)
|
|
adaptation_info["drm_info"].append(drm_info)
|
|
|
|
# Parse Role
|
|
role = adaptation_set.find("mpd:Role", namespaces)
|
|
if role is not None:
|
|
adaptation_info["role"] = role.get("value")
|
|
|
|
# Parse Representations
|
|
for representation in adaptation_set.findall("mpd:Representation", namespaces):
|
|
rep_info = parse_representation(representation, namespaces)
|
|
adaptation_info["representations"].append(rep_info)
|
|
|
|
return adaptation_info
|
|
|
|
|
|
def parse_content_protection(
|
|
content_protection: ET.Element, namespaces: Dict[str, str]
|
|
) -> Dict[str, Any]:
|
|
"""Parse ContentProtection element for DRM information.
|
|
|
|
Args:
|
|
content_protection: The ContentProtection XML element.
|
|
namespaces: XML namespaces dictionary.
|
|
|
|
Returns:
|
|
A dictionary containing DRM information.
|
|
"""
|
|
drm_info = {
|
|
"schemeIdUri": content_protection.get("schemeIdUri"),
|
|
"value": content_protection.get("value"),
|
|
}
|
|
|
|
default_kid = content_protection.get("{urn:mpeg:cenc:2013}default_KID")
|
|
if default_kid:
|
|
drm_info["default_KID"] = default_kid
|
|
|
|
pssh_element = content_protection.find("cenc:pssh", namespaces)
|
|
if pssh_element is not None and pssh_element.text:
|
|
drm_info["pssh"] = pssh_element.text.strip()
|
|
try:
|
|
pssh_decoded = base64.b64decode(drm_info["pssh"])
|
|
drm_info["pssh_hex"] = pssh_decoded.hex()
|
|
except (ValueError, base64.binascii.Error):
|
|
pass
|
|
|
|
return drm_info
|
|
|
|
|
|
def parse_representation(
|
|
representation: ET.Element, namespaces: Dict[str, str]
|
|
) -> Dict[str, Any]:
|
|
"""Parse Representation element from MPD manifest.
|
|
|
|
Args:
|
|
representation: The Representation XML element.
|
|
namespaces: XML namespaces dictionary.
|
|
|
|
Returns:
|
|
A dictionary containing parsed representation information.
|
|
"""
|
|
rep_info = {
|
|
"id": representation.get("id"),
|
|
"bandwidth": representation.get("bandwidth"),
|
|
"codecs": representation.get("codecs"),
|
|
"mimeType": representation.get("mimeType"),
|
|
"width": representation.get("width"),
|
|
"height": representation.get("height"),
|
|
"frameRate": representation.get("frameRate"),
|
|
"segments": {},
|
|
}
|
|
|
|
segment_template = representation.find("mpd:SegmentTemplate", namespaces)
|
|
if segment_template is not None:
|
|
rep_info["segments"] = {
|
|
"timescale": segment_template.get("timescale"),
|
|
"initialization": segment_template.get("initialization"),
|
|
"media": segment_template.get("media"),
|
|
"timeline": [],
|
|
}
|
|
|
|
segment_timeline = segment_template.find("mpd:SegmentTimeline", namespaces)
|
|
if segment_timeline is not None:
|
|
for s_element in segment_timeline.findall("mpd:S", namespaces):
|
|
timeline_info = {
|
|
"t": (
|
|
int(s_element.get("t")) if s_element.get("t") is not None else 0
|
|
), # start time
|
|
"d": (
|
|
int(s_element.get("d")) if s_element.get("d") is not None else 0
|
|
), # duration
|
|
"r": (
|
|
int(s_element.get("r")) if s_element.get("r") is not None else 0
|
|
), # repeat count
|
|
}
|
|
rep_info["segments"]["timeline"].append(timeline_info)
|
|
|
|
return rep_info
|
|
|
|
|
|
# pylint: disable=too-many-locals,too-many-branches
|
|
def organize_by_content_type(manifest_info: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Organize manifest information by content type.
|
|
|
|
Args:
|
|
manifest_info: Parsed manifest information dictionary.
|
|
|
|
Returns:
|
|
A dictionary organized by content type (video, audio, text).
|
|
"""
|
|
organized = {
|
|
"video": {},
|
|
"audio": {},
|
|
# 'text': {},
|
|
# 'manifest_metadata': {
|
|
# 'type': manifest_info.get('type'),
|
|
# 'publishTime': manifest_info.get('publishTime'),
|
|
# 'minBufferTime': manifest_info.get('minBufferTime'),
|
|
# }
|
|
}
|
|
|
|
for period in manifest_info.get("periods", []):
|
|
for adaptation_set in period.get("adaptation_sets", []):
|
|
content_type = adaptation_set.get("contentType")
|
|
|
|
if not content_type:
|
|
continue
|
|
|
|
for rep in adaptation_set.get("representations", []):
|
|
track_info = {
|
|
"track_id": rep.get("id"),
|
|
"adaptation_set_id": adaptation_set.get("id"),
|
|
"bandwidth": int(rep.get("bandwidth", 0)),
|
|
"bitrate_kbps": int(rep.get("bandwidth", 0)) // 1000,
|
|
"codec": rep.get("codecs"),
|
|
"mime_type": rep.get("mimeType"),
|
|
"drm_info": adaptation_set.get("drm_info", []),
|
|
"segments": rep.get("segments", {}),
|
|
}
|
|
|
|
if content_type == "video":
|
|
width = rep.get("width")
|
|
height = rep.get("height")
|
|
frame_rate = rep.get("frameRate")
|
|
|
|
track_info.update(
|
|
{
|
|
"resolution": (
|
|
f"{width}x{height}" if width and height else "unknown"
|
|
),
|
|
"width": int(width) if width else None,
|
|
"height": int(height) if height else None,
|
|
"frame_rate": frame_rate,
|
|
}
|
|
)
|
|
|
|
resolution_key = track_info["resolution"]
|
|
if resolution_key not in organized["video"]:
|
|
organized["video"][resolution_key] = []
|
|
organized["video"][resolution_key].append(track_info)
|
|
|
|
elif content_type == "audio":
|
|
lang = adaptation_set.get("lang", "unknown")
|
|
role = adaptation_set.get("role", "main")
|
|
|
|
track_info.update(
|
|
{
|
|
"language": lang,
|
|
"role": role,
|
|
}
|
|
)
|
|
|
|
lang_key = f"{lang}_{role}"
|
|
if lang_key not in organized["audio"]:
|
|
organized["audio"][lang_key] = []
|
|
organized["audio"][lang_key].append(track_info)
|
|
|
|
# elif content_type == 'text':
|
|
# lang = adaptation_set.get('lang', 'unknown')
|
|
# role = adaptation_set.get('role', 'caption')
|
|
|
|
# track_info.update({
|
|
# 'language': lang,
|
|
# 'role': role,
|
|
# })
|
|
|
|
# lang_key = f"{lang}_{role}"
|
|
# if lang_key not in organized['text']:
|
|
# organized['text'][lang_key] = []
|
|
# organized['text'][lang_key].append(track_info)
|
|
|
|
# Sort video tracks by resolution (descending) and then by bitrate (descending)
|
|
for resolution in organized["video"]:
|
|
organized["video"][resolution].sort(key=lambda x: x["bandwidth"], reverse=True)
|
|
|
|
# Sort audio tracks by bitrate (descending)
|
|
for lang in organized["audio"]:
|
|
organized["audio"][lang].sort(key=lambda x: x["bandwidth"], reverse=True)
|
|
|
|
# Sort video resolutions by pixel count (descending)
|
|
sorted_video = {}
|
|
for resolution in sorted(
|
|
organized["video"].keys(),
|
|
key=lambda r: (
|
|
int(r.split("x")[0]) * int(r.split("x")[1])
|
|
if "x" in r and r.split("x")[0].isdigit()
|
|
else 0
|
|
),
|
|
reverse=True,
|
|
):
|
|
sorted_video[resolution] = organized["video"][resolution]
|
|
organized["video"] = sorted_video
|
|
|
|
return organized
|
|
|
|
|
|
def get_manifest(manifest_id):
|
|
"""Fetch the MPD manifest for a given channel ID.
|
|
|
|
Args:
|
|
manifest_id: The channel/manifest identifier.
|
|
|
|
Returns:
|
|
The manifest content as text.
|
|
"""
|
|
headers = {
|
|
"accept": "*/*",
|
|
"accept-language": "en-GB,en-US;q=0.9,en;q=0.8",
|
|
"cache-control": "no-cache",
|
|
"origin": "https://tv.free.fr",
|
|
"pragma": "no-cache",
|
|
"priority": "u=1, i",
|
|
"referer": "https://tv.free.fr/",
|
|
"sec-ch-ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
|
|
"sec-ch-ua-mobile": "?0",
|
|
"sec-ch-ua-platform": '"macOS"',
|
|
"sec-fetch-dest": "empty",
|
|
"sec-fetch-mode": "cors",
|
|
"sec-fetch-site": "cross-site",
|
|
"user-agent": (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/143.0.0.0 Safari/537.36"
|
|
),
|
|
}
|
|
|
|
format_id = 1
|
|
url = (
|
|
"https://api-proxad.dc2.oqee.net/playlist/v1/live/"
|
|
f"{manifest_id}/{format_id}/live.mpd"
|
|
)
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
return response.text
|
|
|
|
|
|
async def fetch_segment(session, ticks, track_id):
|
|
"""Fetch a media segment asynchronously.
|
|
|
|
Args:
|
|
session: The aiohttp ClientSession.
|
|
ticks: The tick value for the segment.
|
|
track_id: The track identifier.
|
|
|
|
Returns:
|
|
The tick value if successful, None otherwise.
|
|
"""
|
|
url = f"https://media.stream.proxad.net/media/{track_id}_{ticks}"
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Referer": "https://tv.free.fr/",
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/143.0.0.0 Safari/537.36"
|
|
),
|
|
}
|
|
try:
|
|
async with session.get(url, headers=headers) as resp:
|
|
if resp.status == 200:
|
|
return ticks
|
|
return None
|
|
except aiohttp.ClientError:
|
|
return None
|
|
|
|
|
|
def get_init(output_folder, track_id):
|
|
"""Download and save the initialization segment for a track.
|
|
|
|
Args:
|
|
output_folder: The output folder path.
|
|
track_id: The track identifier.
|
|
"""
|
|
url = f"https://media.stream.proxad.net/media/{track_id}_init"
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Referer": "https://tv.free.fr/",
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/143.0.0.0 Safari/537.36"
|
|
),
|
|
}
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
if response.status_code == 200:
|
|
os.makedirs(f"{output_folder}/segments_{track_id}", exist_ok=True)
|
|
init_path = f"{output_folder}/segments_{track_id}/init.mp4"
|
|
with open(init_path, "wb") as f:
|
|
f.write(response.content)
|
|
logger.debug("Saved initialization segment to %s", init_path)
|
|
return init_path
|
|
|
|
|
|
async def save_segments(
|
|
output_folder, track_id, start_tick, rep_nb, duration, batch_size=64
|
|
):
|
|
"""Download and save multiple media segments in batches.
|
|
|
|
Args:
|
|
output_folder: The output folder path.
|
|
track_id: The track identifier.
|
|
start_tick: The starting tick value.
|
|
rep_nb: The number of segments to download.
|
|
duration: The duration per segment.
|
|
batch_size: Number of concurrent downloads per batch (default: 16).
|
|
"""
|
|
os.makedirs(f"{output_folder}/segments_{track_id}", exist_ok=True)
|
|
|
|
async def download_segment(session, tick, rep):
|
|
"""Download a single segment.
|
|
|
|
Returns:
|
|
Tuple of (success: bool, tick: int, rep: int)
|
|
"""
|
|
url = f"https://media.stream.proxad.net/media/{track_id}_{tick}"
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Referer": "https://tv.free.fr/",
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/143.0.0.0 Safari/537.36"
|
|
),
|
|
}
|
|
try:
|
|
async with session.get(url, headers=headers) as resp:
|
|
if resp.status == 200:
|
|
content = await resp.read()
|
|
filename = f"{output_folder}/segments_{track_id}/{tick}.m4s"
|
|
with open(filename, "wb") as f:
|
|
f.write(content)
|
|
return (True, tick, rep)
|
|
logger.error(
|
|
"Failed to download segment %d (tick %d): HTTP %d",
|
|
rep,
|
|
tick,
|
|
resp.status,
|
|
)
|
|
return (False, tick, rep)
|
|
except aiohttp.ClientError as e:
|
|
logger.warning("Error downloading segment %d (tick %d): %s", rep, tick, e)
|
|
return (False, tick, rep)
|
|
|
|
logger.info(
|
|
"Starting download of %d segments in batches of %d...", rep_nb, batch_size
|
|
)
|
|
logger.debug("Track ID: %s", track_id)
|
|
logger.debug("Base tick: %d", start_tick)
|
|
|
|
start_time = time.time()
|
|
|
|
# Build list of all segments to download
|
|
segments_to_download = [(start_tick + i * duration, i) for i in range(rep_nb)]
|
|
retry_list = []
|
|
successful = 0
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
# Process segments in batches
|
|
with tqdm(
|
|
total=len(segments_to_download), desc="Downloading segments", unit="seg"
|
|
) as pbar:
|
|
for batch_start in range(0, len(segments_to_download), batch_size):
|
|
batch = segments_to_download[batch_start : batch_start + batch_size]
|
|
tasks = [download_segment(session, tick, rep) for tick, rep in batch]
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
for success, tick, rep in results:
|
|
if success:
|
|
successful += 1
|
|
else:
|
|
retry_list.append((tick, rep))
|
|
pbar.update(1)
|
|
|
|
# Retry failed segments
|
|
if retry_list:
|
|
logger.info("Retrying %d failed segments...", len(retry_list))
|
|
retry_successful = 0
|
|
final_failures = []
|
|
|
|
with tqdm(
|
|
total=len(retry_list), desc="Retrying segments", unit="seg"
|
|
) as pbar:
|
|
for batch_start in range(0, len(retry_list), batch_size):
|
|
batch = retry_list[batch_start : batch_start + batch_size]
|
|
tasks = [
|
|
download_segment(session, tick, rep) for tick, rep in batch
|
|
]
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
for success, tick, rep in results:
|
|
if success:
|
|
retry_successful += 1
|
|
successful += 1
|
|
else:
|
|
final_failures.append((tick, rep))
|
|
pbar.update(1)
|
|
|
|
if final_failures:
|
|
logger.warning(
|
|
"Failed to download %d segments after retry: %s",
|
|
len(final_failures),
|
|
[tick for tick, _ in final_failures],
|
|
)
|
|
else:
|
|
logger.info(
|
|
"All %d retried segments downloaded successfully", retry_successful
|
|
)
|
|
|
|
end_time = time.time()
|
|
elapsed = end_time - start_time
|
|
|
|
logger.debug("Download completed in %.2fs", elapsed)
|
|
logger.info("Successfully downloaded %d/%d segments", successful, rep_nb)
|
|
logger.info("Files saved to %s/segments_%s/", output_folder, track_id)
|
|
|
|
|
|
def get_kid(output_folder, track_id):
|
|
"""Extract the Key ID (KID) from downloaded segments.
|
|
|
|
Args:
|
|
output_folder: The output folder path.
|
|
track_id: The track identifier.
|
|
|
|
Returns:
|
|
The KID as a hex string if found, None otherwise.
|
|
"""
|
|
folder = f"{output_folder}/segments_{track_id}"
|
|
for filename in os.listdir(folder):
|
|
if filename.endswith(".m4s"):
|
|
filepath = os.path.join(folder, filename)
|
|
logger.debug("Checking file: %s", filepath)
|
|
with open(filepath, "rb") as f:
|
|
data = f.read()
|
|
# Pattern before KID
|
|
index = data.find(
|
|
b"\x73\x65\x69\x67\x00\x00\x00\x14"
|
|
b"\x00\x00\x00\x01\x00\x00\x01\x10"
|
|
)
|
|
if index != -1:
|
|
kid_bytes = data[index + 16 : index + 16 + 16]
|
|
kid = kid_bytes.hex()
|
|
return kid
|
|
return None
|
|
|
|
|
|
def generate_mpd_manifest(manifest_info: Dict[str, Any]) -> str:
|
|
"""Generate an MPD manifest from parsed manifest information.
|
|
|
|
Args:
|
|
manifest_info: A dictionary containing manifest information.
|
|
|
|
Returns:
|
|
The MPD manifest content as a string.
|
|
"""
|
|
mpd = ET.Element("MPD")
|
|
xsi_ns = "{http://www.w3.org/2001/XMLSchema-instance}"
|
|
mpd.set(f"{xsi_ns}schemaLocation", "urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd")
|
|
|
|
# Set MPD attributes
|
|
if manifest_info.get("profiles"):
|
|
mpd.set("profiles", manifest_info["profiles"])
|
|
if manifest_info.get("type"):
|
|
mpd.set("type", manifest_info["type"])
|
|
if manifest_info.get("publishTime"):
|
|
mpd.set("publishTime", manifest_info["publishTime"])
|
|
if manifest_info.get("availabilityStartTime"):
|
|
mpd.set("availabilityStartTime", manifest_info["availabilityStartTime"])
|
|
if manifest_info.get("minimumUpdatePeriod"):
|
|
mpd.set("minimumUpdatePeriod", manifest_info["minimumUpdatePeriod"])
|
|
if manifest_info.get("minBufferTime"):
|
|
mpd.set("minBufferTime", manifest_info["minBufferTime"])
|
|
if manifest_info.get("timeShiftBufferDepth"):
|
|
mpd.set("timeShiftBufferDepth", manifest_info["timeShiftBufferDepth"])
|
|
if manifest_info.get("suggestedPresentationDelay"):
|
|
mpd.set(
|
|
"suggestedPresentationDelay", manifest_info["suggestedPresentationDelay"]
|
|
)
|
|
|
|
# Add UTCTiming element
|
|
utc_timing = ET.SubElement(mpd, "UTCTiming")
|
|
utc_timing.set("schemeIdUri", "urn:mpeg:dash:utc:http-iso:2014")
|
|
utc_timing.set("value", "https://time.akamai.com/?iso")
|
|
|
|
# Create periods
|
|
for period_info in manifest_info.get("periods", []):
|
|
period = ET.SubElement(mpd, "Period")
|
|
if period_info.get("id"):
|
|
period.set("id", period_info["id"])
|
|
if period_info.get("start"):
|
|
period.set("start", period_info["start"])
|
|
|
|
# Create adaptation sets
|
|
for adaptation_info in period_info.get("adaptation_sets", []):
|
|
generate_adaptation_set(period, adaptation_info)
|
|
|
|
return format_xml_custom(mpd)
|
|
|
|
|
|
def generate_adaptation_set(
|
|
parent: ET.Element, adaptation_info: Dict[str, Any]
|
|
) -> ET.Element:
|
|
"""Generate an AdaptationSet element from adaptation set information.
|
|
|
|
Args:
|
|
parent: The parent XML element.
|
|
adaptation_info: Dictionary containing adaptation set information.
|
|
|
|
Returns:
|
|
The created AdaptationSet element.
|
|
"""
|
|
adaptation_set = ET.SubElement(parent, "AdaptationSet")
|
|
|
|
if adaptation_info.get("id"):
|
|
adaptation_set.set("id", adaptation_info["id"])
|
|
if adaptation_info.get("group"):
|
|
adaptation_set.set("group", adaptation_info["group"])
|
|
if adaptation_info.get("segmentAlignment"):
|
|
adaptation_set.set("segmentAlignment", adaptation_info["segmentAlignment"])
|
|
if adaptation_info.get("startWithSAP"):
|
|
adaptation_set.set("startWithSAP", adaptation_info["startWithSAP"])
|
|
if adaptation_info.get("contentType"):
|
|
adaptation_set.set("contentType", adaptation_info["contentType"])
|
|
if adaptation_info.get("lang"):
|
|
adaptation_set.set("lang", adaptation_info["lang"])
|
|
|
|
for drm_info in adaptation_info.get("drm_info", []):
|
|
generate_content_protection(adaptation_set, drm_info)
|
|
|
|
# Add SupplementalProperty if it's a video adaptation set with group="1"
|
|
if (
|
|
adaptation_info.get("contentType") == "video"
|
|
and adaptation_info.get("group") == "1"
|
|
):
|
|
supplemental = ET.SubElement(adaptation_set, "SupplementalProperty")
|
|
supplemental.set("schemeIdUri", "urn:mpeg:dash:adaptation-set-switching:2016")
|
|
if adaptation_info.get("supplementalProperty"):
|
|
supplemental.set("value", adaptation_info["supplementalProperty"])
|
|
|
|
if adaptation_info.get("role"):
|
|
role = ET.SubElement(adaptation_set, "Role")
|
|
role.set("schemeIdUri", "urn:mpeg:dash:role:2011")
|
|
role.set("value", adaptation_info["role"])
|
|
|
|
for rep_info in adaptation_info.get("representations", []):
|
|
generate_representation(adaptation_set, rep_info)
|
|
|
|
return adaptation_set
|
|
|
|
|
|
def generate_content_protection(
|
|
parent: ET.Element, drm_info: Dict[str, Any]
|
|
) -> ET.Element:
|
|
"""Generate ContentProtection element from DRM information.
|
|
|
|
Args:
|
|
parent: The parent XML element.
|
|
drm_info: Dictionary containing DRM information.
|
|
|
|
Returns:
|
|
The created ContentProtection element.
|
|
"""
|
|
content_protection = ET.SubElement(parent, "ContentProtection")
|
|
|
|
if drm_info.get("schemeIdUri"):
|
|
content_protection.set("schemeIdUri", drm_info["schemeIdUri"])
|
|
|
|
if drm_info.get("pssh"):
|
|
pssh = ET.SubElement(content_protection, "{urn:mpeg:cenc:2013}pssh")
|
|
pssh.text = drm_info["pssh"]
|
|
|
|
if drm_info.get("value"):
|
|
content_protection.set("value", drm_info["value"])
|
|
if drm_info.get("default_KID"):
|
|
content_protection.set(
|
|
"{urn:mpeg:cenc:2013}default_KID", drm_info["default_KID"]
|
|
)
|
|
|
|
return content_protection
|
|
|
|
|
|
def generate_representation(parent: ET.Element, rep_info: Dict[str, Any]) -> ET.Element:
|
|
"""Generate Representation element from representation information.
|
|
|
|
Args:
|
|
parent: The parent XML element.
|
|
rep_info: Dictionary containing representation information.
|
|
|
|
Returns:
|
|
The created Representation element.
|
|
"""
|
|
representation = ET.SubElement(parent, "Representation")
|
|
|
|
if rep_info.get("id"):
|
|
representation.set("id", rep_info["id"])
|
|
if rep_info.get("bandwidth"):
|
|
representation.set("bandwidth", rep_info["bandwidth"])
|
|
if rep_info.get("codecs"):
|
|
representation.set("codecs", rep_info["codecs"])
|
|
if rep_info.get("mimeType"):
|
|
representation.set("mimeType", rep_info["mimeType"])
|
|
if rep_info.get("width"):
|
|
representation.set("width", rep_info["width"])
|
|
if rep_info.get("height"):
|
|
representation.set("height", rep_info["height"])
|
|
if rep_info.get("frameRate"):
|
|
representation.set("frameRate", rep_info["frameRate"])
|
|
|
|
segments = rep_info.get("segments", {})
|
|
if segments:
|
|
segment_template = ET.SubElement(representation, "SegmentTemplate")
|
|
|
|
if segments.get("timescale"):
|
|
segment_template.set("timescale", segments["timescale"])
|
|
if segments.get("initialization"):
|
|
segment_template.set("initialization", segments["initialization"])
|
|
if segments.get("media"):
|
|
segment_template.set("media", segments["media"])
|
|
|
|
timeline = segments.get("timeline", [])
|
|
if timeline:
|
|
segment_timeline = ET.SubElement(segment_template, "SegmentTimeline")
|
|
|
|
for timeline_info in timeline:
|
|
s_element = ET.SubElement(segment_timeline, "S")
|
|
t = timeline_info.get("t", 0)
|
|
if t != 0:
|
|
s_element.set("t", str(t))
|
|
d = timeline_info.get("d", 0)
|
|
if d != 0:
|
|
s_element.set("d", str(d))
|
|
r = timeline_info.get("r", 0)
|
|
if r != 0:
|
|
s_element.set("r", str(r))
|
|
|
|
return representation
|
|
|
|
|
|
def format_xml_custom(element: ET.Element) -> str:
|
|
"""Format XML element to match the original MPD style.
|
|
|
|
Args:
|
|
element: The XML element to format.
|
|
|
|
Returns:
|
|
Formatted XML string.
|
|
"""
|
|
rough_string = ET.tostring(element, encoding="unicode")
|
|
# Fix namespace prefix (ns0 -> cenc)
|
|
rough_string = rough_string.replace("ns0:", "cenc:")
|
|
|
|
dom = minidom.parseString(rough_string)
|
|
pretty_xml = dom.toprettyxml(indent="\t", encoding=None)
|
|
# Remove empty lines
|
|
lines = [line for line in pretty_xml.split("\n") if line.strip()]
|
|
|
|
return "\n".join(lines)
|