ssh tunnel - reuse SSH connection + local port for multiple DB connections

This commit is contained in:
Jan Prochazka
2021-02-13 07:47:55 +01:00
parent 728ca72cc1
commit 114dc0b543
15 changed files with 172 additions and 32 deletions

View File

@@ -6,6 +6,7 @@ const nedb = require('nedb-promises');
const { datadir } = require('../utility/directories');
const socket = require('../utility/socket');
const { encryptConnection } = require('../utility/crypting');
const { handleProcessCommunication } = require('../utility/processComm');
function getPortalCollections() {
if (process.env.CONNECTIONS) {
@@ -47,6 +48,7 @@ module.exports = {
test(req, res) {
const subprocess = fork(process.argv[1], ['connectProcess', ...process.argv.slice(3)]);
subprocess.on('message', resp => {
if (handleProcessCommunication(resp, subprocess)) return;
// @ts-ignore
const { msgtype } = resp;
if (msgtype == 'connected' || msgtype == 'error') {

View File

@@ -3,6 +3,7 @@ const connections = require('./connections');
const socket = require('../utility/socket');
const { fork } = require('child_process');
const { DatabaseAnalyser } = require('dbgate-tools');
const { handleProcessCommunication } = require('../utility/processComm');
module.exports = {
/** @type {import('dbgate-types').OpenedDatabaseConnection[]} */
@@ -50,8 +51,10 @@ module.exports = {
status: { name: 'pending' },
};
this.opened.push(newOpened);
// @ts-ignore
subprocess.on('message', ({ msgtype, ...message }) => {
subprocess.on('message', message => {
// @ts-ignore
const { msgtype } = message;
if (handleProcessCommunication(message, subprocess)) return;
if (newOpened.disconnected) return;
this[`handle_${msgtype}`](conid, database, message);
});

View File

@@ -7,6 +7,7 @@ const socket = require('../utility/socket');
const { fork } = require('child_process');
const { rundir, uploadsdir, pluginsdir } = require('../utility/directories');
const { extractShellApiPlugins, extractShellApiFunctionName } = require('dbgate-tools');
const { handleProcessCommunication } = require('../utility/processComm');
function extractPlugins(script) {
const requireRegex = /\s*\/\/\s*@require\s+([^\s]+)\s*\n/g;
@@ -123,8 +124,10 @@ module.exports = {
subprocess,
};
this.opened.push(newOpened);
// @ts-ignore
subprocess.on('message', ({ msgtype, ...message }) => {
subprocess.on('message', message => {
// @ts-ignore
const { msgtype } = message;
if (handleProcessCommunication(message, subprocess)) return;
this[`handle_${msgtype}`](runid, message);
});
return newOpened;

View File

@@ -3,6 +3,7 @@ const socket = require('../utility/socket');
const { fork } = require('child_process');
const _ = require('lodash');
const AsyncLock = require('async-lock');
const { handleProcessCommunication } = require('../utility/processComm');
const lock = new AsyncLock();
module.exports = {
@@ -43,8 +44,10 @@ module.exports = {
this.opened.push(newOpened);
delete this.closed[conid];
socket.emitChanged(`server-status-changed`);
// @ts-ignore
subprocess.on('message', ({ msgtype, ...message }) => {
subprocess.on('message', message => {
// @ts-ignore
const { msgtype } = message;
if (handleProcessCommunication(message, subprocess)) return;
if (newOpened.disconnected) return;
this[`handle_${msgtype}`](conid, message);
});

View File

@@ -4,6 +4,7 @@ const connections = require('./connections');
const socket = require('../utility/socket');
const { fork } = require('child_process');
const jsldata = require('./jsldata');
const { handleProcessCommunication } = require('../utility/processComm');
module.exports = {
/** @type {import('dbgate-types').OpenedSession[]} */
@@ -73,8 +74,10 @@ module.exports = {
sesid,
};
this.opened.push(newOpened);
// @ts-ignore
subprocess.on('message', ({ msgtype, ...message }) => {
subprocess.on('message', message => {
// @ts-ignore
const { msgtype } = message;
if (handleProcessCommunication(message, subprocess)) return;
this[`handle_${msgtype}`](sesid, message);
});
subprocess.send({ msgtype: 'connect', ...connection, database });

View File

@@ -1,10 +1,12 @@
const childProcessChecker = require('../utility/childProcessChecker');
const requireEngineDriver = require('../utility/requireEngineDriver');
const connectUtility = require('../utility/connectUtility');
const { handleProcessCommunication } = require('../utility/processComm');
function start() {
childProcessChecker();
process.on('message', async connection => {
if (handleProcessCommunication(connection)) return;
try {
const driver = requireEngineDriver(connection);
const conn = await connectUtility(driver, connection);

View File

@@ -2,6 +2,7 @@ const stableStringify = require('json-stable-stringify');
const childProcessChecker = require('../utility/childProcessChecker');
const requireEngineDriver = require('../utility/requireEngineDriver');
const connectUtility = require('../utility/connectUtility');
const { handleProcessCommunication } = require('../utility/processComm');
let systemConnection;
let storedConnection;
@@ -127,6 +128,7 @@ function start() {
}, 60 * 1000);
process.on('message', async message => {
if (handleProcessCommunication(message)) return;
try {
await handleMessage(message);
} catch (e) {

View File

@@ -1,5 +1,6 @@
const childProcessChecker = require('../utility/childProcessChecker');
const JsonLinesDatastore = require('../utility/JsonLinesDatastore');
const { handleProcessCommunication } = require('../utility/processComm');
let lastPing = null;
let datastore = new JsonLinesDatastore();
@@ -47,6 +48,7 @@ function start() {
}, 60 * 1000);
process.on('message', async message => {
if (handleProcessCommunication(message)) return;
try {
await handleMessage(message);
} catch (e) {

View File

@@ -3,6 +3,7 @@ const childProcessChecker = require('../utility/childProcessChecker');
const requireEngineDriver = require('../utility/requireEngineDriver');
const { decryptConnection } = require('../utility/crypting');
const connectUtility = require('../utility/connectUtility');
const { handleProcessCommunication } = require('../utility/processComm');
let systemConnection;
let storedConnection;
@@ -97,6 +98,7 @@ function start() {
}, 60 * 1000);
process.on('message', async message => {
if (handleProcessCommunication(message)) return;
try {
await handleMessage(message);
} catch (err) {

View File

@@ -9,6 +9,7 @@ const { jsldir } = require('../utility/directories');
const requireEngineDriver = require('../utility/requireEngineDriver');
const { decryptConnection } = require('../utility/crypting');
const connectUtility = require('../utility/connectUtility');
const { handleProcessCommunication } = require('../utility/processComm');
let systemConnection;
let storedConnection;
@@ -183,6 +184,7 @@ async function handleMessage({ msgtype, ...other }) {
function start() {
childProcessChecker();
process.on('message', async message => {
if (handleProcessCommunication(message)) return;
try {
await handleMessage(message);
} catch (e) {

View File

@@ -1,5 +1,6 @@
const { fork } = require('child_process');
const uuidv1 = require('uuid/v1');
const { handleProcessCommunication } = require('./processComm');
class DatastoreProxy {
constructor(file) {
@@ -30,8 +31,11 @@ class DatastoreProxy {
if (!this.subprocess) {
this.subprocess = fork(process.argv[1], ['jslDatastoreProcess', ...process.argv.slice(3)]);
// @ts-ignore
this.subprocess.on('message', ({ msgtype, ...message }) => {
this.subprocess.on('message', message => {
// @ts-ignore
const { msgtype } = message;
if (handleProcessCommunication(message, this.subprocess)) return;
// if (this.disconnected) return;
this[`handle_${msgtype}`](message);
});

View File

@@ -1,33 +1,36 @@
const { SSHConnection } = require('node-ssh-forward');
const portfinder = require('portfinder');
const { decryptConnection } = require('./crypting');
const { getSshTunnel } = require('./sshTunnel');
const { getSshTunnelProxy } = require('./sshTunnelProxy');
async function connectUtility(driver, storedConnection) {
let connection = decryptConnection(storedConnection);
if (connection.useSshTunnel) {
const sshConfig = {
endHost: connection.sshHost || '',
endPort: connection.sshPort || 22,
bastionHost: '',
agentForward: false,
passphrase: undefined,
username: connection.sshLogin,
password: connection.sshPassword,
skipAutoPrivateKey: true,
noReadline: true,
};
const localPort = await getSshTunnelProxy(connection);
// const sshConfig = {
// endHost: connection.sshHost || '',
// endPort: connection.sshPort || 22,
// bastionHost: '',
// agentForward: false,
// passphrase: undefined,
// username: connection.sshLogin,
// password: connection.sshPassword,
// skipAutoPrivateKey: true,
// noReadline: true,
// };
const sshConn = new SSHConnection(sshConfig);
const localPort = await portfinder.getPortPromise({ port: 10000, stopPort: 60000 });
// workaround for `getPortPromise` not releasing the port quickly enough
await new Promise(resolve => setTimeout(resolve, 500));
const tunnelConfig = {
fromPort: localPort,
toPort: connection.port,
toHost: connection.server,
};
const tunnel = await sshConn.forward(tunnelConfig);
console.log(`Created SSH tunnel to ${connection.sshHost}-${connection.server}:${connection.port}, using local port ${localPort}`)
// const sshConn = new SSHConnection(sshConfig);
// const localPort = await portfinder.getPortPromise({ port: 10000, stopPort: 60000 });
// // workaround for `getPortPromise` not releasing the port quickly enough
// await new Promise(resolve => setTimeout(resolve, 500));
// const tunnelConfig = {
// fromPort: localPort,
// toPort: connection.port,
// toHost: connection.server,
// };
// const tunnel = await sshConn.forward(tunnelConfig);
// console.log(`Created SSH tunnel to ${connection.sshHost}-${connection.server}:${connection.port}, using local port ${localPort}`)
connection = {
...connection,

View File

@@ -0,0 +1,18 @@
const { handleGetSshTunnelRequest, handleGetSshTunnelResponse } = require('./sshTunnelProxy');
function handleProcessCommunication(message, subprocess) {
const { msgtype } = message;
if (msgtype == 'getsshtunnel-request') {
handleGetSshTunnelRequest(message, subprocess);
return true;
}
if (msgtype == 'getsshtunnel-response') {
handleGetSshTunnelResponse(message, subprocess);
return true;
}
return false;
}
module.exports = {
handleProcessCommunication,
};

View File

@@ -0,0 +1,61 @@
const { SSHConnection } = require('node-ssh-forward');
const portfinder = require('portfinder');
const stableStringify = require('json-stable-stringify');
const _ = require('lodash');
const sshConnectionCache = {};
const sshTunnelCache = {};
const CONNECTION_FIELDS = ['sshHost', 'sshPort', 'sshLogin', 'sshPassword'];
const TUNNEL_FIELDS = [...CONNECTION_FIELDS, 'server', 'port'];
async function getSshConnection(connection) {
const connectionCacheKey = stableStringify(_.pick(connection, CONNECTION_FIELDS));
if (sshConnectionCache[connectionCacheKey]) return sshConnectionCache[connectionCacheKey];
const sshConfig = {
endHost: connection.sshHost || '',
endPort: connection.sshPort || 22,
bastionHost: '',
agentForward: false,
passphrase: undefined,
username: connection.sshLogin,
password: connection.sshPassword,
skipAutoPrivateKey: true,
noReadline: true,
};
const sshConn = new SSHConnection(sshConfig);
sshConnectionCache[connectionCacheKey] = sshConn;
return sshConn;
}
async function getSshTunnel(connection) {
const sshConn = await getSshConnection(connection);
const tunnelCacheKey = stableStringify(_.pick(connection, TUNNEL_FIELDS));
if (sshTunnelCache[tunnelCacheKey]) return sshTunnelCache[tunnelCacheKey].localPort;
const localPort = await portfinder.getPortPromise({ port: 10000, stopPort: 60000 });
// workaround for `getPortPromise` not releasing the port quickly enough
await new Promise(resolve => setTimeout(resolve, 500));
const tunnelConfig = {
fromPort: localPort,
toPort: connection.port,
toHost: connection.server,
};
const tunnel = await sshConn.forward(tunnelConfig);
console.log(
`Created SSH tunnel to ${connection.sshHost}-${connection.server}:${connection.port}, using local port ${localPort}`
);
sshTunnelCache[tunnelCacheKey] = {
tunnel,
localPort,
};
return localPort;
}
module.exports = {
getSshTunnel,
};

View File

@@ -0,0 +1,30 @@
const uuidv1 = require('uuid/v1');
const { getSshTunnel } = require('./sshTunnel');
const dispatchedMessages = {};
async function handleGetSshTunnelRequest({ msgid, connection }, subprocess) {
const response = await getSshTunnel(connection);
subprocess.send({ msgtype: 'getsshtunnel-response', msgid, response });
}
function handleGetSshTunnelResponse({ msgid, response }, subprocess) {
const { resolve } = dispatchedMessages[msgid];
delete dispatchedMessages[msgid];
resolve(response);
}
async function getSshTunnelProxy(connection) {
if (!process.send) return getSshTunnel(connection);
const msgid = uuidv1();
process.send({ msgtype: 'getsshtunnel-request', msgid, connection });
return new Promise((resolve, reject) => {
dispatchedMessages[msgid] = { resolve, reject };
});
}
module.exports = {
handleGetSshTunnelRequest,
handleGetSshTunnelResponse,
getSshTunnelProxy,
};