Refactor CLI and client for improved structure and error handling

Refactored the CLI to modularize argument parsing, logging, and output, and improved error handling and progress reporting. The client now uses dataclasses for file metadata, introduces custom exception classes, and provides a more robust upload interface supporting both file paths and file-like objects. Utilities were updated for clarity and type safety.
This commit is contained in:
√(noham)²
2025-12-12 18:30:08 +01:00
parent cf56e3ca3e
commit 66731e2fc1
4 changed files with 399 additions and 180 deletions

View File

@@ -1,7 +1,20 @@
#!/usr/bin/env python3 """Top-level package exports for GofilePy."""
from .client import GofileClient from .client import (
GofileAPIError,
__version__ = "1.0.0" GofileClient,
__all__ = ["GofileClient"] GofileError,
GofileFile,
GofileNetworkError,
GofileUploadError,
)
__version__ = "1.1.2"
__all__ = [
"GofileClient",
"GofileFile",
"GofileError",
"GofileAPIError",
"GofileNetworkError",
"GofileUploadError",
]

View File

@@ -1,130 +1,214 @@
#!/usr/bin/env python3 """Command-line interface for uploading files to Gofile."""
from __future__ import annotations
import argparse import argparse
import os
import json import json
import logging import logging
from tqdm import tqdm import os
from dotenv import load_dotenv from typing import Callable, Dict, List, Optional
from .client import GofileClient
# Configure Logging import httpx
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') from dotenv import load_dotenv
from tqdm import tqdm
from .client import GofileClient, GofileError
LOG_FORMAT = "[%(levelname)s] %(message)s"
logger = logging.getLogger("gofilepy") logger = logging.getLogger("gofilepy")
def main():
load_dotenv()
parser = argparse.ArgumentParser(description="Gofile.io CLI Uploader (HTTPX Edition)") def parse_arguments() -> argparse.Namespace:
"""Return parsed CLI arguments."""
parser.add_argument("files", nargs='+', help="Files to upload")
parser.add_argument("-s", "--to-single-folder", action="store_true",
help="Upload multiple files to the same folder.")
parser.add_argument("-f", "--folder-id", type=str, default=None,
help="ID of an existing Gofile folder.")
parser.add_argument("-vv", "--verbose", action="store_true",
help="Show detailed debug info.")
parser.add_argument("--json", action="store_true",
help="Output result as JSON for scripts.")
args = parser.parse_args() parser = argparse.ArgumentParser(
description="Gofile.io CLI Uploader (HTTPX Edition)",
)
parser.add_argument("files", nargs="+", help="Files to upload")
parser.add_argument(
"-s",
"--to-single-folder",
action="store_true",
help="Upload multiple files to the same folder.",
)
parser.add_argument(
"-f",
"--folder-id",
type=str,
default=None,
help="ID of an existing Gofile folder.",
)
parser.add_argument(
"-vv",
"--verbose",
action="store_true",
help="Show detailed debug info.",
)
parser.add_argument(
"--json",
action="store_true",
help="Output result as JSON for scripts.",
)
return parser.parse_args()
# Log Level Handling
if args.verbose:
logger.setLevel(logging.DEBUG)
# HTTPX can be verbose, enable if needed
# logging.getLogger("httpx").setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
# Token Logic def configure_logging(verbose: bool) -> None:
token = os.environ.get("GOFILE_TOKEN") """Configure logging for the CLI session."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, format=LOG_FORMAT)
logger.setLevel(level)
httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.DEBUG if verbose else logging.WARNING)
def _log_token_state(token: Optional[str], json_mode: bool) -> None:
"""Log whether a token was discovered for informational output."""
if json_mode:
return
if token: if token:
masked_token = f"{token[:4]}..." masked_token = f"{token[:4]}..."
if not args.json: logger.info("🔑 Token loaded: %s", masked_token)
logger.info(f"🔑 Token loaded: {masked_token}")
else: else:
if not args.json: logger.warning("⚠️ No GOFILE_TOKEN found in .env or environment. Running as Guest.")
logger.warning("⚠️ No GOFILE_TOKEN found in .env or environment. Running as Guest.")
client = GofileClient(token=token)
def _progress_callback_factory(progress_bar: Optional[tqdm]) -> Callable[[int], None]:
"""Return a callback that updates the provided progress bar."""
def update(chunk_size: int, active_bar: Optional[tqdm] = progress_bar) -> None:
if active_bar:
active_bar.update(chunk_size)
return update
def _create_progress_bar(filename: str, total: int, quiet: bool) -> Optional[tqdm]:
"""Create a tqdm progress bar unless JSON mode is requested."""
if quiet:
return None
return tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {filename}")
def _handle_upload_success(
data: Dict[str, object],
filename: str,
) -> Dict[str, object]:
"""Normalize the success payload for presentation."""
return {
"file": filename,
"status": "success",
"downloadPage": data.get("downloadPage"),
"directLink": data.get("directLink", "N/A"),
"parentFolder": data.get("parentFolder"),
}
def _handle_upload_error(filename: str, error: Exception) -> Dict[str, object]:
"""Normalize the error payload for presentation."""
return {
"file": filename,
"status": "error",
"message": str(error),
"errorType": error.__class__.__name__,
}
def _apply_guest_token(client: GofileClient, data: Dict[str, object]) -> None:
"""Capture a guest token from the response so future uploads reuse the folder."""
guest_token = data.get("guestToken")
if guest_token and not client.token:
client.token = str(guest_token)
client.client.headers.update({"Authorization": f"Bearer {client.token}"})
logger.debug("Guest token applied: %s", client.token)
def upload_files(args: argparse.Namespace, client: GofileClient) -> List[Dict[str, object]]:
"""Upload each file sequentially and return the collected results."""
results: List[Dict[str, object]] = []
target_folder_id = args.folder_id target_folder_id = args.folder_id
results = []
for file_path in args.files: for file_path in args.files:
if not os.path.exists(file_path): if not os.path.exists(file_path):
res_err = {"file": file_path, "status": "error", "message": "File not found"} logger.error("File not found: %s", file_path)
results.append(res_err) results.append({
if not args.json: "file": file_path,
logger.error(f"File not found: {file_path}") "status": "error",
"message": "File not found",
})
continue continue
file_size = os.path.getsize(file_path) file_size = os.path.getsize(file_path)
filename = os.path.basename(file_path) filename = os.path.basename(file_path)
progress_bar = _create_progress_bar(filename, file_size, args.json)
# Init Progress Bar (Only if not JSON mode) progress_callback = _progress_callback_factory(progress_bar)
pbar = None
if not args.json:
pbar = tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Uploading {filename}")
def progress_update(chunk_size):
if pbar:
pbar.update(chunk_size)
try: try:
data = client.upload_file( data = client.upload_file(
file_path=file_path, file_path=file_path,
folder_id=target_folder_id, folder_id=target_folder_id,
callback=progress_update callback=progress_callback,
) )
# --- Auto-Folder Management for Guests ---
# If we are in single folder mode and it's the first upload
if args.to_single_folder and target_folder_id is None: if args.to_single_folder and target_folder_id is None:
if 'parentFolder' in data: parent_folder = data.get("parentFolder")
target_folder_id = data['parentFolder'] if parent_folder:
logger.debug(f"Parent folder set to: {target_folder_id}") target_folder_id = str(parent_folder)
logger.debug("Parent folder set to: %s", target_folder_id)
# If guest, capture the guestToken to write to the same folder next time _apply_guest_token(client, data)
if 'guestToken' in data and not client.token:
client.token = data['guestToken']
# Re-auth client with new token
client.client.headers.update({"Authorization": f"Bearer {client.token}"})
logger.debug(f"Guest token applied: {client.token}")
results.append({ results.append(_handle_upload_success(data, filename))
"file": filename, except (GofileError, httpx.HTTPError, OSError) as error:
"status": "success", if logger.isEnabledFor(logging.DEBUG):
"downloadPage": data.get("downloadPage"), logger.exception("Upload failed for %s", filename)
"directLink": data.get("directLink", "N/A"), # Sometimes available
"parentFolder": data.get("parentFolder")
})
except Exception as e:
err_msg = str(e)
results.append({"file": filename, "status": "error", "message": err_msg})
if not args.json:
logger.error(f"Upload failed: {err_msg}")
finally:
if pbar:
pbar.close()
# Output
if args.json:
print(json.dumps(results, indent=2))
else:
print("\n--- Summary ---")
for res in results:
if res['status'] == 'success':
print(f"{res['file']} -> {res['downloadPage']}")
else: else:
print(f"{res['file']} -> {res['message']}") logger.error("Upload failed for %s: %s", filename, error)
results.append(_handle_upload_error(filename, error))
finally:
if progress_bar:
progress_bar.close()
return results
def output_results(results: List[Dict[str, object]], json_mode: bool) -> None:
"""Display results in either JSON or human readable form."""
if json_mode:
print(json.dumps(results, indent=2))
return
print("\n--- Summary ---")
for result in results:
if result["status"] == "success":
print(f"{result['file']} -> {result['downloadPage']}")
else:
print(f"{result['file']} -> {result.get('message')}")
successes = sum(1 for res in results if res["status"] == "success")
failures = len(results) - successes
logger.info("Summary: %s succeeded, %s failed", successes, failures)
def main() -> None:
"""Entrypoint for the CLI."""
load_dotenv()
args = parse_arguments()
configure_logging(args.verbose)
token = os.environ.get("GOFILE_TOKEN")
_log_token_state(token, args.json)
client = GofileClient(token=token)
results = upload_files(args, client)
output_results(results, args.json)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1,101 +1,231 @@
#!/usr/bin/env python3 """HTTP client for interacting with the Gofile API."""
from __future__ import annotations
import httpx
import logging import logging
import os import os
from typing import Optional, List, Dict, Callable from dataclasses import dataclass
from typing import Any, BinaryIO, Callable, Dict, List, Optional, Union
import httpx
from .utils import ProgressFileReader from .utils import ProgressFileReader
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class GofileError(RuntimeError):
"""Base exception for all Gofile client errors."""
def __init__(self, message: str, *, context: Optional[Dict[str, Any]] = None):
super().__init__(message)
self.context = context or {}
class GofileAPIError(GofileError):
"""Raised when the Gofile API reports an error."""
class GofileNetworkError(GofileError):
"""Raised when the HTTP request fails before reaching the API."""
class GofileUploadError(GofileError):
"""Raised when the upload flow cannot complete."""
@dataclass(slots=True)
class GofileFile:
"""Represents a file returned by the Gofile API."""
name: str
page_link: str
file_id: str
parent_folder: str
raw: Dict[str, object]
@classmethod
def from_data(cls, data: Dict[str, object]) -> "GofileFile":
"""Create an instance from the API response payload."""
return cls(
name=str(data.get("fileName", "")),
page_link=str(data.get("downloadPage", "")),
file_id=str(data.get("fileId", "")),
parent_folder=str(data.get("parentFolder", "")),
raw=data,
)
def to_dict(self) -> Dict[str, object]:
"""Return the original API payload as a new dict."""
return dict(self.raw)
class GofileClient: class GofileClient:
"""Thin wrapper around Gofile's REST endpoints."""
API_ROOT = "https://api.gofile.io" API_ROOT = "https://api.gofile.io"
UPLOAD_SERVER_URL = "https://upload.gofile.io" UPLOAD_SERVER_URL = "https://upload.gofile.io"
def __init__(self, token: Optional[str] = None): def __init__(self, token: Optional[str] = None):
"""Instantiate the client with an optional authentication token."""
self.token = token self.token = token
# Increase timeout for large API operations, though uploads handle their own timeout self.client = httpx.Client(timeout=30.0)
self.client = httpx.Client(timeout=30.0)
if self.token: if self.token:
logger.debug(f"Initialized with token: {self.token[:4]}***") logger.debug("Initialized with token: %s***", self.token[:4])
self.client.headers.update({"Authorization": f"Bearer {self.token}"}) self.client.headers.update({"Authorization": f"Bearer {self.token}"})
def _handle_response(self, response: httpx.Response) -> Dict: def _handle_response(self, response: httpx.Response) -> Dict[str, object]:
logger.debug(f"Response Status: {response.status_code}") """Validate HTTP responses and normalize API errors."""
logger.debug("Response status: %s", response.status_code)
try: try:
data = response.json() data = response.json()
logger.debug(f"Response Body: {data}") logger.debug("Response body: %s", data)
except Exception: except ValueError as exc: # httpx raises ValueError for invalid JSON
error_text = response.text.strip() error_text = response.text.strip()
logger.debug(f"Failed to parse JSON: {error_text}") logger.debug("Failed to parse JSON: %s", error_text)
response.raise_for_status() response.raise_for_status()
return {} raise GofileAPIError("Invalid JSON returned by Gofile API") from exc
if data.get("status") != "ok": if data.get("status") != "ok":
logger.error(f"API Error: {data}") logger.error("API error payload: %s", data)
raise Exception(f"Gofile API Error: {data.get('status')} - {data.get('data')}") raise GofileAPIError(
f"Gofile API Error: {data.get('status')} - {data.get('data')}"
return data.get("data", {}) )
payload = data.get("data")
if not isinstance(payload, dict):
raise GofileAPIError("Gofile API returned unexpected payload structure")
return payload
def _request(
self, method: str, url: str, *, context: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Dict[str, object]:
"""Execute an HTTP request and normalize errors."""
safe_context = context or {}
try:
logger.debug("HTTP %s %s | payload=%s", method, url, safe_context)
response = self.client.request(method, url, **kwargs)
except httpx.HTTPError as exc:
logger.error("HTTP %s %s failed: %s", method, url, exc)
raise GofileNetworkError(
f"Failed HTTP request to {url}", context={"method": method, **safe_context}
) from exc
return self._handle_response(response)
@staticmethod
def _sanitize_metadata(metadata: Dict[str, str]) -> Dict[str, str]:
"""Return a copy of request metadata with sensitive values redacted."""
redacted = dict(metadata)
if "token" in redacted:
redacted["token"] = "***REDACTED***"
return redacted
def get_server(self) -> str: def get_server(self) -> str:
""" """Return the upload server, which leverages geo-aware routing."""
Gofile suggests using specific servers (availables in their doc),
but 'upload.gofile.io' uses DNS geo-routing automatically.
We stick to the best practice default.
"""
return self.UPLOAD_SERVER_URL return self.UPLOAD_SERVER_URL
def create_folder(self, parent_folder_id: str, folder_name: str) -> Dict: def create_folder(self, parent_folder_id: str, folder_name: str) -> Dict[str, object]:
logger.debug(f"Creating folder '{folder_name}' in '{parent_folder_id}'") """Create a folder under the provided parent folder."""
logger.debug("Creating folder '%s' in '%s'", folder_name, parent_folder_id)
url = f"{self.API_ROOT}/contents/createFolder" url = f"{self.API_ROOT}/contents/createFolder"
payload = { payload = {
"parentFolderId": parent_folder_id, "parentFolderId": parent_folder_id,
"folderName": folder_name "folderName": folder_name,
} }
res = self.client.post(url, json=payload) return self._request("POST", url, json=payload, context=payload)
return self._handle_response(res)
def delete_content(self, content_ids: List[str]) -> Dict: def delete_content(self, content_ids: List[str]) -> Dict[str, object]:
logger.debug(f"Deleting content IDs: {content_ids}") """Delete one or more items by their content IDs."""
logger.debug("Deleting content IDs: %s", content_ids)
url = f"{self.API_ROOT}/contents" url = f"{self.API_ROOT}/contents"
# HTTPX needs 'content' or 'json' for DELETE requests explicitly if body is required payload = {"contentsId": ",".join(content_ids)}
res = self.client.request("DELETE", url, json={"contentsId": ",".join(content_ids)}) return self._request("DELETE", url, json=payload, context=payload)
return self._handle_response(res)
def upload(
self,
file: Union[str, BinaryIO],
folder_id: Optional[str] = None,
callback: Optional[Callable[[int], None]] = None,
) -> GofileFile:
"""Upload a file object or file path to Gofile."""
def upload_file(self,
file_path: str,
folder_id: Optional[str] = None,
callback: Optional[Callable[[int], None]] = None) -> Dict:
server_url = f"{self.get_server()}/uploadfile" server_url = f"{self.get_server()}/uploadfile"
file_name = os.path.basename(file_path) data: Dict[str, str] = {}
# Prepare parameters
data = {}
if self.token: if self.token:
data["token"] = self.token data["token"] = self.token
if folder_id: if folder_id:
data["folderId"] = folder_id data["folderId"] = folder_id
# Use our custom ProgressFileReader logger.debug("Upload metadata: %s", self._sanitize_metadata(data))
# If no callback is provided, we use a dummy lambda to avoid errors
progress_callback = callback if callback else lambda x: None progress_callback = callback or (lambda _chunk: None)
logger.info(f"Starting upload: {file_name} -> {server_url}") if isinstance(file, str):
file_name = os.path.basename(file)
# Open file using our wrapper logger.info("Starting upload: %s -> %s", file_name, server_url)
with ProgressFileReader(file_path, progress_callback) as f: with open(file, "rb") as file_handle:
files = {'file': (file_name, f)} wrapped_file = ProgressFileReader(file_handle, progress_callback)
response = self._post_upload(
# Use a longer timeout for the upload specifically (None = infinite) server_url,
# This is crucial for 2000GB files data=data,
res = self.client.post( files={"file": (file_name, wrapped_file)},
server_url, )
data=data, else:
files=files, file_name = getattr(file, "name", "uploaded_file")
timeout=None if hasattr(file_name, "__fspath__"):
file_name = os.path.basename(file_name) # type: ignore[arg-type]
elif "/" in str(file_name) or "\\" in str(file_name):
file_name = os.path.basename(str(file_name))
logger.info("Starting upload: %s -> %s", file_name, server_url)
files = {"file": (file_name, file)}
response = self._post_upload(
server_url,
data=data,
files=files,
) )
return self._handle_response(res) response_data = self._handle_response(response)
logger.info("Upload finished: %s", file_name)
return GofileFile.from_data(response_data)
def _post_upload(
self,
url: str,
*,
data: Dict[str, str],
files: Dict[str, Any],
) -> httpx.Response:
"""Issue the actual upload request with improved error context."""
try:
return self.client.post(url, data=data, files=files, timeout=None)
except httpx.TimeoutException as exc:
logger.error("Upload timed out at %s", url)
raise GofileUploadError("Upload timed out", context={"url": url}) from exc
except httpx.HTTPError as exc:
logger.error("HTTP error while uploading to %s: %s", url, exc)
raise GofileUploadError("Upload failed", context={"url": url}) from exc
def upload_file(
self,
file_path: str,
folder_id: Optional[str] = None,
callback: Optional[Callable[[int], None]] = None,
) -> Dict[str, object]:
"""Compatibility helper that mirrors the legacy API."""
result = self.upload(file_path, folder_id, callback)
return result.to_dict()

View File

@@ -1,28 +1,20 @@
#!/usr/bin/env python3 """Utility helpers for GofilePy."""
from __future__ import annotations
import typing
import io import io
from typing import BinaryIO, Callable
class ProgressFileReader(io.BufferedReader): class ProgressFileReader(io.BufferedReader):
""" """Buffered reader that reports read progress through a callback."""
Wraps a file object to trigger a callback when data is read.
This allows monitoring upload progress in httpx without loading the file into RAM.
"""
def __init__(self, filename: str, callback: typing.Callable[[int], None]):
self._f = open(filename, 'rb')
self._callback = callback
# Get file size for verification if needed, or just standard init
super().__init__(self._f)
def read(self, size: int = -1) -> bytes: def __init__(self, file_obj: BinaryIO, callback: Callable[[int], None]):
# Read the chunk from disk self._callback = callback
chunk = self._f.read(size) super().__init__(file_obj)
# Update the progress bar with the length of the chunk read
def read(self, size: int = -1) -> bytes: # type: ignore[override]
chunk = super().read(size)
if chunk: if chunk:
self._callback(len(chunk)) self._callback(len(chunk))
return chunk return chunk
def close(self) -> None:
if hasattr(self, '_f'):
self._f.close()