scripting engine

This commit is contained in:
Jan Prochazka
2020-06-11 10:09:04 +02:00
parent ca079d5dce
commit f68bdafd9f
11 changed files with 127 additions and 28 deletions

View File

@@ -43,7 +43,7 @@ function start(argument = null) {
}
app.use(cors());
app.use(bodyParser.json());
app.use(bodyParser.json({ limit: '50mb' }));
useController(app, '/connections', connections);
useController(app, '/server-connections', serverConnections);

View File

@@ -4,6 +4,7 @@ const path = require('path');
const fs = require('fs');
const _ = require('lodash');
const childProcessChecker = require('../utility/childProcessChecker');
const goSplit = require('../utility/goSplit');
const driverConnect = require('../utility/driverConnect');
const { jsldir } = require('../utility/directories');
@@ -19,7 +20,7 @@ class TableWriter {
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
this.currentRowCount = 0;
this.currentChangeIndex = 0;
fs.writeFileSync(this.currentFile, JSON.stringify(columns) + '\n');
fs.writeFileSync(this.currentFile, JSON.stringify({ columns }) + '\n');
this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' });
this.writeCurrentStats(false, false);
process.send({ msgtype: 'recordset', jslid: this.jslid });
@@ -140,9 +141,11 @@ async function handleExecuteQuery({ sql }) {
await waitConnected();
const driver = engines(storedConnection);
const handler = new StreamHandler();
const stream = await driver.stream(systemConnection, sql, handler);
handler.stream = stream;
for (const sqlItem of goSplit(sql)) {
const handler = new StreamHandler();
const stream = await driver.stream(systemConnection, sqlItem, handler);
handler.stream = stream;
}
}
const messageHandlers = {

View File

@@ -1,13 +1,36 @@
const csv = require('csv');
const fs = require('fs');
const stream = require('stream');
async function csvWriter({ fileName, encoding = 'utf-8', ...options }) {
class CsvPrepareStream extends stream.Transform {
constructor({ header }) {
super({ objectMode: true });
this.structure = null;
this.header = header;
}
_transform(chunk, encoding, done) {
if (this.structure) {
this.push(this.structure.columns.map((col) => chunk[col.columnName]));
done();
} else {
this.structure = chunk;
if (this.header) {
this.push(chunk.columns.map((x) => x.columnName));
}
done();
}
}
}
async function csvWriter({ fileName, encoding = 'utf-8', header = true, delimiter, quoted }) {
console.log(`Writing file ${fileName}`);
const csvStream = csv.stringify(options);
const csvPrepare = new CsvPrepareStream({ header });
const csvStream = csv.stringify({ delimiter, quoted });
const fileStream = fs.createWriteStream(fileName, encoding);
csvPrepare.pipe(csvStream);
csvStream.pipe(fileStream);
csvStream['finisher'] = fileStream;
return csvStream;
csvPrepare['finisher'] = fileStream;
return csvPrepare;
}
module.exports = csvWriter;

View File

@@ -5,6 +5,7 @@ async function fakeObjectReader({ delay = 0 } = {}) {
objectMode: true,
});
function doWrite() {
pass.write({ columns: [{ columnName: 'id' }, { columnName: 'country' }] });
pass.write({ id: 1, country: 'Czechia' });
pass.write({ id: 2, country: 'Austria' });
pass.write({ country: 'Germany', id: 3 });

View File

@@ -0,0 +1,18 @@
function goSplit(sql) {
if (!sql) return [];
const lines = sql.split('\n');
const res = [];
let buffer = '';
for (const line of lines) {
if (/^\s*go\s*$/i.test(line)) {
if (buffer.trim()) res.push(buffer);
buffer = '';
} else {
buffer += line + '\n';
}
}
if (buffer.trim()) res.push(buffer);
return res;
}
module.exports = goSplit;