jsl data store fix - uses lock, tested OK on all problematic queries

This commit is contained in:
Jan Prochazka
2020-10-22 10:02:11 +02:00
parent 61b4bf91b0
commit 541f064ddb
3 changed files with 33 additions and 86 deletions

View File

@@ -1,72 +1,39 @@
const lineReader = require('line-reader');
const AsyncLock = require('async-lock');
const lock = new AsyncLock();
class JsonLinesDatastore {
constructor(file) {
this.file = file;
this.reader = null;
this.readedDataRowCount = 0;
this.readedSchemaRow = false;
this.isReading = false;
this.closeAfterRead = null;
this.closeAfterReadCallback = null;
this.closeAfterReadPromise = null;
this.waitForReadyPromise = null;
this.waitForReadyResolve = null;
// this.readerInfo = {
// reader: null,
// readedDataRowCount: 0,
// readedSchemaRow: false,
// isReading: false,
// closeAfterRead: null,
// closeAfterReadPromise: null,
// };
this.notifyChangedCallback = null;
}
closeReader() {
if (!this.reader) return;
const reader = this.reader;
this.reader = null;
reader.close();
}
waitForReady() {
if (this.isReading) {
if (this.waitForReadyResolve) {
return this.waitForReadyPromise;
}
const promise = new Promise((resolve, reject) => {
this.waitForReadyResolve = resolve;
});
this.waitForReadyPromise = promise;
return promise;
}
return Promise.resolve();
this.readedDataRowCount = 0;
this.readedSchemaRow = false;
reader.close(() => {});
}
async notifyChanged(callback) {
if (this.isReading) {
this.closeAfterReadCallback = callback;
if (this.closeAfterRead) {
return this.closeAfterReadPromise;
}
const promise = new Promise((resolve, reject) => {
this.closeAfterRead = resolve;
});
this.closeAfterReadPromise = promise;
return promise;
} else {
this.notifyChangedCallback = callback;
await lock.acquire('reader', async () => {
this.closeReader();
}
});
const call = this.notifyChangedCallback;
this.notifyChangedCallback = null;
if (call) call();
}
async openReader() {
return new Promise((resolve, reject) =>
lineReader.open(this.file, (err, reader) => {
if (err) reject(err);
resolve(reader);
})
);
@@ -89,24 +56,12 @@ class JsonLinesDatastore {
}
async ensureReader(offset) {
console.log('ENSURE', offset);
for (;;) {
await this.waitForReady();
if (this.readedDataRowCount > offset) {
this.closeReader();
}
if (!this.reader) {
const reader = await this.openReader();
if (this.isReading) {
reader.close(); // throw away this reader
continue; // reader is already used by other getRows, wait for free reader
}
this.reader = reader;
this.isReading = true;
break;
} else {
break;
}
if (this.readedDataRowCount > offset) {
this.closeReader();
}
if (!this.reader) {
const reader = await this.openReader();
this.reader = reader;
}
if (!this.readedSchemaRow) {
await this.readLine(); // skip structure
@@ -117,30 +72,16 @@ class JsonLinesDatastore {
}
async getRows(offset, limit) {
await this.ensureReader(offset);
const res = [];
for (let i = 0; i < limit; i += 1) {
const line = await this.readLine();
if (line == null) break;
res.push(JSON.parse(line));
}
this.isReading = false;
if (this.closeAfterRead) {
if (this.closeAfterReadCallback) this.closeAfterReadCallback();
this.closeReader();
const resolve = this.closeAfterRead;
this.closeAfterRead = null;
this.closeAfterReadPromise = null;
this.closeAfterReadCallback = null;
resolve();
}
if (this.waitForReadyResolve) {
const resolve = this.waitForReadyResolve;
this.waitForReadyResolve = null;
this.waitForReadyPromise = null;
resolve();
}
console.log('RETURN', res.length);
await lock.acquire('reader', async () => {
await this.ensureReader(offset);
for (let i = 0; i < limit; i += 1) {
const line = await this.readLine();
if (line == null) break;
res.push(JSON.parse(line));
}
});
// console.log('RETURN', res.length);
return res;
}
}