GoFileCLI/gofilecli.py
2024-09-24 21:10:30 +02:00

591 lines
23 KiB
Python

#!/usr/bin/env python3
import requests
import random
import time
import argparse
import os
import logging
import sys
import platform
import subprocess
import sys
import simpleaudio as sa
from tqdm import tqdm
import json
from dotenv import load_dotenv
# Suppress ALSA warnings
os.environ['SDL_AUDIODRIVER'] = 'dummy'
os.environ['AUDIODEV'] = 'null'
def reqst(url, method ,headers=None, data=None, files=None, params=None, json=None, logger=None):
try:
if method == "get":
response = requests.get(url, headers=headers, data=data, params=params, json=json)
elif method == "post":
response = requests.post(url, headers=headers, data=data, files=files, params=params, json=json)
elif method == "put":
response = requests.put(url, headers=headers, data=data, files=files, params=params, json=json)
elif method == "delete":
response = requests.delete(url, headers=headers, data=data, files=files, params=params, json=json)
logger.debug(f"Request to {url} with method {method} returned status code {response.status_code}")
json_response = response.json() # If response content is not JSON, this will raise a ValueError
return json_response
except requests.exceptions.HTTPError as http_err:
logger.debug(f"Response: {response.text}")
logger.error(f"HTTP error occurred: {http_err}") # Handles HTTP errors (e.g., 404, 500)
sys.exit()
except requests.exceptions.ConnectionError as conn_err:
logger.debug(f"Response: {response.text}")
logger.error(f"Connection error occurred: {conn_err}") # Handles network-related errors
sys.exit()
except requests.exceptions.Timeout as timeout_err:
logger.debug(f"Response: {response.text}")
logger.error(f"Timeout error occurred: {timeout_err}") # Handles request timeouts
sys.exit()
except requests.exceptions.RequestException as req_err:
logger.debug(f"Response: {response.text}")
logger.error(f"An error occurred: {req_err}") # Catches any other requests-related errors
sys.exit()
except ValueError as json_err:
logger.debug(f"Response: {response.text}")
logger.error(f"JSON decode error: {json_err}") # Handles issues with JSON decoding
sys.exit()
def load_file(file_name: str) -> str:
"""
Get the correct path to a file, regardless of development or compiled mode.
"""
return os.path.join(os.path.dirname(__file__), file_name)
def play_sound(logger):
try:
sound_path = load_file("assets/sounds/Blow_edited.wav")
wave_obj = sa.WaveObject.from_wave_file(sound_path)
play_obj = wave_obj.play()
play_obj.wait_done()
except Exception as e:
logger.debug(f"An error occurred while playing sound: {e}")
def set_env_var_unix(name, value, shell="bash"):
home = os.path.expanduser("~")
rc_file = f".{shell}rc"
rc_path = os.path.join(home, rc_file)
with open(rc_path, "a") as f:
f.write(f'\nexport {name}="{value}"\n')
os.system(f"source {rc_path}")
def set_env_var(name, value):
system = platform.system()
if system == "Windows":
subprocess.run(["setx", name, value], shell=True)
elif system in ["Linux", "Darwin"]:
shell = os.getenv('SHELL', '/bin/bash').split('/')[-1]
set_env_var_unix(name, value, shell=shell)
else:
raise NotImplementedError(f"Unsupported OS: {system}")
def format_file_size(file=None, num_bytes=None):
if num_bytes:
file_size_bytes = num_bytes
else:
file_size_bytes = os.path.getsize(file)
size_in_kb = file_size_bytes / 1024
size_in_mb = file_size_bytes / (1024 * 1024)
size_in_gb = file_size_bytes / (1024 * 1024 * 1024)
if size_in_gb >= 1:
return size_in_gb, "GB"
elif size_in_mb >= 1:
return size_in_mb, "MB"
else:
return size_in_kb, "KB"
def file_size(file=None, num_bytes=None):
size, unit = format_file_size(file=file, num_bytes=num_bytes)
return f"{size:.2f} {unit}"
def calculate_upload_speed(file, start_time):
elapsed_time_seconds = time.time() - start_time
file_size, size_unit = format_file_size(file)
if size_unit == "GB":
average_speed = (file_size * 1024) / elapsed_time_seconds # Convert GB to MB and calculate MB/s
speed_unit = "MB/s"
else:
average_speed = file_size / elapsed_time_seconds # MB/s
speed_unit = "MB/s"
if elapsed_time_seconds >= 60:
minutes = int(elapsed_time_seconds // 60)
seconds = int(elapsed_time_seconds % 60)
elapsed_time = f"{minutes}m {seconds}s"
else:
elapsed_time = f"{elapsed_time_seconds:.2f}s"
return f"{average_speed:.2f} {speed_unit}", elapsed_time
def get_file_paths(folderPath):
filePaths = []
for root, directories, files in os.walk(folderPath):
for file in files:
if ".DS_Store" not in file:
file_path = os.path.join(root, file)
filePaths.append(file_path)
return filePaths
def check_folderPath(path):
if not os.path.exists(path):
os.makedirs(path)
return path
def getservers(logger):
servers = []
# response = requests.get("https://api.gofile.io/servers").json()
response = reqst("https://api.gofile.io/servers", logger=logger, method="get")
if response["status"] == "ok":
for server in response["data"]["servers"]:
servers.append(server["name"])
logger.debug(f"{response}")
return servers
else:
logger.error(f"{response}")
return None
def ping_server(url, logger, num_requests=4, delay=0.1):
response_times = []
for _ in range(num_requests):
try:
start_time = time.time()
response = requests.head(url)
response_time = time.time() - start_time
if response.status_code == 200:
response_times.append(response_time)
except requests.exceptions.RequestException as e:
logger.error(f"Error: {e}")
time.sleep(delay)
if response_times:
avg_response = sum(response_times) / len(response_times)
return avg_response
else:
return float('inf')
def test_servers(servers, logger):
best_server = None
best_time = float('inf')
for server in servers:
logger.debug(f"Pinging {server}...")
url = f"https://{server}.gofile.io"
avg_time = ping_server(url, logger)
logger.debug(f"Average response time for {server}: {avg_time:.2f} ms")
if avg_time < best_time:
best_time = avg_time
best_server = server
if best_server:
logger.debug(f"\nThe best server is {best_server} with an average response time of {best_time:.2f} ms.")
else:
logger.error("All pings failed.")
return best_server
def get_stats(logger):
headers = {'authorization': f'Bearer {TOKEN}',}
# data = requests.get(f'https://api.gofile.io/accounts/{ACCOUNT_ID}', headers=headers).json()
data = reqst(f'https://api.gofile.io/accounts/{ACCOUNT_ID}', headers=headers, logger=logger, method="get")
if data["status"] == "ok":
stats = data["data"]["statsCurrent"]
logger.info("Account stats:")
logger.info(f"Total files: {stats['fileCount']}")
logger.info(f"Total folders: {stats['folderCount']}")
size = file_size(num_bytes=stats['storage'])
logger.info(f"Total size: {size}")
traffic = file_size(num_bytes=stats['trafficWebDownloaded'])
logger.info(f"Total traffic: {traffic}")
else:
logger.error(f"{data}")
return None
def get_rootfolder(logger):
headers = {'authorization': f'Bearer {TOKEN}',}
# data = requests.get(f'https://api.gofile.io/accounts/{ACCOUNT_ID}', headers=headers).json()
data = reqst(f'https://api.gofile.io/accounts/{ACCOUNT_ID}', headers=headers, logger=logger, method="get")
root_folder = data.get("data", {}).get("rootFolder", {})
if root_folder:
return root_folder
else:
logger.error(f"{data}")
return None
def get_code(folderId, logger):
headers = {'authorization': f'Bearer {TOKEN}',}
params = (('wt', '4fd6sg89d7s6'),('cache', 'false'),)
# data = requests.get(f'https://api.gofile.io/contents/{folderId}', headers=headers, params=params).json()
data = reqst(f'https://api.gofile.io/contents/{folderId}', headers=headers, params=params, logger=logger, method="get")
code = data.get("data", {}).get("code", {})
if code:
return code
else:
logger.error(f"{data}")
return None
def get_children(id, logger):
headers = {'authorization': f'Bearer {TOKEN}',}
params = (('wt', '4fd6sg89d7s6'),('cache', 'false'),)
# data = requests.get(f'https://api.gofile.io/contents/{id}', headers=headers, params=params).json()
data = reqst(f'https://api.gofile.io/contents/{id}', headers=headers, params=params, logger=logger, method="get")
children = data.get("data", {}).get("children", {})
if children:
return children
else:
if data["status"] == "error-rateLimit":
logger.warning("Rate limit reached, waiting 30 seconds")
time.sleep(30)
return get_children(id, logger)
else:
logger.error(f"{data}")
return None
def createfolder(parentFolderId, folderName, logger):
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}"}
if folderName:
data = {"parentFolderId": parentFolderId, "folderName": folderName}
else:
data = {"parentFolderId": parentFolderId}
# response = requests.post("https://api.gofile.io/contents/createFolder", headers=headers, json=data).json()
response = reqst("https://api.gofile.io/contents/createFolder", headers=headers, json=data, logger=logger, method="post")
if response["status"] == "ok":
name = response["data"]["name"]
code = response["data"]["code"]
folderId = response["data"]["id"]
logger.debug(f"""Folder {name} created with code {code} and folderId {folderId}""")
return folderId
else:
logger.error(f"{response}")
return None
def read_in_chunks(file_object, CHUNK_SIZE):
while True:
data = file_object.read(CHUNK_SIZE)
if not data:
break
yield data
def uploadfile(serverName, folderId, filePath, logger):
# reference : https://api.video/blog/tutorials/upload-a-big-video-file-using-python/
start_time = time.time()
# CHUNK_SIZE = 6000000
# content_size = os.stat(filePath).st_size
# f = open(filePath, "rb")
# index = 0
# offset = 0
# headers = {"Authorization": f"Bearer {TOKEN}", 'content-type': 'multipart/form-data',}
# with tqdm(total=content_size, unit='B', unit_scale=True, desc='Uploading', leave=False) as progress_bar:
# for chunk in read_in_chunks(f, CHUNK_SIZE):
# offset = index + len(chunk)
# headers['Content-Range'] = 'bytes %s-%s/%s' % (index, offset - 1, content_size)
# index = offset
# try:
# # file = {"file": chunk, 'folderId': (None, folderId)}
# # response = requests.post(f"https://{serverName}.gofile.io/contents/uploadfile", files=file, headers=headers)
# # files = {"file": chunk}
# # data = {"folderId": folderId}
# files = {"file": chunk,}
# response = requests.post(f"https://{serverName}.gofile.io/contents/uploadfile",files=files,headers=headers)
# logger.debug("r: %s, Content-Range: %s" % (response, headers['Content-Range']))
# progress_bar.update(len(chunk))
# except Exception as e:
# logger.error(f"Error: {e}")
# logger.debug(f"{response.text}")
# response = response.json()
command = [
"curl",
"-X", "POST",
f"https://{serverName}.gofile.io/contents/uploadfile",
"-H", f"Authorization: Bearer {TOKEN}",
"-F", f"file=@{filePath}",
"-F", f"folderId={folderId}",
]
response = subprocess.run(command, capture_output=True, text=True)
try:
response_json = json.loads(response.stdout)
except json.JSONDecodeError:
logger.error("Failed to parse response as JSON.")
return None
speed, elapsed_time = calculate_upload_speed(filePath, start_time)
response = response_json
if response["status"] == "ok":
logger.debug(response)
name = response["data"]["name"]
downloadPage = response["data"]["downloadPage"]
parentFolderId = response["data"]["parentFolder"]
logger.debug(f"""File {name} uploaded to {downloadPage}""")
return downloadPage, parentFolderId, speed, elapsed_time
else:
logger.error(f"{response}")
return None
def actionFolder(folderId, attributeValue, logger):
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}"}
data = {"attribute": "public", "attributeValue": attributeValue}
# response = requests.put(f"https://api.gofile.io/contents/{folderId}/update", headers=headers, json=data).json()
response = reqst(f"https://api.gofile.io/contents/{folderId}/update", headers=headers, json=data, logger=logger, method="put")
if response["status"] == "ok":
return True
else:
return response
def upload(filePath, folderPath, folderName, parentFolderId, private, logger):
files = []
logger.info("Starting upload")
logger.debug("File: %s", filePath)
if folderPath:
files = get_file_paths(folderPath)
if not files:
logger.error("No files found in folder")
sys.exit()
else:
if os.path.exists(filePath):
files = [filePath]
else:
logger.error("File not found")
sys.exit()
# Getting servers
servers = getservers(logger)
if servers:
if len(servers) > 1: # If there are multiple servers, check the size of the files
if max([os.path.getsize(file) for file in files]) > 100 * 1024 * 1024: # 100 MB in bytes
logger.debug("One of the file have a size > 100 MB. Fetching best server...")
serverName = test_servers(servers, logger)
else:
serverName = random.choice(servers)
else:
serverName = servers[0]
logger.debug(f"Selected server: {serverName}")
if folderName and parentFolderId:
logger.info(f"Creating folder: {folderName} for: {parentFolderId}")
folderId = createfolder(parentFolderId, folderName, logger)
logger.debug(f"FolderId: {folderId}")
parentFolderId = folderId
elif folderName:
logger.info(f"Creating folder: {folderName} for PRIVATE_PARENT_ID: {PRIVATE_PARENT_ID}")
folderId = createfolder(PRIVATE_PARENT_ID, folderName, logger)
logger.debug(f"FolderId: {folderId}")
parentFolderId = folderId
elif parentFolderId and not folderName:
parentFolderId = parentFolderId
logger.debug(f"FolderId: {parentFolderId}")
else:
# parentFolderId = PRIVATE_PARENT_ID
logger.info(f"Creating folder: {folderName} for PRIVATE_PARENT_ID: {PRIVATE_PARENT_ID}")
folderId = createfolder(PRIVATE_PARENT_ID, None, logger)
parentFolderId = folderId
logger.debug(f"FolderId: {parentFolderId}")
for file in files:
if parentFolderId:
logger.info(f"Uploading file: '{file}' ({file_size(file)}) to: '{parentFolderId}' on: '{serverName}'")
else:
logger.info(f"Uploading file: '{file}' ({file_size(file)}) on: '{serverName}'")
downloadPage, parentFolderId, speed, elapsed_time = uploadfile(serverName, parentFolderId, file, logger)
logger.info(f"File uploaded to: {downloadPage} in {elapsed_time} at {speed}")
if not private:
action = actionFolder(parentFolderId, "true", logger)
if action:
logger.info("Folder made public")
else:
logger.error(f"{action}")
else:
action = actionFolder(parentFolderId, "false", logger)
if action:
logger.info("Folder made private")
else:
logger.error(f"{action}")
play_sound(logger)
else:
time.sleep(10)
sys.exit()
def downloadFile(downloadUrl, path, logger):
start_time = time.time()
headers = {"Authorization": f"Bearer {TOKEN}"}
response = requests.get(downloadUrl, headers=headers, stream=True)
# response = reqst(downloadUrl, headers=headers, logger=logger, method="get")
total_size = int(response.headers.get('content-length', 0))
with open(path, "wb") as f, tqdm(total=total_size, unit='B', unit_scale=True, desc='Downloading', leave=False) as progress_bar:
for chunk in response.iter_content(1024):
if chunk:
f.write(chunk)
progress_bar.update(len(chunk))
logger.debug(f"File downloaded: {path}")
speed, elapsed_time = calculate_upload_speed(path, start_time)
return speed, elapsed_time
def download(folderId, folderPath, force, logger):
if 'https' in folderId:
folderId = folderId.split('/')[-1]
if len(folderId) == 36:
folderId = get_code(folderId)
files = []
logger.info("Fetching files")
logger.debug("FolderId: %s", folderId)
folderIdList = [folderId]
while folderIdList:
for folderId in folderIdList:
children = get_children(folderId, logger)
folderIdList.remove(folderId)
for child in children:
if children[child]['type'] == "file":
files.append(children[child])
else:
folderIdList.append(children[child]['id'])
nbdone = 0
logger.info(f"Starting download of {len(files)} files")
for file in files:
logger.info(f"Downloading file {nbdone}/{len(files)}: {file['name']} ({file_size(num_bytes=file['size'])})")
downloadUrl = file['link']
name = file['name']
if folderPath:
folderPath = check_folderPath(folderPath)
else:
folderPath = check_folderPath(os.path.join(os.getcwd(), folderId))
path = os.path.join(folderPath, name)
if os.path.exists(path) and not force:
logger.warning(f"File {name} already exists skipping (set --force to overwrite)")
elif os.path.exists(path) and force:
logger.warning(f"File {name} already exists overwriting")
speed, elapsed_time = downloadFile(downloadUrl, path, logger)
logger.info(f"File download to: {path} in {elapsed_time} at {speed}")
else:
speed, elapsed_time = downloadFile(downloadUrl, path, logger)
logger.info(f"File download to: {path} in {elapsed_time} at {speed}")
nbdone += 1
play_sound(logger)
def opt():
parser = argparse.ArgumentParser(description="Upload or download a file to GoFile.")
exclusive_group = parser.add_mutually_exclusive_group()
exclusive_group.add_argument("--file", "-i", type=str, help="Path to the file to be uploaded")
exclusive_group.add_argument("--folder", "-f", type=str, help="Path to the folder to be uploaded")
parser.add_argument("--name", "-n", type=str, help="Name of the folder on the server")
parser.add_argument("--parent", "-p", type=str, help="Folder id to upload the file to")
parser.add_argument("--private","-pr",action="store_true",help="Upload to private folder default=False",)
parser.add_argument("--log-level",type=str,choices=["DEBUG", "ERROR", "INFO", "OFF", "WARN"],default="INFO",help="Set log level [default: INFO]",)
parser.add_argument("--token", "-tk", type=str, help="GoFile API token")
parser.add_argument("--private-parent-id", "-pp", type=str, help="GoFile private parent id")
exclusive_group.add_argument('--stats', "-s", action='store_true', help='Display account stats.')
exclusive_group.add_argument("--download", "-d", type=str, help="Id or code to the folder to be downloaded")
parser.add_argument("--output", "-o", type=str, help="¨Path to the folder to be downloaded")
parser.add_argument("--force", "-fo", action="store_true", help="Overwrite existing files")
return parser.parse_args()
def init():
args = opt()
log_format = "%(asctime)s %(levelname)s: %(message)s"
logging.basicConfig(level=getattr(logging, args.log_level.upper()),format=log_format,datefmt="%H:%M:%S",)
logger = logging.getLogger(__name__)
load_dotenv()
global TOKEN
global PRIVATE_PARENT_ID
global ACCOUNT_ID
TOKEN = os.getenv("GOPLOAD_TOKEN")
PRIVATE_PARENT_ID = os.getenv("GOPLOAD_PRIVATE_PARENT_ID")
ACCOUNT_ID = os.getenv("GOPLOAD_ACCOUNT_ID")
if not TOKEN:
if args.token:
TOKEN = args.token
# set_key(key_to_set='GOPLOAD_TOKEN', value_to_set=args.token)
set_env_var("GOPLOAD_TOKEN", args.token)
else:
logger.error("Error: GOPLOAD_TOKEN not found, add GOPLOAD_TOKEN to your environment variables")
sys.exit()
if not PRIVATE_PARENT_ID:
if args.private_parent_id:
PRIVATE_PARENT_ID = args.private_parent_id
# set_key(key_to_set='GOPLOAD_PRIVATE_PARENT_ID', value_to_set=args.private_parent_id)
set_env_var("GOPLOAD_PRIVATE_PARENT_ID", args.private_parent_id)
elif TOKEN:
PRIVATE_PARENT_ID = get_rootfolder(logger)
# set_key(key_to_set='GOPLOAD_PRIVATE_PARENT_ID', value_to_set=args.private_parent_id)
set_env_var("GOPLOAD_PRIVATE_PARENT_ID", args.private_parent_id)
else:
logger.error("Error: GOPLOAD_PRIVATE_PARENT_ID not found, add GOPLOAD_PRIVATE_PARENT_ID to your environment variables")
sys.exit()
# No params:
if len(sys.argv) == 1:
logger.error("No arguments specified. Use -h for help")
sys.exit("")
# Stats section
if args.stats:
if len(sys.argv) == 2:
get_stats(logger)
sys.exit()
else:
logger.error("Use --stats without any other argument")
# Upload section
elif args.file:
if args.folder:
logger.error("Both file and folder specified")
sys.exit()
else:
upload(args.file, args.folder, args.name, args.parent, args.private, logger)
elif args.folder:
if args.file:
logger.error("Both file and folder specified")
sys.exit()
else:
upload(args.file, args.folder, args.name, args.parent, args.private, logger)
elif args.name:
if not args.parent:
logger.warning("Parent folder id not specified, GOPLOAD_PRIVATE_PARENT_ID will be used")
# Download section
elif args.download:
download(args.download, args.output, args.force, logger)
if __name__ == "__main__":
try:
init()
except KeyboardInterrupt:
print("\nExiting...")
sys.exit()