diff --git a/packages/api/src/controllers/jsldata.js b/packages/api/src/controllers/jsldata.js index 2aea7ea5a..ff262154b 100644 --- a/packages/api/src/controllers/jsldata.js +++ b/packages/api/src/controllers/jsldata.js @@ -37,6 +37,7 @@ module.exports = { openReader(jslid) { // console.log('OPENING READER'); + console.log('OPENING READER, LINES=', fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length); const file = path.join(jsldir(), `${jslid}.jsonl`); return new Promise((resolve, reject) => lineReader.open(file, (err, reader) => { @@ -87,6 +88,7 @@ module.exports = { }, async notifyChangedStats(stats) { + console.log('SENDING STATS', JSON.stringify(stats)); await this.closeReader(stats.jslid); socket.emit(`jsldata-stats-${stats.jslid}`, stats); }, diff --git a/packages/api/src/proc/sessionProcess.js b/packages/api/src/proc/sessionProcess.js index 98198e601..e6699e180 100644 --- a/packages/api/src/proc/sessionProcess.js +++ b/packages/api/src/proc/sessionProcess.js @@ -13,28 +13,29 @@ let storedConnection; let afterConnectCallbacks = []; let currentHandlers = []; -class StreamHandler { - constructor() { - this.recordset = this.recordset.bind(this); - this.row = this.row.bind(this); - // this.error = this.error.bind(this); - this.done = this.done.bind(this); - this.info = this.info.bind(this); - // use this for cancelling - this.stream = null; - currentHandlers = [...currentHandlers, this]; +class TableWriter { + constructor(columns) { + this.jslid = uuidv1(); + this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`); + this.currentStream = fs.createWriteStream(this.currentFile); + this.currentRowCount = 0; + this.currentChangeIndex = 0; + fs.writeFileSync(`${this.currentFile}.info`, JSON.stringify(columns)); + this.writeCurrentStats(false, false); + process.send({ msgtype: 'recordset', jslid: this.jslid }); } - closeCurrentStream() { - if (this.currentStream) { - this.currentStream.end(); - this.writeCurrentStats(true, true); - - this.currentStream = null; - this.jslid = null; - this.currentFile = null; - this.currentRowCount = null; - this.currentChangeIndex = null; + row(row) { + // console.log('ACCEPT ROW', row); + this.currentStream.write(JSON.stringify(row) + '\n'); + this.currentRowCount += 1; + if (!this.plannedStats) { + this.plannedStats = true; + process.nextTick(() => { + if (this.currentStream) this.currentStream.uncork(); + process.nextTick(() => this.writeCurrentStats(false, true)); + this.plannedStats = false; + }); } } @@ -52,34 +53,57 @@ class StreamHandler { } } - recordset(columns) { - this.closeCurrentStream(); - this.jslid = uuidv1(); - this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`); - this.currentStream = fs.createWriteStream(this.currentFile); - this.currentRowCount = 0; - this.currentChangeIndex = 0; - fs.writeFileSync(`${this.currentFile}.info`, JSON.stringify(columns)); - process.send({ msgtype: 'recordset', jslid: this.jslid }); - this.writeCurrentStats(); + close() { + if (this.currentStream) { + this.currentStream.end(() => { + this.writeCurrentStats(true, true); + }); + } + } +} - this.onRow = _.throttle((jslid) => { - if (jslid == this.jslid) { - this.writeCurrentStats(false, true); - } - }, 500); +class StreamHandler { + constructor() { + this.recordset = this.recordset.bind(this); + this.row = this.row.bind(this); + // this.error = this.error.bind(this); + this.done = this.done.bind(this); + this.info = this.info.bind(this); + // use this for cancelling + this.stream = null; + this.plannedStats = false; + currentHandlers = [...currentHandlers, this]; + } + + closeCurrentWriter() { + if (this.currentWriter) { + this.currentWriter.close(); + this.currentWriter = null; + } + } + + recordset(columns) { + this.closeCurrentWriter(); + this.currentWriter = new TableWriter(columns); + + // this.writeCurrentStats(); + + // this.onRow = _.throttle((jslid) => { + // if (jslid == this.jslid) { + // this.writeCurrentStats(false, true); + // } + // }, 500); } row(row) { // console.log('ACCEPT ROW', row); - this.currentStream.write(JSON.stringify(row) + '\n'); - this.currentRowCount += 1; - this.onRow(this.jslid); + this.currentWriter.row(row); + // this.onRow(this.jslid); } // error(error) { // process.send({ msgtype: 'error', error }); // } done(result) { - this.closeCurrentStream(); + this.closeCurrentWriter(); process.send({ msgtype: 'done', result }); currentHandlers = currentHandlers.filter((x) => x != this); } diff --git a/packages/web/src/datagrid/DataGridCore.js b/packages/web/src/datagrid/DataGridCore.js index 0de53fe4b..1a990de6e 100644 --- a/packages/web/src/datagrid/DataGridCore.js +++ b/packages/web/src/datagrid/DataGridCore.js @@ -208,6 +208,7 @@ export default function DataGridCore(props) { loadedTime: new Date().getTime(), allRowCount: null, errorMessage: null, + jslStatsCounter: 0, }); const { isLoading, loadedRows, isLoadedAll, loadedTime, allRowCount, errorMessage } = loadProps; @@ -285,11 +286,11 @@ export default function DataGridCore(props) { const loadedInfo = { loadedRows: [...loadedRows, ...nextRows], loadedTime, - isLoadedAll: nextRows.length === 0, }; setLoadProps((oldLoadProps) => ({ ...oldLoadProps, isLoading: false, + isLoadedAll: oldLoadProps.jslStatsCounter == loadProps.jslStatsCounter && nextRows.length === 0, ...loadedInfo, })); } @@ -375,6 +376,7 @@ export default function DataGridCore(props) { isLoadedAll: false, loadedTime: new Date().getTime(), errorMessage: null, + jslStatsCounter: 0, }); }; @@ -419,6 +421,7 @@ export default function DataGridCore(props) { ...oldProps, allRowCount: stats.rowCount, isLoadedAll: false, + jslStatsCounter: oldProps.jslStatsCounter + 1, })); }, []);