diff --git a/src/gofilepy/__init__.py b/src/gofilepy/__init__.py index 05cd026..ef30960 100644 --- a/src/gofilepy/__init__.py +++ b/src/gofilepy/__init__.py @@ -1,7 +1,20 @@ -#!/usr/bin/env python3 +"""Top-level package exports for GofilePy.""" -from .client import GofileClient - -__version__ = "1.0.0" -__all__ = ["GofileClient"] +from .client import ( + GofileAPIError, + GofileClient, + GofileError, + GofileFile, + GofileNetworkError, + GofileUploadError, +) +__version__ = "1.1.2" +__all__ = [ + "GofileClient", + "GofileFile", + "GofileError", + "GofileAPIError", + "GofileNetworkError", + "GofileUploadError", +] diff --git a/src/gofilepy/cli.py b/src/gofilepy/cli.py index b37b103..3f557eb 100644 --- a/src/gofilepy/cli.py +++ b/src/gofilepy/cli.py @@ -1,130 +1,214 @@ -#!/usr/bin/env python3 +"""Command-line interface for uploading files to Gofile.""" + +from __future__ import annotations import argparse -import os import json import logging -from tqdm import tqdm -from dotenv import load_dotenv -from .client import GofileClient +import os +from typing import Callable, Dict, List, Optional -# Configure Logging -logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') +import httpx +from dotenv import load_dotenv +from tqdm import tqdm + +from .client import GofileClient, GofileError + +LOG_FORMAT = "[%(levelname)s] %(message)s" logger = logging.getLogger("gofilepy") -def main(): - load_dotenv() - parser = argparse.ArgumentParser(description="Gofile.io CLI Uploader (HTTPX Edition)") - - parser.add_argument("files", nargs='+', help="Files to upload") - - parser.add_argument("-s", "--to-single-folder", action="store_true", - help="Upload multiple files to the same folder.") - - parser.add_argument("-f", "--folder-id", type=str, default=None, - help="ID of an existing Gofile folder.") - - parser.add_argument("-vv", "--verbose", action="store_true", - help="Show detailed debug info.") - - parser.add_argument("--json", action="store_true", - help="Output result as JSON for scripts.") +def parse_arguments() -> argparse.Namespace: + """Return parsed CLI arguments.""" - args = parser.parse_args() + parser = argparse.ArgumentParser( + description="Gofile.io CLI Uploader (HTTPX Edition)", + ) + parser.add_argument("files", nargs="+", help="Files to upload") + parser.add_argument( + "-s", + "--to-single-folder", + action="store_true", + help="Upload multiple files to the same folder.", + ) + parser.add_argument( + "-f", + "--folder-id", + type=str, + default=None, + help="ID of an existing Gofile folder.", + ) + parser.add_argument( + "-vv", + "--verbose", + action="store_true", + help="Show detailed debug info.", + ) + parser.add_argument( + "--json", + action="store_true", + help="Output result as JSON for scripts.", + ) + return parser.parse_args() - # Log Level Handling - if args.verbose: - logger.setLevel(logging.DEBUG) - # HTTPX can be verbose, enable if needed - # logging.getLogger("httpx").setLevel(logging.DEBUG) - else: - logger.setLevel(logging.INFO) - logging.getLogger("httpx").setLevel(logging.WARNING) - # Token Logic - token = os.environ.get("GOFILE_TOKEN") +def configure_logging(verbose: bool) -> None: + """Configure logging for the CLI session.""" + + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig(level=level, format=LOG_FORMAT) + logger.setLevel(level) + httpx_logger = logging.getLogger("httpx") + httpx_logger.setLevel(logging.DEBUG if verbose else logging.WARNING) + + +def _log_token_state(token: Optional[str], json_mode: bool) -> None: + """Log whether a token was discovered for informational output.""" + + if json_mode: + return if token: masked_token = f"{token[:4]}..." - if not args.json: - logger.info(f"🔑 Token loaded: {masked_token}") + logger.info("🔑 Token loaded: %s", masked_token) else: - if not args.json: - logger.warning("⚠️ No GOFILE_TOKEN found in .env or environment. Running as Guest.") + logger.warning("⚠️ No GOFILE_TOKEN found in .env or environment. Running as Guest.") - client = GofileClient(token=token) - + +def _progress_callback_factory(progress_bar: Optional[tqdm]) -> Callable[[int], None]: + """Return a callback that updates the provided progress bar.""" + + def update(chunk_size: int, active_bar: Optional[tqdm] = progress_bar) -> None: + if active_bar: + active_bar.update(chunk_size) + + return update + + +def _create_progress_bar(filename: str, total: int, quiet: bool) -> Optional[tqdm]: + """Create a tqdm progress bar unless JSON mode is requested.""" + + if quiet: + return None + return tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {filename}") + + +def _handle_upload_success( + data: Dict[str, object], + filename: str, +) -> Dict[str, object]: + """Normalize the success payload for presentation.""" + + return { + "file": filename, + "status": "success", + "downloadPage": data.get("downloadPage"), + "directLink": data.get("directLink", "N/A"), + "parentFolder": data.get("parentFolder"), + } + + +def _handle_upload_error(filename: str, error: Exception) -> Dict[str, object]: + """Normalize the error payload for presentation.""" + + return { + "file": filename, + "status": "error", + "message": str(error), + "errorType": error.__class__.__name__, + } + + +def _apply_guest_token(client: GofileClient, data: Dict[str, object]) -> None: + """Capture a guest token from the response so future uploads reuse the folder.""" + + guest_token = data.get("guestToken") + if guest_token and not client.token: + client.token = str(guest_token) + client.client.headers.update({"Authorization": f"Bearer {client.token}"}) + logger.debug("Guest token applied: %s", client.token) + + +def upload_files(args: argparse.Namespace, client: GofileClient) -> List[Dict[str, object]]: + """Upload each file sequentially and return the collected results.""" + + results: List[Dict[str, object]] = [] target_folder_id = args.folder_id - results = [] for file_path in args.files: if not os.path.exists(file_path): - res_err = {"file": file_path, "status": "error", "message": "File not found"} - results.append(res_err) - if not args.json: - logger.error(f"File not found: {file_path}") + logger.error("File not found: %s", file_path) + results.append({ + "file": file_path, + "status": "error", + "message": "File not found", + }) continue file_size = os.path.getsize(file_path) filename = os.path.basename(file_path) - - # Init Progress Bar (Only if not JSON mode) - pbar = None - if not args.json: - pbar = tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Uploading {filename}") - - def progress_update(chunk_size): - if pbar: - pbar.update(chunk_size) + progress_bar = _create_progress_bar(filename, file_size, args.json) + progress_callback = _progress_callback_factory(progress_bar) try: data = client.upload_file( - file_path=file_path, + file_path=file_path, folder_id=target_folder_id, - callback=progress_update + callback=progress_callback, ) - - # --- Auto-Folder Management for Guests --- - # If we are in single folder mode and it's the first upload + if args.to_single_folder and target_folder_id is None: - if 'parentFolder' in data: - target_folder_id = data['parentFolder'] - logger.debug(f"Parent folder set to: {target_folder_id}") - - # If guest, capture the guestToken to write to the same folder next time - if 'guestToken' in data and not client.token: - client.token = data['guestToken'] - # Re-auth client with new token - client.client.headers.update({"Authorization": f"Bearer {client.token}"}) - logger.debug(f"Guest token applied: {client.token}") + parent_folder = data.get("parentFolder") + if parent_folder: + target_folder_id = str(parent_folder) + logger.debug("Parent folder set to: %s", target_folder_id) + _apply_guest_token(client, data) - results.append({ - "file": filename, - "status": "success", - "downloadPage": data.get("downloadPage"), - "directLink": data.get("directLink", "N/A"), # Sometimes available - "parentFolder": data.get("parentFolder") - }) - - except Exception as e: - err_msg = str(e) - results.append({"file": filename, "status": "error", "message": err_msg}) - if not args.json: - logger.error(f"Upload failed: {err_msg}") - finally: - if pbar: - pbar.close() - - # Output - if args.json: - print(json.dumps(results, indent=2)) - else: - print("\n--- Summary ---") - for res in results: - if res['status'] == 'success': - print(f"✅ {res['file']} -> {res['downloadPage']}") + results.append(_handle_upload_success(data, filename)) + except (GofileError, httpx.HTTPError, OSError) as error: + if logger.isEnabledFor(logging.DEBUG): + logger.exception("Upload failed for %s", filename) else: - print(f"❌ {res['file']} -> {res['message']}") + logger.error("Upload failed for %s: %s", filename, error) + results.append(_handle_upload_error(filename, error)) + finally: + if progress_bar: + progress_bar.close() + + return results + + +def output_results(results: List[Dict[str, object]], json_mode: bool) -> None: + """Display results in either JSON or human readable form.""" + + if json_mode: + print(json.dumps(results, indent=2)) + return + + print("\n--- Summary ---") + for result in results: + if result["status"] == "success": + print(f"✅ {result['file']} -> {result['downloadPage']}") + else: + print(f"❌ {result['file']} -> {result.get('message')}") + successes = sum(1 for res in results if res["status"] == "success") + failures = len(results) - successes + logger.info("Summary: %s succeeded, %s failed", successes, failures) + + +def main() -> None: + """Entrypoint for the CLI.""" + + load_dotenv() + args = parse_arguments() + configure_logging(args.verbose) + + token = os.environ.get("GOFILE_TOKEN") + _log_token_state(token, args.json) + + client = GofileClient(token=token) + results = upload_files(args, client) + output_results(results, args.json) + if __name__ == "__main__": main() diff --git a/src/gofilepy/client.py b/src/gofilepy/client.py index 65ff52a..2f392a2 100644 --- a/src/gofilepy/client.py +++ b/src/gofilepy/client.py @@ -1,101 +1,231 @@ -#!/usr/bin/env python3 +"""HTTP client for interacting with the Gofile API.""" + +from __future__ import annotations -import httpx import logging import os -from typing import Optional, List, Dict, Callable +from dataclasses import dataclass +from typing import Any, BinaryIO, Callable, Dict, List, Optional, Union + +import httpx + from .utils import ProgressFileReader logger = logging.getLogger(__name__) + +class GofileError(RuntimeError): + """Base exception for all Gofile client errors.""" + + def __init__(self, message: str, *, context: Optional[Dict[str, Any]] = None): + super().__init__(message) + self.context = context or {} + + +class GofileAPIError(GofileError): + """Raised when the Gofile API reports an error.""" + + +class GofileNetworkError(GofileError): + """Raised when the HTTP request fails before reaching the API.""" + + +class GofileUploadError(GofileError): + """Raised when the upload flow cannot complete.""" + + +@dataclass(slots=True) +class GofileFile: + """Represents a file returned by the Gofile API.""" + + name: str + page_link: str + file_id: str + parent_folder: str + raw: Dict[str, object] + + @classmethod + def from_data(cls, data: Dict[str, object]) -> "GofileFile": + """Create an instance from the API response payload.""" + + return cls( + name=str(data.get("fileName", "")), + page_link=str(data.get("downloadPage", "")), + file_id=str(data.get("fileId", "")), + parent_folder=str(data.get("parentFolder", "")), + raw=data, + ) + + def to_dict(self) -> Dict[str, object]: + """Return the original API payload as a new dict.""" + + return dict(self.raw) + + class GofileClient: + """Thin wrapper around Gofile's REST endpoints.""" + API_ROOT = "https://api.gofile.io" UPLOAD_SERVER_URL = "https://upload.gofile.io" def __init__(self, token: Optional[str] = None): + """Instantiate the client with an optional authentication token.""" + self.token = token - # Increase timeout for large API operations, though uploads handle their own timeout - self.client = httpx.Client(timeout=30.0) - + self.client = httpx.Client(timeout=30.0) + if self.token: - logger.debug(f"Initialized with token: {self.token[:4]}***") + logger.debug("Initialized with token: %s***", self.token[:4]) self.client.headers.update({"Authorization": f"Bearer {self.token}"}) - def _handle_response(self, response: httpx.Response) -> Dict: - logger.debug(f"Response Status: {response.status_code}") + def _handle_response(self, response: httpx.Response) -> Dict[str, object]: + """Validate HTTP responses and normalize API errors.""" + + logger.debug("Response status: %s", response.status_code) try: data = response.json() - logger.debug(f"Response Body: {data}") - except Exception: + logger.debug("Response body: %s", data) + except ValueError as exc: # httpx raises ValueError for invalid JSON error_text = response.text.strip() - logger.debug(f"Failed to parse JSON: {error_text}") + logger.debug("Failed to parse JSON: %s", error_text) response.raise_for_status() - return {} + raise GofileAPIError("Invalid JSON returned by Gofile API") from exc if data.get("status") != "ok": - logger.error(f"API Error: {data}") - raise Exception(f"Gofile API Error: {data.get('status')} - {data.get('data')}") - - return data.get("data", {}) + logger.error("API error payload: %s", data) + raise GofileAPIError( + f"Gofile API Error: {data.get('status')} - {data.get('data')}" + ) + + payload = data.get("data") + if not isinstance(payload, dict): + raise GofileAPIError("Gofile API returned unexpected payload structure") + return payload + + def _request( + self, method: str, url: str, *, context: Optional[Dict[str, Any]] = None, **kwargs: Any + ) -> Dict[str, object]: + """Execute an HTTP request and normalize errors.""" + + safe_context = context or {} + try: + logger.debug("HTTP %s %s | payload=%s", method, url, safe_context) + response = self.client.request(method, url, **kwargs) + except httpx.HTTPError as exc: + logger.error("HTTP %s %s failed: %s", method, url, exc) + raise GofileNetworkError( + f"Failed HTTP request to {url}", context={"method": method, **safe_context} + ) from exc + + return self._handle_response(response) + + @staticmethod + def _sanitize_metadata(metadata: Dict[str, str]) -> Dict[str, str]: + """Return a copy of request metadata with sensitive values redacted.""" + + redacted = dict(metadata) + if "token" in redacted: + redacted["token"] = "***REDACTED***" + return redacted def get_server(self) -> str: - """ - Gofile suggests using specific servers (availables in their doc), - but 'upload.gofile.io' uses DNS geo-routing automatically. - We stick to the best practice default. - """ + """Return the upload server, which leverages geo-aware routing.""" + return self.UPLOAD_SERVER_URL - def create_folder(self, parent_folder_id: str, folder_name: str) -> Dict: - logger.debug(f"Creating folder '{folder_name}' in '{parent_folder_id}'") + def create_folder(self, parent_folder_id: str, folder_name: str) -> Dict[str, object]: + """Create a folder under the provided parent folder.""" + + logger.debug("Creating folder '%s' in '%s'", folder_name, parent_folder_id) url = f"{self.API_ROOT}/contents/createFolder" payload = { "parentFolderId": parent_folder_id, - "folderName": folder_name + "folderName": folder_name, } - res = self.client.post(url, json=payload) - return self._handle_response(res) + return self._request("POST", url, json=payload, context=payload) - def delete_content(self, content_ids: List[str]) -> Dict: - logger.debug(f"Deleting content IDs: {content_ids}") + def delete_content(self, content_ids: List[str]) -> Dict[str, object]: + """Delete one or more items by their content IDs.""" + + logger.debug("Deleting content IDs: %s", content_ids) url = f"{self.API_ROOT}/contents" - # HTTPX needs 'content' or 'json' for DELETE requests explicitly if body is required - res = self.client.request("DELETE", url, json={"contentsId": ",".join(content_ids)}) - return self._handle_response(res) + payload = {"contentsId": ",".join(content_ids)} + return self._request("DELETE", url, json=payload, context=payload) + + def upload( + self, + file: Union[str, BinaryIO], + folder_id: Optional[str] = None, + callback: Optional[Callable[[int], None]] = None, + ) -> GofileFile: + """Upload a file object or file path to Gofile.""" - def upload_file(self, - file_path: str, - folder_id: Optional[str] = None, - callback: Optional[Callable[[int], None]] = None) -> Dict: - server_url = f"{self.get_server()}/uploadfile" - file_name = os.path.basename(file_path) - - # Prepare parameters - data = {} + data: Dict[str, str] = {} if self.token: data["token"] = self.token if folder_id: data["folderId"] = folder_id - # Use our custom ProgressFileReader - # If no callback is provided, we use a dummy lambda to avoid errors - progress_callback = callback if callback else lambda x: None - - logger.info(f"Starting upload: {file_name} -> {server_url}") - - # Open file using our wrapper - with ProgressFileReader(file_path, progress_callback) as f: - files = {'file': (file_name, f)} - - # Use a longer timeout for the upload specifically (None = infinite) - # This is crucial for 2000GB files - res = self.client.post( - server_url, - data=data, - files=files, - timeout=None + logger.debug("Upload metadata: %s", self._sanitize_metadata(data)) + + progress_callback = callback or (lambda _chunk: None) + + if isinstance(file, str): + file_name = os.path.basename(file) + logger.info("Starting upload: %s -> %s", file_name, server_url) + with open(file, "rb") as file_handle: + wrapped_file = ProgressFileReader(file_handle, progress_callback) + response = self._post_upload( + server_url, + data=data, + files={"file": (file_name, wrapped_file)}, + ) + else: + file_name = getattr(file, "name", "uploaded_file") + if hasattr(file_name, "__fspath__"): + file_name = os.path.basename(file_name) # type: ignore[arg-type] + elif "/" in str(file_name) or "\\" in str(file_name): + file_name = os.path.basename(str(file_name)) + + logger.info("Starting upload: %s -> %s", file_name, server_url) + files = {"file": (file_name, file)} + response = self._post_upload( + server_url, + data=data, + files=files, ) - return self._handle_response(res) + response_data = self._handle_response(response) + logger.info("Upload finished: %s", file_name) + return GofileFile.from_data(response_data) + + def _post_upload( + self, + url: str, + *, + data: Dict[str, str], + files: Dict[str, Any], + ) -> httpx.Response: + """Issue the actual upload request with improved error context.""" + + try: + return self.client.post(url, data=data, files=files, timeout=None) + except httpx.TimeoutException as exc: + logger.error("Upload timed out at %s", url) + raise GofileUploadError("Upload timed out", context={"url": url}) from exc + except httpx.HTTPError as exc: + logger.error("HTTP error while uploading to %s: %s", url, exc) + raise GofileUploadError("Upload failed", context={"url": url}) from exc + + def upload_file( + self, + file_path: str, + folder_id: Optional[str] = None, + callback: Optional[Callable[[int], None]] = None, + ) -> Dict[str, object]: + """Compatibility helper that mirrors the legacy API.""" + + result = self.upload(file_path, folder_id, callback) + return result.to_dict() diff --git a/src/gofilepy/utils.py b/src/gofilepy/utils.py index 03157fa..9a7eeeb 100644 --- a/src/gofilepy/utils.py +++ b/src/gofilepy/utils.py @@ -1,28 +1,20 @@ -#!/usr/bin/env python3 +"""Utility helpers for GofilePy.""" + +from __future__ import annotations -import typing import io +from typing import BinaryIO, Callable + class ProgressFileReader(io.BufferedReader): - """ - Wraps a file object to trigger a callback when data is read. - This allows monitoring upload progress in httpx without loading the file into RAM. - """ - def __init__(self, filename: str, callback: typing.Callable[[int], None]): - self._f = open(filename, 'rb') - self._callback = callback - # Get file size for verification if needed, or just standard init - super().__init__(self._f) + """Buffered reader that reports read progress through a callback.""" - def read(self, size: int = -1) -> bytes: - # Read the chunk from disk - chunk = self._f.read(size) - # Update the progress bar with the length of the chunk read + def __init__(self, file_obj: BinaryIO, callback: Callable[[int], None]): + self._callback = callback + super().__init__(file_obj) + + def read(self, size: int = -1) -> bytes: # type: ignore[override] + chunk = super().read(size) if chunk: self._callback(len(chunk)) return chunk - - def close(self) -> None: - if hasattr(self, '_f'): - self._f.close() -