SYNC: Limit query result rows #1098

This commit is contained in:
SPRINX0\prochazka
2025-05-12 15:55:19 +02:00
committed by Diflow
parent c3e09ddab0
commit fb036935e6
7 changed files with 136 additions and 15 deletions

View File

@@ -82,20 +82,27 @@ class QueryStreamTableWriter {
}
close(afterClose) {
if (this.currentStream) {
this.currentStream.end(() => {
this.writeCurrentStats(true, true);
if (afterClose) afterClose();
});
}
return new Promise(resolve => {
if (this.currentStream) {
this.currentStream.end(() => {
this.writeCurrentStats(true, true);
if (afterClose) afterClose();
resolve();
});
} else {
resolve();
}
});
}
}
class StreamHandler {
constructor(queryStreamInfoHolder, resolve, startLine, sesid = undefined) {
constructor(queryStreamInfoHolder, resolve, startLine, sesid = undefined, limitRows = undefined) {
this.recordset = this.recordset.bind(this);
this.startLine = startLine;
this.sesid = sesid;
this.limitRows = limitRows;
this.rowsLimitOverflow = false;
this.row = this.row.bind(this);
// this.error = this.error.bind(this);
this.done = this.done.bind(this);
@@ -107,6 +114,7 @@ class StreamHandler {
this.plannedStats = false;
this.queryStreamInfoHolder = queryStreamInfoHolder;
this.resolve = resolve;
this.rowCounter = 0;
// currentHandlers = [...currentHandlers, this];
}
@@ -118,6 +126,9 @@ class StreamHandler {
}
recordset(columns) {
if (this.rowsLimitOverflow) {
return;
}
this.closeCurrentWriter();
this.currentWriter = new QueryStreamTableWriter(this.sesid);
this.currentWriter.initializeFromQuery(
@@ -125,6 +136,7 @@ class StreamHandler {
this.queryStreamInfoHolder.resultIndex
);
this.queryStreamInfoHolder.resultIndex += 1;
this.rowCounter = 0;
// this.writeCurrentStats();
@@ -135,8 +147,36 @@ class StreamHandler {
// }, 500);
}
row(row) {
if (this.currentWriter) this.currentWriter.row(row);
else if (row.message) process.send({ msgtype: 'info', info: { message: row.message }, sesid: this.sesid });
if (this.rowsLimitOverflow) {
return;
}
if (this.limitRows && this.rowCounter >= this.limitRows) {
process.send({
msgtype: 'info',
info: { message: `Rows limit overflow, loaded ${this.rowCounter} rows, canceling query`, severity: 'error' },
sesid: this.sesid,
});
this.rowsLimitOverflow = true;
this.queryStreamInfoHolder.canceled = true;
if (this.currentWriter) {
this.currentWriter.close().then(() => {
process.exit(0);
});
} else {
process.exit(0);
}
return;
}
if (this.currentWriter) {
this.currentWriter.row(row);
this.rowCounter += 1;
} else if (row.message) {
process.send({ msgtype: 'info', info: { message: row.message }, sesid: this.sesid });
}
// this.onRow(this.jslid);
}
// error(error) {
@@ -161,10 +201,10 @@ class StreamHandler {
}
}
function handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem, sesid = undefined) {
function handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem, sesid = undefined, limitRows = undefined) {
return new Promise((resolve, reject) => {
const start = sqlItem.trimStart || sqlItem.start;
const handler = new StreamHandler(queryStreamInfoHolder, resolve, start && start.line, sesid);
const handler = new StreamHandler(queryStreamInfoHolder, resolve, start && start.line, sesid, limitRows);
driver.stream(dbhan, sqlItem.text, handler);
});
}