This commit is contained in:
Jan Prochazka
2020-06-02 16:55:46 +02:00
parent c0188e866f
commit 7450b7fe85
3 changed files with 69 additions and 40 deletions

View File

@@ -13,28 +13,29 @@ let storedConnection;
let afterConnectCallbacks = [];
let currentHandlers = [];
class StreamHandler {
constructor() {
this.recordset = this.recordset.bind(this);
this.row = this.row.bind(this);
// this.error = this.error.bind(this);
this.done = this.done.bind(this);
this.info = this.info.bind(this);
// use this for cancelling
this.stream = null;
currentHandlers = [...currentHandlers, this];
class TableWriter {
constructor(columns) {
this.jslid = uuidv1();
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
this.currentStream = fs.createWriteStream(this.currentFile);
this.currentRowCount = 0;
this.currentChangeIndex = 0;
fs.writeFileSync(`${this.currentFile}.info`, JSON.stringify(columns));
this.writeCurrentStats(false, false);
process.send({ msgtype: 'recordset', jslid: this.jslid });
}
closeCurrentStream() {
if (this.currentStream) {
this.currentStream.end();
this.writeCurrentStats(true, true);
this.currentStream = null;
this.jslid = null;
this.currentFile = null;
this.currentRowCount = null;
this.currentChangeIndex = null;
row(row) {
// console.log('ACCEPT ROW', row);
this.currentStream.write(JSON.stringify(row) + '\n');
this.currentRowCount += 1;
if (!this.plannedStats) {
this.plannedStats = true;
process.nextTick(() => {
if (this.currentStream) this.currentStream.uncork();
process.nextTick(() => this.writeCurrentStats(false, true));
this.plannedStats = false;
});
}
}
@@ -52,34 +53,57 @@ class StreamHandler {
}
}
recordset(columns) {
this.closeCurrentStream();
this.jslid = uuidv1();
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
this.currentStream = fs.createWriteStream(this.currentFile);
this.currentRowCount = 0;
this.currentChangeIndex = 0;
fs.writeFileSync(`${this.currentFile}.info`, JSON.stringify(columns));
process.send({ msgtype: 'recordset', jslid: this.jslid });
this.writeCurrentStats();
close() {
if (this.currentStream) {
this.currentStream.end(() => {
this.writeCurrentStats(true, true);
});
}
}
}
this.onRow = _.throttle((jslid) => {
if (jslid == this.jslid) {
this.writeCurrentStats(false, true);
}
}, 500);
class StreamHandler {
constructor() {
this.recordset = this.recordset.bind(this);
this.row = this.row.bind(this);
// this.error = this.error.bind(this);
this.done = this.done.bind(this);
this.info = this.info.bind(this);
// use this for cancelling
this.stream = null;
this.plannedStats = false;
currentHandlers = [...currentHandlers, this];
}
closeCurrentWriter() {
if (this.currentWriter) {
this.currentWriter.close();
this.currentWriter = null;
}
}
recordset(columns) {
this.closeCurrentWriter();
this.currentWriter = new TableWriter(columns);
// this.writeCurrentStats();
// this.onRow = _.throttle((jslid) => {
// if (jslid == this.jslid) {
// this.writeCurrentStats(false, true);
// }
// }, 500);
}
row(row) {
// console.log('ACCEPT ROW', row);
this.currentStream.write(JSON.stringify(row) + '\n');
this.currentRowCount += 1;
this.onRow(this.jslid);
this.currentWriter.row(row);
// this.onRow(this.jslid);
}
// error(error) {
// process.send({ msgtype: 'error', error });
// }
done(result) {
this.closeCurrentStream();
this.closeCurrentWriter();
process.send({ msgtype: 'done', result });
currentHandlers = currentHandlers.filter((x) => x != this);
}