diff --git a/utils/stream.py b/utils/stream.py index 447d0bd..f488127 100644 --- a/utils/stream.py +++ b/utils/stream.py @@ -12,6 +12,11 @@ import aiohttp from tqdm.asyncio import tqdm from utils.logging_config import logger +# Register namespaces to preserve prefixes in output +ET.register_namespace('', 'urn:mpeg:dash:schema:mpd:2011') +ET.register_namespace('xsi', 'http://www.w3.org/2001/XMLSchema-instance') +ET.register_namespace('cenc', 'urn:mpeg:cenc:2013') + def parse_mpd_manifest(mpd_content: str) -> Dict[str, Any]: """Parse an MPD manifest and extract metadata. @@ -527,3 +532,210 @@ def get_kid(output_folder, track_id): kid = kid_bytes.hex() return kid return None + + +def generate_mpd_manifest(manifest_info: Dict[str, Any]) -> str: + """Generate an MPD manifest from parsed manifest information. + + Args: + manifest_info: A dictionary containing manifest information. + + Returns: + The MPD manifest content as a string. + """ + mpd = ET.Element("MPD") + mpd.set("{http://www.w3.org/2001/XMLSchema-instance}schemaLocation", "urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd") + + # Set MPD attributes + if manifest_info.get("profiles"): + mpd.set("profiles", manifest_info["profiles"]) + if manifest_info.get("type"): + mpd.set("type", manifest_info["type"]) + if manifest_info.get("publishTime"): + mpd.set("publishTime", manifest_info["publishTime"]) + if manifest_info.get("availabilityStartTime"): + mpd.set("availabilityStartTime", manifest_info["availabilityStartTime"]) + if manifest_info.get("minimumUpdatePeriod"): + mpd.set("minimumUpdatePeriod", manifest_info["minimumUpdatePeriod"]) + if manifest_info.get("minBufferTime"): + mpd.set("minBufferTime", manifest_info["minBufferTime"]) + if manifest_info.get("timeShiftBufferDepth"): + mpd.set("timeShiftBufferDepth", manifest_info["timeShiftBufferDepth"]) + if manifest_info.get("suggestedPresentationDelay"): + mpd.set("suggestedPresentationDelay", manifest_info["suggestedPresentationDelay"]) + + # Add UTCTiming element + utc_timing = ET.SubElement(mpd, "UTCTiming") + utc_timing.set("schemeIdUri", "urn:mpeg:dash:utc:http-iso:2014") + utc_timing.set("value", "https://time.akamai.com/?iso") + + # Create periods + for period_info in manifest_info.get("periods", []): + period = ET.SubElement(mpd, "Period") + if period_info.get("id"): + period.set("id", period_info["id"]) + if period_info.get("start"): + period.set("start", period_info["start"]) + + # Create adaptation sets + for adaptation_info in period_info.get("adaptation_sets", []): + adaptation_set = generate_adaptation_set(period, adaptation_info) + + return format_xml_custom(mpd) + + +def generate_adaptation_set( + parent: ET.Element, adaptation_info: Dict[str, Any] +) -> ET.Element: + """Generate an AdaptationSet element from adaptation set information. + + Args: + parent: The parent XML element. + adaptation_info: Dictionary containing adaptation set information. + + Returns: + The created AdaptationSet element. + """ + adaptation_set = ET.SubElement(parent, "AdaptationSet") + + if adaptation_info.get("id"): + adaptation_set.set("id", adaptation_info["id"]) + if adaptation_info.get("group"): + adaptation_set.set("group", adaptation_info["group"]) + if adaptation_info.get("segmentAlignment"): + adaptation_set.set("segmentAlignment", adaptation_info["segmentAlignment"]) + if adaptation_info.get("startWithSAP"): + adaptation_set.set("startWithSAP", adaptation_info["startWithSAP"]) + if adaptation_info.get("contentType"): + adaptation_set.set("contentType", adaptation_info["contentType"]) + if adaptation_info.get("lang"): + adaptation_set.set("lang", adaptation_info["lang"]) + + for drm_info in adaptation_info.get("drm_info", []): + generate_content_protection(adaptation_set, drm_info) + + # Add SupplementalProperty if it's a video adaptation set with group="1" + if (adaptation_info.get("contentType") == "video" and + adaptation_info.get("group") == "1"): + supplemental = ET.SubElement(adaptation_set, "SupplementalProperty") + supplemental.set("schemeIdUri", "urn:mpeg:dash:adaptation-set-switching:2016") + if adaptation_info.get("supplementalProperty"): + supplemental.set("value", adaptation_info["supplementalProperty"]) + + if adaptation_info.get("role"): + role = ET.SubElement(adaptation_set, "Role") + role.set("schemeIdUri", "urn:mpeg:dash:role:2011") + role.set("value", adaptation_info["role"]) + + for rep_info in adaptation_info.get("representations", []): + generate_representation(adaptation_set, rep_info) + + return adaptation_set + + +def generate_content_protection( + parent: ET.Element, drm_info: Dict[str, Any] +) -> ET.Element: + """Generate ContentProtection element from DRM information. + + Args: + parent: The parent XML element. + drm_info: Dictionary containing DRM information. + + Returns: + The created ContentProtection element. + """ + content_protection = ET.SubElement(parent, "ContentProtection") + + if drm_info.get("schemeIdUri"): + content_protection.set("schemeIdUri", drm_info["schemeIdUri"]) + + if drm_info.get("pssh"): + pssh = ET.SubElement(content_protection, "{urn:mpeg:cenc:2013}pssh") + pssh.text = drm_info["pssh"] + + if drm_info.get("value"): + content_protection.set("value", drm_info["value"]) + if drm_info.get("default_KID"): + content_protection.set("{urn:mpeg:cenc:2013}default_KID", drm_info["default_KID"]) + + return content_protection + + +def generate_representation( + parent: ET.Element, rep_info: Dict[str, Any] +) -> ET.Element: + """Generate Representation element from representation information. + + Args: + parent: The parent XML element. + rep_info: Dictionary containing representation information. + + Returns: + The created Representation element. + """ + representation = ET.SubElement(parent, "Representation") + + if rep_info.get("id"): + representation.set("id", rep_info["id"]) + if rep_info.get("bandwidth"): + representation.set("bandwidth", rep_info["bandwidth"]) + if rep_info.get("codecs"): + representation.set("codecs", rep_info["codecs"]) + if rep_info.get("mimeType"): + representation.set("mimeType", rep_info["mimeType"]) + if rep_info.get("width"): + representation.set("width", rep_info["width"]) + if rep_info.get("height"): + representation.set("height", rep_info["height"]) + if rep_info.get("frameRate"): + representation.set("frameRate", rep_info["frameRate"]) + + segments = rep_info.get("segments", {}) + if segments: + segment_template = ET.SubElement(representation, "SegmentTemplate") + + if segments.get("timescale"): + segment_template.set("timescale", segments["timescale"]) + if segments.get("initialization"): + segment_template.set("initialization", segments["initialization"]) + if segments.get("media"): + segment_template.set("media", segments["media"]) + + timeline = segments.get("timeline", []) + if timeline: + segment_timeline = ET.SubElement(segment_template, "SegmentTimeline") + + for timeline_info in timeline: + s_element = ET.SubElement(segment_timeline, "S") + t = timeline_info.get("t", 0) + if t != 0: + s_element.set("t", str(t)) + d = timeline_info.get("d", 0) + if d != 0: + s_element.set("d", str(d)) + r = timeline_info.get("r", 0) + if r != 0: + s_element.set("r", str(r)) + + return representation + + +def format_xml_custom(element: ET.Element) -> str: + """Format XML element to match the original MPD style. + + Args: + element: The XML element to format. + + Returns: + Formatted XML string. + """ + rough_string = ET.tostring(element, encoding="unicode") + rough_string = rough_string.replace('ns0:', 'cenc:') # Fix namespace prefix + + from xml.dom import minidom + dom = minidom.parseString(rough_string) + pretty_xml = dom.toprettyxml(indent="\t", encoding=None) + lines = [line for line in pretty_xml.split('\n') if line.strip()] # Remove empty lines + + return '\n'.join(lines) \ No newline at end of file