Files
OqeeRewind/utils/stream.py
√(noham)² b75027c750 Lint
2026-01-09 19:49:49 +01:00

763 lines
27 KiB
Python

"""Utility module for streaming and manifest parsing."""
import xml.etree.ElementTree as ET
import base64
import os
import asyncio
import time
from typing import Dict, Any
from xml.dom import minidom
import requests
import aiohttp
from tqdm.asyncio import tqdm
from utils.logging_config import logger
# Register namespaces to preserve prefixes in output
ET.register_namespace("", "urn:mpeg:dash:schema:mpd:2011")
ET.register_namespace("xsi", "http://www.w3.org/2001/XMLSchema-instance")
ET.register_namespace("cenc", "urn:mpeg:cenc:2013")
def parse_mpd_manifest(mpd_content: str) -> Dict[str, Any]:
"""Parse an MPD manifest and extract metadata.
Args:
mpd_content: The MPD manifest content as a string.
Returns:
A dictionary containing parsed manifest information.
"""
root = ET.fromstring(mpd_content)
namespaces = {"mpd": "urn:mpeg:dash:schema:mpd:2011", "cenc": "urn:mpeg:cenc:2013"}
manifest_info = {
"type": root.get("type"),
"profiles": root.get("profiles"),
"publishTime": root.get("publishTime"),
"availabilityStartTime": root.get("availabilityStartTime"),
"minimumUpdatePeriod": root.get("minimumUpdatePeriod"),
"minBufferTime": root.get("minBufferTime"),
"timeShiftBufferDepth": root.get("timeShiftBufferDepth"),
"suggestedPresentationDelay": root.get("suggestedPresentationDelay"),
"periods": [],
}
for period in root.findall("mpd:Period", namespaces):
period_info = {
"id": period.get("id"),
"start": period.get("start"),
"adaptation_sets": [],
}
for adaptation_set in period.findall("mpd:AdaptationSet", namespaces):
adaptation_info = parse_adaptation_set(adaptation_set, namespaces)
period_info["adaptation_sets"].append(adaptation_info)
manifest_info["periods"].append(period_info)
return manifest_info
def parse_adaptation_set(
adaptation_set: ET.Element, namespaces: Dict[str, str]
) -> Dict[str, Any]:
"""Parse an AdaptationSet element from MPD manifest.
Args:
adaptation_set: The AdaptationSet XML element.
namespaces: XML namespaces dictionary.
Returns:
A dictionary containing parsed adaptation set information.
"""
adaptation_info = {
"id": adaptation_set.get("id"),
"group": adaptation_set.get("group"),
"contentType": adaptation_set.get("contentType"),
"lang": adaptation_set.get("lang"),
"segmentAlignment": adaptation_set.get("segmentAlignment"),
"startWithSAP": adaptation_set.get("startWithSAP"),
"drm_info": [],
"representations": [],
}
# Parse ContentProtection
for content_protection in adaptation_set.findall(
"mpd:ContentProtection", namespaces
):
drm_info = parse_content_protection(content_protection, namespaces)
adaptation_info["drm_info"].append(drm_info)
# Parse Role
role = adaptation_set.find("mpd:Role", namespaces)
if role is not None:
adaptation_info["role"] = role.get("value")
# Parse Representations
for representation in adaptation_set.findall("mpd:Representation", namespaces):
rep_info = parse_representation(representation, namespaces)
adaptation_info["representations"].append(rep_info)
return adaptation_info
def parse_content_protection(
content_protection: ET.Element, namespaces: Dict[str, str]
) -> Dict[str, Any]:
"""Parse ContentProtection element for DRM information.
Args:
content_protection: The ContentProtection XML element.
namespaces: XML namespaces dictionary.
Returns:
A dictionary containing DRM information.
"""
drm_info = {
"schemeIdUri": content_protection.get("schemeIdUri"),
"value": content_protection.get("value"),
}
default_kid = content_protection.get("{urn:mpeg:cenc:2013}default_KID")
if default_kid:
drm_info["default_KID"] = default_kid
pssh_element = content_protection.find("cenc:pssh", namespaces)
if pssh_element is not None and pssh_element.text:
drm_info["pssh"] = pssh_element.text.strip()
try:
pssh_decoded = base64.b64decode(drm_info["pssh"])
drm_info["pssh_hex"] = pssh_decoded.hex()
except (ValueError, base64.binascii.Error):
pass
return drm_info
def parse_representation(
representation: ET.Element, namespaces: Dict[str, str]
) -> Dict[str, Any]:
"""Parse Representation element from MPD manifest.
Args:
representation: The Representation XML element.
namespaces: XML namespaces dictionary.
Returns:
A dictionary containing parsed representation information.
"""
rep_info = {
"id": representation.get("id"),
"bandwidth": representation.get("bandwidth"),
"codecs": representation.get("codecs"),
"mimeType": representation.get("mimeType"),
"width": representation.get("width"),
"height": representation.get("height"),
"frameRate": representation.get("frameRate"),
"segments": {},
}
segment_template = representation.find("mpd:SegmentTemplate", namespaces)
if segment_template is not None:
rep_info["segments"] = {
"timescale": segment_template.get("timescale"),
"initialization": segment_template.get("initialization"),
"media": segment_template.get("media"),
"timeline": [],
}
segment_timeline = segment_template.find("mpd:SegmentTimeline", namespaces)
if segment_timeline is not None:
for s_element in segment_timeline.findall("mpd:S", namespaces):
timeline_info = {
"t": (
int(s_element.get("t")) if s_element.get("t") is not None else 0
), # start time
"d": (
int(s_element.get("d")) if s_element.get("d") is not None else 0
), # duration
"r": (
int(s_element.get("r")) if s_element.get("r") is not None else 0
), # repeat count
}
rep_info["segments"]["timeline"].append(timeline_info)
return rep_info
# pylint: disable=too-many-locals,too-many-branches
def organize_by_content_type(manifest_info: Dict[str, Any]) -> Dict[str, Any]:
"""Organize manifest information by content type.
Args:
manifest_info: Parsed manifest information dictionary.
Returns:
A dictionary organized by content type (video, audio, text).
"""
organized = {
"video": {},
"audio": {},
# 'text': {},
# 'manifest_metadata': {
# 'type': manifest_info.get('type'),
# 'publishTime': manifest_info.get('publishTime'),
# 'minBufferTime': manifest_info.get('minBufferTime'),
# }
}
for period in manifest_info.get("periods", []):
for adaptation_set in period.get("adaptation_sets", []):
content_type = adaptation_set.get("contentType")
if not content_type:
continue
for rep in adaptation_set.get("representations", []):
track_info = {
"track_id": rep.get("id"),
"adaptation_set_id": adaptation_set.get("id"),
"bandwidth": int(rep.get("bandwidth", 0)),
"bitrate_kbps": int(rep.get("bandwidth", 0)) // 1000,
"codec": rep.get("codecs"),
"mime_type": rep.get("mimeType"),
"drm_info": adaptation_set.get("drm_info", []),
"segments": rep.get("segments", {}),
}
if content_type == "video":
width = rep.get("width")
height = rep.get("height")
frame_rate = rep.get("frameRate")
track_info.update(
{
"resolution": (
f"{width}x{height}" if width and height else "unknown"
),
"width": int(width) if width else None,
"height": int(height) if height else None,
"frame_rate": frame_rate,
}
)
resolution_key = track_info["resolution"]
if resolution_key not in organized["video"]:
organized["video"][resolution_key] = []
organized["video"][resolution_key].append(track_info)
elif content_type == "audio":
lang = adaptation_set.get("lang", "unknown")
role = adaptation_set.get("role", "main")
track_info.update(
{
"language": lang,
"role": role,
}
)
lang_key = f"{lang}_{role}"
if lang_key not in organized["audio"]:
organized["audio"][lang_key] = []
organized["audio"][lang_key].append(track_info)
# elif content_type == 'text':
# lang = adaptation_set.get('lang', 'unknown')
# role = adaptation_set.get('role', 'caption')
# track_info.update({
# 'language': lang,
# 'role': role,
# })
# lang_key = f"{lang}_{role}"
# if lang_key not in organized['text']:
# organized['text'][lang_key] = []
# organized['text'][lang_key].append(track_info)
# Sort video tracks by resolution (descending) and then by bitrate (descending)
for resolution in organized["video"]:
organized["video"][resolution].sort(key=lambda x: x["bandwidth"], reverse=True)
# Sort audio tracks by bitrate (descending)
for lang in organized["audio"]:
organized["audio"][lang].sort(key=lambda x: x["bandwidth"], reverse=True)
# Sort video resolutions by pixel count (descending)
sorted_video = {}
for resolution in sorted(
organized["video"].keys(),
key=lambda r: (
int(r.split("x")[0]) * int(r.split("x")[1])
if "x" in r and r.split("x")[0].isdigit()
else 0
),
reverse=True,
):
sorted_video[resolution] = organized["video"][resolution]
organized["video"] = sorted_video
return organized
def get_manifest(manifest_id):
"""Fetch the MPD manifest for a given channel ID.
Args:
manifest_id: The channel/manifest identifier.
Returns:
The manifest content as text.
"""
headers = {
"accept": "*/*",
"accept-language": "en-GB,en-US;q=0.9,en;q=0.8",
"cache-control": "no-cache",
"origin": "https://tv.free.fr",
"pragma": "no-cache",
"priority": "u=1, i",
"referer": "https://tv.free.fr/",
"sec-ch-ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"user-agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/143.0.0.0 Safari/537.36"
),
}
format_id = 1
url = (
"https://api-proxad.dc2.oqee.net/playlist/v1/live/"
f"{manifest_id}/{format_id}/live.mpd"
)
response = requests.get(url, headers=headers, timeout=10)
return response.text
async def fetch_segment(session, ticks, track_id):
"""Fetch a media segment asynchronously.
Args:
session: The aiohttp ClientSession.
ticks: The tick value for the segment.
track_id: The track identifier.
Returns:
The tick value if successful, None otherwise.
"""
url = f"https://media.stream.proxad.net/media/{track_id}_{ticks}"
headers = {
"Accept": "*/*",
"Referer": "https://tv.free.fr/",
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/143.0.0.0 Safari/537.36"
),
}
try:
async with session.get(url, headers=headers) as resp:
if resp.status == 200:
return ticks
return None
except aiohttp.ClientError:
return None
def get_init(output_folder, track_id):
"""Download and save the initialization segment for a track.
Args:
output_folder: The output folder path.
track_id: The track identifier.
"""
url = f"https://media.stream.proxad.net/media/{track_id}_init"
headers = {
"Accept": "*/*",
"Referer": "https://tv.free.fr/",
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/143.0.0.0 Safari/537.36"
),
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
os.makedirs(f"{output_folder}/segments_{track_id}", exist_ok=True)
init_path = f"{output_folder}/segments_{track_id}/init.mp4"
with open(init_path, "wb") as f:
f.write(response.content)
logger.debug("Saved initialization segment to %s", init_path)
return init_path
async def save_segments(
output_folder, track_id, start_tick, rep_nb, duration, batch_size=64
):
"""Download and save multiple media segments in batches.
Args:
output_folder: The output folder path.
track_id: The track identifier.
start_tick: The starting tick value.
rep_nb: The number of segments to download.
duration: The duration per segment.
batch_size: Number of concurrent downloads per batch (default: 16).
"""
os.makedirs(f"{output_folder}/segments_{track_id}", exist_ok=True)
async def download_segment(session, tick, rep):
"""Download a single segment.
Returns:
Tuple of (success: bool, tick: int, rep: int)
"""
url = f"https://media.stream.proxad.net/media/{track_id}_{tick}"
headers = {
"Accept": "*/*",
"Referer": "https://tv.free.fr/",
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/143.0.0.0 Safari/537.36"
),
}
try:
async with session.get(url, headers=headers) as resp:
if resp.status == 200:
content = await resp.read()
filename = f"{output_folder}/segments_{track_id}/{tick}.m4s"
with open(filename, "wb") as f:
f.write(content)
return (True, tick, rep)
logger.error(
"Failed to download segment %d (tick %d): HTTP %d",
rep,
tick,
resp.status,
)
return (False, tick, rep)
except aiohttp.ClientError as e:
logger.warning("Error downloading segment %d (tick %d): %s", rep, tick, e)
return (False, tick, rep)
logger.info(
"Starting download of %d segments in batches of %d...", rep_nb, batch_size
)
logger.debug("Track ID: %s", track_id)
logger.debug("Base tick: %d", start_tick)
start_time = time.time()
# Build list of all segments to download
segments_to_download = [(start_tick + i * duration, i) for i in range(rep_nb)]
retry_list = []
successful = 0
async with aiohttp.ClientSession() as session:
# Process segments in batches
with tqdm(
total=len(segments_to_download), desc="Downloading segments", unit="seg"
) as pbar:
for batch_start in range(0, len(segments_to_download), batch_size):
batch = segments_to_download[batch_start : batch_start + batch_size]
tasks = [download_segment(session, tick, rep) for tick, rep in batch]
results = await asyncio.gather(*tasks)
for success, tick, rep in results:
if success:
successful += 1
else:
retry_list.append((tick, rep))
pbar.update(1)
# Retry failed segments
if retry_list:
logger.info("Retrying %d failed segments...", len(retry_list))
retry_successful = 0
final_failures = []
with tqdm(
total=len(retry_list), desc="Retrying segments", unit="seg"
) as pbar:
for batch_start in range(0, len(retry_list), batch_size):
batch = retry_list[batch_start : batch_start + batch_size]
tasks = [
download_segment(session, tick, rep) for tick, rep in batch
]
results = await asyncio.gather(*tasks)
for success, tick, rep in results:
if success:
retry_successful += 1
successful += 1
else:
final_failures.append((tick, rep))
pbar.update(1)
if final_failures:
logger.warning(
"Failed to download %d segments after retry: %s",
len(final_failures),
[tick for tick, _ in final_failures],
)
else:
logger.info(
"All %d retried segments downloaded successfully", retry_successful
)
end_time = time.time()
elapsed = end_time - start_time
logger.debug("Download completed in %.2fs", elapsed)
logger.info("Successfully downloaded %d/%d segments", successful, rep_nb)
logger.info("Files saved to %s/segments_%s/", output_folder, track_id)
def get_kid(output_folder, track_id):
"""Extract the Key ID (KID) from downloaded segments.
Args:
output_folder: The output folder path.
track_id: The track identifier.
Returns:
The KID as a hex string if found, None otherwise.
"""
folder = f"{output_folder}/segments_{track_id}"
for filename in os.listdir(folder):
if filename.endswith(".m4s"):
filepath = os.path.join(folder, filename)
logger.debug("Checking file: %s", filepath)
with open(filepath, "rb") as f:
data = f.read()
# Pattern before KID
index = data.find(
b"\x73\x65\x69\x67\x00\x00\x00\x14"
b"\x00\x00\x00\x01\x00\x00\x01\x10"
)
if index != -1:
kid_bytes = data[index + 16 : index + 16 + 16]
kid = kid_bytes.hex()
return kid
return None
def generate_mpd_manifest(manifest_info: Dict[str, Any]) -> str:
"""Generate an MPD manifest from parsed manifest information.
Args:
manifest_info: A dictionary containing manifest information.
Returns:
The MPD manifest content as a string.
"""
mpd = ET.Element("MPD")
xsi_ns = "{http://www.w3.org/2001/XMLSchema-instance}"
mpd.set(f"{xsi_ns}schemaLocation", "urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd")
# Set MPD attributes
if manifest_info.get("profiles"):
mpd.set("profiles", manifest_info["profiles"])
if manifest_info.get("type"):
mpd.set("type", manifest_info["type"])
if manifest_info.get("publishTime"):
mpd.set("publishTime", manifest_info["publishTime"])
if manifest_info.get("availabilityStartTime"):
mpd.set("availabilityStartTime", manifest_info["availabilityStartTime"])
if manifest_info.get("minimumUpdatePeriod"):
mpd.set("minimumUpdatePeriod", manifest_info["minimumUpdatePeriod"])
if manifest_info.get("minBufferTime"):
mpd.set("minBufferTime", manifest_info["minBufferTime"])
if manifest_info.get("timeShiftBufferDepth"):
mpd.set("timeShiftBufferDepth", manifest_info["timeShiftBufferDepth"])
if manifest_info.get("suggestedPresentationDelay"):
mpd.set(
"suggestedPresentationDelay", manifest_info["suggestedPresentationDelay"]
)
# Add UTCTiming element
utc_timing = ET.SubElement(mpd, "UTCTiming")
utc_timing.set("schemeIdUri", "urn:mpeg:dash:utc:http-iso:2014")
utc_timing.set("value", "https://time.akamai.com/?iso")
# Create periods
for period_info in manifest_info.get("periods", []):
period = ET.SubElement(mpd, "Period")
if period_info.get("id"):
period.set("id", period_info["id"])
if period_info.get("start"):
period.set("start", period_info["start"])
# Create adaptation sets
for adaptation_info in period_info.get("adaptation_sets", []):
generate_adaptation_set(period, adaptation_info)
return format_xml_custom(mpd)
def generate_adaptation_set(
parent: ET.Element, adaptation_info: Dict[str, Any]
) -> ET.Element:
"""Generate an AdaptationSet element from adaptation set information.
Args:
parent: The parent XML element.
adaptation_info: Dictionary containing adaptation set information.
Returns:
The created AdaptationSet element.
"""
adaptation_set = ET.SubElement(parent, "AdaptationSet")
if adaptation_info.get("id"):
adaptation_set.set("id", adaptation_info["id"])
if adaptation_info.get("group"):
adaptation_set.set("group", adaptation_info["group"])
if adaptation_info.get("segmentAlignment"):
adaptation_set.set("segmentAlignment", adaptation_info["segmentAlignment"])
if adaptation_info.get("startWithSAP"):
adaptation_set.set("startWithSAP", adaptation_info["startWithSAP"])
if adaptation_info.get("contentType"):
adaptation_set.set("contentType", adaptation_info["contentType"])
if adaptation_info.get("lang"):
adaptation_set.set("lang", adaptation_info["lang"])
for drm_info in adaptation_info.get("drm_info", []):
generate_content_protection(adaptation_set, drm_info)
# Add SupplementalProperty if it's a video adaptation set with group="1"
if (
adaptation_info.get("contentType") == "video"
and adaptation_info.get("group") == "1"
):
supplemental = ET.SubElement(adaptation_set, "SupplementalProperty")
supplemental.set("schemeIdUri", "urn:mpeg:dash:adaptation-set-switching:2016")
if adaptation_info.get("supplementalProperty"):
supplemental.set("value", adaptation_info["supplementalProperty"])
if adaptation_info.get("role"):
role = ET.SubElement(adaptation_set, "Role")
role.set("schemeIdUri", "urn:mpeg:dash:role:2011")
role.set("value", adaptation_info["role"])
for rep_info in adaptation_info.get("representations", []):
generate_representation(adaptation_set, rep_info)
return adaptation_set
def generate_content_protection(
parent: ET.Element, drm_info: Dict[str, Any]
) -> ET.Element:
"""Generate ContentProtection element from DRM information.
Args:
parent: The parent XML element.
drm_info: Dictionary containing DRM information.
Returns:
The created ContentProtection element.
"""
content_protection = ET.SubElement(parent, "ContentProtection")
if drm_info.get("schemeIdUri"):
content_protection.set("schemeIdUri", drm_info["schemeIdUri"])
if drm_info.get("pssh"):
pssh = ET.SubElement(content_protection, "{urn:mpeg:cenc:2013}pssh")
pssh.text = drm_info["pssh"]
if drm_info.get("value"):
content_protection.set("value", drm_info["value"])
if drm_info.get("default_KID"):
content_protection.set(
"{urn:mpeg:cenc:2013}default_KID", drm_info["default_KID"]
)
return content_protection
def generate_representation(parent: ET.Element, rep_info: Dict[str, Any]) -> ET.Element:
"""Generate Representation element from representation information.
Args:
parent: The parent XML element.
rep_info: Dictionary containing representation information.
Returns:
The created Representation element.
"""
representation = ET.SubElement(parent, "Representation")
if rep_info.get("id"):
representation.set("id", rep_info["id"])
if rep_info.get("bandwidth"):
representation.set("bandwidth", rep_info["bandwidth"])
if rep_info.get("codecs"):
representation.set("codecs", rep_info["codecs"])
if rep_info.get("mimeType"):
representation.set("mimeType", rep_info["mimeType"])
if rep_info.get("width"):
representation.set("width", rep_info["width"])
if rep_info.get("height"):
representation.set("height", rep_info["height"])
if rep_info.get("frameRate"):
representation.set("frameRate", rep_info["frameRate"])
segments = rep_info.get("segments", {})
if segments:
segment_template = ET.SubElement(representation, "SegmentTemplate")
if segments.get("timescale"):
segment_template.set("timescale", segments["timescale"])
if segments.get("initialization"):
segment_template.set("initialization", segments["initialization"])
if segments.get("media"):
segment_template.set("media", segments["media"])
timeline = segments.get("timeline", [])
if timeline:
segment_timeline = ET.SubElement(segment_template, "SegmentTimeline")
for timeline_info in timeline:
s_element = ET.SubElement(segment_timeline, "S")
t = timeline_info.get("t", 0)
if t != 0:
s_element.set("t", str(t))
d = timeline_info.get("d", 0)
if d != 0:
s_element.set("d", str(d))
r = timeline_info.get("r", 0)
if r != 0:
s_element.set("r", str(r))
return representation
def format_xml_custom(element: ET.Element) -> str:
"""Format XML element to match the original MPD style.
Args:
element: The XML element to format.
Returns:
Formatted XML string.
"""
rough_string = ET.tostring(element, encoding="unicode")
# Fix namespace prefix (ns0 -> cenc)
rough_string = rough_string.replace("ns0:", "cenc:")
dom = minidom.parseString(rough_string)
pretty_xml = dom.toprettyxml(indent="\t", encoding=None)
# Remove empty lines
lines = [line for line in pretty_xml.split("\n") if line.strip()]
return "\n".join(lines)