support app queries

This commit is contained in:
Jan Prochazka
2022-03-17 19:32:56 +01:00
parent e181318e24
commit d888feeaf8
13 changed files with 234 additions and 34 deletions

View File

@@ -17,11 +17,15 @@ let afterConnectCallbacks = [];
// let currentHandlers = [];
class TableWriter {
constructor(structure, resultIndex) {
this.jslid = uuidv1();
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
constructor() {
this.currentRowCount = 0;
this.currentChangeIndex = 1;
this.initializedFile = false;
}
initializeFromQuery(structure, resultIndex) {
this.jslid = uuidv1();
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
fs.writeFileSync(
this.currentFile,
JSON.stringify({
@@ -32,13 +36,21 @@ class TableWriter {
this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' });
this.writeCurrentStats(false, false);
this.resultIndex = resultIndex;
this.initializedFile = true;
process.send({ msgtype: 'recordset', jslid: this.jslid, resultIndex });
}
initializeFromReader(jslid) {
this.jslid = jslid;
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
this.writeCurrentStats(false, false);
}
row(row) {
// console.log('ACCEPT ROW', row);
this.currentStream.write(JSON.stringify(row) + '\n');
this.currentRowCount += 1;
if (!this.plannedStats) {
this.plannedStats = true;
process.nextTick(() => {
@@ -49,6 +61,21 @@ class TableWriter {
}
}
rowFromReader(row) {
if (!this.initializedFile) {
process.send({ msgtype: 'initializeFile', jslid: this.jslid });
this.initializedFile = true;
fs.writeFileSync(this.currentFile, JSON.stringify(row) + '\n');
this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' });
this.writeCurrentStats(false, false);
this.initializedFile = true;
return;
}
this.row(row);
}
writeCurrentStats(isFinished = false, emitEvent = false) {
const stats = {
rowCount: this.currentRowCount,
@@ -63,10 +90,11 @@ class TableWriter {
}
}
close() {
close(afterClose) {
if (this.currentStream) {
this.currentStream.end(() => {
this.writeCurrentStats(true, true);
if (afterClose) afterClose();
});
}
}
@@ -98,7 +126,11 @@ class StreamHandler {
recordset(columns) {
this.closeCurrentWriter();
this.currentWriter = new TableWriter(Array.isArray(columns) ? { columns } : columns, this.resultIndexHolder.value);
this.currentWriter = new TableWriter();
this.currentWriter.initializeFromQuery(
Array.isArray(columns) ? { columns } : columns,
this.resultIndexHolder.value
);
this.resultIndexHolder.value += 1;
// this.writeCurrentStats();
@@ -110,7 +142,6 @@ class StreamHandler {
// }, 500);
}
row(row) {
// console.log('ACCEPT ROW', row);
if (this.currentWriter) this.currentWriter.row(row);
else if (row.message) process.send({ msgtype: 'info', info: { message: row.message } });
// this.onRow(this.jslid);
@@ -135,20 +166,21 @@ function handleStream(driver, resultIndexHolder, sql) {
});
}
function ensureExecuteCustomScript(driver) {
function allowExecuteCustomScript(driver) {
if (driver.readOnlySessions) {
return;
return true;
}
if (storedConnection.isReadOnly) {
throw new Error('Connection is read only');
return false;
// throw new Error('Connection is read only');
}
return true;
}
async function handleConnect(connection) {
storedConnection = connection;
const driver = requireEngineDriver(storedConnection);
ensureExecuteCustomScript(driver);
systemConnection = await connectUtility(driver, storedConnection);
for (const [resolve] of afterConnectCallbacks) {
resolve();
@@ -173,6 +205,19 @@ async function handleExecuteQuery({ sql }) {
await waitConnected();
const driver = requireEngineDriver(storedConnection);
if (!allowExecuteCustomScript(driver)) {
process.send({
msgtype: 'info',
info: {
message: 'Connection without read-only sessions is read only',
severity: 'error',
},
});
process.send({ msgtype: 'done', skipFinishedMessage: true });
return;
//process.send({ msgtype: 'error', error: e.message });
}
const resultIndexHolder = {
value: 0,
};
@@ -186,9 +231,39 @@ async function handleExecuteQuery({ sql }) {
process.send({ msgtype: 'done' });
}
async function handleExecuteReader({ jslid, sql, fileName }) {
await waitConnected();
const driver = requireEngineDriver(storedConnection);
if (fileName) {
sql = fs.readFileSync(fileName, 'utf-8');
} else {
if (!allowExecuteCustomScript(driver)) {
process.send({ msgtype: 'done' });
return;
}
}
const writer = new TableWriter();
writer.initializeFromReader(jslid);
const reader = await driver.readQuery(systemConnection, sql);
reader.on('data', data => {
writer.rowFromReader(data);
});
reader.on('end', () => {
writer.close(() => {
process.send({ msgtype: 'done' });
});
});
}
const messageHandlers = {
connect: handleConnect,
executeQuery: handleExecuteQuery,
executeReader: handleExecuteReader,
// cancel: handleCancel,
};