diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js index eed749143..7ed08076b 100644 --- a/packages/api/src/utility/JsonLinesDatastore.js +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -3,38 +3,62 @@ const lineReader = require('line-reader'); class JsonLinesDatastore { constructor(file) { this.file = file; - this.readerInfo = { - reader: null, - readedDataRowCount: 0, - readedSchemaRow: false, - isReading: false, - closeAfterRead: null, - closeAfterReadPromise: null, - }; + this.reader = null; + this.readedDataRowCount = 0; + + this.readedSchemaRow = false; + this.isReading = false; + this.closeAfterRead = null; + this.closeAfterReadCallback = null; + this.closeAfterReadPromise = null; + + this.waitForReadyPromise = null; + this.waitForReadyResolve = null; + + // this.readerInfo = { + // reader: null, + // readedDataRowCount: 0, + // readedSchemaRow: false, + // isReading: false, + // closeAfterRead: null, + // closeAfterReadPromise: null, + // }; } - async closeReader() { - if (!this.readerInfo) return Promise.resolve(); - return new Promise((resolve, reject) => { - this.readerInfo.reader.close((err) => { - if (err) reject(err); - resolve(); - }); - }); + closeReader() { + if (!this.reader) return; + const reader = this.reader; + this.reader = null; + reader.close(); } - async notifyChanged() { - if (this.readerInfo && this.readerInfo.isReading) { - if (this.readerInfo.closeAfterRead) { - return this.readerInfo.closeAfterReadPromise; + waitForReady() { + if (this.isReading) { + if (this.waitForReadyResolve) { + return this.waitForReadyPromise; } const promise = new Promise((resolve, reject) => { - this.readerInfo.closeAfterRead = resolve; + this.waitForReadyResolve = resolve; }); - this.readerInfo.closeAfterReadPromise = promise; + this.waitForReadyPromise = promise; + return promise; + } + return Promise.resolve(); + } + + async notifyChanged(callback) { + if (this.isReading) { + this.closeAfterReadCallback = callback; + if (this.closeAfterRead) { + return this.closeAfterReadPromise; + } + const promise = new Promise((resolve, reject) => { + this.closeAfterRead = resolve; + }); + this.closeAfterReadPromise = promise; return promise; } else { - await this.closeReader(); + this.closeReader(); } } @@ -43,30 +67,21 @@ class JsonLinesDatastore { lineReader.open(this.file, (err, reader) => { if (err) reject(err); - const readerInfo = { - reader, - readedDataRowCount: 0, - readedSchemaRow: false, - isReading: true, - closeAfterRead: null, - closeAfterReadPromise: null, - }; - this.readerInfo = readerInfo; - resolve(readerInfo); + resolve(reader); }) ); } - readLine(readerInfo) { + readLine() { return new Promise((resolve, reject) => { - const { reader } = readerInfo; + const reader = this.reader; if (!reader.hasNextLine()) { resolve(null); return; } reader.nextLine((err, line) => { - if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; - else readerInfo.readedSchemaRow = true; + if (this.readedSchemaRow) this.readedDataRowCount += 1; + else this.readedSchemaRow = true; if (err) reject(err); resolve(line); }); @@ -74,40 +89,58 @@ class JsonLinesDatastore { } async ensureReader(offset) { - if (this.readerInfo && this.readerInfo.readedDataRowCount > offset) { - await this.closeReader(); + console.log('ENSURE', offset); + for (;;) { + await this.waitForReady(); + if (this.readedDataRowCount > offset) { + this.closeReader(); + } + if (!this.reader) { + const reader = await this.openReader(); + if (this.isReading) { + reader.close(); // throw away this reader + continue; // reader is already used by other getRows, wait for free reader + } + this.reader = reader; + this.isReading = true; + break; + } else { + break; + } } - let readerInfo = this.readerInfo; - if (!readerInfo || !readerInfo.reader) { - readerInfo = await this.openReader(); + if (!this.readedSchemaRow) { + await this.readLine(); // skip structure } - readerInfo.isReading = true; - if (!readerInfo.readedSchemaRow) { - await this.readLine(readerInfo); // skip structure + while (this.readedDataRowCount < offset) { + await this.readLine(); } - while (readerInfo.readedDataRowCount < offset) { - await this.readLine(readerInfo); - } - return readerInfo; } async getRows(offset, limit) { - const readerInfo = await this.ensureReader(offset); + await this.ensureReader(offset); const res = []; for (let i = 0; i < limit; i += 1) { - const line = await this.readLine(readerInfo); + const line = await this.readLine(); if (line == null) break; res.push(JSON.parse(line)); } - readerInfo.isReading = false; - if (readerInfo.closeAfterRead) { - await this.closeReader(); - // socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats); - const resolve = readerInfo.closeAfterRead; - readerInfo.closeAfterRead = null; - readerInfo.closeAfterReadPromise = null; + this.isReading = false; + if (this.closeAfterRead) { + if (this.closeAfterReadCallback) this.closeAfterReadCallback(); + this.closeReader(); + const resolve = this.closeAfterRead; + this.closeAfterRead = null; + this.closeAfterReadPromise = null; + this.closeAfterReadCallback = null; resolve(); } + if (this.waitForReadyResolve) { + const resolve = this.waitForReadyResolve; + this.waitForReadyResolve = null; + this.waitForReadyPromise = null; + resolve(); + } + console.log('RETURN', res.length); return res; } }