jsl data filter (archive, query result)

This commit is contained in:
Jan Prochazka
2020-11-19 15:09:31 +01:00
parent 4a7d45e4d0
commit b92e28695e
8 changed files with 228 additions and 39 deletions

View File

@@ -1,6 +1,25 @@
const lineReader = require('line-reader');
const AsyncLock = require('async-lock');
const lock = new AsyncLock();
const stableStringify = require('json-stable-stringify');
const { evaluateCondition } = require('dbgate-sqltree');
async function fetchNextLine(reader) {
return new Promise((resolve, reject) => {
if (!reader.hasNextLine()) {
resolve(null);
return;
}
reader.nextLine((err, line) => {
if (err) {
reject(err);
} else {
resolve(line);
}
});
});
}
class JsonLinesDatastore {
constructor(file) {
@@ -9,6 +28,7 @@ class JsonLinesDatastore {
this.readedDataRowCount = 0;
this.readedSchemaRow = false;
this.notifyChangedCallback = null;
this.currentFilter = null;
}
_closeReader() {
@@ -17,6 +37,7 @@ class JsonLinesDatastore {
this.reader = null;
this.readedDataRowCount = 0;
this.readedSchemaRow = false;
this.currentFilter = null;
reader.close(() => {});
}
@@ -39,46 +60,92 @@ class JsonLinesDatastore {
);
}
_readLine() {
return new Promise((resolve, reject) => {
const reader = this.reader;
if (!reader.hasNextLine()) {
resolve(null);
return;
async _readLine(parse) {
for (;;) {
const line = await fetchNextLine(this.reader);
if (!line) {
// EOF
return null;
}
reader.nextLine((err, line) => {
if (this.readedSchemaRow) this.readedDataRowCount += 1;
else this.readedSchemaRow = true;
if (err) reject(err);
resolve(line);
});
});
if (!this.readedSchemaRow) {
this.readedSchemaRow = true;
return true;
}
if (this.currentFilter) {
const parsedLine = JSON.parse(line);
if (evaluateCondition(this.currentFilter, parsedLine)) {
this.readedDataRowCount += 1;
return parse ? parsedLine : true;
}
} else {
this.readedDataRowCount += 1;
return parse ? JSON.parse(line) : true;
}
}
// return new Promise((resolve, reject) => {
// const reader = this.reader;
// if (!reader.hasNextLine()) {
// resolve(null);
// return;
// }
// reader.nextLine((err, line) => {
// if (err) {
// reject(err);
// return;
// }
// if (!this.readedSchemaRow) {
// this.readedSchemaRow = true;
// resolve(true);
// return;
// }
// if (this.currentFilter) {
// const parsedLine = JSON.parse(line);
// if (evaluateCondition(this.currentFilter, parsedLine)) {
// console.log('TRUE');
// resolve(parse ? parsedLine : true);
// this.readedDataRowCount += 1;
// return;
// } else {
// console.log('FALSE');
// // skip row
// return;
// }
// }
// this.readedDataRowCount += 1;
// resolve(parse ? JSON.parse(line) : true);
// });
// });
}
async _ensureReader(offset) {
if (this.readedDataRowCount > offset) {
async _ensureReader(offset, filter) {
if (this.readedDataRowCount > offset || stableStringify(filter) != stableStringify(this.currentFilter)) {
this._closeReader();
}
if (!this.reader) {
const reader = await this._openReader();
this.reader = reader;
this.currentFilter = filter;
}
if (!this.readedSchemaRow) {
await this._readLine(); // skip structure
await this._readLine(false); // skip structure
}
while (this.readedDataRowCount < offset) {
await this._readLine();
await this._readLine(false);
}
}
async getRows(offset, limit) {
async getRows(offset, limit, filter) {
const res = [];
await lock.acquire('reader', async () => {
await this._ensureReader(offset);
await this._ensureReader(offset, filter);
for (let i = 0; i < limit; i += 1) {
const line = await this._readLine();
const line = await this._readLine(true);
if (line == null) break;
res.push(JSON.parse(line));
res.push(line);
}
});
// console.log('RETURN', res.length);