Files
OqeeRewind/utils/input.py
√(noham)² 5dc55dbf62 Add CLI mode and refactor Oqee downloader workflow
Introduces a command-line interface to main.py for Oqee TV downloads, supporting argument parsing for channel, date, quality, and keys. Refactors stream selection, segment download, decryption, and merging logic for both CLI and interactive modes. Adds new utility modules for DRM key retrieval, segment merging, and decryption. Cleans up and simplifies Oqee client, input, stream, and time utilities for improved maintainability and usability.
2025-12-20 11:43:01 +01:00

575 lines
20 KiB
Python

"""Input utilities for user prompts and channel/stream selection."""
import datetime
import requests
from prompt_toolkit.validation import Validator, ValidationError
from InquirerPy import prompt
from InquirerPy.validator import EmptyInputValidator
from InquirerPy.base.control import Choice
from utils.stream import (
get_manifest,
parse_mpd_manifest,
organize_by_content_type
)
SERVICE_PLAN_API_URL = "https://api.oqee.net/api/v6/service_plan"
EPG_API_URL = "https://api.oqee.net/api/v1/epg/all/{unix}"
class DatetimeValidator(Validator):
"""
Validateur personnalisé pour les chaînes datetime au format "YYYY-MM-DD HH:MM:SS".
"""
def validate(self, document):
try:
datetime.datetime.strptime(document.text, "%Y-%m-%d %H:%M:%S")
except ValueError as exc:
raise ValidationError(
message="Veuillez entrer une date/heure valide au format YYYY-MM-DD HH:MM:SS",
cursor_position=len(document.text),
) from exc
class DurationValidator(Validator):
"""
Validateur personnalisé pour les chaînes de durée au format "HH:MM:SS".
"""
def validate(self, document):
parts = document.text.split(':')
if len(parts) != 3:
raise ValidationError(
message="Veuillez entrer la durée au format HH:MM:SS",
cursor_position=len(document.text),
)
try:
_, m, s = [int(part) for part in parts]
if not (0 <= m < 60 and 0 <= s < 60):
raise ValueError("Les minutes et les secondes doivent être entre 0 et 59.")
except ValueError as exc:
raise ValidationError(
message="Format invalide. Utilisez HH:MM:SS avec des nombres valides.",
cursor_position=len(document.text),
) from exc
def get_date_input():
"""Prompt user for start and end date/time or duration.
Returns:
tuple: A tuple containing (start_date, end_date) as datetime objects.
"""
question_start_date = [
{
"type": "input",
"message": "Entrez une date/heure de début (YYYY-MM-DD HH:MM:SS):",
"name": "datetime",
"default": "2025-01-01 12:00:00",
"validate": DatetimeValidator(),
"invalid_message": "Format de date/heure invalide. Utilisez YYYY-MM-DD HH:MM:SS",
}
]
start_date_result = prompt(question_start_date)
if start_date_result:
start_date = datetime.datetime.strptime(start_date_result["datetime"], "%Y-%m-%d %H:%M:%S")
print(f"Date/heure de début : {start_date}")
question_end_date = [
{
"type": "list",
"message": "Que voulez-vous entrer ?",
"choices": ["Durée", "Date/heure de fin"],
"name": "input_type",
},
{
"type": "input",
"message": "Entrez la durée (HH:MM:SS):",
"name": "duration",
"default": "01:00:00",
"validate": DurationValidator(),
"when": lambda answers: answers["input_type"] == "Durée",
},
{
"type": "input",
"message": "Entrez une date/heure de fin (YYYY-MM-DD HH:MM:SS):",
"name": "datetime",
"default": (
start_date_result["datetime"] if start_date_result
else "2025-01-01 12:00:00"
),
"validate": DatetimeValidator(),
"when": lambda answers: answers["input_type"] == "Date/heure de fin",
},
]
end_date_result = prompt(question_end_date)
if end_date_result:
if end_date_result.get("duration"):
duration_str = end_date_result["duration"]
try:
h, m, s = map(int, duration_str.split(':'))
duration_td = datetime.timedelta(hours=h, minutes=m, seconds=s)
end_date = start_date + duration_td
print(f"\nDate/heure de fin : {end_date}")
except (ValueError, TypeError):
print("Impossible d'analyser la chaîne de durée fournie.")
elif end_date_result.get("datetime"):
try:
end_date = datetime.datetime.strptime(
end_date_result["datetime"], "%Y-%m-%d %H:%M:%S"
)
print(f"\nDate/heure de fin : {end_date}")
except (ValueError, TypeError):
print("Impossible d'analyser la chaîne de date/heure fournie.")
return start_date, end_date
def select_oqee_channel():
"""Select an Oqee channel from the API.
Returns:
dict: Selected channel details or None if cancelled/error.
"""
api_url = SERVICE_PLAN_API_URL
try:
print("Chargement de la liste des chaînes depuis l'API Oqee...")
response = requests.get(api_url, timeout=10)
response.raise_for_status()
data = response.json()
if not data.get("success") or "channels" not in data.get("result", {}):
print("Erreur: Le format de la réponse de l'API est inattendu.")
return None
channels_data = data["result"]["channels"]
choices = [
{
"name": f"{channel_info.get('name', 'Nom inconnu')}",
"value": channel_id
}
for channel_id, channel_info in channels_data.items()
]
choices.sort(key=lambda x: x['name'])
except requests.exceptions.RequestException as e:
print(f"Une erreur réseau est survenue : {e}")
return None
except ValueError:
print("Erreur lors de l'analyse de la réponse JSON.")
return None
questions = [
{
"type": "fuzzy",
"message": "Veuillez choisir une chaîne (tapez pour filtrer) :",
"choices": choices,
"multiselect": False,
"validate": EmptyInputValidator(),
"invalid_message": "Vous devez sélectionner une chaîne.",
"long_instruction": "Utilisez les flèches pour naviguer, Entrée pour sélectionner.",
}
]
try:
result = prompt(questions)
selected_channel_id = result[0]
selected_channel_details = channels_data.get(selected_channel_id)
if selected_channel_details:
print("\n✅ Vous avez sélectionné :")
print(f" - Nom : {selected_channel_details.get('name')}")
print(f" - ID : {selected_channel_details.get('id')}")
print(f" - ID Freebox : {selected_channel_details.get('freebox_id')}")
else:
print("Impossible de retrouver les détails de la chaîne sélectionnée.")
return selected_channel_details
except KeyboardInterrupt:
print("\nOpération annulée par l'utilisateur.")
return None
except (ValueError, KeyError, IndexError) as e:
print(f"Une erreur inattenante est survenue : {e}")
return None
def prompt_for_stream_selection(stream_info, already_selected_types):
"""Guide l'utilisateur pour sélectionner un flux, en désactivant les types déjà choisis."""
try:
content_type_choices = [
Choice(value, name=value, enabled=value not in already_selected_types)
for value in stream_info.keys()
]
questions = [
{
"type": "list",
"message": "Quel type de flux souhaitez-vous sélectionner ?",
"choices": content_type_choices
}
]
result = prompt(questions)
if not result:
return None
selected_type = result[0]
selected_content_data = stream_info[selected_type]
questions = [
{
"type": "list",
"message": f"Choisissez une qualité pour '{selected_type}':",
"choices": list(selected_content_data.keys())
}
]
result = prompt(questions)
if not result:
return None
quality_group_key = result[0]
available_streams = selected_content_data[quality_group_key]
final_selection = None
if len(available_streams) == 1:
final_selection = available_streams[0]
print("Un seul flux disponible pour cette qualité, sélection automatique.")
else:
stream_choices = [
{
"name": (
f"Bitrate: {s.get('bitrate_kbps')} kbps | "
f"Codec: {s.get('codec', 'N/A')} | ID: {s.get('track_id')}"
),
"value": s
}
for s in available_streams
]
questions = [
{
"type": "list",
"message": "Plusieurs flux sont disponibles, choisissez-en un :",
"choices": stream_choices
}
]
result = prompt(questions)
if not result:
return None
final_selection = result[0]
final_selection['content_type'] = selected_type
return final_selection
except (KeyboardInterrupt, TypeError):
return None
def stream_selection():
"""Guide user through channel and stream selection process.
Returns:
dict: Dictionary of selected streams by content type, or None if cancelled.
"""
selected_channel = select_oqee_channel()
if not selected_channel:
return None
print("\n✅ Chaîne sélectionnée :")
print(f" - Nom : {selected_channel.get('name')}")
print(f" - ID : {selected_channel.get('id')}")
dash_id = selected_channel.get('streams', {}).get('dash')
if not dash_id:
print("Aucun flux DASH trouvé pour cette chaîne.")
return None
mpd_content = get_manifest(dash_id)
manifest_info = parse_mpd_manifest(mpd_content)
organized_info = organize_by_content_type(manifest_info)
final_selections = {}
while True:
selection = prompt_for_stream_selection(
organized_info, final_selections.keys()
)
if selection:
content_type = selection.pop('content_type')
final_selections[content_type] = selection
print("\n--- Récapitulatif de votre sélection ---")
for stream_type, details in final_selections.items():
bitrate = details.get('bitrate_kbps')
track_id = details.get('track_id')
print(
f" - {stream_type.capitalize()}: "
f"Bitrate {bitrate} kbps (ID: {track_id})"
)
print("----------------------------------------")
continue_prompt = [
{
"type": "list",
"message": "Que souhaitez-vous faire ?",
"choices": [
"Sélectionner un autre flux",
"Terminer et continuer"
],
}
]
action_result = prompt(continue_prompt)
if (
not action_result or
action_result[0] == "Terminer et continuer"
):
break
if final_selections:
final_selections['channel'] = selected_channel
return final_selections
print("\nAucun flux n'a été sélectionné.")
return None
def get_selection(channel_id, video_quality='best', audio_quality='best'):
"""Get stream selection for a given channel ID with specified qualities.
Args:
channel_id (str): The channel ID to select streams for.
video_quality (str): Video quality selection ('best', '1080+best', '720+worst', etc.).
audio_quality (str): Audio quality selection ('best', 'fra+best', etc.).
Returns:
dict: Dictionary of selected streams by content type, or None if error.
"""
# Fetch channel details
api_url = SERVICE_PLAN_API_URL
try:
response = requests.get(api_url, timeout=10)
response.raise_for_status()
data = response.json()
if not data.get("success") or "channels" not in data.get("result", {}):
print("Erreur: Impossible de récupérer les détails de la chaîne.")
return None
channels_data = data["result"]["channels"]
selected_channel_details = channels_data.get(str(channel_id))
if not selected_channel_details:
print(f"Chaîne avec ID {channel_id} non trouvée.")
return None
except requests.exceptions.RequestException as e:
print(f"Erreur réseau : {e}")
return None
except ValueError:
print("Erreur lors de l'analyse de la réponse JSON.")
return None
print(f"Chaîne sélectionnée : {selected_channel_details.get('name')} (ID: {channel_id})")
dash_id = selected_channel_details.get('streams', {}).get('dash')
if not dash_id:
print("Aucun flux DASH trouvé pour cette chaîne.")
return None
mpd_content = get_manifest(dash_id)
manifest_info = parse_mpd_manifest(mpd_content)
organized_info = organize_by_content_type(manifest_info)
final_selections = {}
final_selections['channel'] = selected_channel_details
# Select video
if 'video' in organized_info:
selected_track = select_track(organized_info['video'], video_quality, 'video')
if selected_track:
final_selections['video'] = selected_track
# Select audio
if 'audio' in organized_info:
selected_track = select_track(organized_info['audio'], audio_quality, 'audio')
if selected_track:
final_selections['audio'] = selected_track
return final_selections
def select_track(content_dict, quality_spec, content_type):
"""Select a track based on quality specification.
Args:
content_dict (dict): Organized content dict (video or audio).
quality_spec (str): Quality spec like 'best', '1080+best', 'fra+worst'.
content_type (str): 'video' or 'audio'.
Returns:
dict: Selected track or None.
"""
if '+' in quality_spec:
filter_part, pref = quality_spec.split('+', 1)
pref = pref.lower()
else:
filter_part = ''
pref = quality_spec.lower()
candidates = []
for key, tracks in content_dict.items():
if filter_part and filter_part.lower() not in key.lower():
continue
candidates.extend(tracks)
if not candidates:
print(f"Aucune piste {content_type} trouvée pour '{quality_spec}'.")
return None
if pref == 'best':
selected = max(candidates, key=lambda x: x['bandwidth'])
elif pref == 'worst':
selected = min(candidates, key=lambda x: x['bandwidth'])
else:
# Default to best if unknown pref
selected = max(candidates, key=lambda x: x['bandwidth'])
print(f"{content_type.capitalize()} sélectionnée : {selected['track_id']}, {selected['bitrate_kbps']} kbps")
return selected
def get_epg_data_at(dt: datetime.datetime):
"""
Fetch EPG data from the Oqee API for the nearest aligned hour of a given datetime.
Args:
dt (datetime.datetime): datetime (with hour, minute, etc.)
Returns:
dict | None: EPG data or None on error
"""
# Round to nearest hour
if dt.minute >= 30:
dt_aligned = (dt + datetime.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
else:
dt_aligned = dt.replace(minute=0, second=0, microsecond=0)
unix_time = int(dt_aligned.timestamp())
print(f"Fetching EPG for aligned time: {dt_aligned} (unix={unix_time})")
try:
response = requests.get(EPG_API_URL.format(unix=unix_time), timeout=10)
response.raise_for_status()
data = response.json()
return data.get("result")
except requests.exceptions.RequestException as e:
print(f"Une erreur réseau est survenue : {e}")
return None
except ValueError:
print("Erreur lors de l'analyse de la réponse JSON.")
return None
def select_program_from_epg(programs, original_start_date, original_end_date):
"""
Prompt user to select a program from EPG data or keep original selection.
Args:
programs (list): List of program dictionaries from EPG data
original_start_date (datetime.datetime): User's original start date selection
original_end_date (datetime.datetime): User's original end date selection
Returns:
dict: Dictionary containing:
- 'start_date': datetime object for start
- 'end_date': datetime object for end
- 'title': str or None (program title if selected)
- 'program': dict or None (full program data if selected)
"""
if not programs:
print("Aucun programme disponible dans le guide EPG.")
return {
'start_date': original_start_date,
'end_date': original_end_date,
'title': None,
'program': None
}
# Create choices list with program information
program_choices = []
for program in programs:
# Extract the live data from the program
live_data = program.get("live", program)
title = live_data.get('title', 'Sans titre')
start_time = datetime.datetime.fromtimestamp(live_data.get('start', 0))
end_time = datetime.datetime.fromtimestamp(live_data.get('end', 0))
duration_min = (end_time - start_time).total_seconds() / 60
choice_name = (
f"{start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')} | "
f"{title} ({int(duration_min)} min)"
)
program_choices.append({
"name": choice_name,
"value": program # Store the full program object
})
# Add option to keep original selection
program_choices.insert(0, {
"name": (
f"Garder la sélection manuelle originale "
f"({original_start_date.strftime('%Y-%m-%d %H:%M:%S')} - "
f"{original_end_date.strftime('%Y-%m-%d %H:%M:%S')})"
),
"value": None
})
questions = [
{
"type": "list",
"message": "Sélectionnez un programme ou gardez votre sélection manuelle :",
"choices": program_choices,
"long_instruction": "Utilisez les flèches pour naviguer, Entrée pour sélectionner.",
}
]
try:
result = prompt(questions)
if not result:
return None
selected_program = result[0]
# If user chose to keep original selection
if selected_program is None:
print("\n✅ Sélection manuelle conservée")
return {
'start_date': original_start_date,
'end_date': original_end_date,
'title': None,
'program': None
}
# Extract live data and convert program timestamps to datetime objects
live_data = selected_program.get('live', selected_program)
program_start = datetime.datetime.fromtimestamp(live_data.get('start', 0))
program_end = datetime.datetime.fromtimestamp(live_data.get('end', 0))
program_title = live_data.get('title', 'Sans titre')
print("\n✅ Programme sélectionné :")
print(f" - Titre : {program_title}")
print(f" - Début : {program_start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" - Fin : {program_end.strftime('%Y-%m-%d %H:%M:%S')}")
return {
'start_date': program_start,
'end_date': program_end,
'title': program_title,
'program': selected_program
}
except KeyboardInterrupt:
print("\nOpération annulée par l'utilisateur.")
return None