diff --git a/info.md b/info.md new file mode 100644 index 0000000..9883565 --- /dev/null +++ b/info.md @@ -0,0 +1 @@ +https://www.digitalbitrate.com/dtv.php?lang=en&liste=2&live=414 \ No newline at end of file diff --git a/main.py b/main.py index 4c9176c..247e71c 100644 --- a/main.py +++ b/main.py @@ -1,34 +1,383 @@ """Main module for Oqee channel selection and stream management.""" +import os +import argparse from datetime import datetime, timedelta +from dotenv import load_dotenv from utils.input import ( stream_selection, get_date_input, get_epg_data_at, - select_program_from_epg + select_program_from_epg, + get_selection, ) +from utils.oqee import OqeeClient +from utils.downloader import get_keys +from utils.utilities import verify_cmd, get_missing_keys, merge_segments, decrypt +from utils.times import ( + convert_date_to_sec, + convert_sec_to_ticks, + convert_ticks_to_sec, + convert_sec_to_date, + find_nearest_tick_by_hour, + bruteforce, +) +from utils.stream import save_segments, get_kid, get_init +import asyncio +import subprocess +import shutil + +load_dotenv() +TIMESCALE = 90000 +DURATION = 288000 + +from pprint import pprint + + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="Oqee TV Live Downloader") + + parser.add_argument( + "--start-date", + type=str, + help="Start date and time in YYYY-MM-DD HH:MM:SS format", + ) + parser.add_argument( + "--end-date", type=str, help="End date and time in YYYY-MM-DD HH:MM:SS format" + ) + parser.add_argument( + "--duration", + type=str, + help="Duration in HH:MM:SS format (alternative to --end-date)", + ) + parser.add_argument("--channel-id", type=str, help="Channel ID to download from") + parser.add_argument( + "--video", + type=str, + help="Video quality selection (e.g., 'best', '1080p', '720p', '1080p+best', '720p+worst')", + ) + parser.add_argument( + "--audio", type=str, help="Audio track selection (e.g., 'best', 'fra_main')" + ) + parser.add_argument( + "--title", + type=str, + help="Title for the download (default: channel_id_start_date)", + ) + parser.add_argument("--username", type=str, help="Oqee username for authentication") + parser.add_argument("--password", type=str, help="Oqee password for authentication") + parser.add_argument( + "--key", + action="append", + help="DRM key for decryption (can be specified multiple times)", + ) + parser.add_argument( + "--output-dir", + type=str, + default="./download", + help="Output directory for downloaded files (default: ./download)", + ) + parser.add_argument( + "--widevine-device", + type=str, + default="./widevine/device.wvd", + help="Path to Widevine device file (default: ./widevine/device.wvd)", + ) + + return parser.parse_args() + if __name__ == "__main__": + args = parse_arguments() + + # Check if CLI mode + cli_mode = any( + [ + args.start_date, + args.end_date, + args.duration, + args.channel_id, + args.video, + args.audio, + args.title, + args.username, + args.password, + args.key, + ] + ) + try: - selections = stream_selection() - freebox_id = selections.get("channel", {}).get("freebox_id") - channel_id = selections.get("channel", {}).get("id") + if cli_mode: + # CLI mode + print("Running in CLI mode...") - start_date, end_date = get_date_input() + # Parse dates + start_date = None + end_date = None - if start_date > datetime.now() - timedelta(days=7): - epg_data = get_epg_data_at(start_date) + if args.start_date: + try: + start_date = datetime.strptime(args.start_date, "%Y-%m-%d %H:%M:%S") + except ValueError: + print("Invalid start-date format. Use YYYY-MM-DD HH:MM:SS") + exit(1) - programs = epg_data["entries"][str(channel_id)] - program_selection = select_program_from_epg( - programs, - start_date, - end_date + if args.end_date and args.duration: + print("Cannot specify both --end-date and --duration") + exit(1) + elif args.end_date: + try: + end_date = datetime.strptime(args.end_date, "%Y-%m-%d %H:%M:%S") + except ValueError: + print("Invalid end-date format. Use YYYY-MM-DD HH:MM:SS") + exit(1) + elif args.duration and start_date: + # Parse duration HH:MM:SS + try: + h, m, s = map(int, args.duration.split(":")) + duration_td = timedelta(hours=h, minutes=m, seconds=s) + end_date = start_date + duration_td + except ValueError: + print("Invalid duration format. Use HH:MM:SS") + exit(1) + + if not start_date: + print("start-date is required in CLI mode") + exit(1) + if not end_date: + print("Either end-date or duration is required in CLI mode") + exit(1) + + keys = args.key or [] + # end = ".".join([args.video, args.audio]) if args.video and args.audio else "" + end = "" + title = ( + args.title + end + or f"{args.channel_id}_{start_date.strftime('%Y%m%d_%H%M%S') + end}" ) - if program_selection: - start_date = program_selection['start_date'] - end_date = program_selection['end_date'] - title = program_selection['title'] + + # Get stream selections + selections = get_selection(args.channel_id, args.video, args.audio) + if not selections: + print("Erreur lors de la sélection des flux.") + exit(1) + + print(f"Start date: {start_date}") + print(f"End date: {end_date}") + print(f"Channel ID: {args.channel_id}") + print(f"Video quality: {args.video}") + print(f"Audio track: {args.audio}") + print(f"Title: {title}") + print(f"DRM keys: {keys}") + print(f"Output dir: {args.output_dir}") + print(f"Widevine device: {args.widevine_device}") + + else: + # Interactive mode + selections = stream_selection() + freebox_id = selections.get("channel", {}).get("freebox_id") + channel_id = selections.get("channel", {}).get("id") + title = None + start_date, end_date = get_date_input() + + if start_date > datetime.now() - timedelta(days=7): + epg_data = get_epg_data_at(start_date) + + programs = epg_data["entries"][str(channel_id)] + program_selection = select_program_from_epg( + programs, start_date, end_date + ) + if program_selection: + start_date = program_selection["start_date"] + end_date = program_selection["end_date"] + title = program_selection["title"] + + title = title or f"{freebox_id}_{start_date.strftime('%Y%m%d_%H%M%S')}" + keys = [] + + output_dir = args.output_dir if cli_mode else "./download" + # pprint(selections) + # missing_keys = get_missing_keys(keys, selections) + + # if missing_keys: + # print(f"Getting missing DRM keys {missing_keys}...") + + # method = {} + # # api_url = os.getenv("API_URL") + # # api_key = os.getenv("API_KEY") + # api_url = None + # api_key = None + # if api_url and api_key: + # method = {"method": "api", "api_url": api_url, "api_key": api_key} + # else: + # username = args.username or os.getenv("OQEE_USERNAME") + # password = args.password or os.getenv("OQEE_PASSWORD") + # client = OqeeClient(username, password) + # verify_cmd(args.widevine_device) + # method = { + # "method": "device", + # "device_file": args.widevine_device, + # "client_class": client, + # } + + # fetched_keys = get_keys(kids=missing_keys, method=method) + # keys = keys + fetched_keys + # print(f"DRM Keys: {keys}") + + # missing_keys = get_missing_keys(keys, selections) + # if len(missing_keys) > 0: + # print(f"Some DRM keys are still missing: {missing_keys}, exiting.") + # exit(1) + + start_tick_user = int(convert_sec_to_ticks(convert_date_to_sec(start_date), TIMESCALE)) + + video_data = None + audio_data = None + + for content_type, sel in [("video", selections["video"]),("audio", selections["audio"]),]: + start_tick_manifest = sel["segments"]["timeline"][0]["t"] + manifest_date = convert_sec_to_date(convert_ticks_to_sec(start_tick_manifest, TIMESCALE)) + init_segment = sel["segments"]["initialization"] + track_id = init_segment.split("/")[-1].split("_init")[0] + + if start_date.date() == manifest_date.date(): + print("Date match between requested start date and manifest data, proceeding with download...") + + start_tick, start_rep = find_nearest_tick_by_hour(start_tick_manifest, start_date, TIMESCALE, DURATION) + end_tick, end_rep = find_nearest_tick_by_hour(start_tick_manifest, end_date, TIMESCALE, DURATION) + else: + print("Date mismatch between requested start date and manifest data, bruteforce method is needed.") + + valid_ticks = asyncio.run(bruteforce(track_id, start_tick_user)) + valid_tick = valid_ticks[0] + + start_tick, start_rep = find_nearest_tick_by_hour(valid_tick, start_date, TIMESCALE, DURATION) + end_tick, end_rep = find_nearest_tick_by_hour(valid_tick, end_date, TIMESCALE, DURATION) + + rep_nb = (end_tick - start_tick) // DURATION + 1 + print(f"Total segments to fetch for {content_type}: {rep_nb}") + data = { + "start_tick": start_tick, + "rep_nb": rep_nb, + "track_id": track_id, + "selection": sel, + } + if content_type == "video": + video_data = data + else: + audio_data = data + + missing_keys = [] + for content_type, data in [("video", video_data), ("audio", audio_data)]: + os.makedirs(output_dir, exist_ok=True) + track_id = data["track_id"] + start_tick = data["start_tick"] + rep_nb = data["rep_nb"] + asyncio.run(save_segments(output_dir, track_id, start_tick, rep_nb, DURATION)) + + # Merge video and audio + video_file = f"{output_dir}/temp_video.mp4" + audio_file = f"{output_dir}/temp_audio.mp4" + + data["file"] = video_file if content_type == "video" else audio_file + merge_segments(output_dir, track_id, video_file if content_type == "video" else audio_file) + + kid = get_kid(output_dir, track_id) + data["kid"] = kid + key = None + for k in keys: + if k.split(":")[0] == kid: + key = k + break + if not key: + print(f"No key found for KID {kid}, need to fetch it.") + missing_keys.append(kid) + + if len(missing_keys) > 0: + method = {} + # api_url = os.getenv("API_URL") + # api_key = os.getenv("API_KEY") + api_url = None + api_key = None + if api_url and api_key: + method = {"method": "api", "api_url": api_url, "api_key": api_key} + else: + username = args.username or os.getenv("OQEE_USERNAME") + password = args.password or os.getenv("OQEE_PASSWORD") + client = OqeeClient(username, password) + verify_cmd(args.widevine_device) + method = { + "method": "device", + "device_file": args.widevine_device, + "client_class": client, + } + + fetched_keys = get_keys(kids=missing_keys, method=method) + keys = keys + fetched_keys + + for content_type, data in [("video", video_data), ("audio", audio_data)]: + track_id = data["track_id"] + file = data["file"] + kid = data["kid"] + + key = None + for k in keys: + if k.split(":")[0] == kid: + key = k + break + + init_path = get_init(output_dir, track_id) + dec_file = f"{output_dir}/dec_{content_type}.mp4" + decrypt(file, init_path, dec_file, key) + + track_id_video = video_data["track_id"] + track_id_audio = audio_data["track_id"] + start_tick_video = video_data["start_tick"] + start_tick_audio = audio_data["start_tick"] + diff_start = start_tick_video - start_tick_audio + diff_start_sec = convert_ticks_to_sec(diff_start, TIMESCALE) + + # ffmpeg -i "concat:init.mp4|merged_dec.m4s" -c copy output.mp4 + command_ffmpeg = ( + f'ffmpeg -i "concat:{output_dir}/segments_{track_id_video}/init.mp4|' + f'{output_dir}/dec_video.mp4" -c copy {output_dir}/video.mp4' + ) + print("FFmpeg command:", command_ffmpeg) + subprocess.run(command_ffmpeg, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + command_ffmpeg = ( + f'ffmpeg -i "concat:{output_dir}/segments_{track_id_audio}/init.mp4|' + f'{output_dir}/dec_audio.mp4" -c copy {output_dir}/audio.mp4' + ) + + print("FFmpeg command:", command_ffmpeg) + subprocess.run(command_ffmpeg, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + command_merge = ( + f"ffmpeg -i {output_dir}/video.mp4 -itsoffset {diff_start_sec} " + f"-i {output_dir}/audio.mp4 -c copy -map 0:v -map 1:a {output_dir}/output.mp4" + ) + print("Merge command:", command_merge) + subprocess.run(command_merge, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + + final_output = f"{output_dir}/{title}.mp4" + shutil.move(f"{output_dir}/output.mp4", final_output) + print(f"Final output saved to {final_output}") + + os.remove(f"{output_dir}/dec_video.mp4") + os.remove(f"{output_dir}/dec_audio.mp4") + os.remove(f"{output_dir}/video.mp4") + os.remove(f"{output_dir}/audio.mp4") + os.remove(f"{output_dir}/temp_video.mp4") + os.remove(f"{output_dir}/temp_audio.mp4") + shutil.rmtree(f"{output_dir}/segments_{video_data['track_id']}") + shutil.rmtree(f"{output_dir}/segments_{audio_data['track_id']}") + except KeyboardInterrupt: print("\n\nProgramme interrompu par l'utilisateur. Au revoir !") + + +# uv run python main.py --start-date "2025-01-01 12:00:00" --duration "01:00:00" --channel-id 536 --video "720+best" --audio best --title "Test" --key 5b1288b31b6a3f789a205614bbd7fac7:14980f2578eca20d78bd70601af21458 --key acacd48e12efbdbaa479b6d6dbf110b4:500af89b21d64c4833e107f26c424afb +# uv run python main.py --start-date "2025-12-19 12:00:00" --duration "00:01:00" --channel-id 536 --video "720+best" --audio best --title "Test" --key 5b1288b31b6a3f789a205614bbd7fac7:14980f2578eca20d78bd70601af21458 --key acacd48e12efbdbaa479b6d6dbf110b4:500af89b21d64c4833e107f26c424afb diff --git a/utils/downloader.py b/utils/downloader.py new file mode 100644 index 0000000..50a2319 --- /dev/null +++ b/utils/downloader.py @@ -0,0 +1,72 @@ +import os +import base64 +import requests +from uuid import UUID +from pywidevine.cdm import Cdm +from pywidevine.device import Device +from pywidevine.pssh import PSSH + + +def fetch_drm_keys(kid: str, api_url: str, api_key: str) -> str: + """Fetch DRM keys for a given KID. + + Args: kid: The key identifier string. + Returns: The DRM key as a string. + """ + headers = { + 'Content-Type': 'application/json', + 'Api-Key': api_key, + } + data = {"service": "oqee", "kid": kid} + response = requests.post( + api_url, headers=headers, json=data, timeout=10 + ) + return response.json()["key"] + + +def generate_pssh(kids: list[str]) -> PSSH: + """Generate a PSSH box for given KIDs. + + Args: kids: List of key identifier strings. + Returns: The PSSH object. + """ + default_pssh = ( + "AAAAiHBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAAGgIARIQrKzUjhLvvbqkebbW2/EQtBIQ" + "WxKIsxtqP3iaIFYUu9f6xxIQXn4atxoopds39jbUXbiFVBIQUUJpv9uuzWKv4ccKTtooMRIQ" + "ocf9FUFCoGm775zPIBr3HRoAKgAyADgASABQAA==" + ) + pssh = PSSH(default_pssh) + pssh.set_key_ids([UUID(kid.replace("-", "").lower()) for kid in kids]) + return pssh + + +def get_keys(kids: list[str], method: dict) -> list[str]: + """Retrieve DRM keys using the specified method.""" + if method["method"] == 'api': + print("Fetching DRM keys via API...") + keys = [] + for kid in kids: + key = fetch_drm_keys(kid, method["api_url"], method["api_key"]) + keys.append(f"{kid}:{key}") + return keys + else: + print("Fetching DRM keys via Widevine CDM...") + client = method["client_class"] + + device = Device.load(method["device_file"]) + cdm = Cdm.from_device(device) + session_id = cdm.open() + certificate = client.certificate() + cdm.set_service_certificate(session_id, certificate) + + pssh_data = generate_pssh(kids) + challenge = cdm.get_license_challenge(session_id, pssh_data, privacy_mode=True) + license_data = client.license(challenge) + + cdm.parse_license(session_id, license_data) + keys = [] + for key in cdm.get_keys(session_id): + if key.type=='CONTENT': + keys.append(f"{key.kid.hex}:{key.key.hex()}") + cdm.close(session_id) + return keys \ No newline at end of file diff --git a/utils/input.py b/utils/input.py index beedbd0..46f4ec7 100644 --- a/utils/input.py +++ b/utils/input.py @@ -333,6 +333,109 @@ def stream_selection(): return None +def get_selection(channel_id, video_quality='best', audio_quality='best'): + """Get stream selection for a given channel ID with specified qualities. + + Args: + channel_id (str): The channel ID to select streams for. + video_quality (str): Video quality selection ('best', '1080+best', '720+worst', etc.). + audio_quality (str): Audio quality selection ('best', 'fra+best', etc.). + + Returns: + dict: Dictionary of selected streams by content type, or None if error. + """ + # Fetch channel details + api_url = SERVICE_PLAN_API_URL + try: + response = requests.get(api_url, timeout=10) + response.raise_for_status() + data = response.json() + if not data.get("success") or "channels" not in data.get("result", {}): + print("Erreur: Impossible de récupérer les détails de la chaîne.") + return None + + channels_data = data["result"]["channels"] + selected_channel_details = channels_data.get(str(channel_id)) + if not selected_channel_details: + print(f"Chaîne avec ID {channel_id} non trouvée.") + return None + + except requests.exceptions.RequestException as e: + print(f"Erreur réseau : {e}") + return None + except ValueError: + print("Erreur lors de l'analyse de la réponse JSON.") + return None + + print(f"Chaîne sélectionnée : {selected_channel_details.get('name')} (ID: {channel_id})") + + dash_id = selected_channel_details.get('streams', {}).get('dash') + if not dash_id: + print("Aucun flux DASH trouvé pour cette chaîne.") + return None + + mpd_content = get_manifest(dash_id) + manifest_info = parse_mpd_manifest(mpd_content) + organized_info = organize_by_content_type(manifest_info) + + final_selections = {} + final_selections['channel'] = selected_channel_details + + # Select video + if 'video' in organized_info: + selected_track = select_track(organized_info['video'], video_quality, 'video') + if selected_track: + final_selections['video'] = selected_track + + # Select audio + if 'audio' in organized_info: + selected_track = select_track(organized_info['audio'], audio_quality, 'audio') + if selected_track: + final_selections['audio'] = selected_track + + return final_selections + + +def select_track(content_dict, quality_spec, content_type): + """Select a track based on quality specification. + + Args: + content_dict (dict): Organized content dict (video or audio). + quality_spec (str): Quality spec like 'best', '1080+best', 'fra+worst'. + content_type (str): 'video' or 'audio'. + + Returns: + dict: Selected track or None. + """ + if '+' in quality_spec: + filter_part, pref = quality_spec.split('+', 1) + pref = pref.lower() + else: + filter_part = '' + pref = quality_spec.lower() + + candidates = [] + for key, tracks in content_dict.items(): + if filter_part and filter_part.lower() not in key.lower(): + continue + candidates.extend(tracks) + + if not candidates: + print(f"Aucune piste {content_type} trouvée pour '{quality_spec}'.") + return None + + if pref == 'best': + selected = max(candidates, key=lambda x: x['bandwidth']) + elif pref == 'worst': + selected = min(candidates, key=lambda x: x['bandwidth']) + else: + # Default to best if unknown pref + selected = max(candidates, key=lambda x: x['bandwidth']) + + print(f"{content_type.capitalize()} sélectionnée : {selected['track_id']}, {selected['bitrate_kbps']} kbps") + return selected + + def get_epg_data_at(dt: datetime.datetime): """ Fetch EPG data from the Oqee API for the nearest aligned hour of a given datetime. diff --git a/utils/oqee.py b/utils/oqee.py index 865420b..e233c96 100644 --- a/utils/oqee.py +++ b/utils/oqee.py @@ -1,29 +1,12 @@ """OQEE streaming service client for authentication and content access.""" import base64 -import logging -import os from urllib.parse import urlparse, parse_qs +import requests from dotenv import load_dotenv load_dotenv() - -class _LoggerProxy: - """Lightweight logger helper that returns exceptions for raise statements.""" - - def __init__(self, name: str): - self._logger = logging.getLogger(name) - - def info(self, message: str): - """Log an info message.""" - self._logger.info(message) - - def error(self, message: str) -> RuntimeError: - """Log an error message and return a RuntimeError.""" - self._logger.error(message) - return RuntimeError(message) - class OqeeClient: # pylint: disable=too-many-instance-attributes """ Service code for OQEE streaming service (https://oqee.com). @@ -32,12 +15,9 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes Security: 1080p@L1 720@L3 """ - def __init__(self, ctx, movie, title): - super().__init__(ctx) - self.session = None # Will be set by parent class - self.log = _LoggerProxy(self.__class__.__name__) - self.movie = movie - self.title, self.typecontent = self.parse_title(title) + def __init__(self, username: str, password: str): + super().__init__() + self.session = requests.Session() # Base headers template for API requests self._headers_template = { @@ -67,148 +47,9 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes self.access_token = None self.right_token = None self.profil_id = None - self.lic_url = None + self.lic_url = "https://license.oqee.net/api/v1/live/license/widevine" - self.configure() - - - def parse_title(self, title): - """ - Parse and categorize different types of OQEE TV URLs. - Args: - title (str): The URL or title string to parse. Can be a full OQEE TV URL or a partial path. - Returns: - tuple or str: If the URL matches a known pattern, returns a tuple of (content_id, content_type). - If no pattern matches, returns the original title string. - """ - if title is None: - raise self.log.error("No title provided.") - - title = title.replace("https://oqee.tv", "").replace("/play", "") - if title.startswith("/replay_collection/"): - return ( - title.replace("/replay_collection/", "").replace("/all", ""), - "replay_collection" - ) - if title.startswith("/vod/contents/"): - return title.replace("/vod/contents/", ""), "vod" - if title.startswith("/svod/portal/"): - return title.replace("/svod/portal/", "").split("/")[1], "vod" - if title.startswith("/replay/"): - return title.replace("/replay/", ""), "replay" - return title - - - def _extract_title_id(self, title): - """Return a usable identifier regardless of input structure.""" - if title is None: - raise self.log.error("Title identifier is required") - if isinstance(title, dict): - return title.get('id') or title.get('program_id') or title.get('content_id') - return getattr(title, 'id', title) - - - def get_vod(self, title): - """Fetch VOD playback information and return the raw API response.""" - title_id = self._extract_title_id(title) - data = { - "supported_stream_types": ["dash"], - "supported_drms": ["widevine"], - "supported_ciphers": ["cbcs", "cenc"], - "supported_ads": ["vast", "vmap"], - } - response = self.session.post( - f'https://api.oqee.net/api/v1/svod/offers/{title_id}/playback_infos', - headers=self.headers_auth, - json=data, - ).json() - self.lic_url = response['result']['license_server'] - return response - - - def get_vod_info(self): - """Return the raw VOD metadata payload for the current title.""" - response = self.session.get( - f'https://api.oqee.net/api/v3/vod/contents/{self.title}', - headers=self.headers_base, - ).json() - if response['success'] is False: - raise self.log.error(f"Failed to get the replay: {response['message']}") - return response - - - def get_replay(self, title): - """Fetch replay playback information and return the raw API response.""" - title_id = self._extract_title_id(title) - payload = { - 'program_id': title_id, - 'supported_stream_types': ['dash'], - 'supported_drms': ['widevine'], - 'supported_ciphers': ['cenc'], - 'supported_subs': ['ttml', 'vtt'], - 'supported_ads': ['vast', 'vmap'], - } - response = self.session.post( - f'https://api.oqee.net/api/v1/replay/programs/{title_id}/playback_infos', - headers=self.headers_auth, - json=payload, - ).json() - if response['success'] is False: - raise self.log.error(f"Failed to get the replay: {response['message']}") - self.lic_url = response['result']['license_server'] - return response - - - def get_replay_info(self): - """ - Retrieve replay information for a given title from the OQEE API. - """ - response = self.session.get( - f'https://api.oqee.net/api/v2/replay/programs/{self.title}', - headers=self.headers_base, - ).json() - if response['success'] is False: - raise self.log.error(f"Failed to get the replay: {response['message']}") - if response['result']['type'] != 'replay': - raise self.log.error(f"Provided ID is not a replay: {response['type']}") - return response - - - def get_replay_collection(self): - """Retrieve replay collection information from Oqee API and return the raw response.""" - response = self.session.get( - f'https://api.oqee.net/api/v2/pages/replay_collection/{self.title}', - headers=self.headers_base, - ).json() - if response['success'] is False: - raise self.log.error(f"Failed to get the replay: {response['message']}") - if response['result']['type'] != 'collection': - raise self.log.error(f"Provided ID is not a collection: {response['type']}") - return response - - - def get_titles(self): - """ - Get title information based on content type. - """ - if self.typecontent == "replay": - return self.get_replay_info() - if self.typecontent == "vod": - return self.get_vod_info() - if self.typecontent == "replay_collection": - return self.get_replay_collection() - return None - - - def get_tracks(self, title): - """ - Get track information based on content type. - """ - if self.typecontent in ("replay", "replay_collection"): - return self.get_replay(title) - if self.typecontent == "vod": - return self.get_vod(title) - return None + self.configure(username, password) def certificate(self, **_): @@ -233,15 +74,16 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes headers=self.headers_auth, json={'licenseRequest': license_request} ) + if not response.json()["success"]: + raise ValueError(f"License request failed: {response.json()['error']['msg']}") return response.json()['result']['license'] - def configure(self): + def configure(self, username, password): """Configure the client by logging in and processing title information.""" - self.log.info("Logging in") - self.login() - self.log.info(f"Processing title ID based on provided path: {self.title}") - self.log.info(f"Obtained the {self.typecontent}: {self.title}") + print("Logging in") + self.login(username, password) + def _build_headers(self, overrides=None, remove=None): """Clone default headers and apply optional overrides/removals.""" @@ -279,7 +121,7 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes 'https://api.oqee.net/api/v2/user/profiles', headers=headers ).json() - self.log.info("Selecting first profile by default.") + print("Selecting first profile by default.") return data['result'][0]['id'] @@ -333,6 +175,8 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes } r = self.session.post('https://subscribe.free.fr/auth/auth.pl', headers=headers, data=data) parsed_url = parse_qs(urlparse(r.url).query) + if 'result' not in parsed_url: + raise ValueError("Login failed: invalid credentials or error in authentication") token = parsed_url['result'][0] headers = self._build_headers( @@ -364,23 +208,24 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes return data['result']['token'] - def login(self): + def login(self, username, password): """ Log in to the Oqee service and set up necessary tokens and headers. """ - username = os.getenv("OQEE_USERNAME") - password = os.getenv("OQEE_PASSWORD") - if not username or not password: - self.log.info("No environment credentials found, using IP login by default.") + print("No credentials provided, using IP login.") self.access_token = self.login_ip() else: - self.log.info("Logging in with credentials sourced from environment variables") - self.access_token = self.login_cred(username, password) + print("Logging in with provided credentials") + try: + self.access_token = self.login_cred(username, password) + except ValueError as e: + print(f"Credential login failed: {e}. Falling back to IP login.") + self.access_token = self.login_ip() - self.log.info("Fetching rights token") + print("Fetching rights token") self.right_token = self.right() - self.log.info("Fetching profile ID") + print("Fetching profile ID") self.profil_id = self.profil() self.headers = self._build_headers(overrides={ diff --git a/utils/stream.py b/utils/stream.py index 9f428db..d06cf4a 100644 --- a/utils/stream.py +++ b/utils/stream.py @@ -4,54 +4,12 @@ import base64 import os import asyncio import time +import subprocess from typing import Dict, Any -from uuid import UUID import requests -from dotenv import load_dotenv import aiohttp -from pywidevine.pssh import PSSH - -load_dotenv() - - -def fetch_drm_keys(kid: str) -> str: - """Fetch DRM keys for a given KID. - - Args: - kid: The key identifier string. - - Returns: - The DRM key as a string. - """ - headers = { - 'Content-Type': 'application/json', - 'Api-Key': os.getenv("API_KEY"), - } - data = {"service": "oqee", "kid": kid} - response = requests.post( - os.getenv("API_URL"), headers=headers, json=data, timeout=10 - ) - return response.json()["key"] - - -def generate_pssh(kid: str) -> str: - """Generate a PSSH box for a given KID. - - Args: - kid: The key identifier string. - - Returns: - The PSSH box as a base64-encoded string. - """ - default_pssh = ( - "AAAAiHBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAAGgIARIQrKzUjhLvvbqkebbW2/EQtBIQ" - "WxKIsxtqP3iaIFYUu9f6xxIQXn4atxoopds39jbUXbiFVBIQUUJpv9uuzWKv4ccKTtooMRIQ" - "ocf9FUFCoGm775zPIBr3HRoAKgAyADgASABQAA==" - ) - pssh = PSSH(default_pssh) - pssh.set_key_ids([UUID(kid.replace("-", "").lower())]) - return pssh.dumps() +from tqdm.asyncio import tqdm def parse_mpd_manifest(mpd_content: str) -> Dict[str, Any]: @@ -206,9 +164,9 @@ def parse_representation( if segment_timeline is not None: for s_element in segment_timeline.findall('mpd:S', namespaces): timeline_info = { - 't': s_element.get('t'), # start time - 'd': s_element.get('d'), # duration - 'r': s_element.get('r') # repeat count + 't': int(s_element.get('t')) if s_element.get('t') is not None else 0, # start time + 'd': int(s_element.get('d')) if s_element.get('d') is not None else 0, # duration + 'r': int(s_element.get('r')) if s_element.get('r') is not None else 0 # repeat count } rep_info['segments']['timeline'].append(timeline_info) @@ -228,12 +186,12 @@ def organize_by_content_type(manifest_info: Dict[str, Any]) -> Dict[str, Any]: organized = { 'video': {}, 'audio': {}, - 'text': {}, - 'manifest_metadata': { - 'type': manifest_info.get('type'), - 'publishTime': manifest_info.get('publishTime'), - 'minBufferTime': manifest_info.get('minBufferTime'), - } + # 'text': {}, + # 'manifest_metadata': { + # 'type': manifest_info.get('type'), + # 'publishTime': manifest_info.get('publishTime'), + # 'minBufferTime': manifest_info.get('minBufferTime'), + # } } for period in manifest_info.get('periods', []): @@ -288,19 +246,19 @@ def organize_by_content_type(manifest_info: Dict[str, Any]) -> Dict[str, Any]: organized['audio'][lang_key] = [] organized['audio'][lang_key].append(track_info) - elif content_type == 'text': - lang = adaptation_set.get('lang', 'unknown') - role = adaptation_set.get('role', 'caption') + # elif content_type == 'text': + # lang = adaptation_set.get('lang', 'unknown') + # role = adaptation_set.get('role', 'caption') - track_info.update({ - 'language': lang, - 'role': role, - }) + # track_info.update({ + # 'language': lang, + # 'role': role, + # }) - lang_key = f"{lang}_{role}" - if lang_key not in organized['text']: - organized['text'][lang_key] = [] - organized['text'][lang_key].append(track_info) + # lang_key = f"{lang}_{role}" + # if lang_key not in organized['text']: + # organized['text'][lang_key] = [] + # organized['text'][lang_key].append(track_info) # Sort video tracks by resolution (descending) and then by bitrate (descending) for resolution in organized['video']: @@ -388,10 +346,11 @@ async def fetch_segment(session, ticks, track_id): except aiohttp.ClientError: return None -def get_init(track_id): +def get_init(output_folder, track_id): """Download and save the initialization segment for a track. Args: + output_folder: The output folder path. track_id: The track identifier. """ url = f"https://media.stream.proxad.net/media/{track_id}_init" @@ -402,12 +361,15 @@ def get_init(track_id): } response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: - with open(f'segments/segments_{track_id}/init.mp4', 'wb') as f: + os.makedirs(f'{output_folder}/segments_{track_id}', exist_ok=True) + init_path = f'{output_folder}/segments_{track_id}/init.mp4' + with open(init_path, 'wb') as f: f.write(response.content) - print(f"✅ Saved initialization segment to init_{track_id}.mp4") + print(f"✅ Saved initialization segment to {init_path}") + return init_path -async def save_segments(track_id, start_tick, rep_nb, duration): +async def save_segments(output_folder, track_id, start_tick, rep_nb, duration): """Download and save multiple media segments. Args: @@ -416,7 +378,7 @@ async def save_segments(track_id, start_tick, rep_nb, duration): rep_nb: The number of segments to download. duration: The duration per segment. """ - os.makedirs(f'segments/segments_{track_id}', exist_ok=True) + os.makedirs(f'{output_folder}/segments_{track_id}', exist_ok=True) async def download_segment(session, tick, rep): """Download a single segment.""" @@ -430,12 +392,9 @@ async def save_segments(track_id, start_tick, rep_nb, duration): async with session.get(url, headers=headers) as resp: if resp.status == 200: content = await resp.read() - filename = f"segments/segments_{track_id}/{tick}.m4s" + filename = f"{output_folder}/segments_{track_id}/{tick}.m4s" with open(filename, 'wb') as f: f.write(content) - print( - f"✅ Saved segment {rep} (tick {tick}) to {filename}" - ) return True print( f"❌ Failed to download segment {rep} (tick {tick}): " @@ -446,10 +405,10 @@ async def save_segments(track_id, start_tick, rep_nb, duration): print(f"⚠️ Error downloading segment {rep} (tick {tick}): {e}") return False - print(f"\n🎬 Starting download of {rep_nb} segments...") + print(f"Starting download of {rep_nb} segments...") print(f"📦 Track ID: {track_id}") print(f"🎯 Base tick: {start_tick}") - print(f"{'='*50}\n") + print(f"{'='*50}") start_time = time.time() successful = 0 @@ -461,31 +420,33 @@ async def save_segments(track_id, start_tick, rep_nb, duration): tick = start_tick + i * duration tasks.append(download_segment(session, tick, i)) - results = await asyncio.gather(*tasks, return_exceptions=True) + results = [] + for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Downloading segments", unit="seg"): + result = await coro + results.append(result) successful = sum(1 for r in results if r is True) failed = rep_nb - successful end_time = time.time() elapsed = end_time - start_time - print(f"\n{'='*50}") + print(f"{'='*50}") print(f"✅ Download completed in {elapsed:.2f}s") - print(f"📊 Successful: {successful}/{rep_nb}") - print(f"❌ Failed: {failed}/{rep_nb}") - print(f"💾 Files saved to segments_{track_id}/") + print(f"💾 Files saved to {output_folder}/segments_{track_id}/") print(f"{'='*50}") -def get_kid(track_id): +def get_kid(output_folder, track_id): """Extract the Key ID (KID) from downloaded segments. Args: + output_folder: The output folder path. track_id: The track identifier. Returns: The KID as a hex string if found, None otherwise. """ - folder = f'segments/segments_{track_id}' + folder = f'{output_folder}/segments_{track_id}' for filename in os.listdir(folder): if filename.endswith('.m4s'): filepath = os.path.join(folder, filename) @@ -501,4 +462,4 @@ def get_kid(track_id): kid_bytes = data[index + 16:index + 16 + 16] kid = kid_bytes.hex() return kid - return None + return None \ No newline at end of file diff --git a/utils/times.py b/utils/times.py index 06903bb..1a1918e 100644 --- a/utils/times.py +++ b/utils/times.py @@ -54,10 +54,9 @@ async def bruteforce(track_id, date): batch_size = 20000 checked_count = 0 - print(f"\n🚀 Starting bruteforce...") - print(f"📦 Track ID: {track_id}") - print(f"🎯 Total ticks to check: {total_requests}") - print(f"{'='*50}\n") + print(f"Starting bruteforce for {track_id}") + # print(f"🎯 Total ticks to check: {total_requests}") + print(f"{'='*50}") start_time = time.time() @@ -69,7 +68,7 @@ async def bruteforce(track_id, date): batch_end = min(batch_start + batch_size, total_requests) ticks_to_check = list(range(batch_start, batch_end)) - print(f"\n📦 Batch {batch_num}/{total_batches} (ticks {batch_start} to {batch_end})") + # print(f"\n📦 Batch {batch_num}/{total_batches} (ticks {batch_start} to {batch_end})") tasks = [fetch_segment(session, t + date, track_id) for t in ticks_to_check] @@ -86,7 +85,7 @@ async def bruteforce(track_id, date): # Stop if we found valid ticks if valid_ticks: - print(f"\n✅ Found {len(valid_ticks)} valid tick(s)!") + print(f"Found valid ticks: {valid_ticks}, stopping bruteforce.") break except KeyboardInterrupt: @@ -100,17 +99,14 @@ async def bruteforce(track_id, date): print(f"✅ Completed in {elapsed:.2f}s") print(f"⚡ Speed: {req_per_sec:.2f} req/s") print(f"📊 Total checked: {checked_count}/{total_requests}") - if valid_ticks: - print(f"📍 Valid ticks: {valid_ticks}") print(f"{'='*50}") return valid_ticks -def find_nearest_tick_by_hour(base_tick, datetime_str, timescale, duration, offset_hours=1): +def find_nearest_tick_by_hour(base_tick, datetime, timescale, duration, offset_hours=1): """Find the nearest tick for a given datetime.""" - dt = datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") - target_ticks = convert_date_to_ticks(dt, timescale, offset_hours) + target_ticks = convert_date_to_ticks(datetime, timescale, offset_hours) diff_ticks = base_tick - target_ticks rep_estimate = diff_ticks / duration @@ -128,10 +124,10 @@ def find_nearest_tick_by_hour(base_tick, datetime_str, timescale, duration, offs target_seconds = convert_ticks_to_sec(target_ticks, timescale) delta_seconds = abs(nearest_seconds - target_seconds) - print(f"Requested datetime: {dt} (offset +{offset_hours}h)") - print(f"Nearest rep: {rep}") - print(f"Tick: {nearest_tick}") - print(f"Date: {convert_sec_to_date(nearest_seconds, offset_hours)}") - print(f"Difference: {delta_seconds:.2f} seconds") + # print(f"Requested datetime: {datetime} (offset +{offset_hours}h)") + # print(f"Nearest rep: {rep}") + # print(f"Tick: {nearest_tick}") + # print(f"Date: {convert_sec_to_date(nearest_seconds, offset_hours)}") + # print(f"Difference: {delta_seconds:.2f} seconds") return nearest_tick, rep diff --git a/utils/utilities.py b/utils/utilities.py new file mode 100644 index 0000000..c3630c6 --- /dev/null +++ b/utils/utilities.py @@ -0,0 +1,71 @@ +import os +import sys +import logging +import subprocess + + +def verify_cmd(path: str) -> bool: + """Verify if the file provided at path is valid and exists, otherwise log error and exit.""" + if not os.path.exists(path): + logging.error(f"File does not exist: {path}") + sys.exit(1) + if not os.path.isfile(path): + logging.error(f"Path is not a file: {path}") + sys.exit(1) + return True + + +def get_missing_keys(keys, selection): + """Determine which DRM keys are missing based on user selection.""" + missing_keys = [] + for drm_info in [ + selection["video"]["drm_info"], + selection["audio"]["drm_info"], + ]: + for drm in drm_info: + if "default_KID" in drm: + kid = drm["default_KID"].replace("-", "").lower() + if kid not in [k.split(":")[0] for k in keys]: + missing_keys.append(kid) + return missing_keys + + +def merge_segments(input_folder: str, track_id: str, output_file: str): + """Merge downloaded segments into a single file using ffmpeg.""" + segment_folder = os.path.join(input_folder, f"segments_{track_id}") + + segment_files = sorted( + [f for f in os.listdir(segment_folder) if f.endswith(".m4s")], + key=lambda x: int(x.split(".")[0]), + ) + with open(output_file, "wb") as outfile: + for fname in segment_files: + with open(f"{segment_folder}/{fname}", "rb") as infile: + outfile.write(infile.read()) + print(f"✅ Merged segments into {output_file}") + + +def decrypt(input_file, init_path, output_file, key): + """Decrypt a media file using mp4ff-decrypt. + + Args: + input_file: Path to the input encrypted file. + init_path: Path to the initialization file. + output_file: Path to the output decrypted file. + key: The decryption key in KID:KEY format. + + Returns: + True if decryption succeeded, False otherwise. + """ + key = key.split(":")[1] + result = subprocess.run( + ['mp4ff-decrypt', '-init', init_path, '-key', key, input_file, output_file], + capture_output=True, + text=True + ) + if result.returncode == 0: + print(f"✅ Decrypted {input_file} to {output_file}") + return True + else: + print(f"❌ Decryption failed: {result.stderr}") + return False \ No newline at end of file