mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-19 23:35:59 +00:00
Merge branch 'master' of https://github.com/dbgate/dbgate
This commit is contained in:
@@ -141,7 +141,7 @@ module.exports = {
|
||||
},
|
||||
|
||||
executeQuery_meta: true,
|
||||
async executeQuery({ sesid, sql, autoCommit }) {
|
||||
async executeQuery({ sesid, sql, autoCommit, limitRows }) {
|
||||
const session = this.opened.find(x => x.sesid == sesid);
|
||||
if (!session) {
|
||||
throw new Error('Invalid session');
|
||||
@@ -149,7 +149,7 @@ module.exports = {
|
||||
|
||||
logger.info({ sesid, sql }, 'Processing query');
|
||||
this.dispatchMessage(sesid, 'Query execution started');
|
||||
session.subprocess.send({ msgtype: 'executeQuery', sql, autoCommit });
|
||||
session.subprocess.send({ msgtype: 'executeQuery', sql, autoCommit, limitRows });
|
||||
|
||||
return { state: 'ok' };
|
||||
},
|
||||
|
||||
@@ -117,7 +117,7 @@ async function handleExecuteControlCommand({ command }) {
|
||||
}
|
||||
}
|
||||
|
||||
async function handleExecuteQuery({ sql, autoCommit }) {
|
||||
async function handleExecuteQuery({ sql, autoCommit, limitRows }) {
|
||||
lastActivity = new Date().getTime();
|
||||
|
||||
await waitConnected();
|
||||
@@ -146,7 +146,7 @@ async function handleExecuteQuery({ sql, autoCommit }) {
|
||||
...driver.getQuerySplitterOptions('stream'),
|
||||
returnRichInfo: true,
|
||||
})) {
|
||||
await handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem);
|
||||
await handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem, undefined, limitRows);
|
||||
// const handler = new StreamHandler(resultIndex);
|
||||
// const stream = await driver.stream(systemConnection, sqlItem, handler);
|
||||
// handler.stream = stream;
|
||||
|
||||
@@ -82,20 +82,27 @@ class QueryStreamTableWriter {
|
||||
}
|
||||
|
||||
close(afterClose) {
|
||||
if (this.currentStream) {
|
||||
this.currentStream.end(() => {
|
||||
this.writeCurrentStats(true, true);
|
||||
if (afterClose) afterClose();
|
||||
});
|
||||
}
|
||||
return new Promise(resolve => {
|
||||
if (this.currentStream) {
|
||||
this.currentStream.end(() => {
|
||||
this.writeCurrentStats(true, true);
|
||||
if (afterClose) afterClose();
|
||||
resolve();
|
||||
});
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class StreamHandler {
|
||||
constructor(queryStreamInfoHolder, resolve, startLine, sesid = undefined) {
|
||||
constructor(queryStreamInfoHolder, resolve, startLine, sesid = undefined, limitRows = undefined) {
|
||||
this.recordset = this.recordset.bind(this);
|
||||
this.startLine = startLine;
|
||||
this.sesid = sesid;
|
||||
this.limitRows = limitRows;
|
||||
this.rowsLimitOverflow = false;
|
||||
this.row = this.row.bind(this);
|
||||
// this.error = this.error.bind(this);
|
||||
this.done = this.done.bind(this);
|
||||
@@ -107,6 +114,7 @@ class StreamHandler {
|
||||
this.plannedStats = false;
|
||||
this.queryStreamInfoHolder = queryStreamInfoHolder;
|
||||
this.resolve = resolve;
|
||||
this.rowCounter = 0;
|
||||
// currentHandlers = [...currentHandlers, this];
|
||||
}
|
||||
|
||||
@@ -118,6 +126,9 @@ class StreamHandler {
|
||||
}
|
||||
|
||||
recordset(columns) {
|
||||
if (this.rowsLimitOverflow) {
|
||||
return;
|
||||
}
|
||||
this.closeCurrentWriter();
|
||||
this.currentWriter = new QueryStreamTableWriter(this.sesid);
|
||||
this.currentWriter.initializeFromQuery(
|
||||
@@ -125,6 +136,7 @@ class StreamHandler {
|
||||
this.queryStreamInfoHolder.resultIndex
|
||||
);
|
||||
this.queryStreamInfoHolder.resultIndex += 1;
|
||||
this.rowCounter = 0;
|
||||
|
||||
// this.writeCurrentStats();
|
||||
|
||||
@@ -135,8 +147,36 @@ class StreamHandler {
|
||||
// }, 500);
|
||||
}
|
||||
row(row) {
|
||||
if (this.currentWriter) this.currentWriter.row(row);
|
||||
else if (row.message) process.send({ msgtype: 'info', info: { message: row.message }, sesid: this.sesid });
|
||||
if (this.rowsLimitOverflow) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.limitRows && this.rowCounter >= this.limitRows) {
|
||||
process.send({
|
||||
msgtype: 'info',
|
||||
info: { message: `Rows limit overflow, loaded ${this.rowCounter} rows, canceling query`, severity: 'error' },
|
||||
sesid: this.sesid,
|
||||
});
|
||||
this.rowsLimitOverflow = true;
|
||||
|
||||
this.queryStreamInfoHolder.canceled = true;
|
||||
if (this.currentWriter) {
|
||||
this.currentWriter.close().then(() => {
|
||||
process.exit(0);
|
||||
});
|
||||
} else {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.currentWriter) {
|
||||
this.currentWriter.row(row);
|
||||
this.rowCounter += 1;
|
||||
} else if (row.message) {
|
||||
process.send({ msgtype: 'info', info: { message: row.message }, sesid: this.sesid });
|
||||
}
|
||||
// this.onRow(this.jslid);
|
||||
}
|
||||
// error(error) {
|
||||
@@ -161,10 +201,10 @@ class StreamHandler {
|
||||
}
|
||||
}
|
||||
|
||||
function handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem, sesid = undefined) {
|
||||
function handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem, sesid = undefined, limitRows = undefined) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const start = sqlItem.trimStart || sqlItem.start;
|
||||
const handler = new StreamHandler(queryStreamInfoHolder, resolve, start && start.line, sesid);
|
||||
const handler = new StreamHandler(queryStreamInfoHolder, resolve, start && start.line, sesid, limitRows);
|
||||
driver.stream(dbhan, sqlItem.text, handler);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user