From ace62ec3f82ed4339ce45c9877da75924eb0935d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=88=9A=28noham=29=C2=B2?= <100566912+NohamR@users.noreply.github.com> Date: Sat, 13 Dec 2025 15:49:42 +0100 Subject: [PATCH] Improve code structure Refactored the download_files function in cli.py to separate single file and folder download logic into helper functions, improving readability and maintainability. --- src/gofilepy/cli.py | 188 +++++++++++++++++++++++------------------ src/gofilepy/client.py | 23 ++--- test_download.py | 2 +- 3 files changed, 122 insertions(+), 91 deletions(-) diff --git a/src/gofilepy/cli.py b/src/gofilepy/cli.py index 6950837..33bd2a4 100644 --- a/src/gofilepy/cli.py +++ b/src/gofilepy/cli.py @@ -6,7 +6,7 @@ import argparse import json import logging import os -from typing import Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional import httpx from dotenv import load_dotenv @@ -98,7 +98,9 @@ def _progress_callback_factory(progress_bar: Optional[tqdm]) -> Callable[[int], return update -def _create_progress_bar(filename: str, total: int, quiet: bool, mode: str = "Uploading") -> Optional[tqdm]: +def _create_progress_bar( + filename: str, total: int, quiet: bool, mode: str = "Uploading" +) -> Optional[tqdm]: """Create a tqdm progress bar unless JSON mode is requested.""" if quiet: @@ -197,123 +199,148 @@ def extract_content_id(url_or_id: str) -> str: # Handle URLs like https://gofile.io/d/nC5ulQ or direct IDs if "gofile.io/d/" in url_or_id: return url_or_id.split("gofile.io/d/")[-1].split("?")[0].split("/")[0] - elif "gofile.io" in url_or_id: + if "gofile.io" in url_or_id: # Handle other URL patterns parts = url_or_id.rstrip("/").split("/") return parts[-1].split("?")[0] return url_or_id +def _download_single_file( + client: GofileClient, + file_name: str, + download_link: str, + output_path: str, + file_size: int, + *, + quiet: bool +) -> Dict[str, object]: + """Download a single file and return the result.""" + progress_bar = _create_progress_bar(file_name, file_size, quiet, mode="Downloading") + progress_callback = _progress_callback_factory(progress_bar) + + try: + client.download_file(download_link, output_path, progress_callback) + return { + "file": file_name, + "status": "success", + "path": output_path, + "size": file_size, + } + except (GofileError, httpx.HTTPError, OSError) as error: + logger.error("Download failed for %s: %s", file_name, error) + return _handle_upload_error(file_name, error) + finally: + if progress_bar: + progress_bar.close() + + +def _process_file_data( + client: GofileClient, + file_name: str, + file_data: Dict[str, Any], + output_dir: str, + quiet: bool +) -> Dict[str, object]: + """Process and download a single file from file data.""" + download_link = str(file_data.get("link", "")) + + if not download_link: + logger.warning("No download link for %s, skipping", file_name) + return _handle_upload_error(file_name, GofileError("No download link")) + + output_path = os.path.join(output_dir, file_name) + file_size = int(file_data.get("size", 0)) + + return _download_single_file( + client, file_name, download_link, output_path, file_size, quiet=quiet + ) + + +def _download_folder_contents( + client: GofileClient, + children: Dict[str, Any], + output_dir: str, + quiet: bool +) -> List[Dict[str, object]]: + """Download all files from a folder.""" + results: List[Dict[str, object]] = [] + logger.info("Found %s file(s) in folder", len(children)) + + for child_id, child_data in children.items(): + if not isinstance(child_data, dict): + continue + + child_type = child_data.get("type") + if child_type != "file": + logger.debug("Skipping non-file item: %s", child_id) + continue + + file_name = str(child_data.get("name", f"file_{child_id}")) + result = _process_file_data(client, file_name, child_data, output_dir, quiet) + results.append(result) + + return results + + def download_files(args: argparse.Namespace, client: GofileClient) -> List[Dict[str, object]]: """Download files from a Gofile URL or content ID.""" - - results: List[Dict[str, object]] = [] content_id = extract_content_id(args.download) - + try: # Fetch content information logger.info("Fetching content information for: %s", content_id) response = client.get_contents(content_id) - + # The response is already the "data" object from get_contents data = response if isinstance(response, dict) else {} if not isinstance(data, dict): raise GofileError("Invalid response structure from API") - + content_type = data.get("type") - + if content_type == "file": # Single file download file_name = str(data.get("name", "downloaded_file")) download_link = str(data.get("link", "")) - + if not download_link: raise GofileError("No download link found in response") - + output_path = os.path.join(args.output, file_name) file_size = int(data.get("size", 0)) - - progress_bar = _create_progress_bar(file_name, file_size, args.json, mode="Downloading") - progress_callback = _progress_callback_factory(progress_bar) - - try: - client.download_file(download_link, output_path, progress_callback) - results.append({ - "file": file_name, - "status": "success", - "path": output_path, - "size": file_size, - }) - except (GofileError, httpx.HTTPError, OSError) as error: - logger.error("Download failed for %s: %s", file_name, error) - results.append(_handle_upload_error(file_name, error)) - finally: - if progress_bar: - progress_bar.close() - - elif content_type == "folder": + + result = _download_single_file( + client, file_name, download_link, output_path, file_size, quiet=args.json + ) + return [result] + + if content_type == "folder": # Multiple files in folder children = data.get("children", {}) if not isinstance(children, dict): raise GofileError("Invalid children structure in folder response") - - logger.info("Found %s file(s) in folder", len(children)) - - for child_id, child_data in children.items(): - if not isinstance(child_data, dict): - continue - - child_type = child_data.get("type") - if child_type != "file": - logger.debug("Skipping non-file item: %s", child_id) - continue - - file_name = str(child_data.get("name", f"file_{child_id}")) - download_link = str(child_data.get("link", "")) - - if not download_link: - logger.warning("No download link for %s, skipping", file_name) - continue - - output_path = os.path.join(args.output, file_name) - file_size = int(child_data.get("size", 0)) - - progress_bar = _create_progress_bar(file_name, file_size, args.json, mode="Downloading") - progress_callback = _progress_callback_factory(progress_bar) - - try: - client.download_file(download_link, output_path, progress_callback) - results.append({ - "file": file_name, - "status": "success", - "path": output_path, - "size": file_size, - }) - except (GofileError, httpx.HTTPError, OSError) as error: - logger.error("Download failed for %s: %s", file_name, error) - results.append(_handle_upload_error(file_name, error)) - finally: - if progress_bar: - progress_bar.close() - else: - raise GofileError(f"Unknown content type: {content_type}") - + + return _download_folder_contents(client, children, args.output, args.json) + + raise GofileError(f"Unknown content type: {content_type}") + except (GofileError, httpx.HTTPError) as error: if logger.isEnabledFor(logging.DEBUG): logger.exception("Failed to download from %s", content_id) else: logger.error("Failed to download from %s: %s", content_id, error) - results.append({ + return [{ "content_id": content_id, "status": "error", "message": str(error), "errorType": error.__class__.__name__, - }) - - return results + }] -def output_results(results: List[Dict[str, object]], json_mode: bool, is_download: bool = False) -> None: +def output_results( + results: List[Dict[str, object]], json_mode: bool, is_download: bool = False +) -> None: """Display results in either JSON or human readable form.""" if json_mode: @@ -328,7 +355,8 @@ def output_results(results: List[Dict[str, object]], json_mode: bool, is_downloa else: print(f"✅ {result['file']} -> {result.get('downloadPage')}") else: - print(f"❌ {result.get('file', result.get('content_id', 'unknown'))} -> {result.get('message')}") + error_name = result.get('file', result.get('content_id', 'unknown')) + print(f"❌ {error_name} -> {result.get('message')}") successes = sum(1 for res in results if res["status"] == "success") failures = len(results) - successes logger.info("Summary: %s succeeded, %s failed", successes, failures) @@ -342,7 +370,7 @@ def main() -> None: configure_logging(args.verbose) token = os.environ.get("GOFILE_TOKEN") - + # Check if we're in download mode or upload mode if args.download: _log_token_state(token, args.json) diff --git a/src/gofilepy/client.py b/src/gofilepy/client.py index d8697ed..6fe5712 100644 --- a/src/gofilepy/client.py +++ b/src/gofilepy/client.py @@ -236,12 +236,12 @@ class GofileClient: logger.debug("Creating guest account") url = f"{self.API_ROOT}/accounts" response = self._request("POST", url, context={"action": "create_guest"}) - + if "token" in response: self.token = str(response["token"]) self.client.headers.update({"Authorization": f"Bearer {self.token}"}) logger.debug("Guest account created with token: %s***", self.token[:4]) - + return response def get_contents(self, content_id: str) -> Dict[str, Any]: @@ -263,9 +263,12 @@ class GofileClient: "sortDirection": "1" } headers = { - "x-website-token": "4fd6sg89d7s6" # to avoid error-notPremium + # to avoid error-notPremium + "x-website-token": "4fd6sg89d7s6" } - return self._request("GET", url, params=params, headers=headers, context={"content_id": content_id}) + return self._request( + "GET", url, params=params, headers=headers, context={"content_id": content_id} + ) def download_file( self, @@ -276,28 +279,28 @@ class GofileClient: """Download a file from the provided direct link.""" logger.info("Starting download: %s -> %s", download_url, output_path) - + cookies = {} if self.token: cookies["accountToken"] = self.token logger.debug("Using accountToken cookie for download") - + try: with self.client.stream("GET", download_url, cookies=cookies, timeout=None) as response: response.raise_for_status() - + total_size = int(response.headers.get("content-length", 0)) logger.debug("File size: %s bytes", total_size) - + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) - + with open(output_path, "wb") as f: for chunk in response.iter_bytes(chunk_size=8192): if chunk: f.write(chunk) if callback: callback(len(chunk)) - + logger.info("Download complete: %s", output_path) except httpx.HTTPError as exc: logger.error("Download failed for %s: %s", download_url, exc) diff --git a/test_download.py b/test_download.py index 0431057..02b3b15 100644 --- a/test_download.py +++ b/test_download.py @@ -7,7 +7,7 @@ from gofilepy import GofileClient client = GofileClient() # Get folder contents -contents = client.get_contents("GxHNKL") +contents = client.get_contents("QUo3a5") print("Folder contents:") print(contents)