"""Input utilities for user prompts and channel/stream selection.""" import datetime import requests from prompt_toolkit.validation import Validator, ValidationError from InquirerPy import prompt from InquirerPy.validator import EmptyInputValidator from InquirerPy.base.control import Choice from utils.stream import ( get_manifest, parse_mpd_manifest, organize_by_content_type ) SERVICE_PLAN_API_URL = "https://api.oqee.net/api/v6/service_plan" EPG_API_URL = "https://api.oqee.net/api/v1/epg/all/{unix}" class DatetimeValidator(Validator): """ Validateur personnalisé pour les chaînes datetime au format "YYYY-MM-DD HH:MM:SS". """ def validate(self, document): try: datetime.datetime.strptime(document.text, "%Y-%m-%d %H:%M:%S") except ValueError as exc: raise ValidationError( message="Veuillez entrer une date/heure valide au format YYYY-MM-DD HH:MM:SS", cursor_position=len(document.text), ) from exc class DurationValidator(Validator): """ Validateur personnalisé pour les chaînes de durée au format "HH:MM:SS". """ def validate(self, document): parts = document.text.split(':') if len(parts) != 3: raise ValidationError( message="Veuillez entrer la durée au format HH:MM:SS", cursor_position=len(document.text), ) try: _, m, s = [int(part) for part in parts] if not (0 <= m < 60 and 0 <= s < 60): raise ValueError("Les minutes et les secondes doivent être entre 0 et 59.") except ValueError as exc: raise ValidationError( message="Format invalide. Utilisez HH:MM:SS avec des nombres valides.", cursor_position=len(document.text), ) from exc def get_date_input(): """Prompt user for start and end date/time or duration. Returns: tuple: A tuple containing (start_date, end_date) as datetime objects. """ question_start_date = [ { "type": "input", "message": "Entrez une date/heure de début (YYYY-MM-DD HH:MM:SS):", "name": "datetime", "default": "2025-01-01 12:00:00", "validate": DatetimeValidator(), "invalid_message": "Format de date/heure invalide. Utilisez YYYY-MM-DD HH:MM:SS", } ] start_date_result = prompt(question_start_date) if start_date_result: start_date = datetime.datetime.strptime(start_date_result["datetime"], "%Y-%m-%d %H:%M:%S") print(f"Date/heure de début : {start_date}") question_end_date = [ { "type": "list", "message": "Que voulez-vous entrer ?", "choices": ["Durée", "Date/heure de fin"], "name": "input_type", }, { "type": "input", "message": "Entrez la durée (HH:MM:SS):", "name": "duration", "default": "01:00:00", "validate": DurationValidator(), "when": lambda answers: answers["input_type"] == "Durée", }, { "type": "input", "message": "Entrez une date/heure de fin (YYYY-MM-DD HH:MM:SS):", "name": "datetime", "default": ( start_date_result["datetime"] if start_date_result else "2025-01-01 12:00:00" ), "validate": DatetimeValidator(), "when": lambda answers: answers["input_type"] == "Date/heure de fin", }, ] end_date_result = prompt(question_end_date) if end_date_result: if end_date_result.get("duration"): duration_str = end_date_result["duration"] try: h, m, s = map(int, duration_str.split(':')) duration_td = datetime.timedelta(hours=h, minutes=m, seconds=s) end_date = start_date + duration_td print(f"\nDate/heure de fin : {end_date}") except (ValueError, TypeError): print("Impossible d'analyser la chaîne de durée fournie.") elif end_date_result.get("datetime"): try: end_date = datetime.datetime.strptime( end_date_result["datetime"], "%Y-%m-%d %H:%M:%S" ) print(f"\nDate/heure de fin : {end_date}") except (ValueError, TypeError): print("Impossible d'analyser la chaîne de date/heure fournie.") return start_date, end_date def select_oqee_channel(): """Select an Oqee channel from the API. Returns: dict: Selected channel details or None if cancelled/error. """ api_url = SERVICE_PLAN_API_URL try: print("Chargement de la liste des chaînes depuis l'API Oqee...") response = requests.get(api_url, timeout=10) response.raise_for_status() data = response.json() if not data.get("success") or "channels" not in data.get("result", {}): print("Erreur: Le format de la réponse de l'API est inattendu.") return None channels_data = data["result"]["channels"] choices = [ { "name": f"{channel_info.get('name', 'Nom inconnu')}", "value": channel_id } for channel_id, channel_info in channels_data.items() ] choices.sort(key=lambda x: x['name']) except requests.exceptions.RequestException as e: print(f"Une erreur réseau est survenue : {e}") return None except ValueError: print("Erreur lors de l'analyse de la réponse JSON.") return None questions = [ { "type": "fuzzy", "message": "Veuillez choisir une chaîne (tapez pour filtrer) :", "choices": choices, "multiselect": False, "validate": EmptyInputValidator(), "invalid_message": "Vous devez sélectionner une chaîne.", "long_instruction": "Utilisez les flèches pour naviguer, Entrée pour sélectionner.", } ] try: result = prompt(questions) selected_channel_id = result[0] selected_channel_details = channels_data.get(selected_channel_id) if selected_channel_details: print("\n✅ Vous avez sélectionné :") print(f" - Nom : {selected_channel_details.get('name')}") print(f" - ID : {selected_channel_details.get('id')}") print(f" - ID Freebox : {selected_channel_details.get('freebox_id')}") else: print("Impossible de retrouver les détails de la chaîne sélectionnée.") return selected_channel_details except KeyboardInterrupt: print("\nOpération annulée par l'utilisateur.") return None except (ValueError, KeyError, IndexError) as e: print(f"Une erreur inattenante est survenue : {e}") return None def prompt_for_stream_selection(stream_info, already_selected_types): """Guide l'utilisateur pour sélectionner un flux, en désactivant les types déjà choisis.""" try: content_type_choices = [ Choice(value, name=value, enabled=value not in already_selected_types) for value in stream_info.keys() ] questions = [ { "type": "list", "message": "Quel type de flux souhaitez-vous sélectionner ?", "choices": content_type_choices } ] result = prompt(questions) if not result: return None selected_type = result[0] selected_content_data = stream_info[selected_type] questions = [ { "type": "list", "message": f"Choisissez une qualité pour '{selected_type}':", "choices": list(selected_content_data.keys()) } ] result = prompt(questions) if not result: return None quality_group_key = result[0] available_streams = selected_content_data[quality_group_key] final_selection = None if len(available_streams) == 1: final_selection = available_streams[0] print("Un seul flux disponible pour cette qualité, sélection automatique.") else: stream_choices = [ { "name": ( f"Bitrate: {s.get('bitrate_kbps')} kbps | " f"Codec: {s.get('codec', 'N/A')} | ID: {s.get('track_id')}" ), "value": s } for s in available_streams ] questions = [ { "type": "list", "message": "Plusieurs flux sont disponibles, choisissez-en un :", "choices": stream_choices } ] result = prompt(questions) if not result: return None final_selection = result[0] final_selection['content_type'] = selected_type return final_selection except (KeyboardInterrupt, TypeError): return None def stream_selection(): """Guide user through channel and stream selection process. Returns: dict: Dictionary of selected streams by content type, or None if cancelled. """ selected_channel = select_oqee_channel() if not selected_channel: return None print("\n✅ Chaîne sélectionnée :") print(f" - Nom : {selected_channel.get('name')}") print(f" - ID : {selected_channel.get('id')}") dash_id = selected_channel.get('streams', {}).get('dash') if not dash_id: print("Aucun flux DASH trouvé pour cette chaîne.") return None mpd_content = get_manifest(dash_id) manifest_info = parse_mpd_manifest(mpd_content) organized_info = organize_by_content_type(manifest_info) final_selections = {} while True: selection = prompt_for_stream_selection( organized_info, final_selections.keys() ) if selection: content_type = selection.pop('content_type') final_selections[content_type] = selection print("\n--- Récapitulatif de votre sélection ---") for stream_type, details in final_selections.items(): bitrate = details.get('bitrate_kbps') track_id = details.get('track_id') print( f" - {stream_type.capitalize()}: " f"Bitrate {bitrate} kbps (ID: {track_id})" ) print("----------------------------------------") continue_prompt = [ { "type": "list", "message": "Que souhaitez-vous faire ?", "choices": [ "Sélectionner un autre flux", "Terminer et continuer" ], } ] action_result = prompt(continue_prompt) if ( not action_result or action_result[0] == "Terminer et continuer" ): break if final_selections: final_selections['channel'] = selected_channel return final_selections print("\nAucun flux n'a été sélectionné.") return None def get_selection(channel_id, video_quality='best', audio_quality='best'): """Get stream selection for a given channel ID with specified qualities. Args: channel_id (str): The channel ID to select streams for. video_quality (str): Video quality selection ('best', '1080+best', '720+worst', etc.). audio_quality (str): Audio quality selection ('best', 'fra+best', etc.). Returns: dict: Dictionary of selected streams by content type, or None if error. """ # Fetch channel details api_url = SERVICE_PLAN_API_URL try: response = requests.get(api_url, timeout=10) response.raise_for_status() data = response.json() if not data.get("success") or "channels" not in data.get("result", {}): print("Erreur: Impossible de récupérer les détails de la chaîne.") return None channels_data = data["result"]["channels"] selected_channel_details = channels_data.get(str(channel_id)) if not selected_channel_details: print(f"Chaîne avec ID {channel_id} non trouvée.") return None except requests.exceptions.RequestException as e: print(f"Erreur réseau : {e}") return None except ValueError: print("Erreur lors de l'analyse de la réponse JSON.") return None print(f"Chaîne sélectionnée : {selected_channel_details.get('name')} (ID: {channel_id})") dash_id = selected_channel_details.get('streams', {}).get('dash') if not dash_id: print("Aucun flux DASH trouvé pour cette chaîne.") return None mpd_content = get_manifest(dash_id) manifest_info = parse_mpd_manifest(mpd_content) organized_info = organize_by_content_type(manifest_info) final_selections = {} final_selections['channel'] = selected_channel_details # Select video if 'video' in organized_info: selected_track = select_track(organized_info['video'], video_quality, 'video') if selected_track: final_selections['video'] = selected_track # Select audio if 'audio' in organized_info: selected_track = select_track(organized_info['audio'], audio_quality, 'audio') if selected_track: final_selections['audio'] = selected_track return final_selections def select_track(content_dict, quality_spec, content_type): """Select a track based on quality specification. Args: content_dict (dict): Organized content dict (video or audio). quality_spec (str): Quality spec like 'best', '1080+best', 'fra+worst'. content_type (str): 'video' or 'audio'. Returns: dict: Selected track or None. """ if '+' in quality_spec: filter_part, pref = quality_spec.split('+', 1) pref = pref.lower() else: filter_part = '' pref = quality_spec.lower() candidates = [] for key, tracks in content_dict.items(): if filter_part and filter_part.lower() not in key.lower(): continue candidates.extend(tracks) if not candidates: print(f"Aucune piste {content_type} trouvée pour '{quality_spec}'.") return None if pref == 'best': selected = max(candidates, key=lambda x: x['bandwidth']) elif pref == 'worst': selected = min(candidates, key=lambda x: x['bandwidth']) else: # Default to best if unknown pref selected = max(candidates, key=lambda x: x['bandwidth']) print(f"{content_type.capitalize()} sélectionnée : {selected['track_id']}, {selected['bitrate_kbps']} kbps") return selected def get_epg_data_at(dt: datetime.datetime): """ Fetch EPG data from the Oqee API for the nearest aligned hour of a given datetime. Args: dt (datetime.datetime): datetime (with hour, minute, etc.) Returns: dict | None: EPG data or None on error """ # Round to nearest hour if dt.minute >= 30: dt_aligned = (dt + datetime.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) else: dt_aligned = dt.replace(minute=0, second=0, microsecond=0) unix_time = int(dt_aligned.timestamp()) print(f"Fetching EPG for aligned time: {dt_aligned} (unix={unix_time})") try: response = requests.get(EPG_API_URL.format(unix=unix_time), timeout=10) response.raise_for_status() data = response.json() return data.get("result") except requests.exceptions.RequestException as e: print(f"Une erreur réseau est survenue : {e}") return None except ValueError: print("Erreur lors de l'analyse de la réponse JSON.") return None def select_program_from_epg(programs, original_start_date, original_end_date): """ Prompt user to select a program from EPG data or keep original selection. Args: programs (list): List of program dictionaries from EPG data original_start_date (datetime.datetime): User's original start date selection original_end_date (datetime.datetime): User's original end date selection Returns: dict: Dictionary containing: - 'start_date': datetime object for start - 'end_date': datetime object for end - 'title': str or None (program title if selected) - 'program': dict or None (full program data if selected) """ if not programs: print("Aucun programme disponible dans le guide EPG.") return { 'start_date': original_start_date, 'end_date': original_end_date, 'title': None, 'program': None } # Create choices list with program information program_choices = [] for program in programs: # Extract the live data from the program live_data = program.get("live", program) title = live_data.get('title', 'Sans titre') start_time = datetime.datetime.fromtimestamp(live_data.get('start', 0)) end_time = datetime.datetime.fromtimestamp(live_data.get('end', 0)) duration_min = (end_time - start_time).total_seconds() / 60 choice_name = ( f"{start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')} | " f"{title} ({int(duration_min)} min)" ) program_choices.append({ "name": choice_name, "value": program # Store the full program object }) # Add option to keep original selection program_choices.insert(0, { "name": ( f"Garder la sélection manuelle originale " f"({original_start_date.strftime('%Y-%m-%d %H:%M:%S')} - " f"{original_end_date.strftime('%Y-%m-%d %H:%M:%S')})" ), "value": None }) questions = [ { "type": "list", "message": "Sélectionnez un programme ou gardez votre sélection manuelle :", "choices": program_choices, "long_instruction": "Utilisez les flèches pour naviguer, Entrée pour sélectionner.", } ] try: result = prompt(questions) if not result: return None selected_program = result[0] # If user chose to keep original selection if selected_program is None: print("\n✅ Sélection manuelle conservée") return { 'start_date': original_start_date, 'end_date': original_end_date, 'title': None, 'program': None } # Extract live data and convert program timestamps to datetime objects live_data = selected_program.get('live', selected_program) program_start = datetime.datetime.fromtimestamp(live_data.get('start', 0)) program_end = datetime.datetime.fromtimestamp(live_data.get('end', 0)) program_title = live_data.get('title', 'Sans titre') print("\n✅ Programme sélectionné :") print(f" - Titre : {program_title}") print(f" - Début : {program_start.strftime('%Y-%m-%d %H:%M:%S')}") print(f" - Fin : {program_end.strftime('%Y-%m-%d %H:%M:%S')}") return { 'start_date': program_start, 'end_date': program_end, 'title': program_title, 'program': selected_program } except KeyboardInterrupt: print("\nOpération annulée par l'utilisateur.") return None