Add batch downloading and retry logic to save_segments

Enhanced the save_segments function to download media segments in configurable batches and added retry logic for failed downloads. Improved logging and progress reporting, and updated the function signature and docstring to reflect these changes.
This commit is contained in:
√(noham)²
2025-12-29 11:48:51 +01:00
parent 247c9f6fe7
commit a01b199475

View File

@@ -389,19 +389,25 @@ def get_init(output_folder, track_id):
return init_path return init_path
async def save_segments(output_folder, track_id, start_tick, rep_nb, duration): async def save_segments(output_folder, track_id, start_tick, rep_nb, duration, batch_size=64):
"""Download and save multiple media segments. """Download and save multiple media segments in batches.
Args: Args:
output_folder: The output folder path.
track_id: The track identifier. track_id: The track identifier.
start_tick: The starting tick value. start_tick: The starting tick value.
rep_nb: The number of segments to download. rep_nb: The number of segments to download.
duration: The duration per segment. duration: The duration per segment.
batch_size: Number of concurrent downloads per batch (default: 16).
""" """
os.makedirs(f"{output_folder}/segments_{track_id}", exist_ok=True) os.makedirs(f"{output_folder}/segments_{track_id}", exist_ok=True)
async def download_segment(session, tick, rep): async def download_segment(session, tick, rep):
"""Download a single segment.""" """Download a single segment.
Returns:
Tuple of (success: bool, tick: int, rep: int)
"""
url = f"https://media.stream.proxad.net/media/{track_id}_{tick}" url = f"https://media.stream.proxad.net/media/{track_id}_{tick}"
headers = { headers = {
"Accept": "*/*", "Accept": "*/*",
@@ -419,43 +425,78 @@ async def save_segments(output_folder, track_id, start_tick, rep_nb, duration):
filename = f"{output_folder}/segments_{track_id}/{tick}.m4s" filename = f"{output_folder}/segments_{track_id}/{tick}.m4s"
with open(filename, "wb") as f: with open(filename, "wb") as f:
f.write(content) f.write(content)
return True return (True, tick, rep)
logger.error( logger.error(
"Failed to download segment %d (tick %d): HTTP %d", "Failed to download segment %d (tick %d): HTTP %d",
rep, tick, resp.status rep, tick, resp.status
) )
return False return (False, tick, rep)
except aiohttp.ClientError as e: except aiohttp.ClientError as e:
logger.warning("Error downloading segment %d (tick %d): %s", rep, tick, e) logger.warning("Error downloading segment %d (tick %d): %s", rep, tick, e)
return False return (False, tick, rep)
logger.info("Starting download of %d segments...", rep_nb) logger.info("Starting download of %d segments in batches of %d...", rep_nb, batch_size)
logger.debug("Track ID: %s", track_id) logger.debug("Track ID: %s", track_id)
logger.debug("Base tick: %d", start_tick) logger.debug("Base tick: %d", start_tick)
start_time = time.time() start_time = time.time()
async with aiohttp.ClientSession() as session: # Build list of all segments to download
tasks = [] segments_to_download = [(start_tick + i * duration, i) for i in range(rep_nb)]
for i in range(rep_nb): retry_list = []
tick = start_tick + i * duration successful = 0
tasks.append(download_segment(session, tick, i))
results = [] async with aiohttp.ClientSession() as session:
for coro in tqdm( # Process segments in batches
asyncio.as_completed(tasks), with tqdm(total=len(segments_to_download), desc="Downloading segments", unit="seg") as pbar:
total=len(tasks), for batch_start in range(0, len(segments_to_download), batch_size):
desc="Downloading segments", batch = segments_to_download[batch_start:batch_start + batch_size]
unit="seg", tasks = [download_segment(session, tick, rep) for tick, rep in batch]
):
result = await coro results = await asyncio.gather(*tasks)
results.append(result)
successful = sum(1 for r in results if r is True) for success, tick, rep in results:
if success:
successful += 1
else:
retry_list.append((tick, rep))
pbar.update(1)
# Retry failed segments
if retry_list:
logger.info("Retrying %d failed segments...", len(retry_list))
retry_successful = 0
final_failures = []
with tqdm(total=len(retry_list), desc="Retrying segments", unit="seg") as pbar:
for batch_start in range(0, len(retry_list), batch_size):
batch = retry_list[batch_start:batch_start + batch_size]
tasks = [download_segment(session, tick, rep) for tick, rep in batch]
results = await asyncio.gather(*tasks)
for success, tick, rep in results:
if success:
retry_successful += 1
successful += 1
else:
final_failures.append((tick, rep))
pbar.update(1)
if final_failures:
logger.warning(
"Failed to download %d segments after retry: %s",
len(final_failures),
[tick for tick, _ in final_failures]
)
else:
logger.info("All %d retried segments downloaded successfully", retry_successful)
end_time = time.time() end_time = time.time()
elapsed = end_time - start_time elapsed = end_time - start_time
logger.debug("Download completed in %.2fs", elapsed) logger.debug("Download completed in %.2fs", elapsed)
logger.info("Successfully downloaded %d/%d segments", successful, rep_nb)
logger.info("Files saved to %s/segments_%s/", output_folder, track_id) logger.info("Files saved to %s/segments_%s/", output_folder, track_id)