query excecute fixes

This commit is contained in:
Jan Prochazka
2021-01-01 11:05:02 +01:00
parent fbd254bafc
commit fc79f5f07c
4 changed files with 59 additions and 41 deletions

View File

@@ -11,7 +11,7 @@ const requireEngineDriver = require('../utility/requireEngineDriver');
let systemConnection;
let storedConnection;
let afterConnectCallbacks = [];
let currentHandlers = [];
// let currentHandlers = [];
class TableWriter {
constructor(columns, resultIndex) {
@@ -64,17 +64,20 @@ class TableWriter {
}
class StreamHandler {
constructor(resultIndex) {
constructor(resultIndexHolder, resolve) {
this.recordset = this.recordset.bind(this);
this.row = this.row.bind(this);
// this.error = this.error.bind(this);
this.done = this.done.bind(this);
this.info = this.info.bind(this);
// use this for cancelling
this.stream = null;
// use this for cancelling - not implemented
// this.stream = null;
this.plannedStats = false;
this.resultIndex = resultIndex;
currentHandlers = [...currentHandlers, this];
this.resultIndexHolder = resultIndexHolder;
this.resolve = resolve;
// currentHandlers = [...currentHandlers, this];
}
closeCurrentWriter() {
@@ -86,7 +89,8 @@ class StreamHandler {
recordset(columns) {
this.closeCurrentWriter();
this.currentWriter = new TableWriter(columns, this.resultIndex);
this.currentWriter = new TableWriter(columns, this.resultIndexHolder.value);
this.resultIndexHolder.value += 1;
// this.writeCurrentStats();
@@ -107,14 +111,21 @@ class StreamHandler {
// }
done(result) {
this.closeCurrentWriter();
process.send({ msgtype: 'done', result });
currentHandlers = currentHandlers.filter((x) => x != this);
// currentHandlers = currentHandlers.filter((x) => x != this);
this.resolve();
}
info(info) {
process.send({ msgtype: 'info', info });
}
}
function handleStream(driver, resultIndexHolder, sql) {
return new Promise((resolve, reject) => {
const handler = new StreamHandler(resultIndexHolder, resolve);
driver.stream(systemConnection, sql, handler);
});
}
async function handleConnect(connection) {
storedConnection = connection;
@@ -126,11 +137,11 @@ async function handleConnect(connection) {
afterConnectCallbacks = [];
}
function handleCancel() {
for (const handler of currentHandlers) {
if (handler.stream) handler.stream.cancel();
}
}
// function handleCancel() {
// for (const handler of currentHandlers) {
// if (handler.stream) handler.stream.cancel();
// }
// }
function waitConnected() {
if (systemConnection) return Promise.resolve();
@@ -143,19 +154,23 @@ async function handleExecuteQuery({ sql }) {
await waitConnected();
const driver = requireEngineDriver(storedConnection);
let resultIndex = 0;
const resultIndexHolder = {
value: 0,
};
for (const sqlItem of goSplit(sql)) {
const handler = new StreamHandler(resultIndex);
const stream = await driver.stream(systemConnection, sqlItem, handler);
handler.stream = stream;
resultIndex += 1;
await handleStream(driver, resultIndexHolder, sqlItem);
// const handler = new StreamHandler(resultIndex);
// const stream = await driver.stream(systemConnection, sqlItem, handler);
// handler.stream = stream;
// resultIndex = handler.resultIndex;
}
process.send({ msgtype: 'done' });
}
const messageHandlers = {
connect: handleConnect,
executeQuery: handleExecuteQuery,
cancel: handleCancel,
// cancel: handleCancel,
};
async function handleMessage({ msgtype, ...other }) {