Improve code structure

Refactored the download_files function in cli.py to separate single file and folder download logic into helper functions, improving readability and maintainability.
This commit is contained in:
√(noham)²
2025-12-13 15:49:42 +01:00
parent 9a4c0b1776
commit ace62ec3f8
3 changed files with 122 additions and 91 deletions

View File

@@ -6,7 +6,7 @@ import argparse
import json import json
import logging import logging
import os import os
from typing import Callable, Dict, List, Optional from typing import Any, Callable, Dict, List, Optional
import httpx import httpx
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -98,7 +98,9 @@ def _progress_callback_factory(progress_bar: Optional[tqdm]) -> Callable[[int],
return update return update
def _create_progress_bar(filename: str, total: int, quiet: bool, mode: str = "Uploading") -> Optional[tqdm]: def _create_progress_bar(
filename: str, total: int, quiet: bool, mode: str = "Uploading"
) -> Optional[tqdm]:
"""Create a tqdm progress bar unless JSON mode is requested.""" """Create a tqdm progress bar unless JSON mode is requested."""
if quiet: if quiet:
@@ -197,17 +199,92 @@ def extract_content_id(url_or_id: str) -> str:
# Handle URLs like https://gofile.io/d/nC5ulQ or direct IDs # Handle URLs like https://gofile.io/d/nC5ulQ or direct IDs
if "gofile.io/d/" in url_or_id: if "gofile.io/d/" in url_or_id:
return url_or_id.split("gofile.io/d/")[-1].split("?")[0].split("/")[0] return url_or_id.split("gofile.io/d/")[-1].split("?")[0].split("/")[0]
elif "gofile.io" in url_or_id: if "gofile.io" in url_or_id:
# Handle other URL patterns # Handle other URL patterns
parts = url_or_id.rstrip("/").split("/") parts = url_or_id.rstrip("/").split("/")
return parts[-1].split("?")[0] return parts[-1].split("?")[0]
return url_or_id return url_or_id
def _download_single_file(
client: GofileClient,
file_name: str,
download_link: str,
output_path: str,
file_size: int,
*,
quiet: bool
) -> Dict[str, object]:
"""Download a single file and return the result."""
progress_bar = _create_progress_bar(file_name, file_size, quiet, mode="Downloading")
progress_callback = _progress_callback_factory(progress_bar)
try:
client.download_file(download_link, output_path, progress_callback)
return {
"file": file_name,
"status": "success",
"path": output_path,
"size": file_size,
}
except (GofileError, httpx.HTTPError, OSError) as error:
logger.error("Download failed for %s: %s", file_name, error)
return _handle_upload_error(file_name, error)
finally:
if progress_bar:
progress_bar.close()
def _process_file_data(
client: GofileClient,
file_name: str,
file_data: Dict[str, Any],
output_dir: str,
quiet: bool
) -> Dict[str, object]:
"""Process and download a single file from file data."""
download_link = str(file_data.get("link", ""))
if not download_link:
logger.warning("No download link for %s, skipping", file_name)
return _handle_upload_error(file_name, GofileError("No download link"))
output_path = os.path.join(output_dir, file_name)
file_size = int(file_data.get("size", 0))
return _download_single_file(
client, file_name, download_link, output_path, file_size, quiet=quiet
)
def _download_folder_contents(
client: GofileClient,
children: Dict[str, Any],
output_dir: str,
quiet: bool
) -> List[Dict[str, object]]:
"""Download all files from a folder."""
results: List[Dict[str, object]] = []
logger.info("Found %s file(s) in folder", len(children))
for child_id, child_data in children.items():
if not isinstance(child_data, dict):
continue
child_type = child_data.get("type")
if child_type != "file":
logger.debug("Skipping non-file item: %s", child_id)
continue
file_name = str(child_data.get("name", f"file_{child_id}"))
result = _process_file_data(client, file_name, child_data, output_dir, quiet)
results.append(result)
return results
def download_files(args: argparse.Namespace, client: GofileClient) -> List[Dict[str, object]]: def download_files(args: argparse.Namespace, client: GofileClient) -> List[Dict[str, object]]:
"""Download files from a Gofile URL or content ID.""" """Download files from a Gofile URL or content ID."""
results: List[Dict[str, object]] = []
content_id = extract_content_id(args.download) content_id = extract_content_id(args.download)
try: try:
@@ -233,87 +310,37 @@ def download_files(args: argparse.Namespace, client: GofileClient) -> List[Dict[
output_path = os.path.join(args.output, file_name) output_path = os.path.join(args.output, file_name)
file_size = int(data.get("size", 0)) file_size = int(data.get("size", 0))
progress_bar = _create_progress_bar(file_name, file_size, args.json, mode="Downloading") result = _download_single_file(
progress_callback = _progress_callback_factory(progress_bar) client, file_name, download_link, output_path, file_size, quiet=args.json
)
return [result]
try: if content_type == "folder":
client.download_file(download_link, output_path, progress_callback)
results.append({
"file": file_name,
"status": "success",
"path": output_path,
"size": file_size,
})
except (GofileError, httpx.HTTPError, OSError) as error:
logger.error("Download failed for %s: %s", file_name, error)
results.append(_handle_upload_error(file_name, error))
finally:
if progress_bar:
progress_bar.close()
elif content_type == "folder":
# Multiple files in folder # Multiple files in folder
children = data.get("children", {}) children = data.get("children", {})
if not isinstance(children, dict): if not isinstance(children, dict):
raise GofileError("Invalid children structure in folder response") raise GofileError("Invalid children structure in folder response")
logger.info("Found %s file(s) in folder", len(children)) return _download_folder_contents(client, children, args.output, args.json)
for child_id, child_data in children.items(): raise GofileError(f"Unknown content type: {content_type}")
if not isinstance(child_data, dict):
continue
child_type = child_data.get("type")
if child_type != "file":
logger.debug("Skipping non-file item: %s", child_id)
continue
file_name = str(child_data.get("name", f"file_{child_id}"))
download_link = str(child_data.get("link", ""))
if not download_link:
logger.warning("No download link for %s, skipping", file_name)
continue
output_path = os.path.join(args.output, file_name)
file_size = int(child_data.get("size", 0))
progress_bar = _create_progress_bar(file_name, file_size, args.json, mode="Downloading")
progress_callback = _progress_callback_factory(progress_bar)
try:
client.download_file(download_link, output_path, progress_callback)
results.append({
"file": file_name,
"status": "success",
"path": output_path,
"size": file_size,
})
except (GofileError, httpx.HTTPError, OSError) as error:
logger.error("Download failed for %s: %s", file_name, error)
results.append(_handle_upload_error(file_name, error))
finally:
if progress_bar:
progress_bar.close()
else:
raise GofileError(f"Unknown content type: {content_type}")
except (GofileError, httpx.HTTPError) as error: except (GofileError, httpx.HTTPError) as error:
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.exception("Failed to download from %s", content_id) logger.exception("Failed to download from %s", content_id)
else: else:
logger.error("Failed to download from %s: %s", content_id, error) logger.error("Failed to download from %s: %s", content_id, error)
results.append({ return [{
"content_id": content_id, "content_id": content_id,
"status": "error", "status": "error",
"message": str(error), "message": str(error),
"errorType": error.__class__.__name__, "errorType": error.__class__.__name__,
}) }]
return results
def output_results(results: List[Dict[str, object]], json_mode: bool, is_download: bool = False) -> None: def output_results(
results: List[Dict[str, object]], json_mode: bool, is_download: bool = False
) -> None:
"""Display results in either JSON or human readable form.""" """Display results in either JSON or human readable form."""
if json_mode: if json_mode:
@@ -328,7 +355,8 @@ def output_results(results: List[Dict[str, object]], json_mode: bool, is_downloa
else: else:
print(f"{result['file']} -> {result.get('downloadPage')}") print(f"{result['file']} -> {result.get('downloadPage')}")
else: else:
print(f"{result.get('file', result.get('content_id', 'unknown'))} -> {result.get('message')}") error_name = result.get('file', result.get('content_id', 'unknown'))
print(f"{error_name} -> {result.get('message')}")
successes = sum(1 for res in results if res["status"] == "success") successes = sum(1 for res in results if res["status"] == "success")
failures = len(results) - successes failures = len(results) - successes
logger.info("Summary: %s succeeded, %s failed", successes, failures) logger.info("Summary: %s succeeded, %s failed", successes, failures)

View File

@@ -263,9 +263,12 @@ class GofileClient:
"sortDirection": "1" "sortDirection": "1"
} }
headers = { headers = {
"x-website-token": "4fd6sg89d7s6" # to avoid error-notPremium # to avoid error-notPremium
"x-website-token": "4fd6sg89d7s6"
} }
return self._request("GET", url, params=params, headers=headers, context={"content_id": content_id}) return self._request(
"GET", url, params=params, headers=headers, context={"content_id": content_id}
)
def download_file( def download_file(
self, self,

View File

@@ -7,7 +7,7 @@ from gofilepy import GofileClient
client = GofileClient() client = GofileClient()
# Get folder contents # Get folder contents
contents = client.get_contents("GxHNKL") contents = client.get_contents("QUo3a5")
print("Folder contents:") print("Folder contents:")
print(contents) print(contents)