chore: cleanup files (possible RC)
This commit is contained in:
@@ -201,7 +201,6 @@ class SSHConnectionPool {
|
||||
private cleanupInterval: NodeJS.Timeout;
|
||||
|
||||
constructor() {
|
||||
// Reduce cleanup interval from 5 minutes to 2 minutes for faster dead connection removal
|
||||
this.cleanupInterval = setInterval(
|
||||
() => {
|
||||
this.cleanup();
|
||||
@@ -211,8 +210,6 @@ class SSHConnectionPool {
|
||||
}
|
||||
|
||||
private getHostKey(host: SSHHostWithCredentials): string {
|
||||
// Include SOCKS5 settings in the key to ensure separate connection pools
|
||||
// for direct connections vs SOCKS5 connections
|
||||
const socks5Key = host.useSocks5
|
||||
? `:socks5:${host.socks5Host}:${host.socks5Port}:${JSON.stringify(host.socks5ProxyChain || [])}`
|
||||
: "";
|
||||
@@ -221,9 +218,8 @@ class SSHConnectionPool {
|
||||
|
||||
private isConnectionHealthy(client: Client): boolean {
|
||||
try {
|
||||
// Check if the connection has been destroyed or closed
|
||||
// @ts-ignore - accessing internal property to check connection state
|
||||
if (client._sock && (client._sock.destroyed || !client._sock.writable)) {
|
||||
const sock = (client as any)._sock;
|
||||
if (sock && (sock.destroyed || !sock.writable)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@@ -236,28 +232,13 @@ class SSHConnectionPool {
|
||||
const hostKey = this.getHostKey(host);
|
||||
let connections = this.connections.get(hostKey) || [];
|
||||
|
||||
statsLogger.info("Getting connection from pool", {
|
||||
operation: "get_connection_from_pool",
|
||||
hostKey: hostKey,
|
||||
availableConnections: connections.length,
|
||||
useSocks5: host.useSocks5,
|
||||
socks5Host: host.socks5Host,
|
||||
hasSocks5ProxyChain: !!(
|
||||
host.socks5ProxyChain && host.socks5ProxyChain.length > 0
|
||||
),
|
||||
hostId: host.id,
|
||||
});
|
||||
|
||||
// Find available connection and validate health
|
||||
const available = connections.find((conn) => !conn.inUse);
|
||||
if (available) {
|
||||
// Health check before reuse
|
||||
if (!this.isConnectionHealthy(available.client)) {
|
||||
statsLogger.warn("Removing unhealthy connection from pool", {
|
||||
operation: "remove_dead_connection",
|
||||
hostKey,
|
||||
});
|
||||
// Remove dead connection
|
||||
try {
|
||||
available.client.end();
|
||||
} catch (error) {
|
||||
@@ -265,12 +246,7 @@ class SSHConnectionPool {
|
||||
}
|
||||
connections = connections.filter((c) => c !== available);
|
||||
this.connections.set(hostKey, connections);
|
||||
// Fall through to create new connection
|
||||
} else {
|
||||
statsLogger.info("Reusing existing connection from pool", {
|
||||
operation: "reuse_connection",
|
||||
hostKey,
|
||||
});
|
||||
available.inUse = true;
|
||||
available.lastUsed = Date.now();
|
||||
return available.client;
|
||||
@@ -278,10 +254,6 @@ class SSHConnectionPool {
|
||||
}
|
||||
|
||||
if (connections.length < this.maxConnectionsPerHost) {
|
||||
statsLogger.info("Creating new connection for pool", {
|
||||
operation: "create_new_connection",
|
||||
hostKey,
|
||||
});
|
||||
const client = await this.createConnection(host);
|
||||
const pooled: PooledConnection = {
|
||||
client,
|
||||
@@ -369,24 +341,11 @@ class SSHConnectionPool {
|
||||
try {
|
||||
const config = buildSshConfig(host);
|
||||
|
||||
// Check if SOCKS5 proxy is enabled (either single proxy or chain)
|
||||
if (
|
||||
host.useSocks5 &&
|
||||
(host.socks5Host ||
|
||||
(host.socks5ProxyChain && host.socks5ProxyChain.length > 0))
|
||||
) {
|
||||
statsLogger.info("Using SOCKS5 proxy for connection", {
|
||||
operation: "socks5_enabled",
|
||||
hostIp: host.ip,
|
||||
hostPort: host.port,
|
||||
socks5Host: host.socks5Host,
|
||||
socks5Port: host.socks5Port,
|
||||
hasChain: !!(
|
||||
host.socks5ProxyChain && host.socks5ProxyChain.length > 0
|
||||
),
|
||||
chainLength: host.socks5ProxyChain?.length || 0,
|
||||
});
|
||||
|
||||
try {
|
||||
const socks5Socket = await createSocks5Connection(
|
||||
host.ip,
|
||||
@@ -402,10 +361,6 @@ class SSHConnectionPool {
|
||||
);
|
||||
|
||||
if (socks5Socket) {
|
||||
statsLogger.info("SOCKS5 socket created successfully", {
|
||||
operation: "socks5_socket_ready",
|
||||
hostIp: host.ip,
|
||||
});
|
||||
config.sock = socks5Socket;
|
||||
client.connect(config);
|
||||
return;
|
||||
@@ -492,12 +447,6 @@ class SSHConnectionPool {
|
||||
const hostKey = this.getHostKey(host);
|
||||
const connections = this.connections.get(hostKey) || [];
|
||||
|
||||
statsLogger.info("Clearing all connections for host", {
|
||||
operation: "clear_host_connections",
|
||||
hostKey,
|
||||
connectionCount: connections.length,
|
||||
});
|
||||
|
||||
for (const conn of connections) {
|
||||
try {
|
||||
conn.client.end();
|
||||
@@ -519,7 +468,6 @@ class SSHConnectionPool {
|
||||
|
||||
for (const [hostKey, connections] of this.connections.entries()) {
|
||||
const activeConnections = connections.filter((conn) => {
|
||||
// Remove if idle for too long
|
||||
if (!conn.inUse && now - conn.lastUsed > maxAge) {
|
||||
try {
|
||||
conn.client.end();
|
||||
@@ -527,7 +475,6 @@ class SSHConnectionPool {
|
||||
totalCleaned++;
|
||||
return false;
|
||||
}
|
||||
// Also remove if connection is unhealthy (even if recently used)
|
||||
if (!this.isConnectionHealthy(conn.client)) {
|
||||
statsLogger.warn("Removing unhealthy connection during cleanup", {
|
||||
operation: "cleanup_unhealthy",
|
||||
@@ -549,23 +496,9 @@ class SSHConnectionPool {
|
||||
this.connections.set(hostKey, activeConnections);
|
||||
}
|
||||
}
|
||||
|
||||
if (totalCleaned > 0 || totalUnhealthy > 0) {
|
||||
statsLogger.info("Connection pool cleanup completed", {
|
||||
operation: "cleanup_complete",
|
||||
idleCleaned: totalCleaned,
|
||||
unhealthyCleaned: totalUnhealthy,
|
||||
remainingHosts: this.connections.size,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
clearAllConnections(): void {
|
||||
statsLogger.info("Clearing ALL connections from pool", {
|
||||
operation: "clear_all_connections",
|
||||
totalHosts: this.connections.size,
|
||||
});
|
||||
|
||||
for (const [hostKey, connections] of this.connections.entries()) {
|
||||
for (const conn of connections) {
|
||||
try {
|
||||
@@ -601,13 +534,12 @@ class SSHConnectionPool {
|
||||
class RequestQueue {
|
||||
private queues = new Map<number, Array<() => Promise<unknown>>>();
|
||||
private processing = new Set<number>();
|
||||
private requestTimeout = 60000; // 60 second timeout for requests
|
||||
private requestTimeout = 60000;
|
||||
|
||||
async queueRequest<T>(hostId: number, request: () => Promise<T>): Promise<T> {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const wrappedRequest = async () => {
|
||||
try {
|
||||
// Add timeout wrapper to prevent indefinite hanging
|
||||
const result = await Promise.race<T>([
|
||||
request(),
|
||||
new Promise<never>((_, rej) =>
|
||||
@@ -646,19 +578,11 @@ class RequestQueue {
|
||||
if (request) {
|
||||
try {
|
||||
await request();
|
||||
} catch (error) {
|
||||
// Log errors but continue processing queue
|
||||
statsLogger.debug("Request queue error", {
|
||||
operation: "queue_request_error",
|
||||
hostId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
} catch (error) {}
|
||||
}
|
||||
}
|
||||
|
||||
this.processing.delete(hostId);
|
||||
// Check if new items were added during processing
|
||||
const currentQueue = this.queues.get(hostId);
|
||||
if (currentQueue && currentQueue.length > 0) {
|
||||
this.processQueue(hostId);
|
||||
@@ -797,9 +721,9 @@ class AuthFailureTracker {
|
||||
|
||||
class PollingBackoff {
|
||||
private failures = new Map<number, { count: number; nextRetry: number }>();
|
||||
private baseDelay = 30000; // 30s base delay
|
||||
private maxDelay = 600000; // 10 min max delay
|
||||
private maxRetries = 5; // Max retry attempts before giving up
|
||||
private baseDelay = 30000;
|
||||
private maxDelay = 600000;
|
||||
private maxRetries = 5;
|
||||
|
||||
recordFailure(hostId: number): void {
|
||||
const existing = this.failures.get(hostId) || { count: 0, nextRetry: 0 };
|
||||
@@ -811,25 +735,16 @@ class PollingBackoff {
|
||||
count: existing.count + 1,
|
||||
nextRetry: Date.now() + delay,
|
||||
});
|
||||
|
||||
statsLogger.debug("Recorded polling backoff", {
|
||||
operation: "polling_backoff_recorded",
|
||||
hostId,
|
||||
failureCount: existing.count + 1,
|
||||
nextRetryDelay: delay,
|
||||
});
|
||||
}
|
||||
|
||||
shouldSkip(hostId: number): boolean {
|
||||
const backoff = this.failures.get(hostId);
|
||||
if (!backoff) return false;
|
||||
|
||||
// If exceeded max retries, always skip
|
||||
if (backoff.count >= this.maxRetries) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Otherwise check if we're still in backoff period
|
||||
return Date.now() < backoff.nextRetry;
|
||||
}
|
||||
|
||||
@@ -852,18 +767,13 @@ class PollingBackoff {
|
||||
|
||||
reset(hostId: number): void {
|
||||
this.failures.delete(hostId);
|
||||
statsLogger.debug("Reset polling backoff", {
|
||||
operation: "polling_backoff_reset",
|
||||
hostId,
|
||||
});
|
||||
}
|
||||
|
||||
cleanup(): void {
|
||||
const maxAge = 60 * 60 * 1000; // 1 hour
|
||||
const maxAge = 60 * 60 * 1000;
|
||||
const now = Date.now();
|
||||
|
||||
for (const [hostId, backoff] of this.failures.entries()) {
|
||||
// Only cleanup if not at max retries and old enough
|
||||
if (backoff.count < this.maxRetries && now - backoff.nextRetry > maxAge) {
|
||||
this.failures.delete(hostId);
|
||||
}
|
||||
@@ -906,7 +816,6 @@ interface SSHHostWithCredentials {
|
||||
updatedAt: string;
|
||||
userId: string;
|
||||
|
||||
// SOCKS5 Proxy configuration
|
||||
useSocks5?: boolean;
|
||||
socks5Host?: string;
|
||||
socks5Port?: number;
|
||||
@@ -1051,7 +960,6 @@ class PollingManager {
|
||||
}
|
||||
|
||||
private async pollHostStatus(host: SSHHostWithCredentials): Promise<void> {
|
||||
// Refresh host data from database to get latest settings
|
||||
const refreshedHost = await fetchHostById(host.id, host.userId);
|
||||
if (!refreshedHost) {
|
||||
statsLogger.warn("Host not found during status polling", {
|
||||
@@ -1082,18 +990,11 @@ class PollingManager {
|
||||
}
|
||||
|
||||
private async pollHostMetrics(host: SSHHostWithCredentials): Promise<void> {
|
||||
// Check if we should skip due to backoff
|
||||
if (pollingBackoff.shouldSkip(host.id)) {
|
||||
const backoffInfo = pollingBackoff.getBackoffInfo(host.id);
|
||||
statsLogger.debug("Skipping metrics polling due to backoff", {
|
||||
operation: "poll_metrics_skipped",
|
||||
hostId: host.id,
|
||||
backoffInfo,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Refresh host data from database to get latest SOCKS5 and other settings
|
||||
const refreshedHost = await fetchHostById(host.id, host.userId);
|
||||
if (!refreshedHost) {
|
||||
statsLogger.warn("Host not found during metrics polling", {
|
||||
@@ -1114,13 +1015,11 @@ class PollingManager {
|
||||
data: metrics,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
// Reset backoff on successful collection
|
||||
pollingBackoff.reset(refreshedHost.id);
|
||||
} catch (error) {
|
||||
const errorMessage =
|
||||
error instanceof Error ? error.message : String(error);
|
||||
|
||||
// Record failure for backoff
|
||||
pollingBackoff.recordFailure(refreshedHost.id);
|
||||
|
||||
const latestConfig = this.pollingConfigs.get(refreshedHost.id);
|
||||
@@ -1356,7 +1255,6 @@ async function resolveHostCredentials(
|
||||
createdAt: host.createdAt,
|
||||
updatedAt: host.updatedAt,
|
||||
userId: host.userId,
|
||||
// SOCKS5 proxy settings
|
||||
useSocks5: !!host.useSocks5,
|
||||
socks5Host: host.socks5Host || undefined,
|
||||
socks5Port: host.socks5Port || undefined,
|
||||
@@ -1415,21 +1313,6 @@ async function resolveHostCredentials(
|
||||
addLegacyCredentials(baseHost, host);
|
||||
}
|
||||
|
||||
statsLogger.info("Resolved host credentials with SOCKS5 settings", {
|
||||
operation: "resolve_host",
|
||||
hostId: host.id as number,
|
||||
useSocks5: baseHost.useSocks5,
|
||||
socks5Host: baseHost.socks5Host,
|
||||
socks5Port: baseHost.socks5Port,
|
||||
hasSocks5ProxyChain: !!(
|
||||
baseHost.socks5ProxyChain &&
|
||||
(baseHost.socks5ProxyChain as any[]).length > 0
|
||||
),
|
||||
proxyChainLength: baseHost.socks5ProxyChain
|
||||
? (baseHost.socks5ProxyChain as any[]).length
|
||||
: 0,
|
||||
});
|
||||
|
||||
return baseHost as unknown as SSHHostWithCredentials;
|
||||
} catch (error) {
|
||||
statsLogger.error(
|
||||
@@ -1654,12 +1537,7 @@ async function collectMetrics(host: SSHHostWithCredentials): Promise<{
|
||||
};
|
||||
try {
|
||||
login_stats = await collectLoginStats(client);
|
||||
} catch (e) {
|
||||
statsLogger.debug("Failed to collect login stats", {
|
||||
operation: "login_stats_failed",
|
||||
error: e instanceof Error ? e.message : String(e),
|
||||
});
|
||||
}
|
||||
} catch (e) {}
|
||||
|
||||
const result = {
|
||||
cpu,
|
||||
@@ -1800,7 +1678,6 @@ app.post("/refresh", async (req, res) => {
|
||||
});
|
||||
}
|
||||
|
||||
// Clear all connections to ensure fresh connections with updated settings
|
||||
connectionPool.clearAllConnections();
|
||||
|
||||
await pollingManager.refreshHostPolling(userId);
|
||||
@@ -1825,7 +1702,6 @@ app.post("/host-updated", async (req, res) => {
|
||||
try {
|
||||
const host = await fetchHostById(hostId, userId);
|
||||
if (host) {
|
||||
// Clear existing connections for this host to ensure new settings (like SOCKS5) are used
|
||||
connectionPool.clearHostConnections(host);
|
||||
|
||||
await pollingManager.startPollingForHost(host);
|
||||
|
||||
Reference in New Issue
Block a user