fix: improve server stats / rbac

This commit is contained in:
LukeGus
2025-12-21 03:09:55 -06:00
parent bfda81f9c2
commit 28729e3de2
14 changed files with 494 additions and 131 deletions

View File

@@ -201,11 +201,12 @@ class SSHConnectionPool {
private cleanupInterval: NodeJS.Timeout;
constructor() {
// Reduce cleanup interval from 5 minutes to 2 minutes for faster dead connection removal
this.cleanupInterval = setInterval(
() => {
this.cleanup();
},
5 * 60 * 1000,
2 * 60 * 1000,
);
}
@@ -218,9 +219,22 @@ class SSHConnectionPool {
return `${host.ip}:${host.port}:${host.username}${socks5Key}`;
}
private isConnectionHealthy(client: Client): boolean {
try {
// Check if the connection has been destroyed or closed
// @ts-ignore - accessing internal property to check connection state
if (client._sock && (client._sock.destroyed || !client._sock.writable)) {
return false;
}
return true;
} catch (error) {
return false;
}
}
async getConnection(host: SSHHostWithCredentials): Promise<Client> {
const hostKey = this.getHostKey(host);
const connections = this.connections.get(hostKey) || [];
let connections = this.connections.get(hostKey) || [];
statsLogger.info("Getting connection from pool", {
operation: "get_connection_from_pool",
@@ -228,19 +242,39 @@ class SSHConnectionPool {
availableConnections: connections.length,
useSocks5: host.useSocks5,
socks5Host: host.socks5Host,
hasSocks5ProxyChain: !!(host.socks5ProxyChain && host.socks5ProxyChain.length > 0),
hasSocks5ProxyChain: !!(
host.socks5ProxyChain && host.socks5ProxyChain.length > 0
),
hostId: host.id,
});
// Find available connection and validate health
const available = connections.find((conn) => !conn.inUse);
if (available) {
statsLogger.info("Reusing existing connection from pool", {
operation: "reuse_connection",
hostKey,
});
available.inUse = true;
available.lastUsed = Date.now();
return available.client;
// Health check before reuse
if (!this.isConnectionHealthy(available.client)) {
statsLogger.warn("Removing unhealthy connection from pool", {
operation: "remove_dead_connection",
hostKey,
});
// Remove dead connection
try {
available.client.end();
} catch (error) {
// Ignore cleanup errors
}
connections = connections.filter((c) => c !== available);
this.connections.set(hostKey, connections);
// Fall through to create new connection
} else {
statsLogger.info("Reusing existing connection from pool", {
operation: "reuse_connection",
hostKey,
});
available.inUse = true;
available.lastUsed = Date.now();
return available.client;
}
}
if (connections.length < this.maxConnectionsPerHost) {
@@ -338,7 +372,8 @@ class SSHConnectionPool {
// Check if SOCKS5 proxy is enabled (either single proxy or chain)
if (
host.useSocks5 &&
(host.socks5Host || (host.socks5ProxyChain && host.socks5ProxyChain.length > 0))
(host.socks5Host ||
(host.socks5ProxyChain && host.socks5ProxyChain.length > 0))
) {
statsLogger.info("Using SOCKS5 proxy for connection", {
operation: "socks5_enabled",
@@ -346,7 +381,9 @@ class SSHConnectionPool {
hostPort: host.port,
socks5Host: host.socks5Host,
socks5Port: host.socks5Port,
hasChain: !!(host.socks5ProxyChain && host.socks5ProxyChain.length > 0),
hasChain: !!(
host.socks5ProxyChain && host.socks5ProxyChain.length > 0
),
chainLength: host.socks5ProxyChain?.length || 0,
});
@@ -383,7 +420,8 @@ class SSHConnectionPool {
statsLogger.error("SOCKS5 connection error", socks5Error, {
operation: "socks5_connection_error",
hostIp: host.ip,
errorMessage: socks5Error instanceof Error ? socks5Error.message : "Unknown",
errorMessage:
socks5Error instanceof Error ? socks5Error.message : "Unknown",
});
reject(
new Error(
@@ -476,13 +514,30 @@ class SSHConnectionPool {
private cleanup(): void {
const now = Date.now();
const maxAge = 10 * 60 * 1000;
let totalCleaned = 0;
let totalUnhealthy = 0;
for (const [hostKey, connections] of this.connections.entries()) {
const activeConnections = connections.filter((conn) => {
// Remove if idle for too long
if (!conn.inUse && now - conn.lastUsed > maxAge) {
try {
conn.client.end();
} catch (error) {}
totalCleaned++;
return false;
}
// Also remove if connection is unhealthy (even if recently used)
if (!this.isConnectionHealthy(conn.client)) {
statsLogger.warn("Removing unhealthy connection during cleanup", {
operation: "cleanup_unhealthy",
hostKey,
inUse: conn.inUse,
});
try {
conn.client.end();
} catch (error) {}
totalUnhealthy++;
return false;
}
return true;
@@ -494,6 +549,15 @@ class SSHConnectionPool {
this.connections.set(hostKey, activeConnections);
}
}
if (totalCleaned > 0 || totalUnhealthy > 0) {
statsLogger.info("Connection pool cleanup completed", {
operation: "cleanup_complete",
idleCleaned: totalCleaned,
unhealthyCleaned: totalUnhealthy,
remainingHosts: this.connections.size,
});
}
}
clearAllConnections(): void {
@@ -507,10 +571,14 @@ class SSHConnectionPool {
try {
conn.client.end();
} catch (error) {
statsLogger.error("Error closing connection during full cleanup", error, {
operation: "clear_all_error",
hostKey,
});
statsLogger.error(
"Error closing connection during full cleanup",
error,
{
operation: "clear_all_error",
hostKey,
},
);
}
}
}
@@ -533,18 +601,35 @@ class SSHConnectionPool {
class RequestQueue {
private queues = new Map<number, Array<() => Promise<unknown>>>();
private processing = new Set<number>();
private requestTimeout = 60000; // 60 second timeout for requests
async queueRequest<T>(hostId: number, request: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const queue = this.queues.get(hostId) || [];
queue.push(async () => {
return new Promise<T>((resolve, reject) => {
const wrappedRequest = async () => {
try {
const result = await request();
// Add timeout wrapper to prevent indefinite hanging
const result = await Promise.race<T>([
request(),
new Promise<never>((_, rej) =>
setTimeout(
() =>
rej(
new Error(
`Request timeout after ${this.requestTimeout}ms for host ${hostId}`,
),
),
this.requestTimeout,
),
),
]);
resolve(result);
} catch (error) {
reject(error);
}
});
};
const queue = this.queues.get(hostId) || [];
queue.push(wrappedRequest);
this.queues.set(hostId, queue);
this.processQueue(hostId);
});
@@ -561,12 +646,21 @@ class RequestQueue {
if (request) {
try {
await request();
} catch (error) {}
} catch (error) {
// Log errors but continue processing queue
statsLogger.debug("Request queue error", {
operation: "queue_request_error",
hostId,
error: error instanceof Error ? error.message : String(error),
});
}
}
}
this.processing.delete(hostId);
if (queue.length > 0) {
// Check if new items were added during processing
const currentQueue = this.queues.get(hostId);
if (currentQueue && currentQueue.length > 0) {
this.processQueue(hostId);
}
}
@@ -701,10 +795,87 @@ class AuthFailureTracker {
}
}
class PollingBackoff {
private failures = new Map<number, { count: number; nextRetry: number }>();
private baseDelay = 30000; // 30s base delay
private maxDelay = 600000; // 10 min max delay
private maxRetries = 5; // Max retry attempts before giving up
recordFailure(hostId: number): void {
const existing = this.failures.get(hostId) || { count: 0, nextRetry: 0 };
const delay = Math.min(
this.baseDelay * Math.pow(2, existing.count),
this.maxDelay,
);
this.failures.set(hostId, {
count: existing.count + 1,
nextRetry: Date.now() + delay,
});
statsLogger.debug("Recorded polling backoff", {
operation: "polling_backoff_recorded",
hostId,
failureCount: existing.count + 1,
nextRetryDelay: delay,
});
}
shouldSkip(hostId: number): boolean {
const backoff = this.failures.get(hostId);
if (!backoff) return false;
// If exceeded max retries, always skip
if (backoff.count >= this.maxRetries) {
return true;
}
// Otherwise check if we're still in backoff period
return Date.now() < backoff.nextRetry;
}
getBackoffInfo(hostId: number): string | null {
const backoff = this.failures.get(hostId);
if (!backoff) return null;
if (backoff.count >= this.maxRetries) {
return `Max retries exceeded (${backoff.count} failures) - polling suspended`;
}
const remainingMs = backoff.nextRetry - Date.now();
if (remainingMs > 0) {
const remainingSec = Math.ceil(remainingMs / 1000);
return `Retry in ${remainingSec}s (attempt ${backoff.count}/${this.maxRetries})`;
}
return null;
}
reset(hostId: number): void {
this.failures.delete(hostId);
statsLogger.debug("Reset polling backoff", {
operation: "polling_backoff_reset",
hostId,
});
}
cleanup(): void {
const maxAge = 60 * 60 * 1000; // 1 hour
const now = Date.now();
for (const [hostId, backoff] of this.failures.entries()) {
// Only cleanup if not at max retries and old enough
if (backoff.count < this.maxRetries && now - backoff.nextRetry > maxAge) {
this.failures.delete(hostId);
}
}
}
}
const connectionPool = new SSHConnectionPool();
const requestQueue = new RequestQueue();
const metricsCache = new MetricsCache();
const authFailureTracker = new AuthFailureTracker();
const pollingBackoff = new PollingBackoff();
const authManager = AuthManager.getInstance();
type HostStatus = "online" | "offline";
@@ -891,7 +1062,11 @@ class PollingManager {
}
try {
const isOnline = await tcpPing(refreshedHost.ip, refreshedHost.port, 5000);
const isOnline = await tcpPing(
refreshedHost.ip,
refreshedHost.port,
5000,
);
const statusEntry: StatusEntry = {
status: isOnline ? "online" : "offline",
lastChecked: new Date().toISOString(),
@@ -907,6 +1082,17 @@ class PollingManager {
}
private async pollHostMetrics(host: SSHHostWithCredentials): Promise<void> {
// Check if we should skip due to backoff
if (pollingBackoff.shouldSkip(host.id)) {
const backoffInfo = pollingBackoff.getBackoffInfo(host.id);
statsLogger.debug("Skipping metrics polling due to backoff", {
operation: "poll_metrics_skipped",
hostId: host.id,
backoffInfo,
});
return;
}
// Refresh host data from database to get latest SOCKS5 and other settings
const refreshedHost = await fetchHostById(host.id, host.userId);
if (!refreshedHost) {
@@ -928,17 +1114,24 @@ class PollingManager {
data: metrics,
timestamp: Date.now(),
});
// Reset backoff on successful collection
pollingBackoff.reset(refreshedHost.id);
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : String(error);
// Record failure for backoff
pollingBackoff.recordFailure(refreshedHost.id);
const latestConfig = this.pollingConfigs.get(refreshedHost.id);
if (latestConfig && latestConfig.statsConfig.metricsEnabled) {
const backoffInfo = pollingBackoff.getBackoffInfo(refreshedHost.id);
statsLogger.warn("Failed to collect metrics for host", {
operation: "metrics_poll_failed",
hostId: refreshedHost.id,
hostName: refreshedHost.name,
error: errorMessage,
backoff: backoffInfo,
});
}
}
@@ -1228,8 +1421,13 @@ async function resolveHostCredentials(
useSocks5: baseHost.useSocks5,
socks5Host: baseHost.socks5Host,
socks5Port: baseHost.socks5Port,
hasSocks5ProxyChain: !!(baseHost.socks5ProxyChain && (baseHost.socks5ProxyChain as any[]).length > 0),
proxyChainLength: baseHost.socks5ProxyChain ? (baseHost.socks5ProxyChain as any[]).length : 0,
hasSocks5ProxyChain: !!(
baseHost.socks5ProxyChain &&
(baseHost.socks5ProxyChain as any[]).length > 0
),
proxyChainLength: baseHost.socks5ProxyChain
? (baseHost.socks5ProxyChain as any[]).length
: 0,
});
return baseHost as unknown as SSHHostWithCredentials;
@@ -1735,6 +1933,7 @@ app.listen(PORT, async () => {
setInterval(
() => {
authFailureTracker.cleanup();
pollingBackoff.cleanup();
},
10 * 60 * 1000,
);