mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-18 16:06:01 +00:00
handleQueryStream refactor
This commit is contained in:
@@ -11,6 +11,7 @@ const { decryptConnection } = require('../utility/crypting');
|
|||||||
const { connectUtility } = require('../utility/connectUtility');
|
const { connectUtility } = require('../utility/connectUtility');
|
||||||
const { handleProcessCommunication } = require('../utility/processComm');
|
const { handleProcessCommunication } = require('../utility/processComm');
|
||||||
const { getLogger, extractIntSettingsValue, extractBoolSettingsValue } = require('dbgate-tools');
|
const { getLogger, extractIntSettingsValue, extractBoolSettingsValue } = require('dbgate-tools');
|
||||||
|
const { handleQueryStream, QueryStreamTableWriter } = require('../utility/handleQueryStream');
|
||||||
|
|
||||||
const logger = getLogger('sessionProcess');
|
const logger = getLogger('sessionProcess');
|
||||||
|
|
||||||
@@ -23,164 +24,6 @@ let lastActivity = null;
|
|||||||
let currentProfiler = null;
|
let currentProfiler = null;
|
||||||
let executingScripts = 0;
|
let executingScripts = 0;
|
||||||
|
|
||||||
class TableWriter {
|
|
||||||
constructor() {
|
|
||||||
this.currentRowCount = 0;
|
|
||||||
this.currentChangeIndex = 1;
|
|
||||||
this.initializedFile = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
initializeFromQuery(structure, resultIndex) {
|
|
||||||
this.jslid = crypto.randomUUID();
|
|
||||||
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
|
|
||||||
fs.writeFileSync(
|
|
||||||
this.currentFile,
|
|
||||||
JSON.stringify({
|
|
||||||
...structure,
|
|
||||||
__isStreamHeader: true,
|
|
||||||
}) + '\n'
|
|
||||||
);
|
|
||||||
this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' });
|
|
||||||
this.writeCurrentStats(false, false);
|
|
||||||
this.resultIndex = resultIndex;
|
|
||||||
this.initializedFile = true;
|
|
||||||
process.send({ msgtype: 'recordset', jslid: this.jslid, resultIndex });
|
|
||||||
}
|
|
||||||
|
|
||||||
initializeFromReader(jslid) {
|
|
||||||
this.jslid = jslid;
|
|
||||||
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
|
|
||||||
this.writeCurrentStats(false, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
row(row) {
|
|
||||||
// console.log('ACCEPT ROW', row);
|
|
||||||
this.currentStream.write(JSON.stringify(row) + '\n');
|
|
||||||
this.currentRowCount += 1;
|
|
||||||
|
|
||||||
if (!this.plannedStats) {
|
|
||||||
this.plannedStats = true;
|
|
||||||
process.nextTick(() => {
|
|
||||||
if (this.currentStream) this.currentStream.uncork();
|
|
||||||
process.nextTick(() => this.writeCurrentStats(false, true));
|
|
||||||
this.plannedStats = false;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rowFromReader(row) {
|
|
||||||
if (!this.initializedFile) {
|
|
||||||
process.send({ msgtype: 'initializeFile', jslid: this.jslid });
|
|
||||||
this.initializedFile = true;
|
|
||||||
|
|
||||||
fs.writeFileSync(this.currentFile, JSON.stringify(row) + '\n');
|
|
||||||
this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' });
|
|
||||||
this.writeCurrentStats(false, false);
|
|
||||||
this.initializedFile = true;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.row(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
writeCurrentStats(isFinished = false, emitEvent = false) {
|
|
||||||
const stats = {
|
|
||||||
rowCount: this.currentRowCount,
|
|
||||||
changeIndex: this.currentChangeIndex,
|
|
||||||
isFinished,
|
|
||||||
jslid: this.jslid,
|
|
||||||
};
|
|
||||||
fs.writeFileSync(`${this.currentFile}.stats`, JSON.stringify(stats));
|
|
||||||
this.currentChangeIndex += 1;
|
|
||||||
if (emitEvent) {
|
|
||||||
process.send({ msgtype: 'stats', ...stats });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
close(afterClose) {
|
|
||||||
if (this.currentStream) {
|
|
||||||
this.currentStream.end(() => {
|
|
||||||
this.writeCurrentStats(true, true);
|
|
||||||
if (afterClose) afterClose();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class StreamHandler {
|
|
||||||
constructor(resultIndexHolder, resolve, startLine) {
|
|
||||||
this.recordset = this.recordset.bind(this);
|
|
||||||
this.startLine = startLine;
|
|
||||||
this.row = this.row.bind(this);
|
|
||||||
// this.error = this.error.bind(this);
|
|
||||||
this.done = this.done.bind(this);
|
|
||||||
this.info = this.info.bind(this);
|
|
||||||
|
|
||||||
// use this for cancelling - not implemented
|
|
||||||
// this.stream = null;
|
|
||||||
|
|
||||||
this.plannedStats = false;
|
|
||||||
this.resultIndexHolder = resultIndexHolder;
|
|
||||||
this.resolve = resolve;
|
|
||||||
// currentHandlers = [...currentHandlers, this];
|
|
||||||
}
|
|
||||||
|
|
||||||
closeCurrentWriter() {
|
|
||||||
if (this.currentWriter) {
|
|
||||||
this.currentWriter.close();
|
|
||||||
this.currentWriter = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
recordset(columns) {
|
|
||||||
this.closeCurrentWriter();
|
|
||||||
this.currentWriter = new TableWriter();
|
|
||||||
this.currentWriter.initializeFromQuery(
|
|
||||||
Array.isArray(columns) ? { columns } : columns,
|
|
||||||
this.resultIndexHolder.value
|
|
||||||
);
|
|
||||||
this.resultIndexHolder.value += 1;
|
|
||||||
|
|
||||||
// this.writeCurrentStats();
|
|
||||||
|
|
||||||
// this.onRow = _.throttle((jslid) => {
|
|
||||||
// if (jslid == this.jslid) {
|
|
||||||
// this.writeCurrentStats(false, true);
|
|
||||||
// }
|
|
||||||
// }, 500);
|
|
||||||
}
|
|
||||||
row(row) {
|
|
||||||
if (this.currentWriter) this.currentWriter.row(row);
|
|
||||||
else if (row.message) process.send({ msgtype: 'info', info: { message: row.message } });
|
|
||||||
// this.onRow(this.jslid);
|
|
||||||
}
|
|
||||||
// error(error) {
|
|
||||||
// process.send({ msgtype: 'error', error });
|
|
||||||
// }
|
|
||||||
done(result) {
|
|
||||||
this.closeCurrentWriter();
|
|
||||||
// currentHandlers = currentHandlers.filter((x) => x != this);
|
|
||||||
this.resolve();
|
|
||||||
}
|
|
||||||
info(info) {
|
|
||||||
if (info && info.line != null) {
|
|
||||||
info = {
|
|
||||||
...info,
|
|
||||||
line: this.startLine + info.line,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
process.send({ msgtype: 'info', info });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function handleStream(driver, resultIndexHolder, sqlItem) {
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
const start = sqlItem.trimStart || sqlItem.start;
|
|
||||||
const handler = new StreamHandler(resultIndexHolder, resolve, start && start.line);
|
|
||||||
driver.stream(dbhan, sqlItem.text, handler);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function allowExecuteCustomScript(driver) {
|
function allowExecuteCustomScript(driver) {
|
||||||
if (driver.readOnlySessions) {
|
if (driver.readOnlySessions) {
|
||||||
return true;
|
return true;
|
||||||
@@ -227,7 +70,7 @@ async function handleStartProfiler({ jslid }) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const writer = new TableWriter();
|
const writer = new QueryStreamTableWriter();
|
||||||
writer.initializeFromReader(jslid);
|
writer.initializeFromReader(jslid);
|
||||||
|
|
||||||
currentProfiler = await driver.startProfiler(dbhan, {
|
currentProfiler = await driver.startProfiler(dbhan, {
|
||||||
@@ -313,7 +156,7 @@ async function handleExecuteQuery({ sql, autoCommit }) {
|
|||||||
...driver.getQuerySplitterOptions('stream'),
|
...driver.getQuerySplitterOptions('stream'),
|
||||||
returnRichInfo: true,
|
returnRichInfo: true,
|
||||||
})) {
|
})) {
|
||||||
await handleStream(driver, resultIndexHolder, sqlItem);
|
await handleQueryStream(dbhan, driver, resultIndexHolder, sqlItem);
|
||||||
// const handler = new StreamHandler(resultIndex);
|
// const handler = new StreamHandler(resultIndex);
|
||||||
// const stream = await driver.stream(systemConnection, sqlItem, handler);
|
// const stream = await driver.stream(systemConnection, sqlItem, handler);
|
||||||
// handler.stream = stream;
|
// handler.stream = stream;
|
||||||
@@ -341,7 +184,7 @@ async function handleExecuteReader({ jslid, sql, fileName }) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const writer = new TableWriter();
|
const writer = new QueryStreamTableWriter();
|
||||||
writer.initializeFromReader(jslid);
|
writer.initializeFromReader(jslid);
|
||||||
|
|
||||||
const reader = await driver.readQuery(dbhan, sql);
|
const reader = await driver.readQuery(dbhan, sql);
|
||||||
|
|||||||
169
packages/api/src/utility/handleQueryStream.js
Normal file
169
packages/api/src/utility/handleQueryStream.js
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
const crypto = require('crypto');
|
||||||
|
const path = require('path');
|
||||||
|
const fs = require('fs');
|
||||||
|
const _ = require('lodash');
|
||||||
|
|
||||||
|
const { jsldir } = require('../utility/directories');
|
||||||
|
|
||||||
|
class QueryStreamTableWriter {
|
||||||
|
constructor() {
|
||||||
|
this.currentRowCount = 0;
|
||||||
|
this.currentChangeIndex = 1;
|
||||||
|
this.initializedFile = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
initializeFromQuery(structure, resultIndex) {
|
||||||
|
this.jslid = crypto.randomUUID();
|
||||||
|
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
|
||||||
|
fs.writeFileSync(
|
||||||
|
this.currentFile,
|
||||||
|
JSON.stringify({
|
||||||
|
...structure,
|
||||||
|
__isStreamHeader: true,
|
||||||
|
}) + '\n'
|
||||||
|
);
|
||||||
|
this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' });
|
||||||
|
this.writeCurrentStats(false, false);
|
||||||
|
this.resultIndex = resultIndex;
|
||||||
|
this.initializedFile = true;
|
||||||
|
process.send({ msgtype: 'recordset', jslid: this.jslid, resultIndex });
|
||||||
|
}
|
||||||
|
|
||||||
|
initializeFromReader(jslid) {
|
||||||
|
this.jslid = jslid;
|
||||||
|
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
|
||||||
|
this.writeCurrentStats(false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
row(row) {
|
||||||
|
// console.log('ACCEPT ROW', row);
|
||||||
|
this.currentStream.write(JSON.stringify(row) + '\n');
|
||||||
|
this.currentRowCount += 1;
|
||||||
|
|
||||||
|
if (!this.plannedStats) {
|
||||||
|
this.plannedStats = true;
|
||||||
|
process.nextTick(() => {
|
||||||
|
if (this.currentStream) this.currentStream.uncork();
|
||||||
|
process.nextTick(() => this.writeCurrentStats(false, true));
|
||||||
|
this.plannedStats = false;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rowFromReader(row) {
|
||||||
|
if (!this.initializedFile) {
|
||||||
|
process.send({ msgtype: 'initializeFile', jslid: this.jslid });
|
||||||
|
this.initializedFile = true;
|
||||||
|
|
||||||
|
fs.writeFileSync(this.currentFile, JSON.stringify(row) + '\n');
|
||||||
|
this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' });
|
||||||
|
this.writeCurrentStats(false, false);
|
||||||
|
this.initializedFile = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.row(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
writeCurrentStats(isFinished = false, emitEvent = false) {
|
||||||
|
const stats = {
|
||||||
|
rowCount: this.currentRowCount,
|
||||||
|
changeIndex: this.currentChangeIndex,
|
||||||
|
isFinished,
|
||||||
|
jslid: this.jslid,
|
||||||
|
};
|
||||||
|
fs.writeFileSync(`${this.currentFile}.stats`, JSON.stringify(stats));
|
||||||
|
this.currentChangeIndex += 1;
|
||||||
|
if (emitEvent) {
|
||||||
|
process.send({ msgtype: 'stats', ...stats });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close(afterClose) {
|
||||||
|
if (this.currentStream) {
|
||||||
|
this.currentStream.end(() => {
|
||||||
|
this.writeCurrentStats(true, true);
|
||||||
|
if (afterClose) afterClose();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class StreamHandler {
|
||||||
|
constructor(resultIndexHolder, resolve, startLine) {
|
||||||
|
this.recordset = this.recordset.bind(this);
|
||||||
|
this.startLine = startLine;
|
||||||
|
this.row = this.row.bind(this);
|
||||||
|
// this.error = this.error.bind(this);
|
||||||
|
this.done = this.done.bind(this);
|
||||||
|
this.info = this.info.bind(this);
|
||||||
|
|
||||||
|
// use this for cancelling - not implemented
|
||||||
|
// this.stream = null;
|
||||||
|
|
||||||
|
this.plannedStats = false;
|
||||||
|
this.resultIndexHolder = resultIndexHolder;
|
||||||
|
this.resolve = resolve;
|
||||||
|
// currentHandlers = [...currentHandlers, this];
|
||||||
|
}
|
||||||
|
|
||||||
|
closeCurrentWriter() {
|
||||||
|
if (this.currentWriter) {
|
||||||
|
this.currentWriter.close();
|
||||||
|
this.currentWriter = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
recordset(columns) {
|
||||||
|
this.closeCurrentWriter();
|
||||||
|
this.currentWriter = new QueryStreamTableWriter();
|
||||||
|
this.currentWriter.initializeFromQuery(
|
||||||
|
Array.isArray(columns) ? { columns } : columns,
|
||||||
|
this.resultIndexHolder.value
|
||||||
|
);
|
||||||
|
this.resultIndexHolder.value += 1;
|
||||||
|
|
||||||
|
// this.writeCurrentStats();
|
||||||
|
|
||||||
|
// this.onRow = _.throttle((jslid) => {
|
||||||
|
// if (jslid == this.jslid) {
|
||||||
|
// this.writeCurrentStats(false, true);
|
||||||
|
// }
|
||||||
|
// }, 500);
|
||||||
|
}
|
||||||
|
row(row) {
|
||||||
|
if (this.currentWriter) this.currentWriter.row(row);
|
||||||
|
else if (row.message) process.send({ msgtype: 'info', info: { message: row.message } });
|
||||||
|
// this.onRow(this.jslid);
|
||||||
|
}
|
||||||
|
// error(error) {
|
||||||
|
// process.send({ msgtype: 'error', error });
|
||||||
|
// }
|
||||||
|
done(result) {
|
||||||
|
this.closeCurrentWriter();
|
||||||
|
// currentHandlers = currentHandlers.filter((x) => x != this);
|
||||||
|
this.resolve();
|
||||||
|
}
|
||||||
|
info(info) {
|
||||||
|
if (info && info.line != null) {
|
||||||
|
info = {
|
||||||
|
...info,
|
||||||
|
line: this.startLine + info.line,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
process.send({ msgtype: 'info', info });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleQueryStream(dbhan, driver, resultIndexHolder, sqlItem) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const start = sqlItem.trimStart || sqlItem.start;
|
||||||
|
const handler = new StreamHandler(resultIndexHolder, resolve, start && start.line);
|
||||||
|
driver.stream(dbhan, sqlItem.text, handler);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
handleQueryStream,
|
||||||
|
QueryStreamTableWriter,
|
||||||
|
};
|
||||||
Reference in New Issue
Block a user