This commit is contained in:
√(noham)²
2025-07-02 22:18:25 +02:00
parent b0197ae646
commit 7ba77cb9d2
2 changed files with 257 additions and 189 deletions

3
.gitignore vendored
View File

@@ -125,4 +125,5 @@ venv.bak/
dmypy.json
# node
node_modules/
node_modules/
test.txt

View File

@@ -1,12 +1,13 @@
#!/usr/bin/env python3
import os
import requests
import mimetypes
from typing import List, Optional, Dict, Any, Union
import logging
import argparse
import logging
import mimetypes
import os
import sys
from base64 import b64encode
from typing import List, Optional, Dict, Any, Union
import requests
from dotenv import load_dotenv
from requests_toolbelt.multipart.encoder import MultipartEncoder, MultipartEncoderMonitor
from tqdm import tqdm
@@ -19,6 +20,10 @@ logger = logging.getLogger(__name__)
load_dotenv()
PIXELDRAIN_API_KEY = os.getenv("PIXELDRAIN_API_KEY")
# Constants
CHUNK_SIZE = 8192
TIMEOUT = 30
def display_file_size(size: int) -> str:
"""Return a human-readable file size."""
@@ -32,29 +37,30 @@ def display_file_size(size: int) -> str:
def upload_to_pixeldrain(file_path: str) -> Optional[str]:
"""
Upload a file to pixeldrain.com and return the shareable URL.
Args:
file_path (str): Path to the file to upload
Returns:
Optional[str]: The pixeldrain URL if successful, None otherwise
Raises:
Exception: If file not found or upload fails
FileNotFoundError: If file not found
requests.RequestException: If upload fails
"""
try:
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
logger.error("File not found: %s", file_path)
return None
logger.info(f"Uploading file to pixeldrain: {file_path}")
logger.info("Uploading file to pixeldrain: %s", file_path)
file_size = os.path.getsize(file_path)
# Create authorization header for API key authentication
auth_header = f"Basic {b64encode(f':{PIXELDRAIN_API_KEY}'.encode()).decode()}"
with open(file_path, 'rb') as f:
with open(file_path, 'rb') as file:
with tqdm(
total=file_size,
unit="B",
@@ -65,10 +71,11 @@ def upload_to_pixeldrain(file_path: str) -> Optional[str]:
progress.update(monitor.bytes_read - progress.n)
# Detect MIME type for proper content handling
content_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
content_type = (mimetypes.guess_type(file_path)[0] or
"application/octet-stream")
encoder = MultipartEncoder(
fields={
"file": (os.path.basename(file_path), f, content_type),
"file": (os.path.basename(file_path), file, content_type),
}
)
monitor = MultipartEncoderMonitor(encoder, progress_callback)
@@ -77,32 +84,96 @@ def upload_to_pixeldrain(file_path: str) -> Optional[str]:
"Authorization": auth_header
}
response = requests.post('https://pixeldrain.com/api/file', data=monitor, headers=headers)
if response.status_code == 200 or response.status_code == 201:
response = requests.post(
'https://pixeldrain.com/api/file',
data=monitor,
headers=headers,
timeout=TIMEOUT
)
if response.status_code in (200, 201):
json_response = response.json()
if json_response['success']:
file_id = json_response['id']
logger.info(f"Upload completed: {os.path.basename(file_path)}")
logger.info("Upload completed: %s", os.path.basename(file_path))
return f"https://pixeldrain.com/u/{file_id}"
else:
logger.error(f"Upload failed: {json_response.get('message', 'Unknown error')}")
logger.error("Upload failed: %s",
json_response.get('message', 'Unknown error'))
return None
logger.error("Upload failed: HTTP %s - %s",
response.status_code, response.text)
return None
except (OSError, requests.RequestException) as error:
logger.error("Error uploading to pixeldrain: %s", error)
return None
def _handle_successful_download(response, save_path: str, filename: str,
download_folder: str) -> str:
"""Handle successful download response."""
total_size = int(response.headers.get('content-length', 0))
os.makedirs(download_folder, exist_ok=True)
# Download with progress bar
with open(save_path, 'wb') as file:
with tqdm(
total=total_size,
unit="B",
unit_scale=True,
desc=f"Downloading {filename}...",
) as progress:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
if chunk: # Filter out keep-alive chunks
file.write(chunk)
progress.update(len(chunk))
logger.info("Download completed: %s", save_path)
return save_path
def _handle_404_error(response):
"""Handle 404 error response."""
try:
json_response = response.json()
logger.error("File not found: %s",
json_response.get('message', 'The file could not be found'))
except requests.exceptions.JSONDecodeError:
logger.error("File not found")
def _handle_403_error(response, file_id: str):
"""Handle 403 error response."""
try:
json_response = response.json()
error_value = json_response.get('value', '')
message = json_response.get('message', 'Access forbidden')
if 'rate_limited_captcha_required' in error_value:
logger.error("Rate limited: %s", message)
logger.info("Please visit https://pixeldrain.com/u/%s to complete captcha",
file_id)
elif 'virus_detected_captcha_required' in error_value:
logger.error("Virus detected: %s", message)
logger.info("Please visit https://pixeldrain.com/u/%s to complete captcha",
file_id)
else:
logger.error(f"Upload failed: HTTP {response.status_code} - {response.text}")
except Exception as e:
logger.error(f"Error uploading to pixeldrain: {e}")
return None
logger.error("Access forbidden: %s", message)
except requests.exceptions.JSONDecodeError:
logger.error("Access forbidden")
def download_from_pixeldrain(file_id: str, download_folder: str, force_download: bool = False) -> Optional[str]:
def download_from_pixeldrain(file_id: str, download_folder: str,
force_download: bool = False) -> Optional[str]:
"""
Download a file from pixeldrain.com using its file ID.
Args:
file_id (str): The pixeldrain file ID
download_folder (str): Directory to save the downloaded file
force_download (bool): Force download by adding ?download parameter
Returns:
Optional[str]: Path to downloaded file if successful, None otherwise
"""
@@ -113,77 +184,42 @@ def download_from_pixeldrain(file_id: str, download_folder: str, force_download:
else:
auth_header = f"Basic {b64encode(f':{PIXELDRAIN_API_KEY}'.encode()).decode()}"
headers = {"Authorization": auth_header}
# Get file info to determine the filename
logger.info(f"Getting file info for {file_id}")
logger.info("Getting file info for %s", file_id)
file_info = get_file_info_pixeldrain(file_id)
if not file_info:
logger.error("Could not retrieve file information")
return None
filename = file_info.get('name', f"{file_id}_unknown")
save_path = os.path.join(download_folder, filename)
# Construct URL with optional download parameter
url = f'https://pixeldrain.com/api/file/{file_id}'
if force_download:
url += '?download'
logger.info(f"Downloading file {filename} from pixeldrain")
response = requests.get(url, headers=headers, stream=True)
logger.info("Downloading file %s from pixeldrain", filename)
response = requests.get(url, headers=headers, stream=True, timeout=TIMEOUT)
if response.status_code == 200:
total_size = int(response.headers.get('content-length', 0))
os.makedirs(download_folder, exist_ok=True)
# Download with progress bar
with open(save_path, 'wb') as f:
with tqdm(
total=total_size,
unit="B",
unit_scale=True,
desc=f"Downloading {filename}...",
) as progress:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # Filter out keep-alive chunks
f.write(chunk)
progress.update(len(chunk))
logger.info(f"Download completed: {save_path}")
return save_path
elif response.status_code == 404:
try:
json_response = response.json()
logger.error(f"File not found: {json_response.get('message', 'The file could not be found')}")
except:
logger.error("File not found")
return _handle_successful_download(response, save_path, filename, download_folder)
if response.status_code == 404:
_handle_404_error(response)
return None
elif response.status_code == 403:
try:
json_response = response.json()
error_value = json_response.get('value', '')
message = json_response.get('message', 'Access forbidden')
if 'rate_limited_captcha_required' in error_value:
logger.error(f"Rate limited: {message}")
logger.info(f"Please visit https://pixeldrain.com/u/{file_id} to complete captcha")
elif 'virus_detected_captcha_required' in error_value:
logger.error(f"Virus detected: {message}")
logger.info(f"Please visit https://pixeldrain.com/u/{file_id} to complete captcha")
else:
logger.error(f"Access forbidden: {message}")
except:
logger.error("Access forbidden")
if response.status_code == 403:
_handle_403_error(response, file_id)
return None
else:
logger.error(f"Download failed: HTTP {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error downloading from pixeldrain: {e}")
logger.error("Download failed: HTTP %s - %s",
response.status_code, response.text)
return None
except (OSError, requests.RequestException) as error:
logger.error("Error downloading from pixeldrain: %s", error)
return None
@@ -193,114 +229,120 @@ def get_stats_pixeldrain() -> Optional[Dict[str, Any]]:
if not PIXELDRAIN_API_KEY:
logger.error("PIXELDRAIN_API_KEY not found in environment variables")
return None
logger.info("Fetching files stats from pixeldrain")
auth_header = f"Basic {b64encode(f':{PIXELDRAIN_API_KEY}'.encode()).decode()}"
headers = {"Authorization": auth_header}
response = requests.get('https://pixeldrain.com/api/user/files', headers=headers)
response = requests.get('https://pixeldrain.com/api/user/files',
headers=headers, timeout=TIMEOUT)
if response.status_code == 200:
data = response.json()
logger.info(f"Successfully retrieved stats for {len(data.get('files', []))} files")
logger.info("Successfully retrieved stats for %s files",
len(data.get('files', [])))
return data
else:
logger.error(f"Failed to get stats: HTTP {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error getting stats from pixeldrain: {e}")
logger.error("Failed to get stats: HTTP %s - %s",
response.status_code, response.text)
return None
except (OSError, requests.RequestException) as error:
logger.error("Error getting stats from pixeldrain: %s", error)
return None
def print_stats_pixeldrain() -> None:
"""Print account statistics from pixeldrain.com."""
stats = get_stats_pixeldrain()
if stats and 'files' in stats:
files = stats['files']
logger.info(f"Found {len(files)} files in account")
logger.info("Found %s files in account", len(files))
# Calculate some basic statistics
total_size = sum(file.get('size', 0) for file in files)
total_views = sum(file.get('views', 0) for file in files)
total_downloads = sum(file.get('downloads', 0) for file in files)
total_bandwidth = sum(file.get('bandwidth_used', 0) for file in files)
logger.info(f"Total size: {total_size / (1024**3):.2f} GB")
logger.info(f"Total views: {total_views:,}")
logger.info(f"Total downloads: {total_downloads:,}")
logger.info(f"Total bandwidth used: {total_bandwidth / (1024**3):.2f} GB")
logger.info("Total size: %.2f GB", total_size / (1024**3))
logger.info("Total views: %s", f"{total_views:,}")
logger.info("Total downloads: %s", f"{total_downloads:,}")
logger.info("Total bandwidth used: %.2f GB", total_bandwidth / (1024**3))
# Show top 5 most downloaded files
top_downloads = sorted(files, key=lambda x: x.get('downloads', 0), reverse=True)[:5]
logger.info("\nTop 5 most downloaded files:")
logger.info("Top 5 most downloaded files:")
for i, file in enumerate(top_downloads, 1):
logger.info(f"{i}. {file.get('name', 'Unknown')} - {file.get('downloads', 0)} downloads")
logger.info("%s. %s - %s downloads", i, file.get('name', 'Unknown'),
file.get('downloads', 0))
else:
logger.error("Failed to get stats")
def get_file_info_pixeldrain(file_ids: Union[str, List[str]]) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]:
def get_file_info_pixeldrain(file_ids: Union[str, List[str]]) -> Optional[
Union[Dict[str, Any], List[Dict[str, Any]]]]:
"""Get information about one or more files from pixeldrain.com."""
try:
if not PIXELDRAIN_API_KEY:
logger.error("PIXELDRAIN_API_KEY not found in environment variables")
return None
# Handle both single ID and list of IDs
if isinstance(file_ids, list):
if len(file_ids) > 1000:
logger.error("Maximum 1000 files per request")
return None
ids_str = ",".join(file_ids)
logger.info(f"Fetching info for {len(file_ids)} files from pixeldrain")
logger.info("Fetching info for %s files from pixeldrain", len(file_ids))
else:
ids_str = file_ids
logger.info(f"Fetching info for file {file_ids} from pixeldrain")
logger.info("Fetching info for file %s from pixeldrain", file_ids)
auth_header = f"Basic {b64encode(f':{PIXELDRAIN_API_KEY}'.encode()).decode()}"
headers = {"Authorization": auth_header}
response = requests.get(f'https://pixeldrain.com/api/file/{ids_str}/info', headers=headers)
response = requests.get(f'https://pixeldrain.com/api/file/{ids_str}/info',
headers=headers, timeout=TIMEOUT)
if response.status_code == 200:
data = response.json()
logger.info("Successfully retrieved file info")
return data
elif response.status_code == 404:
if response.status_code == 404:
json_response = response.json()
logger.error(f"File not found: {json_response.get('value', 'Unknown error')}")
logger.error("File not found: %s", json_response.get('value', 'Unknown error'))
return None
else:
logger.error(f"Failed to get file info: HTTP {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error getting file info from pixeldrain: {e}")
logger.error("Failed to get file info: HTTP %s - %s",
response.status_code, response.text)
return None
except (OSError, requests.RequestException) as error:
logger.error("Error getting file info from pixeldrain: %s", error)
return None
def print_file_info_pixeldrain(file_ids: Union[str, List[str]]) -> None:
"""Print information about one or more files from pixeldrain.com."""
file_info = get_file_info_pixeldrain(file_ids)
if file_info:
logger.info(f"File name: {file_info.get('name', 'Unknown')}")
logger.info(f"File size: {display_file_size(file_info.get('size', 0))}")
logger.info(f"Views: {file_info.get('views', 0):,}")
logger.info(f"Downloads: {file_info.get('downloads', 0):,}")
logger.info(f"Upload date: {file_info.get('date_upload', 'Unknown')}")
# logger.info(f"Last view: {file_info.get('date_last_view', 'Unknown')}")
# logger.info(f"MIME type: {file_info.get('mime_type', 'Unknown')}")
# logger.info(f"Can edit: {file_info.get('can_edit', False)}")
logger.info("File name: %s", file_info.get('name', 'Unknown'))
logger.info("File size: %s", display_file_size(file_info.get('size', 0)))
logger.info("Views: %s", f"{file_info.get('views', 0):,}")
logger.info("Downloads: %s", f"{file_info.get('downloads', 0):,}")
logger.info("Upload date: %s", file_info.get('date_upload', 'Unknown'))
def reupload_pixeldrain(file_ids: Union[str, List[str]], download_folder: str, force_download: bool = False) -> Optional[str]:
def reupload_pixeldrain(file_ids: Union[str, List[str]], download_folder: str,
force_download: bool = False) -> Optional[str]:
"""Reupload files from pixeldrain.com."""
downloaded_file = download_from_pixeldrain(file_ids, download_folder, force_download)
if not downloaded_file:
logger.error("Download failed, cannot reupload")
return None
return upload_to_pixeldrain(downloaded_file)
@@ -310,88 +352,113 @@ def parse_file_id(input_str: str) -> str:
input_str = input_str.strip()
if "pixeldrain.com/u/" in input_str:
return input_str.split("pixeldrain.com/u/")[-1]
elif "pixeldrain.com/f/" in input_str:
if "pixeldrain.com/f/" in input_str:
return input_str.split("pixeldrain.com/f/")[-1]
elif "href.li/?" in input_str:
if "href.li/?" in input_str:
return parse_file_id(input_str.split("href.li/?")[-1])
return input_str
def main():
"""Main CLI entry point."""
parser = argparse.ArgumentParser(description="CLI for pixeldrain.com - Upload, download, and manage files")
def _setup_argument_parser():
"""Set up the argument parser with all commands and options."""
parser = argparse.ArgumentParser(
description="CLI for pixeldrain.com - Upload, download, and manage files"
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Upload command
upload_parser = subparsers.add_parser('upload', help='Upload a file to pixeldrain')
upload_parser.add_argument('file_path', help='Path to the file to upload')
# Download command
download_parser = subparsers.add_parser('download', help='Download a file from pixeldrain')
download_parser = subparsers.add_parser('download',
help='Download a file from pixeldrain')
download_parser.add_argument('file_id', help='File ID or pixeldrain URL')
download_parser.add_argument('-d', '--dir', default='/tmp', help='Download directory (default: /tmp)')
download_parser.add_argument('-f', '--force', action='store_true', help='Force download (?download parameter)')
download_parser.add_argument('-d', '--dir', default='/tmp',
help='Download directory (default: /tmp)')
download_parser.add_argument('-f', '--force', action='store_true',
help='Force download (?download parameter)')
# Info command
info_parser = subparsers.add_parser('info', help='Get file information')
info_parser.add_argument('file_id', help='File ID or pixeldrain URL')
# Stats command
stats_parser = subparsers.add_parser('stats', help='Display account statistics')
subparsers.add_parser('stats', help='Display account statistics')
# Reupload command
reupload_parser = subparsers.add_parser('reupload', help='Re-download and re-upload a file')
reupload_parser = subparsers.add_parser('reupload',
help='Re-download and re-upload a file')
reupload_parser.add_argument('file_id', help='File ID or pixeldrain URL')
reupload_parser.add_argument('-d', '--dir', default='/tmp', help='Temporary directory (default: /tmp)')
reupload_parser.add_argument('-f', '--force', action='store_true', help='Force download')
reupload_parser.add_argument('-d', '--dir', default='/tmp',
help='Temporary directory (default: /tmp)')
reupload_parser.add_argument('-f', '--force', action='store_true',
help='Force download')
return parser
def _handle_upload_command(args):
"""Handle upload command."""
if not os.path.isfile(args.file_path):
logger.error("File not found: %s", args.file_path)
sys.exit(1)
url = upload_to_pixeldrain(args.file_path)
if url:
print(f"File uploaded successfully: {url}")
else:
logger.error("Upload failed")
sys.exit(1)
def _handle_download_command(args):
"""Handle download command."""
file_id = parse_file_id(args.file_id)
result = download_from_pixeldrain(file_id, args.dir, args.force)
if result:
print(f"File downloaded successfully: {result}")
if os.path.exists(result):
file_size = os.path.getsize(result)
print(f"File size: {display_file_size(file_size)}")
else:
logger.error("Download failed")
sys.exit(1)
def _handle_reupload_command(args):
"""Handle reupload command."""
file_id = parse_file_id(args.file_id)
url = reupload_pixeldrain(file_id, args.dir, args.force)
if url:
print(f"File re-uploaded successfully: {url}")
else:
logger.error("Re-upload failed")
sys.exit(1)
def main():
"""Main CLI entry point."""
parser = _setup_argument_parser()
args = parser.parse_args()
# Check for API key for commands that need it
if args.command in ['upload', 'stats', 'info', 'reupload'] and not PIXELDRAIN_API_KEY:
logger.error("PIXELDRAIN_API_KEY is required for this command. Set it in your environment variables or in a .env file")
logger.error("PIXELDRAIN_API_KEY is required for this command. "
"Set it in your environment variables or in a .env file")
sys.exit(1)
if args.command == 'upload':
if not os.path.isfile(args.file_path):
logger.error(f"File not found: {args.file_path}")
sys.exit(1)
url = upload_to_pixeldrain(args.file_path)
if url:
print(f"File uploaded successfully: {url}")
else:
logger.error("Upload failed")
sys.exit(1)
_handle_upload_command(args)
elif args.command == 'download':
file_id = parse_file_id(args.file_id)
result = download_from_pixeldrain(file_id, args.dir, args.force)
if result:
print(f"File downloaded successfully: {result}")
if os.path.exists(result):
file_size = os.path.getsize(result)
print(f"File size: {display_file_size(file_size)}")
else:
logger.error("Download failed")
sys.exit(1)
_handle_download_command(args)
elif args.command == 'info':
file_id = parse_file_id(args.file_id)
print_file_info_pixeldrain(file_id)
elif args.command == 'stats':
print_stats_pixeldrain()
elif args.command == 'reupload':
file_id = parse_file_id(args.file_id)
url = reupload_pixeldrain(file_id, args.dir, args.force)
if url:
print(f"File re-uploaded successfully: {url}")
else:
logger.error("Re-upload failed")
sys.exit(1)
_handle_reupload_command(args)
else:
parser.print_help()