Integrate structured logging across project

This commit is contained in:
√(noham)²
2025-12-20 12:46:28 +01:00
parent 0c408e0c5e
commit debd884e2d
9 changed files with 173 additions and 112 deletions

View File

@@ -64,8 +64,9 @@ uv run main.py
You can automate the download by providing arguments.
```bash
usage: main.py [-h] [--start-date START_DATE] [--end-date END_DATE] [--duration DURATION] [--channel-id CHANNEL_ID] [--video VIDEO] [--audio AUDIO]
[--title TITLE] [--username USERNAME] [--password PASSWORD] [--key KEY] [--output-dir OUTPUT_DIR] [--widevine-device WIDEVINE_DEVICE]
usage: main.py [-h] [--start-date START_DATE] [--end-date END_DATE] [--duration DURATION] [--channel-id CHANNEL_ID] [--video VIDEO] [--audio AUDIO] [--title TITLE]
[--username USERNAME] [--password PASSWORD] [--key KEY] [--output-dir OUTPUT_DIR] [--widevine-device WIDEVINE_DEVICE]
[--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}]
options:
-h, --help show this help message and exit
@@ -85,6 +86,8 @@ options:
Output directory for downloaded files (default: ./download)
--widevine-device WIDEVINE_DEVICE
Path to Widevine device file (default: ./widevine/device.wvd)
--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
Set the logging level (default: INFO)
```
#### Examples
@@ -118,7 +121,7 @@ In order to decrypt DRM content, you will need to have a dumped CDM, after that
- [x] Full implementation
- [x] Verify mp4ff installation
- [x] CLI arguments implementation + documentation
- [ ] French/English full translation
- [x] French/English full translation
- [ ] Better output system
- [ ] Add more comments in the code
- [ ] Logging system

63
main.py
View File

@@ -6,6 +6,7 @@ import argparse
import asyncio
import subprocess
import shutil
import logging
from datetime import datetime, timedelta
from dotenv import load_dotenv
from utils.input import (
@@ -27,6 +28,7 @@ from utils.times import (
bruteforce,
)
from utils.stream import save_segments, get_kid, get_init
from utils.logging_config import setup_logging, logger
load_dotenv()
TIMESCALE = 90000
@@ -83,12 +85,20 @@ def parse_arguments():
default="./widevine/device.wvd",
help="Path to Widevine device file (default: ./widevine/device.wvd)",
)
parser.add_argument(
"--log-level",
type=str,
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level (default: INFO)",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
setup_logging(level=getattr(logging, args.log_level.upper()))
verify_mp4ff()
# Check if CLI mode
@@ -110,7 +120,7 @@ if __name__ == "__main__":
try:
if cli_mode:
# CLI mode
print("Running in CLI mode...")
logger.info("Running in CLI mode...")
# Parse dates
start_date = None
@@ -120,17 +130,17 @@ if __name__ == "__main__":
try:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d %H:%M:%S")
except ValueError:
print("Invalid start-date format. Use YYYY-MM-DD HH:MM:SS")
logger.error("Invalid start-date format. Use YYYY-MM-DD HH:MM:SS")
sys.exit(1)
if args.end_date and args.duration:
print("Cannot specify both --end-date and --duration")
logger.error("Cannot specify both --end-date and --duration")
sys.exit(1)
elif args.end_date:
try:
end_date = datetime.strptime(args.end_date, "%Y-%m-%d %H:%M:%S")
except ValueError:
print("Invalid end-date format. Use YYYY-MM-DD HH:MM:SS")
logger.error("Invalid end-date format. Use YYYY-MM-DD HH:MM:SS")
sys.exit(1)
elif args.duration and start_date:
# Parse duration HH:MM:SS
@@ -139,14 +149,14 @@ if __name__ == "__main__":
duration_td = timedelta(hours=h, minutes=m, seconds=s)
end_date = start_date + duration_td
except ValueError:
print("Invalid duration format. Use HH:MM:SS")
logger.error("Invalid duration format. Use HH:MM:SS")
sys.exit(1)
if not start_date:
print("start-date is required in CLI mode")
logger.error("start-date is required in CLI mode")
sys.exit(1)
if not end_date:
print("Either end-date or duration is required in CLI mode")
logger.error("Either end-date or duration is required in CLI mode")
sys.exit(1)
keys = args.key or []
@@ -160,18 +170,18 @@ if __name__ == "__main__":
# Get stream selections
selections = get_selection(args.channel_id, args.video, args.audio)
if not selections:
print("Error during stream selection.")
logger.error("Error during stream selection.")
sys.exit(1)
print(f"Start date: {start_date}")
print(f"End date: {end_date}")
print(f"Channel ID: {args.channel_id}")
print(f"Video quality: {args.video}")
print(f"Audio track: {args.audio}")
print(f"Title: {title}")
print(f"DRM keys: {keys}")
print(f"Output dir: {args.output_dir}")
print(f"Widevine device: {args.widevine_device}")
logger.info(f"Start date: {start_date}")
logger.info(f"End date: {end_date}")
logger.info(f"Channel ID: {args.channel_id}")
logger.info(f"Video quality: {args.video}")
logger.info(f"Audio track: {args.audio}")
logger.info(f"Title: {title}")
logger.info(f"DRM keys: {keys}")
logger.info(f"Output dir: {args.output_dir}")
logger.info(f"Widevine device: {args.widevine_device}")
else:
# Interactive mode
@@ -219,7 +229,7 @@ if __name__ == "__main__":
track_id = init_segment.split("/")[-1].split("_init")[0]
if start_date.date() == manifest_date.date():
print(
logger.info(
"Date match between requested start date and manifest data, proceeding with download..."
)
@@ -230,7 +240,7 @@ if __name__ == "__main__":
start_tick_manifest, end_date, TIMESCALE, DURATION
)
else:
print(
logger.info(
"Date mismatch between requested start date and manifest data, bruteforce method is needed."
)
@@ -245,7 +255,7 @@ if __name__ == "__main__":
)
rep_nb = (end_tick - start_tick) // DURATION + 1
print(f"Total segments to fetch for {content_type}: {rep_nb}")
logger.info(f"Total segments to fetch for {content_type}: {rep_nb}")
data = {
"start_tick": start_tick,
"rep_nb": rep_nb,
@@ -286,7 +296,7 @@ if __name__ == "__main__":
key = k
break
if not key:
print(f"No key found for KID {kid}, need to fetch it.")
logger.info(f"No key found for KID {kid}, need to fetch it.")
missing_keys.append(kid)
if len(missing_keys) > 0:
@@ -307,6 +317,7 @@ if __name__ == "__main__":
}
fetched_keys = get_keys(kids=missing_keys, method=method)
logger.info(f"Fetched keys: {fetched_keys}")
keys = keys + fetched_keys
for content_type, data in [("video", video_data), ("audio", audio_data)]:
@@ -336,7 +347,7 @@ if __name__ == "__main__":
f'ffmpeg -i "concat:{output_dir}/segments_{track_id_video}/init.mp4|'
f'{output_dir}/dec_video.mp4" -c copy {output_dir}/video.mp4'
)
print("FFmpeg command:", command_ffmpeg)
logger.debug(f"FFmpeg command: {command_ffmpeg}")
subprocess.run(
command_ffmpeg,
shell=True,
@@ -349,7 +360,7 @@ if __name__ == "__main__":
f'{output_dir}/dec_audio.mp4" -c copy {output_dir}/audio.mp4'
)
print("FFmpeg command:", command_ffmpeg)
logger.debug(f"FFmpeg command: {command_ffmpeg}")
subprocess.run(
command_ffmpeg,
shell=True,
@@ -362,7 +373,7 @@ if __name__ == "__main__":
f"ffmpeg -i {output_dir}/video.mp4 -itsoffset {diff_start_sec} "
f"-i {output_dir}/audio.mp4 -c copy -map 0:v -map 1:a {output_dir}/output.mp4"
)
print("Merge command:", COMMAND_MERGE)
logger.debug(f"Merge command: {COMMAND_MERGE}")
subprocess.run(
COMMAND_MERGE,
shell=True,
@@ -373,7 +384,7 @@ if __name__ == "__main__":
FINAL_OUTPUT = f"{output_dir}/{title}.mp4"
shutil.move(f"{output_dir}/output.mp4", FINAL_OUTPUT)
print(f"Final output saved to {FINAL_OUTPUT}")
logger.info(f"Final output saved to {FINAL_OUTPUT}")
os.remove(f"{output_dir}/dec_video.mp4")
os.remove(f"{output_dir}/dec_audio.mp4")
@@ -385,7 +396,7 @@ if __name__ == "__main__":
shutil.rmtree(f"{output_dir}/segments_{audio_data['track_id']}")
except KeyboardInterrupt:
print("\n\nProgram interrupted by user. Goodbye!")
logger.info("\n\nProgram interrupted by user. Goodbye!")
# uv run python main.py --start-date "2025-01-01 12:00:00" --duration "01:00:00" \

View File

@@ -4,6 +4,7 @@ import requests
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.pssh import PSSH
from utils.logging_config import logger
def fetch_drm_keys(kid: str, api_url: str, api_key: str) -> str:
@@ -40,14 +41,14 @@ def generate_pssh(kids: list[str]) -> PSSH:
def get_keys(kids: list[str], method: dict) -> list[str]:
"""Retrieve DRM keys using the specified method."""
if method["method"] == "api":
print("Fetching DRM keys via API...")
logger.info("Fetching DRM keys via API...")
keys = []
for kid in kids:
key = fetch_drm_keys(kid, method["api_url"], method["api_key"])
keys.append(f"{kid}:{key}")
return keys
print("Fetching DRM keys via Widevine CDM...")
logger.info("Fetching DRM keys via Widevine CDM...")
client = method["client_class"]
device = Device.load(method["device_file"])

View File

@@ -8,6 +8,7 @@ from InquirerPy.validator import EmptyInputValidator
from InquirerPy.base.control import Choice
from utils.stream import get_manifest, parse_mpd_manifest, organize_by_content_type
from utils.logging_config import logger
SERVICE_PLAN_API_URL = "https://api.oqee.net/api/v6/service_plan"
EPG_API_URL = "https://api.oqee.net/api/v1/epg/all/{unix}"
@@ -75,7 +76,7 @@ def get_date_input():
start_date = datetime.datetime.strptime(
start_date_result["datetime"], "%Y-%m-%d %H:%M:%S"
)
print(f"Start date/time: {start_date}")
logger.debug(f"Start date/time: {start_date}")
question_end_date = [
{
@@ -115,18 +116,18 @@ def get_date_input():
h, m, s = map(int, duration_str.split(":"))
duration_td = datetime.timedelta(hours=h, minutes=m, seconds=s)
end_date = start_date + duration_td
print(f"\nEnd date/time: {end_date}")
logger.debug(f"End date/time: {end_date}")
except (ValueError, TypeError):
print("Unable to parse the provided duration string.")
logger.error("Unable to parse the provided duration string.")
elif end_date_result.get("datetime"):
try:
end_date = datetime.datetime.strptime(
end_date_result["datetime"], "%Y-%m-%d %H:%M:%S"
)
print(f"\nEnd date/time: {end_date}")
logger.debug(f"End date/time: {end_date}")
except (ValueError, TypeError):
print("Unable to parse the provided date/time string.")
logger.error("Unable to parse the provided date/time string.")
return start_date, end_date
@@ -138,12 +139,12 @@ def select_oqee_channel():
"""
api_url = SERVICE_PLAN_API_URL
try:
print("Loading channel list from Oqee API...")
logger.info("Loading channel list from Oqee API...")
response = requests.get(api_url, timeout=10)
response.raise_for_status()
data = response.json()
if not data.get("success") or "channels" not in data.get("result", {}):
print("Error: Unexpected API response format.")
logger.error("Error: Unexpected API response format.")
return None
channels_data = data["result"]["channels"]
@@ -154,10 +155,10 @@ def select_oqee_channel():
choices.sort(key=lambda x: x["name"])
except requests.exceptions.RequestException as e:
print(f"A network error occurred: {e}")
logger.error(f"A network error occurred: {e}")
return None
except ValueError:
print("Error parsing JSON response.")
logger.error("Error parsing JSON response.")
return None
questions = [
@@ -177,19 +178,19 @@ def select_oqee_channel():
selected_channel_id = result[0]
selected_channel_details = channels_data.get(selected_channel_id)
if selected_channel_details:
print("\nYou have selected:")
print(f" - Name: {selected_channel_details.get('name')}")
print(f" - ID: {selected_channel_details.get('id')}")
print(f" - Freebox ID: {selected_channel_details.get('freebox_id')}")
logger.info("You have selected:")
logger.info(f" - Name: {selected_channel_details.get('name')}")
logger.info(f" - ID: {selected_channel_details.get('id')}")
logger.info(f" - Freebox ID: {selected_channel_details.get('freebox_id')}")
else:
print("Unable to find details for the selected channel.")
logger.warning("Unable to find details for the selected channel.")
return selected_channel_details
except KeyboardInterrupt:
print("\nOperation cancelled by user.")
logger.info("Operation cancelled by user.")
return None
except (ValueError, KeyError, IndexError) as e:
print(f"An unexpected error occurred: {e}")
logger.error(f"An unexpected error occurred: {e}")
return None
@@ -232,7 +233,7 @@ def prompt_for_stream_selection(stream_info, already_selected_types):
final_selection = None
if len(available_streams) == 1:
final_selection = available_streams[0]
print("Only one stream available for this quality, automatic selection.")
logger.debug("Only one stream available for this quality, automatic selection.")
else:
stream_choices = [
{
@@ -274,13 +275,13 @@ def stream_selection():
if not selected_channel:
return None
print("\nSelected channel:")
print(f" - Name: {selected_channel.get('name')}")
print(f" - ID: {selected_channel.get('id')}")
logger.debug("Selected channel:")
logger.debug(f" - Name: {selected_channel.get('name')}")
logger.debug(f" - ID: {selected_channel.get('id')}")
dash_id = selected_channel.get("streams", {}).get("dash")
if not dash_id:
print("No DASH stream found for this channel.")
logger.error("No DASH stream found for this channel.")
return None
mpd_content = get_manifest(dash_id)
@@ -296,15 +297,15 @@ def stream_selection():
content_type = selection.pop("content_type")
final_selections[content_type] = selection
print("\n--- Selection Summary ---")
logger.info("--- Selection Summary ---")
for stream_type, details in final_selections.items():
bitrate = details.get("bitrate_kbps")
track_id = details.get("track_id")
print(
logger.info(
f" - {stream_type.capitalize()}: "
f"Bitrate {bitrate} kbps (ID: {track_id})"
)
print("----------------------------------------")
logger.info("----------------------------------------")
continue_prompt = [
{
@@ -322,7 +323,7 @@ def stream_selection():
final_selections["channel"] = selected_channel
return final_selections
print("\nNo stream has been selected.")
logger.info("No stream has been selected.")
return None
@@ -344,29 +345,29 @@ def get_selection(channel_id, video_quality="best", audio_quality="best"):
response.raise_for_status()
data = response.json()
if not data.get("success") or "channels" not in data.get("result", {}):
print("Error: Unable to retrieve channel details.")
logger.error("Error: Unable to retrieve channel details.")
return None
channels_data = data["result"]["channels"]
selected_channel_details = channels_data.get(str(channel_id))
if not selected_channel_details:
print(f"Channel with ID {channel_id} not found.")
logger.error(f"Channel with ID {channel_id} not found.")
return None
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
logger.error(f"Network error: {e}")
return None
except ValueError:
print("Error parsing JSON response.")
logger.error("Error parsing JSON response.")
return None
print(
logger.info(
f"Selected channel: {selected_channel_details.get('name')} (ID: {channel_id})"
)
dash_id = selected_channel_details.get("streams", {}).get("dash")
if not dash_id:
print("No DASH stream found for this channel.")
logger.error("No DASH stream found for this channel.")
return None
mpd_content = get_manifest(dash_id)
@@ -416,7 +417,7 @@ def select_track(content_dict, quality_spec, content_type):
candidates.extend(tracks)
if not candidates:
print(f"No {content_type} track found for '{quality_spec}'.")
logger.warning(f"No {content_type} track found for '{quality_spec}'.")
return None
if pref == "best":
@@ -427,7 +428,7 @@ def select_track(content_dict, quality_spec, content_type):
# Default to best if unknown pref
selected = max(candidates, key=lambda x: x["bandwidth"])
print(
logger.info(
f"{content_type.capitalize()} selected: {selected['track_id']}, {selected['bitrate_kbps']} kbps"
)
return selected
@@ -453,7 +454,7 @@ def get_epg_data_at(dt: datetime.datetime):
dt_aligned = dt.replace(minute=0, second=0, microsecond=0)
unix_time = int(dt_aligned.timestamp())
print(f"Fetching EPG for aligned time: {dt_aligned} (unix={unix_time})")
logger.info(f"Fetching EPG for aligned time: {dt_aligned} (unix={unix_time})")
try:
response = requests.get(EPG_API_URL.format(unix=unix_time), timeout=10)
@@ -463,10 +464,10 @@ def get_epg_data_at(dt: datetime.datetime):
return data.get("result")
except requests.exceptions.RequestException as e:
print(f"A network error occurred: {e}")
logger.error(f"A network error occurred: {e}")
return None
except ValueError:
print("Error parsing JSON response.")
logger.error("Error parsing JSON response.")
return None
@@ -487,7 +488,7 @@ def select_program_from_epg(programs, original_start_date, original_end_date):
- 'program': dict or None (full program data if selected)
"""
if not programs:
print("No programs available in the EPG guide.")
logger.warning("No programs available in the EPG guide.")
return {
"start_date": original_start_date,
"end_date": original_end_date,
@@ -544,7 +545,7 @@ def select_program_from_epg(programs, original_start_date, original_end_date):
# If user chose to keep original selection
if selected_program is None:
print("\nManual selection kept")
logger.info("Manual selection kept")
return {
"start_date": original_start_date,
"end_date": original_end_date,
@@ -558,10 +559,10 @@ def select_program_from_epg(programs, original_start_date, original_end_date):
program_end = datetime.datetime.fromtimestamp(live_data.get("end", 0))
program_title = live_data.get("title", "Untitled")
print("\nSelected program:")
print(f" - Title: {program_title}")
print(f" - Start: {program_start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" - End: {program_end.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("Selected program:")
logger.info(f" - Title: {program_title}")
logger.info(f" - Start: {program_start.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f" - End: {program_end.strftime('%Y-%m-%d %H:%M:%S')}")
return {
"start_date": program_start,
@@ -571,5 +572,5 @@ def select_program_from_epg(programs, original_start_date, original_end_date):
}
except KeyboardInterrupt:
print("\nOperation cancelled by user.")
logger.error("Operation cancelled by user.")
return None

48
utils/logging_config.py Normal file
View File

@@ -0,0 +1,48 @@
import logging
import sys
class ColoredFormatter(logging.Formatter):
"""Custom logging formatter to add colors to log levels."""
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
green = "\x1b[32;20m"
cyan = "\x1b[36;20m"
FORMATS = {
logging.DEBUG: cyan + "%(levelname)s" + reset + " - %(message)s",
logging.INFO: green + "%(levelname)s" + reset + " - %(message)s",
logging.WARNING: yellow + "%(levelname)s" + reset + " - %(message)s",
logging.ERROR: red + "%(levelname)s" + reset + " - %(message)s",
logging.CRITICAL: bold_red + "%(levelname)s" + reset + " - %(message)s"
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
def setup_logging(level=logging.INFO):
"""Set up logging configuration."""
logger = logging.getLogger("OqeeRewind")
logger.setLevel(level)
# Create console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(level)
# Create formatter
formatter = ColoredFormatter()
console_handler.setFormatter(formatter)
# Add handler to logger
if not logger.handlers:
logger.addHandler(console_handler)
return logger
# Create a default logger instance
logger = logging.getLogger("OqeeRewind")

View File

@@ -5,6 +5,7 @@ from urllib.parse import urlparse, parse_qs
import requests
from dotenv import load_dotenv
from utils.logging_config import logger
load_dotenv()
@@ -80,7 +81,7 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes
def configure(self, username, password):
"""Configure the client by logging in and processing title information."""
print("Logging in")
logger.info("Logging in")
self.login(username, password)
def _build_headers(self, overrides=None, remove=None):
@@ -115,7 +116,7 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes
data = self.session.get(
"https://api.oqee.net/api/v2/user/profiles", headers=headers
).json()
print("Selecting first profile by default.")
logger.info("Selecting first profile by default.")
return data["result"][0]["id"]
def login_cred(self, username, password):
@@ -208,19 +209,19 @@ class OqeeClient: # pylint: disable=too-many-instance-attributes
Log in to the Oqee service and set up necessary tokens and headers.
"""
if not username or not password:
print("No credentials provided, using IP login.")
logger.info("No credentials provided, using IP login.")
self.access_token = self.login_ip()
else:
print("Logging in with provided credentials")
logger.info("Logging in with provided credentials")
try:
self.access_token = self.login_cred(username, password)
except ValueError as e:
print(f"Credential login failed: {e}. Falling back to IP login.")
logger.warning(f"Credential login failed: {e}. Falling back to IP login.")
self.access_token = self.login_ip()
print("Fetching rights token")
logger.info("Fetching rights token")
self.right_token = self.right()
print("Fetching profile ID")
logger.info("Fetching profile ID")
self.profil_id = self.profil()
self.headers = self._build_headers(

View File

@@ -11,6 +11,7 @@ from typing import Dict, Any
import requests
import aiohttp
from tqdm.asyncio import tqdm
from utils.logging_config import logger
def parse_mpd_manifest(mpd_content: str) -> Dict[str, Any]:
@@ -373,7 +374,7 @@ def get_init(output_folder, track_id):
init_path = f"{output_folder}/segments_{track_id}/init.mp4"
with open(init_path, "wb") as f:
f.write(response.content)
print(f"Saved initialization segment to {init_path}")
logger.debug(f"Saved initialization segment to {init_path}")
return init_path
@@ -404,19 +405,18 @@ async def save_segments(output_folder, track_id, start_tick, rep_nb, duration):
with open(filename, "wb") as f:
f.write(content)
return True
print(
f"Failed to download segment {rep} (tick {tick}): "
logger.error(
f"Failed to download segment {rep} (tick {tick}): "
f"HTTP {resp.status}"
)
return False
except aiohttp.ClientError as e:
print(f"⚠️ Error downloading segment {rep} (tick {tick}): {e}")
logger.warning(f"Error downloading segment {rep} (tick {tick}): {e}")
return False
print(f"Starting download of {rep_nb} segments...")
print(f"📦 Track ID: {track_id}")
print(f"🎯 Base tick: {start_tick}")
print(f"{'='*50}")
logger.info(f"Starting download of {rep_nb} segments...")
logger.debug(f"Track ID: {track_id}")
logger.debug(f"Base tick: {start_tick}")
start_time = time.time()
successful = 0
@@ -443,10 +443,8 @@ async def save_segments(output_folder, track_id, start_tick, rep_nb, duration):
end_time = time.time()
elapsed = end_time - start_time
print(f"{'='*50}")
print(f"✅ Download completed in {elapsed:.2f}s")
print(f"💾 Files saved to {output_folder}/segments_{track_id}/")
print(f"{'='*50}")
logger.debug(f"Download completed in {elapsed:.2f}s")
logger.info(f"Files saved to {output_folder}/segments_{track_id}/")
def get_kid(output_folder, track_id):
@@ -463,7 +461,7 @@ def get_kid(output_folder, track_id):
for filename in os.listdir(folder):
if filename.endswith(".m4s"):
filepath = os.path.join(folder, filename)
print(f"Checking file: {filepath}")
logger.debug(f"Checking file: {filepath}")
with open(filepath, "rb") as f:
data = f.read()
# Pattern before KID

View File

@@ -8,6 +8,7 @@ import aiohttp
from tqdm import tqdm
from utils.stream import fetch_segment
from utils.logging_config import logger
def convert_ticks_to_sec(ticks, timescale):
@@ -56,8 +57,7 @@ async def bruteforce(track_id, date):
total_requests = 288000
batch_size = 20000
print(f"Starting bruteforce for {track_id}")
print(f"{'='*50}")
logger.debug(f"Starting bruteforce for {track_id}")
start_time = time.time()
@@ -86,18 +86,16 @@ async def bruteforce(track_id, date):
# Stop if we found valid ticks
if valid_ticks:
print(f"Found valid ticks: {valid_ticks}, stopping bruteforce.")
logger.debug(f"Found valid ticks: {valid_ticks}, stopping bruteforce.")
break
except KeyboardInterrupt:
print("\n\n🛑 Interrupted by user (Ctrl+C)")
logger.error("Interrupted by user (Ctrl+C)")
elapsed = time.time() - start_time
print(f"\n{'='*50}")
print(f"✅ Completed in {elapsed:.2f}s")
print(f"⚡ Speed: {total_requests / elapsed if elapsed > 0 else 0:.2f} req/s")
print(f"📊 Total checked: {total_requests}")
print(f"{'='*50}")
logger.debug(f"Completed in {elapsed:.2f}s")
logger.debug(f"Speed: {total_requests / elapsed if elapsed > 0 else 0:.2f} req/s")
logger.debug(f"Total checked: {total_requests}")
return valid_ticks

View File

@@ -2,17 +2,17 @@
import os
import sys
import logging
import subprocess
import shutil
from utils.logging_config import logger
def verify_mp4ff():
"""Verify if mp4ff-decrypt is installed and available in PATH."""
if shutil.which("mp4ff-decrypt") is None:
print("❌ Error: mp4ff-decrypt is not installed or not in PATH.")
print("Please install it using:")
print("go install github.com/Eyevinn/mp4ff/cmd/mp4ff-decrypt@latest")
logger.error("mp4ff-decrypt is not installed or not in PATH.")
logger.info("Please install it using:")
logger.info("go install github.com/Eyevinn/mp4ff/cmd/mp4ff-decrypt@latest")
sys.exit(1)
return True
@@ -20,10 +20,10 @@ def verify_mp4ff():
def verify_cmd(path: str) -> bool:
"""Verify if the file provided at path is valid and exists, otherwise log error and exit."""
if not os.path.exists(path):
logging.error("File does not exist: %s", path)
logger.error(f"File does not exist: {path}")
sys.exit(1)
if not os.path.isfile(path):
logging.error("Path is not a file: %s", path)
logger.error(f"Path is not a file: {path}")
sys.exit(1)
return True
@@ -40,7 +40,7 @@ def merge_segments(input_folder: str, track_id: str, output_file: str):
for fname in segment_files:
with open(f"{segment_folder}/{fname}", "rb") as infile:
outfile.write(infile.read())
print(f"Merged segments into {output_file}")
logger.info(f"Merged segments into {output_file}")
def decrypt(input_file, init_path, output_file, key):
@@ -63,7 +63,7 @@ def decrypt(input_file, init_path, output_file, key):
check=False,
)
if result.returncode == 0:
print(f"Decrypted {input_file} to {output_file}")
logger.info(f"Decrypted {input_file} to {output_file}")
return True
print(f"Decryption failed: {result.stderr}")
logger.error(f"Decryption failed: {result.stderr}")
return False