MCP bridge: support multiple concurrent clients

Replace single-client model (m_client/m_readBuffer/m_initialized)
with a ClientState vector. Each client gets its own read buffer and
initialized flag. Responses route to m_currentSender (set during
request processing); notifications broadcast to all initialized
clients.

Re-entrancy guard in onReadyRead: re-resolve ClientState after each
processLine() call since sendJson flush can re-enter the event loop
and trigger onDisconnected, removing the client mid-iteration.

Tests: 378-line test_mcp exercising connect, initialize, tools/list,
disconnect one client, notification broadcast, and serial requests
against a MockMcpServer with the same multi-client architecture.
This commit is contained in:
noita-player
2026-03-08 20:49:59 -07:00
parent 51de48a6ed
commit 4d0782db68
5 changed files with 549 additions and 62 deletions

View File

@@ -69,14 +69,15 @@ void McpBridge::start() {
}
void McpBridge::stop() {
if (m_client) {
m_client->disconnect(this);
m_client->disconnectFromServer();
m_client->deleteLater();
m_client = nullptr;
for (auto& c : m_clients) {
c.socket->disconnect(this);
c.socket->disconnectFromServer();
c.socket->deleteLater();
}
m_readBuffer.clear();
m_initialized = false;
m_clients.clear();
m_currentSender = nullptr;
m_processing = false;
m_pendingRequests.clear();
if (m_server) {
m_server->close();
delete m_server;
@@ -88,56 +89,89 @@ void McpBridge::stop() {
// Connection handling
// ════════════════════════════════════════════════════════════════════
McpBridge::ClientState* McpBridge::findClient(QLocalSocket* sock) {
for (auto& c : m_clients)
if (c.socket == sock) return &c;
return nullptr;
}
void McpBridge::removeClient(QLocalSocket* sock) {
for (int i = 0; i < m_clients.size(); ++i) {
if (m_clients[i].socket == sock) {
sock->disconnect(this);
sock->deleteLater();
m_clients.removeAt(i);
return;
}
}
}
void McpBridge::onNewConnection() {
auto* pending = m_server->nextPendingConnection();
if (!pending) return;
// Single client — disconnect previous
if (m_client) {
m_client->disconnect(this);
m_client->disconnectFromServer();
m_client->deleteLater();
m_client = nullptr;
}
m_clients.append({pending, {}, false});
m_client = pending;
m_readBuffer.clear();
m_initialized = false;
connect(m_client, &QLocalSocket::readyRead,
connect(pending, &QLocalSocket::readyRead,
this, &McpBridge::onReadyRead);
connect(m_client, &QLocalSocket::disconnected,
connect(pending, &QLocalSocket::disconnected,
this, &McpBridge::onDisconnected);
qDebug() << "[MCP] Client connected";
qDebug() << "[MCP] Client connected (" << m_clients.size() << "total)";
}
void McpBridge::onReadyRead() {
if (!m_client) return;
m_readBuffer.append(m_client->readAll());
auto* sock = qobject_cast<QLocalSocket*>(sender());
auto* cs = findClient(sock);
if (!cs) return;
cs->readBuffer.append(sock->readAll());
// Newline-delimited JSON framing
// Guard: processLine→sendJson→flush can re-enter the event loop
// and trigger onDisconnected, nulling m_client mid-loop.
while (m_client) {
int idx = m_readBuffer.indexOf('\n');
// Extract complete lines from this client's buffer.
// If a request is already in flight (m_processing), queue the line
// instead of processing it -- nested event loops in scanner/tree.apply
// would otherwise let interleaved requests clobber m_currentSender.
while (findClient(sock)) {
cs = findClient(sock);
int idx = cs->readBuffer.indexOf('\n');
if (idx < 0) break;
QByteArray line = m_readBuffer.left(idx).trimmed();
m_readBuffer.remove(0, idx + 1);
if (!line.isEmpty())
processLine(line);
QByteArray line = cs->readBuffer.left(idx).trimmed();
cs->readBuffer.remove(0, idx + 1);
if (line.isEmpty()) continue;
if (m_processing) {
m_pendingRequests.append({sock, line});
continue;
}
m_processing = true;
m_currentSender = sock;
processLine(line);
m_currentSender = nullptr;
m_processing = false;
drainPendingRequests();
}
}
void McpBridge::drainPendingRequests() {
while (!m_pendingRequests.isEmpty()) {
auto req = m_pendingRequests.takeFirst();
if (!findClient(req.socket)) continue; // client disconnected while queued
m_processing = true;
m_currentSender = req.socket;
processLine(req.line);
m_currentSender = nullptr;
m_processing = false;
}
}
void McpBridge::onDisconnected() {
qDebug() << "[MCP] Client disconnected";
if (m_client) {
m_client->disconnect(this);
m_client->deleteLater();
m_client = nullptr;
}
m_readBuffer.clear();
m_initialized = false;
auto* sock = qobject_cast<QLocalSocket*>(sender());
qDebug() << "[MCP] Client disconnected (" << m_clients.size() - 1 << "remaining)";
// Purge any queued requests from this client
m_pendingRequests.erase(
std::remove_if(m_pendingRequests.begin(), m_pendingRequests.end(),
[sock](const PendingRequest& r) { return r.socket == sock; }),
m_pendingRequests.end());
removeClient(sock);
}
// ════════════════════════════════════════════════════════════════════
@@ -161,18 +195,26 @@ QJsonObject McpBridge::errReply(const QJsonValue& id, int code, const QString& m
}
void McpBridge::sendJson(const QJsonObject& obj) {
if (!m_client) return;
QLocalSocket* target = m_currentSender;
if (!target || !findClient(target)) return;
QByteArray data = QJsonDocument(obj).toJson(QJsonDocument::Compact);
qDebug() << "[MCP] >>" << data.left(200);
data.append('\n');
m_client->write(data);
if (m_client) m_client->flush();
target->write(data);
target->flush();
}
void McpBridge::sendNotification(const QString& method, const QJsonObject& params) {
QJsonObject n{{"jsonrpc", "2.0"}, {"method", method}};
if (!params.isEmpty()) n["params"] = params;
sendJson(n);
QByteArray data = QJsonDocument(n).toJson(QJsonDocument::Compact);
data.append('\n');
for (auto& c : m_clients) {
if (c.initialized) {
c.socket->write(data);
c.socket->flush();
}
}
}
QJsonObject McpBridge::makeTextResult(const QString& text, bool isError) {
@@ -229,7 +271,7 @@ void McpBridge::processLine(const QByteArray& line) {
// ════════════════════════════════════════════════════════════════════
QJsonObject McpBridge::handleInitialize(const QJsonValue& id, const QJsonObject&) {
m_initialized = true;
if (auto* cs = findClient(m_currentSender)) cs->initialized = true;
QJsonObject caps;
caps["tools"] = QJsonObject{{"listChanged", false}};
@@ -566,6 +608,22 @@ QJsonObject McpBridge::handleToolsList(const QJsonValue& id) {
}}
});
// process.info
tools.append(QJsonObject{
{"name", "process.info"},
{"description", "Returns PEB address and enumerates all Thread Environment Blocks (TEBs) for the attached process. "
"TEBs are discovered via NtQuerySystemInformation and NtQueryInformationThread. "
"Each TEB entry includes: address, threadId. "
"Requires a live process provider with PEB support."},
{"inputSchema", QJsonObject{
{"type", "object"},
{"properties", QJsonObject{
{"tabIndex", QJsonObject{{"type", "integer"},
{"description", "MDI tab index (0-based). Omit for active tab."}}}
}}
}}
});
return okReply(id, QJsonObject{{"tools", tools}});
}
@@ -595,6 +653,7 @@ QJsonObject McpBridge::handleToolsCall(const QJsonValue& id, const QJsonObject&
else if (toolName == "scanner.scan") result = toolScannerScan(args);
else if (toolName == "scanner.scan_pattern") result = toolScannerScanPattern(args);
else if (toolName == "mcp.reconnect") result = toolReconnect(args);
else if (toolName == "process.info") result = toolProcessInfo(args);
else return errReply(id, -32601, "Unknown tool: " + toolName);
m_mainWindow->clearMcpStatus();
@@ -1156,12 +1215,6 @@ QJsonObject McpBridge::toolHexRead(const QJsonObject& args) {
if (baseRel)
offset += (int64_t)tab->doc->tree.baseAddress;
qDebug() << "[hex_read] arg offset" << (args.value("offset").isString() ? "str" : "num")
<< (args.value("offset").isString() ? args.value("offset").toString() : QString())
<< Qt::showbase << Qt::hex << (quint64)offset
<< "baseRelative" << baseRel << "tree.base" << (quint64)tab->doc->tree.baseAddress
<< "final addr" << (quint64)offset << Qt::dec;
if (offset < 0 || !prov->isReadable((uint64_t)offset, length))
return makeTextResult("Cannot read at offset " + QString::number(offset), true);
@@ -1667,29 +1720,61 @@ QJsonObject McpBridge::toolScannerScanPattern(const QJsonObject& args) {
// ════════════════════════════════════════════════════════════════════
QJsonObject McpBridge::toolReconnect(const QJsonObject&) {
if (!m_client)
QLocalSocket* sock = m_currentSender;
if (!sock)
return makeTextResult("No client connected.", true);
// Disconnect after this response is sent so the client receives the result
QTimer::singleShot(0, this, [this]() {
if (m_client) {
m_client->disconnectFromServer();
}
QTimer::singleShot(0, this, [this, sock]() {
if (findClient(sock))
sock->disconnectFromServer();
});
return makeTextResult("Disconnected. The MCP client will exit; your IDE may restart it and reconnect to Reclass.");
}
// ════════════════════════════════════════════════════════════════════
// TOOL: process.info — PEB address + TEB enumeration
// ════════════════════════════════════════════════════════════════════
QJsonObject McpBridge::toolProcessInfo(const QJsonObject& args) {
auto* tab = resolveTab(args);
if (!tab) return makeTextResult("No active tab", true);
auto* prov = tab->doc->provider.get();
if (!prov) return makeTextResult("No data source attached", true);
if (!prov->isLive()) return makeTextResult("Not a live provider", true);
uint64_t pebAddr = prov->peb();
if (!pebAddr) return makeTextResult("PEB not available for this provider", true);
QJsonObject out;
out["peb"] = "0x" + QString::number(pebAddr, 16).toUpper();
auto tebList = prov->tebs();
QJsonArray tebArr;
for (const auto& t : tebList) {
tebArr.append(QJsonObject{
{"address", "0x" + QString::number(t.tebAddress, 16).toUpper()},
{"threadId", (qint64)t.threadId}
});
}
out["tebs"] = tebArr;
out["tebCount"] = tebArr.size();
return makeTextResult(QString::fromUtf8(QJsonDocument(out).toJson(QJsonDocument::Indented)));
}
// ════════════════════════════════════════════════════════════════════
// Notifications (call from MainWindow/Controller hooks)
// ════════════════════════════════════════════════════════════════════
void McpBridge::notifyTreeChanged() {
if (!m_client || !m_initialized) return;
if (m_clients.isEmpty()) return;
sendNotification("notifications/resources/updated",
QJsonObject{{"uri", "project://tree"}});
}
void McpBridge::notifyDataChanged() {
if (!m_client || !m_initialized) return;
if (m_clients.isEmpty()) return;
sendNotification("notifications/resources/updated",
QJsonObject{{"uri", "project://data"}});
}