diff --git a/packages/api/src/controllers/connections.js b/packages/api/src/controllers/connections.js index 902522c56..cedf0b795 100644 --- a/packages/api/src/controllers/connections.js +++ b/packages/api/src/controllers/connections.js @@ -6,6 +6,7 @@ const nedb = require('nedb-promises'); const { datadir } = require('../utility/directories'); const socket = require('../utility/socket'); const { encryptConnection } = require('../utility/crypting'); +const { handleProcessCommunication } = require('../utility/processComm'); function getPortalCollections() { if (process.env.CONNECTIONS) { @@ -47,6 +48,7 @@ module.exports = { test(req, res) { const subprocess = fork(process.argv[1], ['connectProcess', ...process.argv.slice(3)]); subprocess.on('message', resp => { + if (handleProcessCommunication(resp, subprocess)) return; // @ts-ignore const { msgtype } = resp; if (msgtype == 'connected' || msgtype == 'error') { diff --git a/packages/api/src/controllers/databaseConnections.js b/packages/api/src/controllers/databaseConnections.js index dac7756a9..e6a5ddad1 100644 --- a/packages/api/src/controllers/databaseConnections.js +++ b/packages/api/src/controllers/databaseConnections.js @@ -3,6 +3,7 @@ const connections = require('./connections'); const socket = require('../utility/socket'); const { fork } = require('child_process'); const { DatabaseAnalyser } = require('dbgate-tools'); +const { handleProcessCommunication } = require('../utility/processComm'); module.exports = { /** @type {import('dbgate-types').OpenedDatabaseConnection[]} */ @@ -50,8 +51,10 @@ module.exports = { status: { name: 'pending' }, }; this.opened.push(newOpened); - // @ts-ignore - subprocess.on('message', ({ msgtype, ...message }) => { + subprocess.on('message', message => { + // @ts-ignore + const { msgtype } = message; + if (handleProcessCommunication(message, subprocess)) return; if (newOpened.disconnected) return; this[`handle_${msgtype}`](conid, database, message); }); diff --git a/packages/api/src/controllers/runners.js b/packages/api/src/controllers/runners.js index 4b52723ba..bec81e626 100644 --- a/packages/api/src/controllers/runners.js +++ b/packages/api/src/controllers/runners.js @@ -7,6 +7,7 @@ const socket = require('../utility/socket'); const { fork } = require('child_process'); const { rundir, uploadsdir, pluginsdir } = require('../utility/directories'); const { extractShellApiPlugins, extractShellApiFunctionName } = require('dbgate-tools'); +const { handleProcessCommunication } = require('../utility/processComm'); function extractPlugins(script) { const requireRegex = /\s*\/\/\s*@require\s+([^\s]+)\s*\n/g; @@ -123,8 +124,10 @@ module.exports = { subprocess, }; this.opened.push(newOpened); - // @ts-ignore - subprocess.on('message', ({ msgtype, ...message }) => { + subprocess.on('message', message => { + // @ts-ignore + const { msgtype } = message; + if (handleProcessCommunication(message, subprocess)) return; this[`handle_${msgtype}`](runid, message); }); return newOpened; diff --git a/packages/api/src/controllers/serverConnections.js b/packages/api/src/controllers/serverConnections.js index 17c7154ef..41683a37f 100644 --- a/packages/api/src/controllers/serverConnections.js +++ b/packages/api/src/controllers/serverConnections.js @@ -3,6 +3,7 @@ const socket = require('../utility/socket'); const { fork } = require('child_process'); const _ = require('lodash'); const AsyncLock = require('async-lock'); +const { handleProcessCommunication } = require('../utility/processComm'); const lock = new AsyncLock(); module.exports = { @@ -43,8 +44,10 @@ module.exports = { this.opened.push(newOpened); delete this.closed[conid]; socket.emitChanged(`server-status-changed`); - // @ts-ignore - subprocess.on('message', ({ msgtype, ...message }) => { + subprocess.on('message', message => { + // @ts-ignore + const { msgtype } = message; + if (handleProcessCommunication(message, subprocess)) return; if (newOpened.disconnected) return; this[`handle_${msgtype}`](conid, message); }); diff --git a/packages/api/src/controllers/sessions.js b/packages/api/src/controllers/sessions.js index f7ec88b55..d33d4ba83 100644 --- a/packages/api/src/controllers/sessions.js +++ b/packages/api/src/controllers/sessions.js @@ -4,6 +4,7 @@ const connections = require('./connections'); const socket = require('../utility/socket'); const { fork } = require('child_process'); const jsldata = require('./jsldata'); +const { handleProcessCommunication } = require('../utility/processComm'); module.exports = { /** @type {import('dbgate-types').OpenedSession[]} */ @@ -73,8 +74,10 @@ module.exports = { sesid, }; this.opened.push(newOpened); - // @ts-ignore - subprocess.on('message', ({ msgtype, ...message }) => { + subprocess.on('message', message => { + // @ts-ignore + const { msgtype } = message; + if (handleProcessCommunication(message, subprocess)) return; this[`handle_${msgtype}`](sesid, message); }); subprocess.send({ msgtype: 'connect', ...connection, database }); diff --git a/packages/api/src/proc/connectProcess.js b/packages/api/src/proc/connectProcess.js index 570899443..d6724e148 100644 --- a/packages/api/src/proc/connectProcess.js +++ b/packages/api/src/proc/connectProcess.js @@ -1,10 +1,12 @@ const childProcessChecker = require('../utility/childProcessChecker'); const requireEngineDriver = require('../utility/requireEngineDriver'); const connectUtility = require('../utility/connectUtility'); +const { handleProcessCommunication } = require('../utility/processComm'); function start() { childProcessChecker(); process.on('message', async connection => { + if (handleProcessCommunication(connection)) return; try { const driver = requireEngineDriver(connection); const conn = await connectUtility(driver, connection); diff --git a/packages/api/src/proc/databaseConnectionProcess.js b/packages/api/src/proc/databaseConnectionProcess.js index 1719a266d..579fa3f89 100644 --- a/packages/api/src/proc/databaseConnectionProcess.js +++ b/packages/api/src/proc/databaseConnectionProcess.js @@ -2,6 +2,7 @@ const stableStringify = require('json-stable-stringify'); const childProcessChecker = require('../utility/childProcessChecker'); const requireEngineDriver = require('../utility/requireEngineDriver'); const connectUtility = require('../utility/connectUtility'); +const { handleProcessCommunication } = require('../utility/processComm'); let systemConnection; let storedConnection; @@ -127,6 +128,7 @@ function start() { }, 60 * 1000); process.on('message', async message => { + if (handleProcessCommunication(message)) return; try { await handleMessage(message); } catch (e) { diff --git a/packages/api/src/proc/jslDatastoreProcess.js b/packages/api/src/proc/jslDatastoreProcess.js index f3299faf1..36dfbee6f 100644 --- a/packages/api/src/proc/jslDatastoreProcess.js +++ b/packages/api/src/proc/jslDatastoreProcess.js @@ -1,5 +1,6 @@ const childProcessChecker = require('../utility/childProcessChecker'); const JsonLinesDatastore = require('../utility/JsonLinesDatastore'); +const { handleProcessCommunication } = require('../utility/processComm'); let lastPing = null; let datastore = new JsonLinesDatastore(); @@ -47,6 +48,7 @@ function start() { }, 60 * 1000); process.on('message', async message => { + if (handleProcessCommunication(message)) return; try { await handleMessage(message); } catch (e) { diff --git a/packages/api/src/proc/serverConnectionProcess.js b/packages/api/src/proc/serverConnectionProcess.js index 2c7fed0e7..728f6f81b 100644 --- a/packages/api/src/proc/serverConnectionProcess.js +++ b/packages/api/src/proc/serverConnectionProcess.js @@ -3,6 +3,7 @@ const childProcessChecker = require('../utility/childProcessChecker'); const requireEngineDriver = require('../utility/requireEngineDriver'); const { decryptConnection } = require('../utility/crypting'); const connectUtility = require('../utility/connectUtility'); +const { handleProcessCommunication } = require('../utility/processComm'); let systemConnection; let storedConnection; @@ -97,6 +98,7 @@ function start() { }, 60 * 1000); process.on('message', async message => { + if (handleProcessCommunication(message)) return; try { await handleMessage(message); } catch (err) { diff --git a/packages/api/src/proc/sessionProcess.js b/packages/api/src/proc/sessionProcess.js index b3dcce4b6..6e71ba4f4 100644 --- a/packages/api/src/proc/sessionProcess.js +++ b/packages/api/src/proc/sessionProcess.js @@ -9,6 +9,7 @@ const { jsldir } = require('../utility/directories'); const requireEngineDriver = require('../utility/requireEngineDriver'); const { decryptConnection } = require('../utility/crypting'); const connectUtility = require('../utility/connectUtility'); +const { handleProcessCommunication } = require('../utility/processComm'); let systemConnection; let storedConnection; @@ -183,6 +184,7 @@ async function handleMessage({ msgtype, ...other }) { function start() { childProcessChecker(); process.on('message', async message => { + if (handleProcessCommunication(message)) return; try { await handleMessage(message); } catch (e) { diff --git a/packages/api/src/utility/DatastoreProxy.js b/packages/api/src/utility/DatastoreProxy.js index 2f9338b4e..6f2a3113f 100644 --- a/packages/api/src/utility/DatastoreProxy.js +++ b/packages/api/src/utility/DatastoreProxy.js @@ -1,5 +1,6 @@ const { fork } = require('child_process'); const uuidv1 = require('uuid/v1'); +const { handleProcessCommunication } = require('./processComm'); class DatastoreProxy { constructor(file) { @@ -30,8 +31,11 @@ class DatastoreProxy { if (!this.subprocess) { this.subprocess = fork(process.argv[1], ['jslDatastoreProcess', ...process.argv.slice(3)]); - // @ts-ignore - this.subprocess.on('message', ({ msgtype, ...message }) => { + this.subprocess.on('message', message => { + // @ts-ignore + const { msgtype } = message; + if (handleProcessCommunication(message, this.subprocess)) return; + // if (this.disconnected) return; this[`handle_${msgtype}`](message); }); diff --git a/packages/api/src/utility/connectUtility.js b/packages/api/src/utility/connectUtility.js index 33c4e7766..3c07c53b9 100644 --- a/packages/api/src/utility/connectUtility.js +++ b/packages/api/src/utility/connectUtility.js @@ -1,33 +1,36 @@ const { SSHConnection } = require('node-ssh-forward'); const portfinder = require('portfinder'); const { decryptConnection } = require('./crypting'); +const { getSshTunnel } = require('./sshTunnel'); +const { getSshTunnelProxy } = require('./sshTunnelProxy'); async function connectUtility(driver, storedConnection) { let connection = decryptConnection(storedConnection); if (connection.useSshTunnel) { - const sshConfig = { - endHost: connection.sshHost || '', - endPort: connection.sshPort || 22, - bastionHost: '', - agentForward: false, - passphrase: undefined, - username: connection.sshLogin, - password: connection.sshPassword, - skipAutoPrivateKey: true, - noReadline: true, - }; + const localPort = await getSshTunnelProxy(connection); + // const sshConfig = { + // endHost: connection.sshHost || '', + // endPort: connection.sshPort || 22, + // bastionHost: '', + // agentForward: false, + // passphrase: undefined, + // username: connection.sshLogin, + // password: connection.sshPassword, + // skipAutoPrivateKey: true, + // noReadline: true, + // }; - const sshConn = new SSHConnection(sshConfig); - const localPort = await portfinder.getPortPromise({ port: 10000, stopPort: 60000 }); - // workaround for `getPortPromise` not releasing the port quickly enough - await new Promise(resolve => setTimeout(resolve, 500)); - const tunnelConfig = { - fromPort: localPort, - toPort: connection.port, - toHost: connection.server, - }; - const tunnel = await sshConn.forward(tunnelConfig); - console.log(`Created SSH tunnel to ${connection.sshHost}-${connection.server}:${connection.port}, using local port ${localPort}`) + // const sshConn = new SSHConnection(sshConfig); + // const localPort = await portfinder.getPortPromise({ port: 10000, stopPort: 60000 }); + // // workaround for `getPortPromise` not releasing the port quickly enough + // await new Promise(resolve => setTimeout(resolve, 500)); + // const tunnelConfig = { + // fromPort: localPort, + // toPort: connection.port, + // toHost: connection.server, + // }; + // const tunnel = await sshConn.forward(tunnelConfig); + // console.log(`Created SSH tunnel to ${connection.sshHost}-${connection.server}:${connection.port}, using local port ${localPort}`) connection = { ...connection, diff --git a/packages/api/src/utility/processComm.js b/packages/api/src/utility/processComm.js new file mode 100644 index 000000000..f199069af --- /dev/null +++ b/packages/api/src/utility/processComm.js @@ -0,0 +1,18 @@ +const { handleGetSshTunnelRequest, handleGetSshTunnelResponse } = require('./sshTunnelProxy'); + +function handleProcessCommunication(message, subprocess) { + const { msgtype } = message; + if (msgtype == 'getsshtunnel-request') { + handleGetSshTunnelRequest(message, subprocess); + return true; + } + if (msgtype == 'getsshtunnel-response') { + handleGetSshTunnelResponse(message, subprocess); + return true; + } + return false; +} + +module.exports = { + handleProcessCommunication, +}; diff --git a/packages/api/src/utility/sshTunnel.js b/packages/api/src/utility/sshTunnel.js new file mode 100644 index 000000000..c37b9e726 --- /dev/null +++ b/packages/api/src/utility/sshTunnel.js @@ -0,0 +1,61 @@ +const { SSHConnection } = require('node-ssh-forward'); +const portfinder = require('portfinder'); +const stableStringify = require('json-stable-stringify'); +const _ = require('lodash'); + +const sshConnectionCache = {}; +const sshTunnelCache = {}; + +const CONNECTION_FIELDS = ['sshHost', 'sshPort', 'sshLogin', 'sshPassword']; +const TUNNEL_FIELDS = [...CONNECTION_FIELDS, 'server', 'port']; + +async function getSshConnection(connection) { + const connectionCacheKey = stableStringify(_.pick(connection, CONNECTION_FIELDS)); + if (sshConnectionCache[connectionCacheKey]) return sshConnectionCache[connectionCacheKey]; + + const sshConfig = { + endHost: connection.sshHost || '', + endPort: connection.sshPort || 22, + bastionHost: '', + agentForward: false, + passphrase: undefined, + username: connection.sshLogin, + password: connection.sshPassword, + skipAutoPrivateKey: true, + noReadline: true, + }; + + const sshConn = new SSHConnection(sshConfig); + sshConnectionCache[connectionCacheKey] = sshConn; + return sshConn; +} + +async function getSshTunnel(connection) { + const sshConn = await getSshConnection(connection); + const tunnelCacheKey = stableStringify(_.pick(connection, TUNNEL_FIELDS)); + if (sshTunnelCache[tunnelCacheKey]) return sshTunnelCache[tunnelCacheKey].localPort; + + const localPort = await portfinder.getPortPromise({ port: 10000, stopPort: 60000 }); + // workaround for `getPortPromise` not releasing the port quickly enough + await new Promise(resolve => setTimeout(resolve, 500)); + const tunnelConfig = { + fromPort: localPort, + toPort: connection.port, + toHost: connection.server, + }; + const tunnel = await sshConn.forward(tunnelConfig); + console.log( + `Created SSH tunnel to ${connection.sshHost}-${connection.server}:${connection.port}, using local port ${localPort}` + ); + + sshTunnelCache[tunnelCacheKey] = { + tunnel, + localPort, + }; + + return localPort; +} + +module.exports = { + getSshTunnel, +}; diff --git a/packages/api/src/utility/sshTunnelProxy.js b/packages/api/src/utility/sshTunnelProxy.js new file mode 100644 index 000000000..afca7cc6e --- /dev/null +++ b/packages/api/src/utility/sshTunnelProxy.js @@ -0,0 +1,30 @@ +const uuidv1 = require('uuid/v1'); +const { getSshTunnel } = require('./sshTunnel'); + +const dispatchedMessages = {}; + +async function handleGetSshTunnelRequest({ msgid, connection }, subprocess) { + const response = await getSshTunnel(connection); + subprocess.send({ msgtype: 'getsshtunnel-response', msgid, response }); +} + +function handleGetSshTunnelResponse({ msgid, response }, subprocess) { + const { resolve } = dispatchedMessages[msgid]; + delete dispatchedMessages[msgid]; + resolve(response); +} + +async function getSshTunnelProxy(connection) { + if (!process.send) return getSshTunnel(connection); + const msgid = uuidv1(); + process.send({ msgtype: 'getsshtunnel-request', msgid, connection }); + return new Promise((resolve, reject) => { + dispatchedMessages[msgid] = { resolve, reject }; + }); +} + +module.exports = { + handleGetSshTunnelRequest, + handleGetSshTunnelResponse, + getSshTunnelProxy, +};