2024-01-21 21:19:01 +01:00

380 lines
14 KiB
Python

import faulthandler
import logging
import os
from dotenv import load_dotenv
load_dotenv()
import signal
import sys
import time
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Any, Iterable
from prometheus_client import start_http_server
from prometheus_client.core import REGISTRY, CounterMetricFamily, GaugeMetricFamily
from pythonjsonlogger import jsonlogger
from qbittorrentapi import Client, TorrentStates
# Enable dumps on stderr in case of segfault
faulthandler.enable()
logger = logging.getLogger()
class MetricType(Enum):
"""
Represents possible metric types (used in this project).
"""
GAUGE = auto()
COUNTER = auto()
@dataclass
class Metric:
"""
Contains data and metadata about a single counter or gauge.
"""
name: str
value: Any
labels: dict[str, str] = field(default_factory=lambda: {}) # Default to empty dict
help_text: str = ""
metric_type: MetricType = MetricType.GAUGE
class QbittorrentMetricsCollector:
def __init__(self, config: dict) -> None:
self.config = config
self.client = Client(
host=config["host"],
port=config["port"],
username=config["username"],
password=config["password"],
VERIFY_WEBUI_CERTIFICATE=config["verify_webui_certificate"],
)
def collect(self) -> Iterable[GaugeMetricFamily | CounterMetricFamily]:
"""
Yields Prometheus gauges and counters from metrics collected from qbittorrent.
"""
metrics: list[Metric] = self.get_qbittorrent_metrics()
for metric in metrics:
if metric.metric_type == MetricType.COUNTER:
prom_metric = CounterMetricFamily(
metric.name, metric.help_text, labels=list(metric.labels.keys())
)
else:
prom_metric = GaugeMetricFamily(
metric.name, metric.help_text, labels=list(metric.labels.keys())
)
prom_metric.add_metric(
value=metric.value, labels=list(metric.labels.values())
)
yield prom_metric
def get_qbittorrent_metrics(self) -> list[Metric]:
"""
Calls and combines qbittorrent state metrics with torrent metrics.
"""
metrics: list[Metric] = []
metrics.extend(self._get_qbittorrent_status_metrics())
metrics.extend(self._get_qbittorrent_torrent_tags_metrics())
return metrics
def _get_qbittorrent_status_metrics(self) -> list[Metric]:
"""
Returns metrics about the state of the qbittorrent server.
"""
maindata: dict[str, Any] = {}
version: str = ""
# Fetch data from API
try:
maindata = self.client.sync_maindata()
version = self.client.app.version
except Exception as e:
logger.error(f"Couldn't get server info: {e}")
server_state = maindata.get('server_state', {})
return [
Metric(
name=f"{self.config['metrics_prefix']}_up",
value=bool(server_state),
labels={"version": version},
help_text=(
"Whether the qBittorrent server is answering requests from this"
" exporter. A `version` label with the server version is added."
),
),
Metric(
name=f"{self.config['metrics_prefix']}_connected",
value=server_state.get("connection_status", "") == "connected",
labels={}, # no labels in the example
help_text=(
"Whether the qBittorrent server is connected to the Bittorrent"
" network."
),
),
Metric(
name=f"{self.config['metrics_prefix']}_firewalled",
value=server_state.get("connection_status", "") == "firewalled",
labels={}, # no labels in the example
help_text=(
"Whether the qBittorrent server is connected to the Bittorrent"
" network but is behind a firewall."
),
),
Metric(
name=f"{self.config['metrics_prefix']}_dht_nodes",
value=server_state.get("dht_nodes", 0),
labels={}, # no labels in the example
help_text="Number of DHT nodes connected to.",
),
Metric(
name=f"{self.config['metrics_prefix']}_dl_info_data",
value=server_state.get("dl_info_data", 0),
labels={}, # no labels in the example
help_text="Data downloaded since the server started, in bytes.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_up_info_data",
value=server_state.get("up_info_data", 0),
labels={}, # no labels in the example
help_text="Data uploaded since the server started, in bytes.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_alltime_dl",
value=server_state.get("alltime_dl", 0),
labels={}, # no labels in the example
help_text="Total data downloaded, in bytes.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_alltime_ul",
value=server_state.get("alltime_ul", 0),
labels={}, # no labels in the example
help_text="Total data uploaded, in bytes.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_total_peer_connections",
value=server_state.get("total_peer_connections", 0),
labels={}, # no labels in the example
help_text="total_peer_connections.",
metric_type=MetricType.COUNTER,
),
#### Disk metrics
Metric(
name=f"{self.config['metrics_prefix']}_write_cache_overload",
value=server_state.get("write_cache_overload", 0),
labels={}, # no labels in the example
help_text="write_cache_overload.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_read_cache_overload",
value=server_state.get("read_cache_overload", 0),
labels={}, # no labels in the example
help_text="read_cache_overload.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_read_cache_hits",
value=server_state.get("read_cache_hits", 0),
labels={}, # no labels in the example
help_text="read_cache_hits.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_average_time_queue",
value=server_state.get("average_time_queue", 0),
labels={}, # no labels in the example
help_text="average_time_queue.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_free_space_on_disk",
value=server_state.get("free_space_on_disk", 0),
labels={}, # no labels in the example
help_text="free_space_on_disk.",
metric_type=MetricType.COUNTER,
),
Metric(
name=f"{self.config['metrics_prefix']}_queued_io_jobs",
value=server_state.get("queued_io_jobs", 0),
labels={}, # no labels in the example
help_text="queued_io_jobs.",
metric_type=MetricType.COUNTER,
),
]
def _fetch_categories(self) -> dict:
"""Fetches all categories in use from qbittorrent."""
try:
categories = dict(self.client.torrent_categories.categories)
for key, value in categories.items():
categories[key] = dict(value) # type: ignore
return categories
except Exception as e:
logger.error(f"Couldn't fetch categories: {e}")
return {}
def _fetch_torrents(self) -> list[dict]:
"""Fetches torrents from qbittorrent"""
try:
return [dict(_attr_dict) for _attr_dict in self.client.torrents.info()]
except Exception as e:
logger.error(f"Couldn't fetch torrents: {e}")
return []
def _filter_torrents_by_category(
self, category: str, torrents: list[dict]
) -> list[dict]:
"""Filters torrents by the given category."""
return [
torrent
for torrent in torrents
if torrent["category"] == category
or (category == "Uncategorized" and torrent["category"] == "")
]
def _filter_torrents_by_state(
self, state: TorrentStates, torrents: list[dict]
) -> list[dict]:
"""Filters torrents by the given state."""
return [torrent for torrent in torrents if torrent["state"] == state.value]
def _construct_metric(self, state: str, category: str, count: int) -> Metric:
"""Constructs and returns a metric object with a torrent count and appropriate
labels."""
return Metric(
name=f"{self.config['metrics_prefix']}_torrents_count",
value=count,
labels={
"status": state,
"category": category,
},
help_text=f"Number of torrents in status {state} under category {category}",
)
def _get_qbittorrent_torrent_tags_metrics(self) -> list[Metric]:
categories = self._fetch_categories()
torrents = self._fetch_torrents()
metrics: list[Metric] = []
categories["Uncategorized"] = {"name": "Uncategorized", "savePath": ""}
for category in categories:
category_torrents = self._filter_torrents_by_category(category, torrents)
for state in TorrentStates:
state_torrents = self._filter_torrents_by_state(
state, category_torrents
)
metric = self._construct_metric(
state.value, category, len(state_torrents)
)
metrics.append(metric)
return metrics
class ShutdownSignalHandler:
def __init__(self):
self.shutdown_count: int = 0
# Register signal handler
signal.signal(signal.SIGINT, self._on_signal_received)
signal.signal(signal.SIGTERM, self._on_signal_received)
def is_shutting_down(self):
return self.shutdown_count > 0
def _on_signal_received(self, signal, frame):
if self.shutdown_count > 1:
logger.warn("Forcibly killing exporter")
sys.exit(1)
logger.info("Exporter is shutting down")
self.shutdown_count += 1
def _get_config_value(key: str, default: str = "") -> str:
input_path = os.environ.get("FILE__" + key, None)
if input_path is not None:
try:
with open(input_path, "r") as input_file:
return input_file.read().strip()
except IOError as e:
logger.error(f"Unable to read value for {key} from {input_path}: {str(e)}")
return os.environ.get(key, default)
def get_config() -> dict:
"""Loads all config values."""
return {
"host": _get_config_value("QBITTORRENT_HOST", ""),
"port": _get_config_value("QBITTORRENT_PORT", ""),
"username": _get_config_value("QBITTORRENT_USER", ""),
"password": _get_config_value("QBITTORRENT_PASS", ""),
"exporter_port": int(_get_config_value("EXPORTER_PORT", "8000")),
"log_level": _get_config_value("EXPORTER_LOG_LEVEL", "INFO"),
"metrics_prefix": _get_config_value("METRICS_PREFIX", "qbittorrent"),
"verify_webui_certificate": (
_get_config_value("VERIFY_WEBUI_CERTIFICATE", "True") == "True"
),
}
def main():
# Init logger so it can be used
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
"%(asctime) %(levelname) %(message)", datefmt="%Y-%m-%d %H:%M:%S"
)
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
logger.setLevel("INFO") # default until config is loaded
config = get_config()
# set level once config has been loaded
logger.setLevel(config["log_level"])
# Register signal handler
signal_handler = ShutdownSignalHandler()
if not config["host"]:
logger.error(
"No host specified, please set QBITTORRENT_HOST environment variable"
)
sys.exit(1)
if not config["port"]:
logger.error(
"No port specified, please set QBITTORRENT_PORT environment variable"
)
sys.exit(1)
# Register our custom collector
logger.info("Exporter is starting up")
REGISTRY.register(QbittorrentMetricsCollector(config)) # type: ignore
# Start server
start_http_server(config["exporter_port"])
logger.info(f"Exporter listening on port {config['exporter_port']}")
while not signal_handler.is_shutting_down():
time.sleep(1)
logger.info("Exporter has shutdown")
if __name__ == "__main__":
main()