sort JSONL data & query results

This commit is contained in:
Jan Prochazka
2023-02-04 15:27:55 +01:00
parent 3babe95944
commit 57fa9335d4
6 changed files with 81 additions and 9 deletions

View File

@@ -1,9 +1,16 @@
const fs = require('fs');
const os = require('os');
const rimraf = require('rimraf');
const path = require('path');
const lineReader = require('line-reader');
const AsyncLock = require('async-lock');
const lock = new AsyncLock();
const stableStringify = require('json-stable-stringify');
const { evaluateCondition } = require('dbgate-sqltree');
const requirePluginFunction = require('./requirePluginFunction');
const esort = require('external-sorting');
const uuidv1 = require('uuid/v1');
const { jsldir } = require('./directories');
function fetchNextLineFromReader(reader) {
return new Promise((resolve, reject) => {
@@ -32,7 +39,39 @@ class JsonLinesDatastore {
// this.firstRowToBeReturned = null;
this.notifyChangedCallback = null;
this.currentFilter = null;
this.currentSort = null;
this.rowFormatter = requirePluginFunction(formatterFunction);
this.sortedFiles = {};
}
static async sortFile(infile, outfile, sort) {
const tempDir = path.join(os.tmpdir(), uuidv1());
fs.mkdirSync(tempDir);
await esort
.default({
input: fs.createReadStream(infile),
output: fs.createWriteStream(outfile),
deserializer: JSON.parse,
serializer: JSON.stringify,
tempDir,
maxHeap: 100,
comparer: (a, b) => {
for (const item of sort) {
const { uniqueName, order } = item;
if (a[uniqueName] < b[uniqueName]) {
return order == 'ASC' ? -1 : 1;
}
if (a[uniqueName] > b[uniqueName]) {
return order == 'ASC' ? 1 : -1;
}
}
return 0;
},
})
.asc();
await rimraf(tempDir);
}
_closeReader() {
@@ -43,6 +82,7 @@ class JsonLinesDatastore {
this.readedSchemaRow = false;
// this.firstRowToBeReturned = null;
this.currentFilter = null;
this.currentSort = null;
reader.close(() => {});
}
@@ -56,9 +96,9 @@ class JsonLinesDatastore {
if (call) call();
}
async _openReader() {
async _openReader(fileName) {
return new Promise((resolve, reject) =>
lineReader.open(this.file, (err, reader) => {
lineReader.open(fileName, (err, reader) => {
if (err) reject(err);
resolve(reader);
})
@@ -140,14 +180,19 @@ class JsonLinesDatastore {
// });
}
async _ensureReader(offset, filter) {
if (this.readedDataRowCount > offset || stableStringify(filter) != stableStringify(this.currentFilter)) {
async _ensureReader(offset, filter, sort) {
if (
this.readedDataRowCount > offset ||
stableStringify(filter) != stableStringify(this.currentFilter) ||
stableStringify(sort) != stableStringify(this.currentSort)
) {
this._closeReader();
}
if (!this.reader) {
const reader = await this._openReader();
const reader = await this._openReader(sort ? this.sortedFiles[stableStringify(sort)] : this.file);
this.reader = reader;
this.currentFilter = filter;
this.currentSort = sort;
}
// if (!this.readedSchemaRow) {
// const line = await this._readLine(true); // skip structure
@@ -179,10 +224,16 @@ class JsonLinesDatastore {
});
}
async getRows(offset, limit, filter) {
async getRows(offset, limit, filter, sort) {
const res = [];
if (sort && !this.sortedFiles[stableStringify(sort)]) {
const jslid = uuidv1();
const sortedFile = path.join(jsldir(), `${jslid}.jsonl`);
await JsonLinesDatastore.sortFile(this.file, sortedFile, sort);
this.sortedFiles[stableStringify(sort)] = sortedFile;
}
await lock.acquire('reader', async () => {
await this._ensureReader(offset, filter);
await this._ensureReader(offset, filter, sort);
// console.log(JSON.stringify(this.currentFilter, undefined, 2));
for (let i = 0; i < limit; i += 1) {
const line = await this._readLine(true);