diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js index b7f9c109d..c34cb3300 100644 --- a/packages/api/src/utility/JsonLinesDatastore.js +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -2,7 +2,6 @@ const fs = require('fs'); const os = require('os'); const rimraf = require('rimraf'); const path = require('path'); -const lineReader = require('line-reader'); const AsyncLock = require('async-lock'); const lock = new AsyncLock(); const stableStringify = require('json-stable-stringify'); @@ -11,23 +10,7 @@ const requirePluginFunction = require('./requirePluginFunction'); const esort = require('external-sorting'); const uuidv1 = require('uuid/v1'); const { jsldir } = require('./directories'); - -function fetchNextLineFromReader(reader) { - return new Promise((resolve, reject) => { - if (!reader.hasNextLine()) { - resolve(null); - return; - } - - reader.nextLine((err, line) => { - if (err) { - reject(err); - } else { - resolve(line); - } - }); - }); -} +const LineReader = require('./LineReader'); class JsonLinesDatastore { constructor(file, formatterFunction) { @@ -74,7 +57,7 @@ class JsonLinesDatastore { await new Promise(resolve => rimraf(tempDir, resolve)); } - _closeReader() { + async _closeReader() { // console.log('CLOSING READER', this.reader); if (!this.reader) return; const reader = this.reader; @@ -84,7 +67,7 @@ class JsonLinesDatastore { // this.firstRowToBeReturned = null; this.currentFilter = null; this.currentSort = null; - return new Promise(resolve => reader.close(resolve)); + await reader.close(); } async notifyChanged(callback) { @@ -100,12 +83,9 @@ class JsonLinesDatastore { async _openReader(fileName) { // console.log('OPENING READER', fileName); // console.log(fs.readFileSync(fileName, 'utf-8')); - return new Promise((resolve, reject) => - lineReader.open(fileName, (err, reader) => { - if (err) reject(err); - resolve(reader); - }) - ); + + const fileStream = fs.createReadStream(fileName); + return new LineReader(fileStream); } parseLine(line) { @@ -120,7 +100,7 @@ class JsonLinesDatastore { // return res; // } for (;;) { - const line = await fetchNextLineFromReader(this.reader); + const line = await this.reader.readLine(); if (!line) { // EOF return null; @@ -240,6 +220,7 @@ class JsonLinesDatastore { // console.log(JSON.stringify(this.currentFilter, undefined, 2)); for (let i = 0; i < limit; i += 1) { const line = await this._readLine(true); + // console.log('READED LINE', i); if (line == null) break; res.push(line); } diff --git a/packages/api/src/utility/LineReader.js b/packages/api/src/utility/LineReader.js new file mode 100644 index 000000000..7fbed5b48 --- /dev/null +++ b/packages/api/src/utility/LineReader.js @@ -0,0 +1,88 @@ +const readline = require('readline'); + +class Queue { + constructor() { + this.elements = {}; + this.head = 0; + this.tail = 0; + } + enqueue(element) { + this.elements[this.tail] = element; + this.tail++; + } + dequeue() { + const item = this.elements[this.head]; + delete this.elements[this.head]; + this.head++; + return item; + } + peek() { + return this.elements[this.head]; + } + getLength() { + return this.tail - this.head; + } + isEmpty() { + return this.getLength() === 0; + } +} + +class LineReader { + constructor(input) { + this.input = input; + this.queue = new Queue(); + this.resolve = null; + this.isEnded = false; + this.rl = readline.createInterface({ + input, + }); + this.input.pause(); + + this.rl.on('line', line => { + this.input.pause(); + if (this.resolve) { + const resolve = this.resolve; + this.resolve = null; + resolve(line); + return; + } + this.queue.enqueue(line); + }); + + this.rl.on('close', () => { + if (this.resolve) { + const resolve = this.resolve; + this.resolve = null; + this.isEnded = true; + resolve(null); + return; + } + this.queue.enqueue(null); + }); + } + + readLine() { + if (this.isEnded) { + return Promise.resolve(null); + } + + if (!this.queue.isEmpty()) { + const res = this.queue.dequeue(); + if (res == null) this.isEnded = true; + return Promise.resolve(res); + } + + this.input.resume(); + + return new Promise(resolve => { + this.resolve = resolve; + }); + } + + close() { + this.isEnded = true; + return new Promise(resolve => this.input.close(resolve)); + } +} + +module.exports = LineReader;