From 31f87a6a9914da0441cdd38e6a6c724a8c3fad46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=88=9A=28noham=29=C2=B2?= <100566912+NohamR@users.noreply.github.com> Date: Sat, 20 Dec 2025 12:52:35 +0100 Subject: [PATCH] Refactor logging to use parameterized messages --- main.py | 32 +++++++++++++------------- utils/input.py | 46 +++++++++++++++++++------------------ utils/logging_config.py | 14 +++++++----- utils/oqee.py | 33 ++++++++++++++++++++++----- utils/stream.py | 50 +++++++++++++++++++++++++---------------- utils/times.py | 10 ++++----- utils/utilities.py | 10 ++++----- 7 files changed, 116 insertions(+), 79 deletions(-) diff --git a/main.py b/main.py index 91d0b45..6059295 100644 --- a/main.py +++ b/main.py @@ -173,15 +173,15 @@ if __name__ == "__main__": logger.error("Error during stream selection.") sys.exit(1) - logger.info(f"Start date: {start_date}") - logger.info(f"End date: {end_date}") - logger.info(f"Channel ID: {args.channel_id}") - logger.info(f"Video quality: {args.video}") - logger.info(f"Audio track: {args.audio}") - logger.info(f"Title: {title}") - logger.info(f"DRM keys: {keys}") - logger.info(f"Output dir: {args.output_dir}") - logger.info(f"Widevine device: {args.widevine_device}") + logger.debug("Start date: %s", start_date) + logger.debug("End date: %s", end_date) + logger.debug("Channel ID: %s", args.channel_id) + logger.debug("Video quality: %s", args.video) + logger.debug("Audio track: %s", args.audio) + logger.debug("Title: %s", title) + logger.debug("DRM keys: %s", keys) + logger.debug("Output dir: %s", args.output_dir) + logger.debug("Widevine device: %s", args.widevine_device) else: # Interactive mode @@ -255,7 +255,7 @@ if __name__ == "__main__": ) rep_nb = (end_tick - start_tick) // DURATION + 1 - logger.info(f"Total segments to fetch for {content_type}: {rep_nb}") + logger.info("Total segments to fetch for %s: %d", content_type, rep_nb) data = { "start_tick": start_tick, "rep_nb": rep_nb, @@ -296,7 +296,7 @@ if __name__ == "__main__": key = k break if not key: - logger.info(f"No key found for KID {kid}, need to fetch it.") + logger.info("No key found for KID %s, need to fetch it.", kid) missing_keys.append(kid) if len(missing_keys) > 0: @@ -317,7 +317,7 @@ if __name__ == "__main__": } fetched_keys = get_keys(kids=missing_keys, method=method) - logger.info(f"Fetched keys: {fetched_keys}") + logger.info("Fetched keys: %s", fetched_keys) keys = keys + fetched_keys for content_type, data in [("video", video_data), ("audio", audio_data)]: @@ -347,7 +347,7 @@ if __name__ == "__main__": f'ffmpeg -i "concat:{output_dir}/segments_{track_id_video}/init.mp4|' f'{output_dir}/dec_video.mp4" -c copy {output_dir}/video.mp4' ) - logger.debug(f"FFmpeg command: {command_ffmpeg}") + logger.debug("FFmpeg command: %s", command_ffmpeg) subprocess.run( command_ffmpeg, shell=True, @@ -360,7 +360,7 @@ if __name__ == "__main__": f'{output_dir}/dec_audio.mp4" -c copy {output_dir}/audio.mp4' ) - logger.debug(f"FFmpeg command: {command_ffmpeg}") + logger.debug("FFmpeg command: %s", command_ffmpeg) subprocess.run( command_ffmpeg, shell=True, @@ -373,7 +373,7 @@ if __name__ == "__main__": f"ffmpeg -i {output_dir}/video.mp4 -itsoffset {diff_start_sec} " f"-i {output_dir}/audio.mp4 -c copy -map 0:v -map 1:a {output_dir}/output.mp4" ) - logger.debug(f"Merge command: {COMMAND_MERGE}") + logger.debug("Merge command: %s", COMMAND_MERGE) subprocess.run( COMMAND_MERGE, shell=True, @@ -384,7 +384,7 @@ if __name__ == "__main__": FINAL_OUTPUT = f"{output_dir}/{title}.mp4" shutil.move(f"{output_dir}/output.mp4", FINAL_OUTPUT) - logger.info(f"Final output saved to {FINAL_OUTPUT}") + logger.info("Final output saved to %s", FINAL_OUTPUT) os.remove(f"{output_dir}/dec_video.mp4") os.remove(f"{output_dir}/dec_audio.mp4") diff --git a/utils/input.py b/utils/input.py index 6f8c1aa..879cd58 100644 --- a/utils/input.py +++ b/utils/input.py @@ -76,7 +76,7 @@ def get_date_input(): start_date = datetime.datetime.strptime( start_date_result["datetime"], "%Y-%m-%d %H:%M:%S" ) - logger.debug(f"Start date/time: {start_date}") + logger.debug("Start date/time: %s", start_date) question_end_date = [ { @@ -116,7 +116,7 @@ def get_date_input(): h, m, s = map(int, duration_str.split(":")) duration_td = datetime.timedelta(hours=h, minutes=m, seconds=s) end_date = start_date + duration_td - logger.debug(f"End date/time: {end_date}") + logger.debug("End date/time: %s", end_date) except (ValueError, TypeError): logger.error("Unable to parse the provided duration string.") @@ -125,7 +125,7 @@ def get_date_input(): end_date = datetime.datetime.strptime( end_date_result["datetime"], "%Y-%m-%d %H:%M:%S" ) - logger.debug(f"End date/time: {end_date}") + logger.debug("End date/time: %s", end_date) except (ValueError, TypeError): logger.error("Unable to parse the provided date/time string.") return start_date, end_date @@ -155,7 +155,7 @@ def select_oqee_channel(): choices.sort(key=lambda x: x["name"]) except requests.exceptions.RequestException as e: - logger.error(f"A network error occurred: {e}") + logger.error("A network error occurred: %s", e) return None except ValueError: logger.error("Error parsing JSON response.") @@ -179,9 +179,9 @@ def select_oqee_channel(): selected_channel_details = channels_data.get(selected_channel_id) if selected_channel_details: logger.info("You have selected:") - logger.info(f" - Name: {selected_channel_details.get('name')}") - logger.info(f" - ID: {selected_channel_details.get('id')}") - logger.info(f" - Freebox ID: {selected_channel_details.get('freebox_id')}") + logger.info(" - Name: %s", selected_channel_details.get("name")) + logger.info(" - ID: %s", selected_channel_details.get("id")) + logger.info(" - Freebox ID: %s", selected_channel_details.get("freebox_id")) else: logger.warning("Unable to find details for the selected channel.") return selected_channel_details @@ -190,7 +190,7 @@ def select_oqee_channel(): logger.info("Operation cancelled by user.") return None except (ValueError, KeyError, IndexError) as e: - logger.error(f"An unexpected error occurred: {e}") + logger.error("An unexpected error occurred: %s", e) return None @@ -276,8 +276,8 @@ def stream_selection(): return None logger.debug("Selected channel:") - logger.debug(f" - Name: {selected_channel.get('name')}") - logger.debug(f" - ID: {selected_channel.get('id')}") + logger.debug(" - Name: %s", selected_channel.get("name")) + logger.debug(" - ID: %s", selected_channel.get("id")) dash_id = selected_channel.get("streams", {}).get("dash") if not dash_id: @@ -302,8 +302,8 @@ def stream_selection(): bitrate = details.get("bitrate_kbps") track_id = details.get("track_id") logger.info( - f" - {stream_type.capitalize()}: " - f"Bitrate {bitrate} kbps (ID: {track_id})" + " - %s: Bitrate %s kbps (ID: %s)", + stream_type.capitalize(), bitrate, track_id ) logger.info("----------------------------------------") @@ -351,18 +351,19 @@ def get_selection(channel_id, video_quality="best", audio_quality="best"): channels_data = data["result"]["channels"] selected_channel_details = channels_data.get(str(channel_id)) if not selected_channel_details: - logger.error(f"Channel with ID {channel_id} not found.") + logger.error("Channel with ID %s not found.", channel_id) return None except requests.exceptions.RequestException as e: - logger.error(f"Network error: {e}") + logger.error("Network error: %s", e) return None except ValueError: logger.error("Error parsing JSON response.") return None logger.info( - f"Selected channel: {selected_channel_details.get('name')} (ID: {channel_id})" + "Selected channel: %s (ID: %s)", + selected_channel_details.get("name"), channel_id ) dash_id = selected_channel_details.get("streams", {}).get("dash") @@ -417,7 +418,7 @@ def select_track(content_dict, quality_spec, content_type): candidates.extend(tracks) if not candidates: - logger.warning(f"No {content_type} track found for '{quality_spec}'.") + logger.warning("No %s track found for '%s'.", content_type, quality_spec) return None if pref == "best": @@ -429,7 +430,8 @@ def select_track(content_dict, quality_spec, content_type): selected = max(candidates, key=lambda x: x["bandwidth"]) logger.info( - f"{content_type.capitalize()} selected: {selected['track_id']}, {selected['bitrate_kbps']} kbps" + "%s selected: %s, %d kbps", + content_type.capitalize(), selected["track_id"], selected["bitrate_kbps"] ) return selected @@ -454,7 +456,7 @@ def get_epg_data_at(dt: datetime.datetime): dt_aligned = dt.replace(minute=0, second=0, microsecond=0) unix_time = int(dt_aligned.timestamp()) - logger.info(f"Fetching EPG for aligned time: {dt_aligned} (unix={unix_time})") + logger.info("Fetching EPG for aligned time: %s (unix=%d)", dt_aligned, unix_time) try: response = requests.get(EPG_API_URL.format(unix=unix_time), timeout=10) @@ -464,7 +466,7 @@ def get_epg_data_at(dt: datetime.datetime): return data.get("result") except requests.exceptions.RequestException as e: - logger.error(f"A network error occurred: {e}") + logger.error("A network error occurred: %s", e) return None except ValueError: logger.error("Error parsing JSON response.") @@ -560,9 +562,9 @@ def select_program_from_epg(programs, original_start_date, original_end_date): program_title = live_data.get("title", "Untitled") logger.info("Selected program:") - logger.info(f" - Title: {program_title}") - logger.info(f" - Start: {program_start.strftime('%Y-%m-%d %H:%M:%S')}") - logger.info(f" - End: {program_end.strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(" - Title: %s", program_title) + logger.info(" - Start: %s", program_start.strftime("%Y-%m-%d %H:%M:%S")) + logger.info(" - End: %s", program_end.strftime("%Y-%m-%d %H:%M:%S")) return { "start_date": program_start, diff --git a/utils/logging_config.py b/utils/logging_config.py index 88ad291..82a295e 100644 --- a/utils/logging_config.py +++ b/utils/logging_config.py @@ -1,9 +1,11 @@ +"""Logging configuration for OqeeRewind.""" + import logging import sys class ColoredFormatter(logging.Formatter): """Custom logging formatter to add colors to log levels.""" - + grey = "\x1b[38;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" @@ -27,8 +29,8 @@ class ColoredFormatter(logging.Formatter): def setup_logging(level=logging.INFO): """Set up logging configuration.""" - logger = logging.getLogger("OqeeRewind") - logger.setLevel(level) + log = logging.getLogger("OqeeRewind") + log.setLevel(level) # Create console handler console_handler = logging.StreamHandler(sys.stdout) @@ -39,10 +41,10 @@ def setup_logging(level=logging.INFO): console_handler.setFormatter(formatter) # Add handler to logger - if not logger.handlers: - logger.addHandler(console_handler) + if not log.handlers: + log.addHandler(console_handler) - return logger + return log # Create a default logger instance logger = logging.getLogger("OqeeRewind") diff --git a/utils/oqee.py b/utils/oqee.py index 4d21574..83be45e 100644 --- a/utils/oqee.py +++ b/utils/oqee.py @@ -24,7 +24,11 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes # Base headers template for API requests self._headers_template = { - "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "accept": ( + "text/html,application/xhtml+xml,application/xml;q=0.9," + "image/avif,image/webp,image/apng,*/*;q=0.8," + "application/signed-exchange;v=b3;q=0.7" + ), "accept-language": "en-GB,en-US;q=0.9,en;q=0.8", "cache-control": "no-cache", "pragma": "no-cache", @@ -37,7 +41,11 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes "sec-fetch-site": "none", "sec-fetch-user": "?1", "upgrade-insecure-requests": "1", - "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", + "user-agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/144.0.0.0 Safari/537.36" + ), } self.headers_base = self._build_headers() @@ -128,7 +136,11 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes "pragma": "no-cache", "priority": "u=1, i", "sec-ch-ua": '"Brave";v="131", "Chromium";v="131", "Not_A Brand";v="24"', - "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "user-agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/131.0.0.0 Safari/537.36" + ), "x-oqee-customization": "0", } ) @@ -143,7 +155,10 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes state = r["state"][0] headers = { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9, image/avif,image/webp,image/apng,*/*;q=0.8", + "Accept": ( + "text/html,application/xhtml+xml,application/xml;q=0.9, " + "image/avif,image/webp,image/apng,*/*;q=0.8" + ), "Accept-Language": "fr-FR,fr;q=0.7", "Cache-Control": "max-age=0", "Connection": "keep-alive", @@ -156,7 +171,11 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes "Sec-Fetch-User": "?1", "Sec-GPC": "1", "Upgrade-Insecure-Requests": "1", - "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "user-agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/131.0.0.0 Safari/537.36" + ), "sec-ch-ua": '"Brave";v="123", "Not:A-Brand";v="8", "Chromium";v="123"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', @@ -216,7 +235,9 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes try: self.access_token = self.login_cred(username, password) except ValueError as e: - logger.warning(f"Credential login failed: {e}. Falling back to IP login.") + logger.warning( + "Credential login failed: %s. Falling back to IP login.", e + ) self.access_token = self.login_ip() logger.info("Fetching rights token") diff --git a/utils/stream.py b/utils/stream.py index 429d2cc..3994ccf 100644 --- a/utils/stream.py +++ b/utils/stream.py @@ -5,7 +5,6 @@ import base64 import os import asyncio import time -import subprocess from typing import Dict, Any import requests @@ -317,12 +316,16 @@ def get_manifest(manifest_id): "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "cross-site", - "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "user-agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/143.0.0.0 Safari/537.36" + ), } format_id = 1 url = ( - f"https://api-proxad.dc2.oqee.net/playlist/v1/live/" + "https://api-proxad.dc2.oqee.net/playlist/v1/live/" f"{manifest_id}/{format_id}/live.mpd" ) response = requests.get(url, headers=headers, timeout=10) @@ -344,7 +347,11 @@ async def fetch_segment(session, ticks, track_id): headers = { "Accept": "*/*", "Referer": "https://tv.free.fr/", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "User-Agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/143.0.0.0 Safari/537.36" + ), } try: async with session.get(url, headers=headers) as resp: @@ -366,7 +373,11 @@ def get_init(output_folder, track_id): headers = { "Accept": "*/*", "Referer": "https://tv.free.fr/", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "User-Agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/143.0.0.0 Safari/537.36" + ), } response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: @@ -374,7 +385,7 @@ def get_init(output_folder, track_id): init_path = f"{output_folder}/segments_{track_id}/init.mp4" with open(init_path, "wb") as f: f.write(response.content) - logger.debug(f"Saved initialization segment to {init_path}") + logger.debug("Saved initialization segment to %s", init_path) return init_path @@ -395,7 +406,11 @@ async def save_segments(output_folder, track_id, start_tick, rep_nb, duration): headers = { "Accept": "*/*", "Referer": "https://tv.free.fr/", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "User-Agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/143.0.0.0 Safari/537.36" + ), } try: async with session.get(url, headers=headers) as resp: @@ -406,21 +421,19 @@ async def save_segments(output_folder, track_id, start_tick, rep_nb, duration): f.write(content) return True logger.error( - f"Failed to download segment {rep} (tick {tick}): " - f"HTTP {resp.status}" + "Failed to download segment %d (tick %d): HTTP %d", + rep, tick, resp.status ) return False except aiohttp.ClientError as e: - logger.warning(f"Error downloading segment {rep} (tick {tick}): {e}") + logger.warning("Error downloading segment %d (tick %d): %s", rep, tick, e) return False - logger.info(f"Starting download of {rep_nb} segments...") - logger.debug(f"Track ID: {track_id}") - logger.debug(f"Base tick: {start_tick}") + logger.info("Starting download of %d segments...", rep_nb) + logger.debug("Track ID: %s", track_id) + logger.debug("Base tick: %d", start_tick) start_time = time.time() - successful = 0 - failed = 0 async with aiohttp.ClientSession() as session: tasks = [] @@ -438,13 +451,12 @@ async def save_segments(output_folder, track_id, start_tick, rep_nb, duration): result = await coro results.append(result) successful = sum(1 for r in results if r is True) - failed = rep_nb - successful end_time = time.time() elapsed = end_time - start_time - logger.debug(f"Download completed in {elapsed:.2f}s") - logger.info(f"Files saved to {output_folder}/segments_{track_id}/") + logger.debug("Download completed in %.2fs", elapsed) + logger.info("Files saved to %s/segments_%s/", output_folder, track_id) def get_kid(output_folder, track_id): @@ -461,7 +473,7 @@ def get_kid(output_folder, track_id): for filename in os.listdir(folder): if filename.endswith(".m4s"): filepath = os.path.join(folder, filename) - logger.debug(f"Checking file: {filepath}") + logger.debug("Checking file: %s", filepath) with open(filepath, "rb") as f: data = f.read() # Pattern before KID diff --git a/utils/times.py b/utils/times.py index f5c8146..746698a 100644 --- a/utils/times.py +++ b/utils/times.py @@ -57,7 +57,7 @@ async def bruteforce(track_id, date): total_requests = 288000 batch_size = 20000 - logger.debug(f"Starting bruteforce for {track_id}") + logger.debug("Starting bruteforce for %s", track_id) start_time = time.time() @@ -86,16 +86,16 @@ async def bruteforce(track_id, date): # Stop if we found valid ticks if valid_ticks: - logger.debug(f"Found valid ticks: {valid_ticks}, stopping bruteforce.") + logger.debug("Found valid ticks: %s, stopping bruteforce.", valid_ticks) break except KeyboardInterrupt: logger.error("Interrupted by user (Ctrl+C)") elapsed = time.time() - start_time - logger.debug(f"Completed in {elapsed:.2f}s") - logger.debug(f"Speed: {total_requests / elapsed if elapsed > 0 else 0:.2f} req/s") - logger.debug(f"Total checked: {total_requests}") + logger.debug("Completed in %.2fs", elapsed) + logger.debug("Speed: %.2f req/s", total_requests / elapsed if elapsed > 0 else 0) + logger.debug("Total checked: %d", total_requests) return valid_ticks diff --git a/utils/utilities.py b/utils/utilities.py index 277ac68..1645575 100644 --- a/utils/utilities.py +++ b/utils/utilities.py @@ -20,10 +20,10 @@ def verify_mp4ff(): def verify_cmd(path: str) -> bool: """Verify if the file provided at path is valid and exists, otherwise log error and exit.""" if not os.path.exists(path): - logger.error(f"File does not exist: {path}") + logger.error("File does not exist: %s", path) sys.exit(1) if not os.path.isfile(path): - logger.error(f"Path is not a file: {path}") + logger.error("Path is not a file: %s", path) sys.exit(1) return True @@ -40,7 +40,7 @@ def merge_segments(input_folder: str, track_id: str, output_file: str): for fname in segment_files: with open(f"{segment_folder}/{fname}", "rb") as infile: outfile.write(infile.read()) - logger.info(f"Merged segments into {output_file}") + logger.info("Merged segments into %s", output_file) def decrypt(input_file, init_path, output_file, key): @@ -63,7 +63,7 @@ def decrypt(input_file, init_path, output_file, key): check=False, ) if result.returncode == 0: - logger.info(f"Decrypted {input_file} to {output_file}") + logger.info("Decrypted %s to %s", input_file, output_file) return True - logger.error(f"Decryption failed: {result.stderr}") + logger.error("Decryption failed: %s", result.stderr) return False