query - basic print workflow - messages on client

This commit is contained in:
Jan Prochazka
2020-04-05 20:48:04 +02:00
parent 3df4e9b7dc
commit 72375ec635
13 changed files with 315 additions and 24 deletions

View File

@@ -49,7 +49,7 @@ module.exports = {
},
/** @param {import('@dbgate/types').OpenedDatabaseConnection} conn */
async sendRequest(conn, message) {
sendRequest(conn, message) {
const msgid = uuidv1();
const promise = new Promise((resolve, reject) => {
this.requests[msgid] = [resolve, reject];

View File

@@ -0,0 +1,68 @@
const _ = require('lodash');
const uuidv1 = require('uuid/v1');
const connections = require('./connections');
const socket = require('../utility/socket');
const { fork } = require('child_process');
const DatabaseAnalyser = require('@dbgate/engines/default/DatabaseAnalyser');
module.exports = {
/** @type {import('@dbgate/types').OpenedSession[]} */
opened: [],
handle_error(sesid, props) {
const { error } = props;
console.log(`Error in database session ${sesid}: ${error}`);
},
// handle_row(sesid, props) {
// const { row } = props;
// socket.emit('sessionRow', row);
// },
handle_info(sesid, props) {
const { info } = props;
socket.emit(`session-info-${sesid}`, info);
},
create_meta: 'post',
async create({ conid, database }) {
const sesid = uuidv1();
const connection = await connections.get({ conid });
const subprocess = fork(process.argv[1], ['sessionProcess']);
const newOpened = {
conid,
database,
subprocess,
connection,
sesid,
};
this.opened.push(newOpened);
// @ts-ignore
subprocess.on('message', ({ msgtype, ...message }) => {
this[`handle_${msgtype}`](sesid, message);
});
subprocess.send({ msgtype: 'connect', ...connection, database });
return newOpened;
},
executeQuery_meta: 'post',
async executeQuery({ sesid, sql }) {
const session = this.opened.find((x) => x.sesid == sesid);
if (!session) {
throw new Error('Invalid session');
}
console.log(`Processing query, sesid=${sesid}, sql=${sql}`);
session.subprocess.send({ msgtype: 'executeQuery', sql });
return { state: 'ok' };
},
// runCommand_meta: 'post',
// async runCommand({ conid, database, sql }) {
// console.log(`Running SQL command , conid=${conid}, database=${database}, sql=${sql}`);
// const opened = await this.ensureOpened(conid, database);
// const res = await this.sendRequest(opened, { msgtype: 'queryData', sql });
// return res;
// },
};

View File

@@ -10,6 +10,7 @@ const connections = require('./controllers/connections');
const serverConnections = require('./controllers/serverConnections');
const databaseConnections = require('./controllers/databaseConnections');
const tables = require('./controllers/tables');
const sessions = require('./controllers/sessions');
const socket = require('./utility/socket');
function start() {
@@ -27,6 +28,7 @@ function start() {
useController(app, '/server-connections', serverConnections);
useController(app, '/database-connections', databaseConnections);
useController(app, '/tables', tables);
useController(app, '/sessions', sessions);
if (fs.existsSync('/home/dbgate-docker/build')) {
// server static files inside docker container

View File

@@ -1,9 +1,11 @@
const connectProcess = require('./connectProcess');
const databaseConnectionProcess = require('./databaseConnectionProcess');
const serverConnectionProcess = require('./serverConnectionProcess');
const sessionProcess = require('./sessionProcess');
module.exports = {
connectProcess,
databaseConnectionProcess,
serverConnectionProcess,
sessionProcess,
};

View File

@@ -0,0 +1,69 @@
const engines = require('@dbgate/engines');
const driverConnect = require('../utility/driverConnect');
let systemConnection;
let storedConnection;
let afterConnectCallbacks = [];
async function handleConnect(connection) {
storedConnection = connection;
const driver = engines(storedConnection);
systemConnection = await driverConnect(driver, storedConnection);
for (const [resolve, reject] of afterConnectCallbacks) {
resolve();
}
afterConnectCallbacks = [];
}
function waitConnected() {
if (systemConnection) return Promise.resolve();
return new Promise((resolve, reject) => {
afterConnectCallbacks.push([resolve, reject]);
});
}
async function handleExecuteQuery({ sql }) {
await waitConnected();
const driver = engines(storedConnection);
await driver.stream(systemConnection, sql, {
recordset: (columns) => {
process.send({ msgtype: 'recordset', columns });
},
row: (row) => {
process.send({ msgtype: 'row', row });
},
error: (error) => {
process.send({ msgtype: 'error', error });
},
done: (result) => {
process.send({ msgtype: 'done', result });
},
info: (info) => {
process.send({ msgtype: 'info', info });
},
});
}
const messageHandlers = {
connect: handleConnect,
executeQuery: handleExecuteQuery,
};
async function handleMessage({ msgtype, ...other }) {
const handler = messageHandlers[msgtype];
await handler(other);
}
function start() {
process.on('message', async (message) => {
try {
await handleMessage(message);
} catch (e) {
process.send({ msgtype: 'error', error: e.message });
}
});
}
module.exports = { start };