From 2823fb6e2e2195ca8ad5441d311b986c3fe54d83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=88=9A=28noham=29=C2=B2?= <100566912+NohamR@users.noreply.github.com> Date: Sat, 20 Dec 2025 12:20:59 +0100 Subject: [PATCH] Lint and clean up codebase for consistency --- main.py | 144 +++++++++++++------- utils/downloader.py | 51 ++++--- utils/input.py | 181 ++++++++++++------------- utils/oqee.py | 203 ++++++++++++++-------------- utils/stream.py | 317 +++++++++++++++++++++++--------------------- utils/times.py | 74 +++++------ utils/utilities.py | 16 ++- 7 files changed, 518 insertions(+), 468 deletions(-) diff --git a/main.py b/main.py index c2b4e81..55abf81 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,11 @@ """Main module for Oqee channel selection and stream management.""" import os +import sys import argparse +import asyncio +import subprocess +import shutil from datetime import datetime, timedelta from dotenv import load_dotenv from utils.input import ( @@ -23,16 +27,11 @@ from utils.times import ( bruteforce, ) from utils.stream import save_segments, get_kid, get_init -import asyncio -import subprocess -import shutil load_dotenv() TIMESCALE = 90000 DURATION = 288000 -from pprint import pprint - def parse_arguments(): """Parse command line arguments.""" @@ -122,17 +121,17 @@ if __name__ == "__main__": start_date = datetime.strptime(args.start_date, "%Y-%m-%d %H:%M:%S") except ValueError: print("Invalid start-date format. Use YYYY-MM-DD HH:MM:SS") - exit(1) + sys.exit(1) if args.end_date and args.duration: print("Cannot specify both --end-date and --duration") - exit(1) + sys.exit(1) elif args.end_date: try: end_date = datetime.strptime(args.end_date, "%Y-%m-%d %H:%M:%S") except ValueError: print("Invalid end-date format. Use YYYY-MM-DD HH:MM:SS") - exit(1) + sys.exit(1) elif args.duration and start_date: # Parse duration HH:MM:SS try: @@ -141,28 +140,28 @@ if __name__ == "__main__": end_date = start_date + duration_td except ValueError: print("Invalid duration format. Use HH:MM:SS") - exit(1) + sys.exit(1) if not start_date: print("start-date is required in CLI mode") - exit(1) + sys.exit(1) if not end_date: print("Either end-date or duration is required in CLI mode") - exit(1) + sys.exit(1) keys = args.key or [] - # end = ".".join([args.video, args.audio]) if args.video and args.audio else "" - end = "" + # END_SUFFIX = ".".join([args.video, args.audio]) if args.video and args.audio else "" + END_SUFFIX = "" title = ( - args.title + end - or f"{args.channel_id}_{start_date.strftime('%Y%m%d_%H%M%S') + end}" + args.title + END_SUFFIX + or f"{args.channel_id}_{start_date.strftime('%Y%m%d_%H%M%S') + END_SUFFIX}" ) # Get stream selections selections = get_selection(args.channel_id, args.video, args.audio) if not selections: print("Erreur lors de la sélection des flux.") - exit(1) + sys.exit(1) print(f"Start date: {start_date}") print(f"End date: {end_date}") @@ -196,33 +195,54 @@ if __name__ == "__main__": title = title or f"{freebox_id}_{start_date.strftime('%Y%m%d_%H%M%S')}" keys = [] - - output_dir = os.getenv("OUTPUT_DIR") or (args.output_dir if cli_mode else "./download") - start_tick_user = int(convert_sec_to_ticks(convert_date_to_sec(start_date), TIMESCALE)) + output_dir = os.getenv("OUTPUT_DIR") or ( + args.output_dir if cli_mode else "./download" + ) + + start_tick_user = int( + convert_sec_to_ticks(convert_date_to_sec(start_date), TIMESCALE) + ) video_data = None audio_data = None - for content_type, sel in [("video", selections["video"]),("audio", selections["audio"]),]: + for content_type, sel in [ + ("video", selections["video"]), + ("audio", selections["audio"]), + ]: start_tick_manifest = sel["segments"]["timeline"][0]["t"] - manifest_date = convert_sec_to_date(convert_ticks_to_sec(start_tick_manifest, TIMESCALE)) + manifest_date = convert_sec_to_date( + convert_ticks_to_sec(start_tick_manifest, TIMESCALE) + ) init_segment = sel["segments"]["initialization"] track_id = init_segment.split("/")[-1].split("_init")[0] if start_date.date() == manifest_date.date(): - print("Date match between requested start date and manifest data, proceeding with download...") + print( + "Date match between requested start date and manifest data, proceeding with download..." + ) - start_tick, start_rep = find_nearest_tick_by_hour(start_tick_manifest, start_date, TIMESCALE, DURATION) - end_tick, end_rep = find_nearest_tick_by_hour(start_tick_manifest, end_date, TIMESCALE, DURATION) + start_tick, start_rep = find_nearest_tick_by_hour( + start_tick_manifest, start_date, TIMESCALE, DURATION + ) + end_tick, end_rep = find_nearest_tick_by_hour( + start_tick_manifest, end_date, TIMESCALE, DURATION + ) else: - print("Date mismatch between requested start date and manifest data, bruteforce method is needed.") + print( + "Date mismatch between requested start date and manifest data, bruteforce method is needed." + ) valid_ticks = asyncio.run(bruteforce(track_id, start_tick_user)) valid_tick = valid_ticks[0] - start_tick, start_rep = find_nearest_tick_by_hour(valid_tick, start_date, TIMESCALE, DURATION) - end_tick, end_rep = find_nearest_tick_by_hour(valid_tick, end_date, TIMESCALE, DURATION) + start_tick, start_rep = find_nearest_tick_by_hour( + valid_tick, start_date, TIMESCALE, DURATION + ) + end_tick, end_rep = find_nearest_tick_by_hour( + valid_tick, end_date, TIMESCALE, DURATION + ) rep_nb = (end_tick - start_tick) // DURATION + 1 print(f"Total segments to fetch for {content_type}: {rep_nb}") @@ -243,14 +263,20 @@ if __name__ == "__main__": track_id = data["track_id"] start_tick = data["start_tick"] rep_nb = data["rep_nb"] - asyncio.run(save_segments(output_dir, track_id, start_tick, rep_nb, DURATION)) + asyncio.run( + save_segments(output_dir, track_id, start_tick, rep_nb, DURATION) + ) # Merge video and audio video_file = f"{output_dir}/temp_video.mp4" audio_file = f"{output_dir}/temp_audio.mp4" data["file"] = video_file if content_type == "video" else audio_file - merge_segments(output_dir, track_id, video_file if content_type == "video" else audio_file) + merge_segments( + output_dir, + track_id, + video_file if content_type == "video" else audio_file, + ) kid = get_kid(output_dir, track_id) data["kid"] = kid @@ -265,12 +291,10 @@ if __name__ == "__main__": if len(missing_keys) > 0: method = {} - # api_url = os.getenv("API_URL") - # api_key = os.getenv("API_KEY") - api_url = None - api_key = None - if api_url and api_key: - method = {"method": "api", "api_url": api_url, "api_key": api_key} + API_URL = os.getenv("API_URL") or None + API_KEY = os.getenv("API_KEY") or None + if API_URL and API_KEY: + method = {"method": "api", "api_url": API_URL, "api_key": API_KEY} else: username = args.username or os.getenv("OQEE_USERNAME") password = args.password or os.getenv("OQEE_PASSWORD") @@ -295,7 +319,7 @@ if __name__ == "__main__": if k.split(":")[0] == kid: key = k break - + init_path = get_init(output_dir, track_id) dec_file = f"{output_dir}/dec_{content_type}.mp4" decrypt(file, init_path, dec_file, key) @@ -306,33 +330,50 @@ if __name__ == "__main__": start_tick_audio = audio_data["start_tick"] diff_start = start_tick_video - start_tick_audio diff_start_sec = convert_ticks_to_sec(diff_start, TIMESCALE) - + # ffmpeg -i "concat:init.mp4|merged_dec.m4s" -c copy output.mp4 command_ffmpeg = ( f'ffmpeg -i "concat:{output_dir}/segments_{track_id_video}/init.mp4|' f'{output_dir}/dec_video.mp4" -c copy {output_dir}/video.mp4' ) print("FFmpeg command:", command_ffmpeg) - subprocess.run(command_ffmpeg, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + command_ffmpeg, + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) command_ffmpeg = ( f'ffmpeg -i "concat:{output_dir}/segments_{track_id_audio}/init.mp4|' f'{output_dir}/dec_audio.mp4" -c copy {output_dir}/audio.mp4' ) print("FFmpeg command:", command_ffmpeg) - subprocess.run(command_ffmpeg, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + command_ffmpeg, + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) - command_merge = ( + COMMAND_MERGE = ( f"ffmpeg -i {output_dir}/video.mp4 -itsoffset {diff_start_sec} " f"-i {output_dir}/audio.mp4 -c copy -map 0:v -map 1:a {output_dir}/output.mp4" ) - print("Merge command:", command_merge) - subprocess.run(command_merge, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + print("Merge command:", COMMAND_MERGE) + subprocess.run( + COMMAND_MERGE, + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) - - final_output = f"{output_dir}/{title}.mp4" - shutil.move(f"{output_dir}/output.mp4", final_output) - print(f"Final output saved to {final_output}") + FINAL_OUTPUT = f"{output_dir}/{title}.mp4" + shutil.move(f"{output_dir}/output.mp4", FINAL_OUTPUT) + print(f"Final output saved to {FINAL_OUTPUT}") os.remove(f"{output_dir}/dec_video.mp4") os.remove(f"{output_dir}/dec_audio.mp4") @@ -343,10 +384,15 @@ if __name__ == "__main__": shutil.rmtree(f"{output_dir}/segments_{video_data['track_id']}") shutil.rmtree(f"{output_dir}/segments_{audio_data['track_id']}") - except KeyboardInterrupt: print("\n\nProgramme interrompu par l'utilisateur. Au revoir !") -# uv run python main.py --start-date "2025-01-01 12:00:00" --duration "01:00:00" --channel-id 536 --video "720+best" --audio best --title "Test" --key 5b1288b31b6a3f789a205614bbd7fac7:14980f2578eca20d78bd70601af21458 --key acacd48e12efbdbaa479b6d6dbf110b4:500af89b21d64c4833e107f26c424afb -# uv run python main.py --start-date "2025-12-19 12:00:00" --duration "00:01:00" --channel-id 536 --video "720+best" --audio best --title "Test" --key 5b1288b31b6a3f789a205614bbd7fac7:14980f2578eca20d78bd70601af21458 --key acacd48e12efbdbaa479b6d6dbf110b4:500af89b21d64c4833e107f26c424afb +# uv run python main.py --start-date "2025-01-01 12:00:00" --duration "01:00:00" \ +# --channel-id 536 --video "720+best" --audio best --title "Test" \ +# --key 5b1288b31b6a3f789a205614bbd7fac7:14980f2578eca20d78bd70601af21458 \ +# --key acacd48e12efbdbaa479b6d6dbf110b4:500af89b21d64c4833e107f26c424afb +# uv run python main.py --start-date "2025-12-19 12:00:00" --duration "00:01:00" \ +# --channel-id 536 --video "720+best" --audio best --title "Test" \ +# --key 5b1288b31b6a3f789a205614bbd7fac7:14980f2578eca20d78bd70601af21458 \ +# --key acacd48e12efbdbaa479b6d6dbf110b4:500af89b21d64c4833e107f26c424afb diff --git a/utils/downloader.py b/utils/downloader.py index 50a2319..6bdb9e7 100644 --- a/utils/downloader.py +++ b/utils/downloader.py @@ -1,7 +1,6 @@ -import os -import base64 -import requests +"""Module for fetching DRM keys and generating PSSH boxes.""" from uuid import UUID +import requests from pywidevine.cdm import Cdm from pywidevine.device import Device from pywidevine.pssh import PSSH @@ -14,13 +13,11 @@ def fetch_drm_keys(kid: str, api_url: str, api_key: str) -> str: Returns: The DRM key as a string. """ headers = { - 'Content-Type': 'application/json', - 'Api-Key': api_key, + "Content-Type": "application/json", + "Api-Key": api_key, } data = {"service": "oqee", "kid": kid} - response = requests.post( - api_url, headers=headers, json=data, timeout=10 - ) + response = requests.post(api_url, headers=headers, json=data, timeout=10) return response.json()["key"] @@ -42,31 +39,31 @@ def generate_pssh(kids: list[str]) -> PSSH: def get_keys(kids: list[str], method: dict) -> list[str]: """Retrieve DRM keys using the specified method.""" - if method["method"] == 'api': + if method["method"] == "api": print("Fetching DRM keys via API...") keys = [] for kid in kids: key = fetch_drm_keys(kid, method["api_url"], method["api_key"]) keys.append(f"{kid}:{key}") return keys - else: - print("Fetching DRM keys via Widevine CDM...") - client = method["client_class"] - device = Device.load(method["device_file"]) - cdm = Cdm.from_device(device) - session_id = cdm.open() - certificate = client.certificate() - cdm.set_service_certificate(session_id, certificate) + print("Fetching DRM keys via Widevine CDM...") + client = method["client_class"] - pssh_data = generate_pssh(kids) - challenge = cdm.get_license_challenge(session_id, pssh_data, privacy_mode=True) - license_data = client.license(challenge) + device = Device.load(method["device_file"]) + cdm = Cdm.from_device(device) + session_id = cdm.open() + certificate = client.certificate() + cdm.set_service_certificate(session_id, certificate) - cdm.parse_license(session_id, license_data) - keys = [] - for key in cdm.get_keys(session_id): - if key.type=='CONTENT': - keys.append(f"{key.kid.hex}:{key.key.hex()}") - cdm.close(session_id) - return keys \ No newline at end of file + pssh_data = generate_pssh(kids) + challenge = cdm.get_license_challenge(session_id, pssh_data, privacy_mode=True) + license_data = client.license(challenge) + + cdm.parse_license(session_id, license_data) + keys = [] + for key in cdm.get_keys(session_id): + if key.type == "CONTENT": + keys.append(f"{key.kid.hex}:{key.key.hex()}") + cdm.close(session_id) + return keys diff --git a/utils/input.py b/utils/input.py index 46f4ec7..b4b06dd 100644 --- a/utils/input.py +++ b/utils/input.py @@ -1,4 +1,5 @@ """Input utilities for user prompts and channel/stream selection.""" + import datetime import requests from prompt_toolkit.validation import Validator, ValidationError @@ -6,11 +7,7 @@ from InquirerPy import prompt from InquirerPy.validator import EmptyInputValidator from InquirerPy.base.control import Choice -from utils.stream import ( - get_manifest, - parse_mpd_manifest, - organize_by_content_type -) +from utils.stream import get_manifest, parse_mpd_manifest, organize_by_content_type SERVICE_PLAN_API_URL = "https://api.oqee.net/api/v6/service_plan" EPG_API_URL = "https://api.oqee.net/api/v1/epg/all/{unix}" @@ -20,6 +17,7 @@ class DatetimeValidator(Validator): """ Validateur personnalisé pour les chaînes datetime au format "YYYY-MM-DD HH:MM:SS". """ + def validate(self, document): try: datetime.datetime.strptime(document.text, "%Y-%m-%d %H:%M:%S") @@ -34,8 +32,9 @@ class DurationValidator(Validator): """ Validateur personnalisé pour les chaînes de durée au format "HH:MM:SS". """ + def validate(self, document): - parts = document.text.split(':') + parts = document.text.split(":") if len(parts) != 3: raise ValidationError( message="Veuillez entrer la durée au format HH:MM:SS", @@ -44,7 +43,9 @@ class DurationValidator(Validator): try: _, m, s = [int(part) for part in parts] if not (0 <= m < 60 and 0 <= s < 60): - raise ValueError("Les minutes et les secondes doivent être entre 0 et 59.") + raise ValueError( + "Les minutes et les secondes doivent être entre 0 et 59." + ) except ValueError as exc: raise ValidationError( message="Format invalide. Utilisez HH:MM:SS avec des nombres valides.", @@ -71,7 +72,9 @@ def get_date_input(): start_date_result = prompt(question_start_date) if start_date_result: - start_date = datetime.datetime.strptime(start_date_result["datetime"], "%Y-%m-%d %H:%M:%S") + start_date = datetime.datetime.strptime( + start_date_result["datetime"], "%Y-%m-%d %H:%M:%S" + ) print(f"Date/heure de début : {start_date}") question_end_date = [ @@ -94,7 +97,8 @@ def get_date_input(): "message": "Entrez une date/heure de fin (YYYY-MM-DD HH:MM:SS):", "name": "datetime", "default": ( - start_date_result["datetime"] if start_date_result + start_date_result["datetime"] + if start_date_result else "2025-01-01 12:00:00" ), "validate": DatetimeValidator(), @@ -108,7 +112,7 @@ def get_date_input(): if end_date_result.get("duration"): duration_str = end_date_result["duration"] try: - h, m, s = map(int, duration_str.split(':')) + h, m, s = map(int, duration_str.split(":")) duration_td = datetime.timedelta(hours=h, minutes=m, seconds=s) end_date = start_date + duration_td print(f"\nDate/heure de fin : {end_date}") @@ -144,13 +148,10 @@ def select_oqee_channel(): channels_data = data["result"]["channels"] choices = [ - { - "name": f"{channel_info.get('name', 'Nom inconnu')}", - "value": channel_id - } + {"name": f"{channel_info.get('name', 'Nom inconnu')}", "value": channel_id} for channel_id, channel_info in channels_data.items() ] - choices.sort(key=lambda x: x['name']) + choices.sort(key=lambda x: x["name"]) except requests.exceptions.RequestException as e: print(f"Une erreur réseau est survenue : {e}") @@ -204,7 +205,7 @@ def prompt_for_stream_selection(stream_info, already_selected_types): { "type": "list", "message": "Quel type de flux souhaitez-vous sélectionner ?", - "choices": content_type_choices + "choices": content_type_choices, } ] result = prompt(questions) @@ -218,7 +219,7 @@ def prompt_for_stream_selection(stream_info, already_selected_types): { "type": "list", "message": f"Choisissez une qualité pour '{selected_type}':", - "choices": list(selected_content_data.keys()) + "choices": list(selected_content_data.keys()), } ] result = prompt(questions) @@ -239,7 +240,7 @@ def prompt_for_stream_selection(stream_info, already_selected_types): f"Bitrate: {s.get('bitrate_kbps')} kbps | " f"Codec: {s.get('codec', 'N/A')} | ID: {s.get('track_id')}" ), - "value": s + "value": s, } for s in available_streams ] @@ -247,7 +248,7 @@ def prompt_for_stream_selection(stream_info, already_selected_types): { "type": "list", "message": "Plusieurs flux sont disponibles, choisissez-en un :", - "choices": stream_choices + "choices": stream_choices, } ] result = prompt(questions) @@ -255,7 +256,7 @@ def prompt_for_stream_selection(stream_info, already_selected_types): return None final_selection = result[0] - final_selection['content_type'] = selected_type + final_selection["content_type"] = selected_type return final_selection except (KeyboardInterrupt, TypeError): @@ -277,7 +278,7 @@ def stream_selection(): print(f" - Nom : {selected_channel.get('name')}") print(f" - ID : {selected_channel.get('id')}") - dash_id = selected_channel.get('streams', {}).get('dash') + dash_id = selected_channel.get("streams", {}).get("dash") if not dash_id: print("Aucun flux DASH trouvé pour cette chaîne.") return None @@ -289,18 +290,16 @@ def stream_selection(): final_selections = {} while True: - selection = prompt_for_stream_selection( - organized_info, final_selections.keys() - ) + selection = prompt_for_stream_selection(organized_info, final_selections.keys()) if selection: - content_type = selection.pop('content_type') + content_type = selection.pop("content_type") final_selections[content_type] = selection print("\n--- Récapitulatif de votre sélection ---") for stream_type, details in final_selections.items(): - bitrate = details.get('bitrate_kbps') - track_id = details.get('track_id') + bitrate = details.get("bitrate_kbps") + track_id = details.get("track_id") print( f" - {stream_type.capitalize()}: " f"Bitrate {bitrate} kbps (ID: {track_id})" @@ -311,29 +310,23 @@ def stream_selection(): { "type": "list", "message": "Que souhaitez-vous faire ?", - "choices": [ - "Sélectionner un autre flux", - "Terminer et continuer" - ], + "choices": ["Sélectionner un autre flux", "Terminer et continuer"], } ] action_result = prompt(continue_prompt) - if ( - not action_result or - action_result[0] == "Terminer et continuer" - ): + if not action_result or action_result[0] == "Terminer et continuer": break if final_selections: - final_selections['channel'] = selected_channel + final_selections["channel"] = selected_channel return final_selections print("\nAucun flux n'a été sélectionné.") return None -def get_selection(channel_id, video_quality='best', audio_quality='best'): +def get_selection(channel_id, video_quality="best", audio_quality="best"): """Get stream selection for a given channel ID with specified qualities. Args: @@ -367,9 +360,11 @@ def get_selection(channel_id, video_quality='best', audio_quality='best'): print("Erreur lors de l'analyse de la réponse JSON.") return None - print(f"Chaîne sélectionnée : {selected_channel_details.get('name')} (ID: {channel_id})") + print( + f"Chaîne sélectionnée : {selected_channel_details.get('name')} (ID: {channel_id})" + ) - dash_id = selected_channel_details.get('streams', {}).get('dash') + dash_id = selected_channel_details.get("streams", {}).get("dash") if not dash_id: print("Aucun flux DASH trouvé pour cette chaîne.") return None @@ -379,19 +374,19 @@ def get_selection(channel_id, video_quality='best', audio_quality='best'): organized_info = organize_by_content_type(manifest_info) final_selections = {} - final_selections['channel'] = selected_channel_details + final_selections["channel"] = selected_channel_details # Select video - if 'video' in organized_info: - selected_track = select_track(organized_info['video'], video_quality, 'video') + if "video" in organized_info: + selected_track = select_track(organized_info["video"], video_quality, "video") if selected_track: - final_selections['video'] = selected_track + final_selections["video"] = selected_track # Select audio - if 'audio' in organized_info: - selected_track = select_track(organized_info['audio'], audio_quality, 'audio') + if "audio" in organized_info: + selected_track = select_track(organized_info["audio"], audio_quality, "audio") if selected_track: - final_selections['audio'] = selected_track + final_selections["audio"] = selected_track return final_selections @@ -407,11 +402,11 @@ def select_track(content_dict, quality_spec, content_type): Returns: dict: Selected track or None. """ - if '+' in quality_spec: - filter_part, pref = quality_spec.split('+', 1) + if "+" in quality_spec: + filter_part, pref = quality_spec.split("+", 1) pref = pref.lower() else: - filter_part = '' + filter_part = "" pref = quality_spec.lower() candidates = [] @@ -424,22 +419,24 @@ def select_track(content_dict, quality_spec, content_type): print(f"Aucune piste {content_type} trouvée pour '{quality_spec}'.") return None - if pref == 'best': - selected = max(candidates, key=lambda x: x['bandwidth']) - elif pref == 'worst': - selected = min(candidates, key=lambda x: x['bandwidth']) + if pref == "best": + selected = max(candidates, key=lambda x: x["bandwidth"]) + elif pref == "worst": + selected = min(candidates, key=lambda x: x["bandwidth"]) else: # Default to best if unknown pref - selected = max(candidates, key=lambda x: x['bandwidth']) + selected = max(candidates, key=lambda x: x["bandwidth"]) - print(f"{content_type.capitalize()} sélectionnée : {selected['track_id']}, {selected['bitrate_kbps']} kbps") + print( + f"{content_type.capitalize()} sélectionnée : {selected['track_id']}, {selected['bitrate_kbps']} kbps" + ) return selected def get_epg_data_at(dt: datetime.datetime): """ Fetch EPG data from the Oqee API for the nearest aligned hour of a given datetime. - + Args: dt (datetime.datetime): datetime (with hour, minute, etc.) @@ -449,7 +446,9 @@ def get_epg_data_at(dt: datetime.datetime): # Round to nearest hour if dt.minute >= 30: - dt_aligned = (dt + datetime.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + dt_aligned = (dt + datetime.timedelta(hours=1)).replace( + minute=0, second=0, microsecond=0 + ) else: dt_aligned = dt.replace(minute=0, second=0, microsecond=0) @@ -474,12 +473,12 @@ def get_epg_data_at(dt: datetime.datetime): def select_program_from_epg(programs, original_start_date, original_end_date): """ Prompt user to select a program from EPG data or keep original selection. - + Args: programs (list): List of program dictionaries from EPG data original_start_date (datetime.datetime): User's original start date selection original_end_date (datetime.datetime): User's original end date selection - + Returns: dict: Dictionary containing: - 'start_date': datetime object for start @@ -490,10 +489,10 @@ def select_program_from_epg(programs, original_start_date, original_end_date): if not programs: print("Aucun programme disponible dans le guide EPG.") return { - 'start_date': original_start_date, - 'end_date': original_end_date, - 'title': None, - 'program': None + "start_date": original_start_date, + "end_date": original_end_date, + "title": None, + "program": None, } # Create choices list with program information @@ -501,29 +500,31 @@ def select_program_from_epg(programs, original_start_date, original_end_date): for program in programs: # Extract the live data from the program live_data = program.get("live", program) - title = live_data.get('title', 'Sans titre') - start_time = datetime.datetime.fromtimestamp(live_data.get('start', 0)) - end_time = datetime.datetime.fromtimestamp(live_data.get('end', 0)) + title = live_data.get("title", "Sans titre") + start_time = datetime.datetime.fromtimestamp(live_data.get("start", 0)) + end_time = datetime.datetime.fromtimestamp(live_data.get("end", 0)) duration_min = (end_time - start_time).total_seconds() / 60 choice_name = ( f"{start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')} | " f"{title} ({int(duration_min)} min)" ) - program_choices.append({ - "name": choice_name, - "value": program # Store the full program object - }) + program_choices.append( + {"name": choice_name, "value": program} # Store the full program object + ) # Add option to keep original selection - program_choices.insert(0, { - "name": ( - f"Garder la sélection manuelle originale " - f"({original_start_date.strftime('%Y-%m-%d %H:%M:%S')} - " - f"{original_end_date.strftime('%Y-%m-%d %H:%M:%S')})" - ), - "value": None - }) + program_choices.insert( + 0, + { + "name": ( + f"Garder la sélection manuelle originale " + f"({original_start_date.strftime('%Y-%m-%d %H:%M:%S')} - " + f"{original_end_date.strftime('%Y-%m-%d %H:%M:%S')})" + ), + "value": None, + }, + ) questions = [ { @@ -545,17 +546,17 @@ def select_program_from_epg(programs, original_start_date, original_end_date): if selected_program is None: print("\n✅ Sélection manuelle conservée") return { - 'start_date': original_start_date, - 'end_date': original_end_date, - 'title': None, - 'program': None + "start_date": original_start_date, + "end_date": original_end_date, + "title": None, + "program": None, } # Extract live data and convert program timestamps to datetime objects - live_data = selected_program.get('live', selected_program) - program_start = datetime.datetime.fromtimestamp(live_data.get('start', 0)) - program_end = datetime.datetime.fromtimestamp(live_data.get('end', 0)) - program_title = live_data.get('title', 'Sans titre') + live_data = selected_program.get("live", selected_program) + program_start = datetime.datetime.fromtimestamp(live_data.get("start", 0)) + program_end = datetime.datetime.fromtimestamp(live_data.get("end", 0)) + program_title = live_data.get("title", "Sans titre") print("\n✅ Programme sélectionné :") print(f" - Titre : {program_title}") @@ -563,10 +564,10 @@ def select_program_from_epg(programs, original_start_date, original_end_date): print(f" - Fin : {program_end.strftime('%Y-%m-%d %H:%M:%S')}") return { - 'start_date': program_start, - 'end_date': program_end, - 'title': program_title, - 'program': selected_program + "start_date": program_start, + "end_date": program_end, + "title": program_title, + "program": selected_program, } except KeyboardInterrupt: diff --git a/utils/oqee.py b/utils/oqee.py index e233c96..3f99f99 100644 --- a/utils/oqee.py +++ b/utils/oqee.py @@ -1,4 +1,5 @@ """OQEE streaming service client for authentication and content access.""" + import base64 from urllib.parse import urlparse, parse_qs import requests @@ -7,6 +8,7 @@ from dotenv import load_dotenv load_dotenv() + class OqeeClient: # pylint: disable=too-many-instance-attributes """ Service code for OQEE streaming service (https://oqee.com). @@ -21,20 +23,20 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes # Base headers template for API requests self._headers_template = { - 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', - 'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8', - 'cache-control': 'no-cache', - 'pragma': 'no-cache', - 'priority': 'u=0, i', - 'sec-ch-ua': '"Not(A:Brand";v="8", "Chromium";v="144", "Google Chrome";v="144"', - 'sec-ch-ua-mobile': '?0', - 'sec-ch-ua-platform': '"macOS"', - 'sec-fetch-dest': 'document', - 'sec-fetch-mode': 'navigate', - 'sec-fetch-site': 'none', - 'sec-fetch-user': '?1', - 'upgrade-insecure-requests': '1', - 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36', + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "accept-language": "en-GB,en-US;q=0.9,en;q=0.8", + "cache-control": "no-cache", + "pragma": "no-cache", + "priority": "u=0, i", + "sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Google Chrome";v="144"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"macOS"', + "sec-fetch-dest": "document", + "sec-fetch-mode": "navigate", + "sec-fetch-site": "none", + "sec-fetch-user": "?1", + "upgrade-insecure-requests": "1", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", } self.headers_base = self._build_headers() @@ -51,18 +53,14 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes self.configure(username, password) - def certificate(self, **_): """ Get the Service Privacy Certificate. """ response = self.session.post( - url=self.lic_url, - headers=self.headers_auth, - json={"licenseRequest": "CAQ="} + url=self.lic_url, headers=self.headers_auth, json={"licenseRequest": "CAQ="} ) - return response.json()['result']['license'] - + return response.json()["result"]["license"] def license(self, challenge, **_): """ @@ -72,19 +70,19 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes response = self.session.post( url=self.lic_url, headers=self.headers_auth, - json={'licenseRequest': license_request} + json={"licenseRequest": license_request}, ) if not response.json()["success"]: - raise ValueError(f"License request failed: {response.json()['error']['msg']}") - return response.json()['result']['license'] - + raise ValueError( + f"License request failed: {response.json()['error']['msg']}" + ) + return response.json()["result"]["license"] def configure(self, username, password): """Configure the client by logging in and processing title information.""" print("Logging in") self.login(username, password) - def _build_headers(self, overrides=None, remove=None): """Clone default headers and apply optional overrides/removals.""" headers = self._headers_template.copy() @@ -95,118 +93,115 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes headers.pop(key, None) return headers - def right(self): """ Get user rights token from Oqee API. """ headers = self._build_headers( - overrides={'authorization': f'Bearer {self.access_token}'} + overrides={"authorization": f"Bearer {self.access_token}"} ) data = self.session.get( - 'https://api.oqee.net/api/v3/user/rights_proxad', - headers=headers + "https://api.oqee.net/api/v3/user/rights_proxad", headers=headers ).json() - return data['result']['token'] - + return data["result"]["token"] def profil(self): """ Gets the first profile ID from the OQEE API. """ headers = self._build_headers( - overrides={'authorization': f'Bearer {self.access_token}'} + overrides={"authorization": f"Bearer {self.access_token}"} ) data = self.session.get( - 'https://api.oqee.net/api/v2/user/profiles', - headers=headers + "https://api.oqee.net/api/v2/user/profiles", headers=headers ).json() print("Selecting first profile by default.") - return data['result'][0]['id'] - + return data["result"][0]["id"] def login_cred(self, username, password): """Authenticate with OQEE service using Free account credentials.""" - headers = self._build_headers(overrides={ - 'accept-language': 'fr-FR,fr;q=0.8', - 'cache-control': 'no-cache', - 'pragma': 'no-cache', - 'priority': 'u=1, i', - 'sec-ch-ua': '"Brave";v="131", "Chromium";v="131", "Not_A Brand";v="24"', - 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', - 'x-oqee-customization': '0', - }) - data = {"provider":"free","platform":"web"} - response = self.session.post('https://api.oqee.net/api/v2/user/oauth/init', headers=headers, json=data).json() - redirect_url = response['result']['redirect_url'] + headers = self._build_headers( + overrides={ + "accept-language": "fr-FR,fr;q=0.8", + "cache-control": "no-cache", + "pragma": "no-cache", + "priority": "u=1, i", + "sec-ch-ua": '"Brave";v="131", "Chromium";v="131", "Not_A Brand";v="24"', + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "x-oqee-customization": "0", + } + ) + data = {"provider": "free", "platform": "web"} + response = self.session.post( + "https://api.oqee.net/api/v2/user/oauth/init", headers=headers, json=data + ).json() + redirect_url = response["result"]["redirect_url"] r = parse_qs(urlparse(redirect_url).query) - client_id = r['client_id'][0] - redirect_uri = r['redirect_uri'][0] - state = r['state'][0] + client_id = r["client_id"][0] + redirect_uri = r["redirect_uri"][0] + state = r["state"][0] headers = { - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9, image/avif,image/webp,image/apng,*/*;q=0.8', - 'Accept-Language': 'fr-FR,fr;q=0.7', - 'Cache-Control': 'max-age=0', - 'Connection': 'keep-alive', - 'Content-Type': 'application/x-www-form-urlencoded', - 'Origin': 'https://subscribe.free.fr', - 'Referer': 'https://subscribe.free.fr/auth/auth.pl?', - 'Sec-Fetch-Dest': 'document', - 'Sec-Fetch-Mode': 'navigate', - 'Sec-Fetch-Site': 'same-origin', - 'Sec-Fetch-User': '?1', - 'Sec-GPC': '1', - 'Upgrade-Insecure-Requests': '1', - 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', - 'sec-ch-ua': '"Brave";v="123", "Not:A-Brand";v="8", "Chromium";v="123"', - 'sec-ch-ua-mobile': '?0', - 'sec-ch-ua-platform': '"macOS"', + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9, image/avif,image/webp,image/apng,*/*;q=0.8", + "Accept-Language": "fr-FR,fr;q=0.7", + "Cache-Control": "max-age=0", + "Connection": "keep-alive", + "Content-Type": "application/x-www-form-urlencoded", + "Origin": "https://subscribe.free.fr", + "Referer": "https://subscribe.free.fr/auth/auth.pl?", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "same-origin", + "Sec-Fetch-User": "?1", + "Sec-GPC": "1", + "Upgrade-Insecure-Requests": "1", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "sec-ch-ua": '"Brave";v="123", "Not:A-Brand";v="8", "Chromium";v="123"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"macOS"', } data = { - 'login': username, - 'pass': password, - 'ok': 'Se connecter', - 'client_id': client_id, - 'ressource': '', - 'response_type': 'code', - 'redirect_uri': redirect_uri, - 'state': state + "login": username, + "pass": password, + "ok": "Se connecter", + "client_id": client_id, + "ressource": "", + "response_type": "code", + "redirect_uri": redirect_uri, + "state": state, } - r = self.session.post('https://subscribe.free.fr/auth/auth.pl', headers=headers, data=data) + r = self.session.post( + "https://subscribe.free.fr/auth/auth.pl", headers=headers, data=data + ) parsed_url = parse_qs(urlparse(r.url).query) - if 'result' not in parsed_url: - raise ValueError("Login failed: invalid credentials or error in authentication") - token = parsed_url['result'][0] + if "result" not in parsed_url: + raise ValueError( + "Login failed: invalid credentials or error in authentication" + ) + token = parsed_url["result"][0] headers = self._build_headers( - overrides={'x-oqee-customization': '0'}, - remove=('x-oqee-account-provider',) + overrides={"x-oqee-customization": "0"}, remove=("x-oqee-account-provider",) ) data = self.session.post( - 'https://api.oqee.net/api/v5/user/login', + "https://api.oqee.net/api/v5/user/login", headers=headers, - json={'type': 'freeoa', 'token': token} + json={"type": "freeoa", "token": token}, ).json() - return data['result']['token'] - + return data["result"]["token"] def login_ip(self): """ Performs IP-based authentication with the OQEE service. """ headers = self._build_headers( - overrides={'x-oqee-customization': '0'}, - remove=('x-oqee-account-provider',) + overrides={"x-oqee-customization": "0"}, remove=("x-oqee-account-provider",) ) data = {"type": "ip"} data = self.session.post( - 'https://api.oqee.net/api/v5/user/login', - headers=headers, - json=data + "https://api.oqee.net/api/v5/user/login", headers=headers, json=data ).json() - return data['result']['token'] - + return data["result"]["token"] def login(self, username, password): """ @@ -228,13 +223,17 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes print("Fetching profile ID") self.profil_id = self.profil() - self.headers = self._build_headers(overrides={ - 'x-fbx-rights-token': self.right_token, - 'x-oqee-profile': self.profil_id, - }) + self.headers = self._build_headers( + overrides={ + "x-fbx-rights-token": self.right_token, + "x-oqee-profile": self.profil_id, + } + ) - self.headers_auth = self._build_headers(overrides={ - 'x-fbx-rights-token': self.right_token, - 'x-oqee-profile': self.profil_id, - 'authorization': f'Bearer {self.access_token}', - }) + self.headers_auth = self._build_headers( + overrides={ + "x-fbx-rights-token": self.right_token, + "x-oqee-profile": self.profil_id, + "authorization": f"Bearer {self.access_token}", + } + ) diff --git a/utils/stream.py b/utils/stream.py index d06cf4a..a6bec48 100644 --- a/utils/stream.py +++ b/utils/stream.py @@ -1,4 +1,5 @@ """Utility module for streaming and manifest parsing.""" + import xml.etree.ElementTree as ET import base64 import os @@ -22,33 +23,30 @@ def parse_mpd_manifest(mpd_content: str) -> Dict[str, Any]: A dictionary containing parsed manifest information. """ root = ET.fromstring(mpd_content) - namespaces = { - 'mpd': 'urn:mpeg:dash:schema:mpd:2011', - 'cenc': 'urn:mpeg:cenc:2013' - } + namespaces = {"mpd": "urn:mpeg:dash:schema:mpd:2011", "cenc": "urn:mpeg:cenc:2013"} manifest_info = { - 'type': root.get('type'), - 'profiles': root.get('profiles'), - 'publishTime': root.get('publishTime'), - 'availabilityStartTime': root.get('availabilityStartTime'), - 'minimumUpdatePeriod': root.get('minimumUpdatePeriod'), - 'minBufferTime': root.get('minBufferTime'), - 'timeShiftBufferDepth': root.get('timeShiftBufferDepth'), - 'suggestedPresentationDelay': root.get('suggestedPresentationDelay'), - 'periods': [] + "type": root.get("type"), + "profiles": root.get("profiles"), + "publishTime": root.get("publishTime"), + "availabilityStartTime": root.get("availabilityStartTime"), + "minimumUpdatePeriod": root.get("minimumUpdatePeriod"), + "minBufferTime": root.get("minBufferTime"), + "timeShiftBufferDepth": root.get("timeShiftBufferDepth"), + "suggestedPresentationDelay": root.get("suggestedPresentationDelay"), + "periods": [], } - for period in root.findall('mpd:Period', namespaces): + for period in root.findall("mpd:Period", namespaces): period_info = { - 'id': period.get('id'), - 'start': period.get('start'), - 'adaptation_sets': [] + "id": period.get("id"), + "start": period.get("start"), + "adaptation_sets": [], } - for adaptation_set in period.findall('mpd:AdaptationSet', namespaces): + for adaptation_set in period.findall("mpd:AdaptationSet", namespaces): adaptation_info = parse_adaptation_set(adaptation_set, namespaces) - period_info['adaptation_sets'].append(adaptation_info) - manifest_info['periods'].append(period_info) + period_info["adaptation_sets"].append(adaptation_info) + manifest_info["periods"].append(period_info) return manifest_info @@ -65,32 +63,32 @@ def parse_adaptation_set( A dictionary containing parsed adaptation set information. """ adaptation_info = { - 'id': adaptation_set.get('id'), - 'group': adaptation_set.get('group'), - 'contentType': adaptation_set.get('contentType'), - 'lang': adaptation_set.get('lang'), - 'segmentAlignment': adaptation_set.get('segmentAlignment'), - 'startWithSAP': adaptation_set.get('startWithSAP'), - 'drm_info': [], - 'representations': [] + "id": adaptation_set.get("id"), + "group": adaptation_set.get("group"), + "contentType": adaptation_set.get("contentType"), + "lang": adaptation_set.get("lang"), + "segmentAlignment": adaptation_set.get("segmentAlignment"), + "startWithSAP": adaptation_set.get("startWithSAP"), + "drm_info": [], + "representations": [], } # Parse ContentProtection for content_protection in adaptation_set.findall( - 'mpd:ContentProtection', namespaces + "mpd:ContentProtection", namespaces ): drm_info = parse_content_protection(content_protection, namespaces) - adaptation_info['drm_info'].append(drm_info) + adaptation_info["drm_info"].append(drm_info) # Parse Role - role = adaptation_set.find('mpd:Role', namespaces) + role = adaptation_set.find("mpd:Role", namespaces) if role is not None: - adaptation_info['role'] = role.get('value') + adaptation_info["role"] = role.get("value") # Parse Representations - for representation in adaptation_set.findall('mpd:Representation', namespaces): + for representation in adaptation_set.findall("mpd:Representation", namespaces): rep_info = parse_representation(representation, namespaces) - adaptation_info['representations'].append(rep_info) + adaptation_info["representations"].append(rep_info) return adaptation_info @@ -108,20 +106,20 @@ def parse_content_protection( A dictionary containing DRM information. """ drm_info = { - 'schemeIdUri': content_protection.get('schemeIdUri'), - 'value': content_protection.get('value') + "schemeIdUri": content_protection.get("schemeIdUri"), + "value": content_protection.get("value"), } - default_kid = content_protection.get('{urn:mpeg:cenc:2013}default_KID') + default_kid = content_protection.get("{urn:mpeg:cenc:2013}default_KID") if default_kid: - drm_info['default_KID'] = default_kid + drm_info["default_KID"] = default_kid - pssh_element = content_protection.find('cenc:pssh', namespaces) + pssh_element = content_protection.find("cenc:pssh", namespaces) if pssh_element is not None and pssh_element.text: - drm_info['pssh'] = pssh_element.text.strip() + drm_info["pssh"] = pssh_element.text.strip() try: - pssh_decoded = base64.b64decode(drm_info['pssh']) - drm_info['pssh_hex'] = pssh_decoded.hex() + pssh_decoded = base64.b64decode(drm_info["pssh"]) + drm_info["pssh_hex"] = pssh_decoded.hex() except (ValueError, base64.binascii.Error): pass @@ -141,34 +139,40 @@ def parse_representation( A dictionary containing parsed representation information. """ rep_info = { - 'id': representation.get('id'), - 'bandwidth': representation.get('bandwidth'), - 'codecs': representation.get('codecs'), - 'mimeType': representation.get('mimeType'), - 'width': representation.get('width'), - 'height': representation.get('height'), - 'frameRate': representation.get('frameRate'), - 'segments': {} + "id": representation.get("id"), + "bandwidth": representation.get("bandwidth"), + "codecs": representation.get("codecs"), + "mimeType": representation.get("mimeType"), + "width": representation.get("width"), + "height": representation.get("height"), + "frameRate": representation.get("frameRate"), + "segments": {}, } - segment_template = representation.find('mpd:SegmentTemplate', namespaces) + segment_template = representation.find("mpd:SegmentTemplate", namespaces) if segment_template is not None: - rep_info['segments'] = { - 'timescale': segment_template.get('timescale'), - 'initialization': segment_template.get('initialization'), - 'media': segment_template.get('media'), - 'timeline': [] + rep_info["segments"] = { + "timescale": segment_template.get("timescale"), + "initialization": segment_template.get("initialization"), + "media": segment_template.get("media"), + "timeline": [], } - segment_timeline = segment_template.find('mpd:SegmentTimeline', namespaces) + segment_timeline = segment_template.find("mpd:SegmentTimeline", namespaces) if segment_timeline is not None: - for s_element in segment_timeline.findall('mpd:S', namespaces): + for s_element in segment_timeline.findall("mpd:S", namespaces): timeline_info = { - 't': int(s_element.get('t')) if s_element.get('t') is not None else 0, # start time - 'd': int(s_element.get('d')) if s_element.get('d') is not None else 0, # duration - 'r': int(s_element.get('r')) if s_element.get('r') is not None else 0 # repeat count + "t": ( + int(s_element.get("t")) if s_element.get("t") is not None else 0 + ), # start time + "d": ( + int(s_element.get("d")) if s_element.get("d") is not None else 0 + ), # duration + "r": ( + int(s_element.get("r")) if s_element.get("r") is not None else 0 + ), # repeat count } - rep_info['segments']['timeline'].append(timeline_info) + rep_info["segments"]["timeline"].append(timeline_info) return rep_info @@ -184,8 +188,8 @@ def organize_by_content_type(manifest_info: Dict[str, Any]) -> Dict[str, Any]: A dictionary organized by content type (video, audio, text). """ organized = { - 'video': {}, - 'audio': {}, + "video": {}, + "audio": {}, # 'text': {}, # 'manifest_metadata': { # 'type': manifest_info.get('type'), @@ -194,57 +198,61 @@ def organize_by_content_type(manifest_info: Dict[str, Any]) -> Dict[str, Any]: # } } - for period in manifest_info.get('periods', []): - for adaptation_set in period.get('adaptation_sets', []): - content_type = adaptation_set.get('contentType') + for period in manifest_info.get("periods", []): + for adaptation_set in period.get("adaptation_sets", []): + content_type = adaptation_set.get("contentType") if not content_type: continue - for rep in adaptation_set.get('representations', []): + for rep in adaptation_set.get("representations", []): track_info = { - 'track_id': rep.get('id'), - 'adaptation_set_id': adaptation_set.get('id'), - 'bandwidth': int(rep.get('bandwidth', 0)), - 'bitrate_kbps': int(rep.get('bandwidth', 0)) // 1000, - 'codec': rep.get('codecs'), - 'mime_type': rep.get('mimeType'), - 'drm_info': adaptation_set.get('drm_info', []), - 'segments': rep.get('segments', {}), + "track_id": rep.get("id"), + "adaptation_set_id": adaptation_set.get("id"), + "bandwidth": int(rep.get("bandwidth", 0)), + "bitrate_kbps": int(rep.get("bandwidth", 0)) // 1000, + "codec": rep.get("codecs"), + "mime_type": rep.get("mimeType"), + "drm_info": adaptation_set.get("drm_info", []), + "segments": rep.get("segments", {}), } - if content_type == 'video': - width = rep.get('width') - height = rep.get('height') - frame_rate = rep.get('frameRate') + if content_type == "video": + width = rep.get("width") + height = rep.get("height") + frame_rate = rep.get("frameRate") - track_info.update({ - 'resolution': ( - f"{width}x{height}" if width and height else 'unknown' - ), - 'width': int(width) if width else None, - 'height': int(height) if height else None, - 'frame_rate': frame_rate, - }) + track_info.update( + { + "resolution": ( + f"{width}x{height}" if width and height else "unknown" + ), + "width": int(width) if width else None, + "height": int(height) if height else None, + "frame_rate": frame_rate, + } + ) - resolution_key = track_info['resolution'] - if resolution_key not in organized['video']: - organized['video'][resolution_key] = [] - organized['video'][resolution_key].append(track_info) + resolution_key = track_info["resolution"] + if resolution_key not in organized["video"]: + organized["video"][resolution_key] = [] + organized["video"][resolution_key].append(track_info) - elif content_type == 'audio': - lang = adaptation_set.get('lang', 'unknown') - role = adaptation_set.get('role', 'main') + elif content_type == "audio": + lang = adaptation_set.get("lang", "unknown") + role = adaptation_set.get("role", "main") - track_info.update({ - 'language': lang, - 'role': role, - }) + track_info.update( + { + "language": lang, + "role": role, + } + ) lang_key = f"{lang}_{role}" - if lang_key not in organized['audio']: - organized['audio'][lang_key] = [] - organized['audio'][lang_key].append(track_info) + if lang_key not in organized["audio"]: + organized["audio"][lang_key] = [] + organized["audio"][lang_key].append(track_info) # elif content_type == 'text': # lang = adaptation_set.get('lang', 'unknown') @@ -261,27 +269,26 @@ def organize_by_content_type(manifest_info: Dict[str, Any]) -> Dict[str, Any]: # organized['text'][lang_key].append(track_info) # Sort video tracks by resolution (descending) and then by bitrate (descending) - for resolution in organized['video']: - organized['video'][resolution].sort( - key=lambda x: x['bandwidth'], reverse=True - ) + for resolution in organized["video"]: + organized["video"][resolution].sort(key=lambda x: x["bandwidth"], reverse=True) # Sort audio tracks by bitrate (descending) - for lang in organized['audio']: - organized['audio'][lang].sort(key=lambda x: x['bandwidth'], reverse=True) + for lang in organized["audio"]: + organized["audio"][lang].sort(key=lambda x: x["bandwidth"], reverse=True) # Sort video resolutions by pixel count (descending) sorted_video = {} for resolution in sorted( - organized['video'].keys(), + organized["video"].keys(), key=lambda r: ( - int(r.split('x')[0]) * int(r.split('x')[1]) - if 'x' in r and r.split('x')[0].isdigit() else 0 + int(r.split("x")[0]) * int(r.split("x")[1]) + if "x" in r and r.split("x")[0].isdigit() + else 0 ), - reverse=True + reverse=True, ): - sorted_video[resolution] = organized['video'][resolution] - organized['video'] = sorted_video + sorted_video[resolution] = organized["video"][resolution] + organized["video"] = sorted_video return organized @@ -296,26 +303,26 @@ def get_manifest(manifest_id): The manifest content as text. """ headers = { - 'accept': '*/*', - 'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8', - 'cache-control': 'no-cache', - 'origin': 'https://tv.free.fr', - 'pragma': 'no-cache', - 'priority': 'u=1, i', - 'referer': 'https://tv.free.fr/', - 'sec-ch-ua': '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"', - 'sec-ch-ua-mobile': '?0', - 'sec-ch-ua-platform': '"macOS"', - 'sec-fetch-dest': 'empty', - 'sec-fetch-mode': 'cors', - 'sec-fetch-site': 'cross-site', - 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36', + "accept": "*/*", + "accept-language": "en-GB,en-US;q=0.9,en;q=0.8", + "cache-control": "no-cache", + "origin": "https://tv.free.fr", + "pragma": "no-cache", + "priority": "u=1, i", + "referer": "https://tv.free.fr/", + "sec-ch-ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"macOS"', + "sec-fetch-dest": "empty", + "sec-fetch-mode": "cors", + "sec-fetch-site": "cross-site", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", } format_id = 1 url = ( - f'https://api-proxad.dc2.oqee.net/playlist/v1/live/' - f'{manifest_id}/{format_id}/live.mpd' + f"https://api-proxad.dc2.oqee.net/playlist/v1/live/" + f"{manifest_id}/{format_id}/live.mpd" ) response = requests.get(url, headers=headers, timeout=10) return response.text @@ -334,9 +341,9 @@ async def fetch_segment(session, ticks, track_id): """ url = f"https://media.stream.proxad.net/media/{track_id}_{ticks}" headers = { - 'Accept': '*/*', - 'Referer': 'https://tv.free.fr/', - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36', + "Accept": "*/*", + "Referer": "https://tv.free.fr/", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", } try: async with session.get(url, headers=headers) as resp: @@ -346,6 +353,7 @@ async def fetch_segment(session, ticks, track_id): except aiohttp.ClientError: return None + def get_init(output_folder, track_id): """Download and save the initialization segment for a track. @@ -355,15 +363,15 @@ def get_init(output_folder, track_id): """ url = f"https://media.stream.proxad.net/media/{track_id}_init" headers = { - 'Accept': '*/*', - 'Referer': 'https://tv.free.fr/', - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36', + "Accept": "*/*", + "Referer": "https://tv.free.fr/", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", } response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: - os.makedirs(f'{output_folder}/segments_{track_id}', exist_ok=True) - init_path = f'{output_folder}/segments_{track_id}/init.mp4' - with open(init_path, 'wb') as f: + os.makedirs(f"{output_folder}/segments_{track_id}", exist_ok=True) + init_path = f"{output_folder}/segments_{track_id}/init.mp4" + with open(init_path, "wb") as f: f.write(response.content) print(f"✅ Saved initialization segment to {init_path}") return init_path @@ -378,22 +386,22 @@ async def save_segments(output_folder, track_id, start_tick, rep_nb, duration): rep_nb: The number of segments to download. duration: The duration per segment. """ - os.makedirs(f'{output_folder}/segments_{track_id}', exist_ok=True) + os.makedirs(f"{output_folder}/segments_{track_id}", exist_ok=True) async def download_segment(session, tick, rep): """Download a single segment.""" url = f"https://media.stream.proxad.net/media/{track_id}_{tick}" headers = { - 'Accept': '*/*', - 'Referer': 'https://tv.free.fr/', - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36', + "Accept": "*/*", + "Referer": "https://tv.free.fr/", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", } try: async with session.get(url, headers=headers) as resp: if resp.status == 200: content = await resp.read() filename = f"{output_folder}/segments_{track_id}/{tick}.m4s" - with open(filename, 'wb') as f: + with open(filename, "wb") as f: f.write(content) return True print( @@ -421,7 +429,12 @@ async def save_segments(output_folder, track_id, start_tick, rep_nb, duration): tasks.append(download_segment(session, tick, i)) results = [] - for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Downloading segments", unit="seg"): + for coro in tqdm( + asyncio.as_completed(tasks), + total=len(tasks), + desc="Downloading segments", + unit="seg", + ): result = await coro results.append(result) successful = sum(1 for r in results if r is True) @@ -446,20 +459,20 @@ def get_kid(output_folder, track_id): Returns: The KID as a hex string if found, None otherwise. """ - folder = f'{output_folder}/segments_{track_id}' + folder = f"{output_folder}/segments_{track_id}" for filename in os.listdir(folder): - if filename.endswith('.m4s'): + if filename.endswith(".m4s"): filepath = os.path.join(folder, filename) print(f"Checking file: {filepath}") - with open(filepath, 'rb') as f: + with open(filepath, "rb") as f: data = f.read() # Pattern before KID index = data.find( - b'\x73\x65\x69\x67\x00\x00\x00\x14' - b'\x00\x00\x00\x01\x00\x00\x01\x10' + b"\x73\x65\x69\x67\x00\x00\x00\x14" + b"\x00\x00\x00\x01\x00\x00\x01\x10" ) if index != -1: - kid_bytes = data[index + 16:index + 16 + 16] + kid_bytes = data[index + 16 : index + 16 + 16] kid = kid_bytes.hex() return kid - return None \ No newline at end of file + return None diff --git a/utils/times.py b/utils/times.py index 1a1918e..386cbb3 100644 --- a/utils/times.py +++ b/utils/times.py @@ -1,4 +1,5 @@ """Utility functions for time and tick conversions, and bruteforce operations.""" + import asyncio import datetime import time @@ -21,7 +22,9 @@ def convert_sec_to_ticks(seconds, timescale): def convert_sec_to_date(seconds, offset_hours=1): """Convert seconds to datetime with offset.""" - dt = datetime.datetime.utcfromtimestamp(seconds) + datetime.timedelta(hours=offset_hours) + dt = datetime.datetime.utcfromtimestamp(seconds) + datetime.timedelta( + hours=offset_hours + ) return dt @@ -52,61 +55,56 @@ async def bruteforce(track_id, date): valid_ticks = [] total_requests = 288000 batch_size = 20000 - checked_count = 0 - + print(f"Starting bruteforce for {track_id}") - # print(f"🎯 Total ticks to check: {total_requests}") print(f"{'='*50}") - + start_time = time.time() - - total_batches = (total_requests + batch_size - 1) // batch_size - + try: async with aiohttp.ClientSession() as session: - for batch_num, batch_start in enumerate(range(0, total_requests, batch_size), 1): + for batch_start in range(0, total_requests, batch_size): batch_end = min(batch_start + batch_size, total_requests) - ticks_to_check = list(range(batch_start, batch_end)) - - # print(f"\n📦 Batch {batch_num}/{total_batches} (ticks {batch_start} to {batch_end})") - - tasks = [fetch_segment(session, t + date, track_id) for t in ticks_to_check] - + tasks = [ + fetch_segment(session, t + date, track_id) + for t in range(batch_start, batch_end) + ] + results = [] - for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), - desc=f"Batch {batch_num}", unit="req"): + for coro in tqdm( + asyncio.as_completed(tasks), + total=len(tasks), + desc="Bruteforce", + unit="req", + ): result = await coro results.append(result) - - new_valid = [r for r in results if r and not isinstance(r, Exception)] - valid_ticks.extend(new_valid) - - checked_count += len(ticks_to_check) - + + valid_ticks.extend( + [r for r in results if r and not isinstance(r, Exception)] + ) + # Stop if we found valid ticks if valid_ticks: print(f"Found valid ticks: {valid_ticks}, stopping bruteforce.") break - + except KeyboardInterrupt: print("\n\n🛑 Interrupted by user (Ctrl+C)") - - end_time = time.time() - elapsed = end_time - start_time - req_per_sec = checked_count / elapsed if elapsed > 0 else 0 - + + elapsed = time.time() - start_time print(f"\n{'='*50}") print(f"✅ Completed in {elapsed:.2f}s") - print(f"⚡ Speed: {req_per_sec:.2f} req/s") - print(f"📊 Total checked: {checked_count}/{total_requests}") + print(f"⚡ Speed: {total_requests / elapsed if elapsed > 0 else 0:.2f} req/s") + print(f"📊 Total checked: {total_requests}") print(f"{'='*50}") - + return valid_ticks -def find_nearest_tick_by_hour(base_tick, datetime, timescale, duration, offset_hours=1): +def find_nearest_tick_by_hour(base_tick, dt, timescale, duration, offset_hours=1): """Find the nearest tick for a given datetime.""" - target_ticks = convert_date_to_ticks(datetime, timescale, offset_hours) + target_ticks = convert_date_to_ticks(dt, timescale, offset_hours) diff_ticks = base_tick - target_ticks rep_estimate = diff_ticks / duration @@ -120,14 +118,8 @@ def find_nearest_tick_by_hour(base_tick, datetime, timescale, duration, offset_h rep = int(round(rep_estimate)) nearest_tick = base_tick - rep * duration - nearest_seconds = convert_ticks_to_sec(nearest_tick, timescale) - target_seconds = convert_ticks_to_sec(target_ticks, timescale) - delta_seconds = abs(nearest_seconds - target_seconds) - - # print(f"Requested datetime: {datetime} (offset +{offset_hours}h)") + # print(f"Requested datetime: {dt} (offset +{offset_hours}h)") # print(f"Nearest rep: {rep}") # print(f"Tick: {nearest_tick}") - # print(f"Date: {convert_sec_to_date(nearest_seconds, offset_hours)}") - # print(f"Difference: {delta_seconds:.2f} seconds") return nearest_tick, rep diff --git a/utils/utilities.py b/utils/utilities.py index b27766b..07d35ea 100644 --- a/utils/utilities.py +++ b/utils/utilities.py @@ -1,3 +1,5 @@ +"""Utility functions for OqeeRewind, including verification, merging, and decryption.""" + import os import sys import logging @@ -18,10 +20,10 @@ def verify_mp4ff(): def verify_cmd(path: str) -> bool: """Verify if the file provided at path is valid and exists, otherwise log error and exit.""" if not os.path.exists(path): - logging.error(f"File does not exist: {path}") + logging.error("File does not exist: %s", path) sys.exit(1) if not os.path.isfile(path): - logging.error(f"Path is not a file: {path}") + logging.error("Path is not a file: %s", path) sys.exit(1) return True @@ -55,13 +57,13 @@ def decrypt(input_file, init_path, output_file, key): """ key = key.split(":")[1] result = subprocess.run( - ['mp4ff-decrypt', '-init', init_path, '-key', key, input_file, output_file], + ["mp4ff-decrypt", "-init", init_path, "-key", key, input_file, output_file], capture_output=True, - text=True + text=True, + check=False, ) if result.returncode == 0: print(f"✅ Decrypted {input_file} to {output_file}") return True - else: - print(f"❌ Decryption failed: {result.stderr}") - return False \ No newline at end of file + print(f"❌ Decryption failed: {result.stderr}") + return False