jsl data refactor

This commit is contained in:
Jan Prochazka
2020-10-22 09:39:18 +02:00
parent da1617729b
commit 61b4bf91b0

View File

@@ -3,38 +3,62 @@ const lineReader = require('line-reader');
class JsonLinesDatastore { class JsonLinesDatastore {
constructor(file) { constructor(file) {
this.file = file; this.file = file;
this.readerInfo = { this.reader = null;
reader: null, this.readedDataRowCount = 0;
readedDataRowCount: 0,
readedSchemaRow: false, this.readedSchemaRow = false;
isReading: false, this.isReading = false;
closeAfterRead: null, this.closeAfterRead = null;
closeAfterReadPromise: null, this.closeAfterReadCallback = null;
}; this.closeAfterReadPromise = null;
this.waitForReadyPromise = null;
this.waitForReadyResolve = null;
// this.readerInfo = {
// reader: null,
// readedDataRowCount: 0,
// readedSchemaRow: false,
// isReading: false,
// closeAfterRead: null,
// closeAfterReadPromise: null,
// };
} }
async closeReader() { closeReader() {
if (!this.readerInfo) return Promise.resolve(); if (!this.reader) return;
return new Promise((resolve, reject) => { const reader = this.reader;
this.readerInfo.reader.close((err) => { this.reader = null;
if (err) reject(err); reader.close();
resolve();
});
});
} }
async notifyChanged() { waitForReady() {
if (this.readerInfo && this.readerInfo.isReading) { if (this.isReading) {
if (this.readerInfo.closeAfterRead) { if (this.waitForReadyResolve) {
return this.readerInfo.closeAfterReadPromise; return this.waitForReadyPromise;
} }
const promise = new Promise((resolve, reject) => { const promise = new Promise((resolve, reject) => {
this.readerInfo.closeAfterRead = resolve; this.waitForReadyResolve = resolve;
}); });
this.readerInfo.closeAfterReadPromise = promise; this.waitForReadyPromise = promise;
return promise;
}
return Promise.resolve();
}
async notifyChanged(callback) {
if (this.isReading) {
this.closeAfterReadCallback = callback;
if (this.closeAfterRead) {
return this.closeAfterReadPromise;
}
const promise = new Promise((resolve, reject) => {
this.closeAfterRead = resolve;
});
this.closeAfterReadPromise = promise;
return promise; return promise;
} else { } else {
await this.closeReader(); this.closeReader();
} }
} }
@@ -43,30 +67,21 @@ class JsonLinesDatastore {
lineReader.open(this.file, (err, reader) => { lineReader.open(this.file, (err, reader) => {
if (err) reject(err); if (err) reject(err);
const readerInfo = { resolve(reader);
reader,
readedDataRowCount: 0,
readedSchemaRow: false,
isReading: true,
closeAfterRead: null,
closeAfterReadPromise: null,
};
this.readerInfo = readerInfo;
resolve(readerInfo);
}) })
); );
} }
readLine(readerInfo) { readLine() {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const { reader } = readerInfo; const reader = this.reader;
if (!reader.hasNextLine()) { if (!reader.hasNextLine()) {
resolve(null); resolve(null);
return; return;
} }
reader.nextLine((err, line) => { reader.nextLine((err, line) => {
if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; if (this.readedSchemaRow) this.readedDataRowCount += 1;
else readerInfo.readedSchemaRow = true; else this.readedSchemaRow = true;
if (err) reject(err); if (err) reject(err);
resolve(line); resolve(line);
}); });
@@ -74,40 +89,58 @@ class JsonLinesDatastore {
} }
async ensureReader(offset) { async ensureReader(offset) {
if (this.readerInfo && this.readerInfo.readedDataRowCount > offset) { console.log('ENSURE', offset);
await this.closeReader(); for (;;) {
await this.waitForReady();
if (this.readedDataRowCount > offset) {
this.closeReader();
} }
let readerInfo = this.readerInfo; if (!this.reader) {
if (!readerInfo || !readerInfo.reader) { const reader = await this.openReader();
readerInfo = await this.openReader(); if (this.isReading) {
reader.close(); // throw away this reader
continue; // reader is already used by other getRows, wait for free reader
} }
readerInfo.isReading = true; this.reader = reader;
if (!readerInfo.readedSchemaRow) { this.isReading = true;
await this.readLine(readerInfo); // skip structure break;
} else {
break;
} }
while (readerInfo.readedDataRowCount < offset) {
await this.readLine(readerInfo);
} }
return readerInfo; if (!this.readedSchemaRow) {
await this.readLine(); // skip structure
}
while (this.readedDataRowCount < offset) {
await this.readLine();
}
} }
async getRows(offset, limit) { async getRows(offset, limit) {
const readerInfo = await this.ensureReader(offset); await this.ensureReader(offset);
const res = []; const res = [];
for (let i = 0; i < limit; i += 1) { for (let i = 0; i < limit; i += 1) {
const line = await this.readLine(readerInfo); const line = await this.readLine();
if (line == null) break; if (line == null) break;
res.push(JSON.parse(line)); res.push(JSON.parse(line));
} }
readerInfo.isReading = false; this.isReading = false;
if (readerInfo.closeAfterRead) { if (this.closeAfterRead) {
await this.closeReader(); if (this.closeAfterReadCallback) this.closeAfterReadCallback();
// socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats); this.closeReader();
const resolve = readerInfo.closeAfterRead; const resolve = this.closeAfterRead;
readerInfo.closeAfterRead = null; this.closeAfterRead = null;
readerInfo.closeAfterReadPromise = null; this.closeAfterReadPromise = null;
this.closeAfterReadCallback = null;
resolve(); resolve();
} }
if (this.waitForReadyResolve) {
const resolve = this.waitForReadyResolve;
this.waitForReadyResolve = null;
this.waitForReadyPromise = null;
resolve();
}
console.log('RETURN', res.length);
return res; return res;
} }
} }