From 1a54d6bab0df6fe42e4ccf6292e59cbd26277a6a Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Mon, 19 Oct 2020 17:11:27 +0200 Subject: [PATCH 1/7] close all fix --- packages/web/src/TabsPanel.js | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/web/src/TabsPanel.js b/packages/web/src/TabsPanel.js index 9a5a1514b..abbc02e1b 100644 --- a/packages/web/src/TabsPanel.js +++ b/packages/web/src/TabsPanel.js @@ -176,7 +176,14 @@ export default function TabsPanel() { const closeTab = closeTabFunc((x, active) => x.tabid == active.tabid); const closeAll = () => { - setOpenedTabs([]); + const closedTime = new Date().getTime(); + setOpenedTabs((tabs) => + tabs.map((tab) => ({ + ...tab, + closedTime: tab.closedTime || closedTime, + selected: false, + })) + ); }; const closeWithSameDb = closeTabFunc( (x, active) => From c4914429ce51a4df99c6478907abb288385aa98b Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Mon, 19 Oct 2020 17:21:59 +0200 Subject: [PATCH 2/7] v3.7.22 --- app/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/package.json b/app/package.json index e65acba9f..e98f8ad36 100644 --- a/app/package.json +++ b/app/package.json @@ -1,6 +1,6 @@ { "name": "dbgate", - "version": "3.7.21", + "version": "3.7.22", "private": true, "author": "Jan Prochazka ", "dependencies": { From da1617729bc7e09f6a5878fd677cadd8542c2d99 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 22 Oct 2020 08:23:53 +0200 Subject: [PATCH 3/7] jsl data refactor --- packages/api/src/controllers/jsldata.js | 174 +++++++++--------- packages/api/src/proc/index.js | 2 + packages/api/src/proc/jslDatastoreProcess.js | 57 ++++++ packages/api/src/utility/DatastoreProxy.js | 55 ++++++ .../api/src/utility/JsonLinesDatastore.js | 115 ++++++++++++ 5 files changed, 318 insertions(+), 85 deletions(-) create mode 100644 packages/api/src/proc/jslDatastoreProcess.js create mode 100644 packages/api/src/utility/DatastoreProxy.js create mode 100644 packages/api/src/utility/JsonLinesDatastore.js diff --git a/packages/api/src/controllers/jsldata.js b/packages/api/src/controllers/jsldata.js index 9c71bb634..b451c130a 100644 --- a/packages/api/src/controllers/jsldata.js +++ b/packages/api/src/controllers/jsldata.js @@ -1,6 +1,9 @@ const fs = require('fs'); const lineReader = require('line-reader'); +const { off } = require('process'); +const DatastoreProxy = require('../utility/DatastoreProxy'); const getJslFileName = require('../utility/getJslFileName'); +const JsonLinesDatastore = require('../utility/JsonLinesDatastore'); const socket = require('../utility/socket'); function readFirstLine(file) { @@ -19,76 +22,85 @@ function readFirstLine(file) { }); } - module.exports = { - openedReaders: {}, + datastores: {}, - closeReader(jslid) { - // console.log('CLOSING READER'); - if (!this.openedReaders[jslid]) return Promise.resolve(); - return new Promise((resolve, reject) => { - this.openedReaders[jslid].reader.close((err) => { - if (err) reject(err); - delete this.openedReaders[jslid]; - resolve(); - }); - }); - }, + // closeReader(jslid) { + // // console.log('CLOSING READER'); + // if (!this.openedReaders[jslid]) return Promise.resolve(); + // return new Promise((resolve, reject) => { + // this.openedReaders[jslid].reader.close((err) => { + // if (err) reject(err); + // delete this.openedReaders[jslid]; + // resolve(); + // }); + // }); + // }, - readLine(readerInfo) { - return new Promise((resolve, reject) => { - const { reader } = readerInfo; - if (!reader.hasNextLine()) { - resolve(null); - return; - } - reader.nextLine((err, line) => { - if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; - else readerInfo.readedSchemaRow = true; - if (err) reject(err); - resolve(line); - }); - }); - }, + // readLine(readerInfo) { + // return new Promise((resolve, reject) => { + // const { reader } = readerInfo; + // if (!reader.hasNextLine()) { + // resolve(null); + // return; + // } + // reader.nextLine((err, line) => { + // if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; + // else readerInfo.readedSchemaRow = true; + // if (err) reject(err); + // resolve(line); + // }); + // }); + // }, - openReader(jslid) { - // console.log('OPENING READER'); - // console.log( - // 'OPENING READER, LINES=', - // fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length - // ); - const file = getJslFileName(jslid); - return new Promise((resolve, reject) => - lineReader.open(file, (err, reader) => { - if (err) reject(err); - const readerInfo = { - reader, - readedDataRowCount: 0, - readedSchemaRow: false, - isReading: true, - }; - this.openedReaders[jslid] = readerInfo; - resolve(readerInfo); - }) - ); - }, + // openReader(jslid) { + // // console.log('OPENING READER'); + // // console.log( + // // 'OPENING READER, LINES=', + // // fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length + // // ); + // const file = getJslFileName(jslid); + // return new Promise((resolve, reject) => + // lineReader.open(file, (err, reader) => { + // if (err) reject(err); + // const readerInfo = { + // reader, + // readedDataRowCount: 0, + // readedSchemaRow: false, + // isReading: true, + // }; + // this.openedReaders[jslid] = readerInfo; + // resolve(readerInfo); + // }) + // ); + // }, - async ensureReader(jslid, offset) { - if (this.openedReaders[jslid] && this.openedReaders[jslid].readedDataRowCount > offset) { - await this.closeReader(jslid); + // async ensureReader(jslid, offset) { + // if (this.openedReaders[jslid] && this.openedReaders[jslid].readedDataRowCount > offset) { + // await this.closeReader(jslid); + // } + // let readerInfo = this.openedReaders[jslid]; + // if (!this.openedReaders[jslid]) { + // readerInfo = await this.openReader(jslid); + // } + // readerInfo.isReading = true; + // if (!readerInfo.readedSchemaRow) { + // await this.readLine(readerInfo); // skip structure + // } + // while (readerInfo.readedDataRowCount < offset) { + // await this.readLine(readerInfo); + // } + // return readerInfo; + // }, + + async ensureDatastore(jslid) { + let datastore = this.datastores[jslid]; + if (!datastore) { + datastore = new JsonLinesDatastore(getJslFileName(jslid)); + // datastore = new DatastoreProxy(getJslFileName(jslid)); + this.datastores[jslid] = datastore; } - let readerInfo = this.openedReaders[jslid]; - if (!this.openedReaders[jslid]) { - readerInfo = await this.openReader(jslid); - } - readerInfo.isReading = true; - if (!readerInfo.readedSchemaRow) { - await this.readLine(readerInfo); // skip structure - } - while (readerInfo.readedDataRowCount < offset) { - await this.readLine(readerInfo); - } - return readerInfo; + return datastore; }, getInfo_meta: 'get', @@ -101,20 +113,8 @@ module.exports = { getRows_meta: 'get', async getRows({ jslid, offset, limit }) { - const readerInfo = await this.ensureReader(jslid, offset); - const res = []; - for (let i = 0; i < limit; i += 1) { - const line = await this.readLine(readerInfo); - if (line == null) break; - res.push(JSON.parse(line)); - } - readerInfo.isReading = false; - if (readerInfo.closeAfterReadAndSendStats) { - await this.closeReader(jslid); - socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats); - readerInfo.closeAfterReadAndSendStats = null; - } - return res; + const datastore = await this.ensureDatastore(jslid); + return datastore.getRows(offset, limit); }, getStats_meta: 'get', @@ -126,12 +126,16 @@ module.exports = { async notifyChangedStats(stats) { console.log('SENDING STATS', JSON.stringify(stats)); - const readerInfo = this.openedReaders[stats.jslid]; - if (readerInfo && readerInfo.isReading) { - readerInfo.closeAfterReadAndSendStats = stats; - } else { - await this.closeReader(stats.jslid); - socket.emit(`jsldata-stats-${stats.jslid}`, stats); - } + const datastore = this.datastores[stats.jslid]; + if (datastore) await datastore.notifyChanged(); + socket.emit(`jsldata-stats-${stats.jslid}`, stats); + + // const readerInfo = this.openedReaders[stats.jslid]; + // if (readerInfo && readerInfo.isReading) { + // readerInfo.closeAfterReadAndSendStats = stats; + // } else { + // await this.closeReader(stats.jslid); + // socket.emit(`jsldata-stats-${stats.jslid}`, stats); + // } }, }; diff --git a/packages/api/src/proc/index.js b/packages/api/src/proc/index.js index 8292b416f..b6f0be5a1 100644 --- a/packages/api/src/proc/index.js +++ b/packages/api/src/proc/index.js @@ -2,10 +2,12 @@ const connectProcess = require('./connectProcess'); const databaseConnectionProcess = require('./databaseConnectionProcess'); const serverConnectionProcess = require('./serverConnectionProcess'); const sessionProcess = require('./sessionProcess'); +const jslDatastoreProcess = require('./jslDatastoreProcess'); module.exports = { connectProcess, databaseConnectionProcess, serverConnectionProcess, sessionProcess, + jslDatastoreProcess, }; diff --git a/packages/api/src/proc/jslDatastoreProcess.js b/packages/api/src/proc/jslDatastoreProcess.js new file mode 100644 index 000000000..3b3b112ed --- /dev/null +++ b/packages/api/src/proc/jslDatastoreProcess.js @@ -0,0 +1,57 @@ +const childProcessChecker = require('../utility/childProcessChecker'); +const JsonLinesDatastore = require('../utility/JsonLinesDatastore'); + +let lastPing = null; +let datastore = new JsonLinesDatastore(); + +function handlePing() { + lastPing = new Date().getTime(); +} + +function handleOpen({file }) { + handlePing(); + datastore = new JsonLinesDatastore(file); +} + +async function handleRead({ msgid, offset, limit }) { + handlePing(); + const rows = await datastore.getRows(offset, limit); + process.send({ msgtype: 'response', msgid, rows }); +} + +function handleNotifyChanged() { + datastore.notifyChanged(); +} + +const messageHandlers = { + open: handleOpen, + read: handleRead, + ping: handlePing, + notifyChanged: handleNotifyChanged, +}; + +async function handleMessage({ msgtype, ...other }) { + const handler = messageHandlers[msgtype]; + await handler(other); +} + +function start() { + childProcessChecker(); + + setInterval(() => { + const time = new Date().getTime(); + if (time - lastPing > 60 * 1000) { + process.exit(0); + } + }, 60 * 1000); + + process.on('message', async (message) => { + try { + await handleMessage(message); + } catch (e) { + process.send({ msgtype: 'error', error: e.message }); + } + }); +} + +module.exports = { start }; diff --git a/packages/api/src/utility/DatastoreProxy.js b/packages/api/src/utility/DatastoreProxy.js new file mode 100644 index 000000000..437b3153d --- /dev/null +++ b/packages/api/src/utility/DatastoreProxy.js @@ -0,0 +1,55 @@ +const { fork } = require('child_process'); +const uuidv1 = require('uuid/v1'); + +class DatastoreProxy { + constructor(file) { + this.subprocess = null; + this.disconnected = false; + this.file = file; + this.requests = {}; + this.handle_response = this.handle_response.bind(this); + this.handle_ping = this.handle_ping.bind(this); + } + + // handle_response({ msgid, rows }) { + handle_response({ msgid, rows }) { + const [resolve, reject] = this.requests[msgid]; + resolve(rows); + delete this.requests[msgid]; + } + + handle_ping() {} + + + async ensureSubprocess() { + if (!this.subprocess) { + this.subprocess = fork(process.argv[1], ['jslDatastoreProcess']); + + // @ts-ignore + this.subprocess.on('message', ({ msgtype, ...message }) => { + // if (this.disconnected) return; + this[`handle_${msgtype}`](message); + }); + this.subprocess.on('exit', () => { + // if (this.disconnected) return; + this.subprocess = null; + }); + this.subprocess.send({ msgtype: 'open', file: this.file }); + } + return this.subprocess; + } + + async getRows(offset, limit) { + await this.ensureSubprocess(); + const msgid = uuidv1(); + const promise = new Promise((resolve, reject) => { + this.requests[msgid] = [resolve, reject]; + this.subprocess.send({ msgtype: 'read', msgid, offset, limit }); + }); + return promise; + } + + async notifyChanged() {} +} + +module.exports = DatastoreProxy; diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js new file mode 100644 index 000000000..eed749143 --- /dev/null +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -0,0 +1,115 @@ +const lineReader = require('line-reader'); + +class JsonLinesDatastore { + constructor(file) { + this.file = file; + this.readerInfo = { + reader: null, + readedDataRowCount: 0, + readedSchemaRow: false, + isReading: false, + closeAfterRead: null, + closeAfterReadPromise: null, + }; + } + + async closeReader() { + if (!this.readerInfo) return Promise.resolve(); + return new Promise((resolve, reject) => { + this.readerInfo.reader.close((err) => { + if (err) reject(err); + resolve(); + }); + }); + } + + async notifyChanged() { + if (this.readerInfo && this.readerInfo.isReading) { + if (this.readerInfo.closeAfterRead) { + return this.readerInfo.closeAfterReadPromise; + } + const promise = new Promise((resolve, reject) => { + this.readerInfo.closeAfterRead = resolve; + }); + this.readerInfo.closeAfterReadPromise = promise; + return promise; + } else { + await this.closeReader(); + } + } + + async openReader() { + return new Promise((resolve, reject) => + lineReader.open(this.file, (err, reader) => { + if (err) reject(err); + + const readerInfo = { + reader, + readedDataRowCount: 0, + readedSchemaRow: false, + isReading: true, + closeAfterRead: null, + closeAfterReadPromise: null, + }; + this.readerInfo = readerInfo; + resolve(readerInfo); + }) + ); + } + + readLine(readerInfo) { + return new Promise((resolve, reject) => { + const { reader } = readerInfo; + if (!reader.hasNextLine()) { + resolve(null); + return; + } + reader.nextLine((err, line) => { + if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; + else readerInfo.readedSchemaRow = true; + if (err) reject(err); + resolve(line); + }); + }); + } + + async ensureReader(offset) { + if (this.readerInfo && this.readerInfo.readedDataRowCount > offset) { + await this.closeReader(); + } + let readerInfo = this.readerInfo; + if (!readerInfo || !readerInfo.reader) { + readerInfo = await this.openReader(); + } + readerInfo.isReading = true; + if (!readerInfo.readedSchemaRow) { + await this.readLine(readerInfo); // skip structure + } + while (readerInfo.readedDataRowCount < offset) { + await this.readLine(readerInfo); + } + return readerInfo; + } + + async getRows(offset, limit) { + const readerInfo = await this.ensureReader(offset); + const res = []; + for (let i = 0; i < limit; i += 1) { + const line = await this.readLine(readerInfo); + if (line == null) break; + res.push(JSON.parse(line)); + } + readerInfo.isReading = false; + if (readerInfo.closeAfterRead) { + await this.closeReader(); + // socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats); + const resolve = readerInfo.closeAfterRead; + readerInfo.closeAfterRead = null; + readerInfo.closeAfterReadPromise = null; + resolve(); + } + return res; + } +} + +module.exports = JsonLinesDatastore; From 61b4bf91b0d1de5e14b3b5078b3f44dc2dd63a4a Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 22 Oct 2020 09:39:18 +0200 Subject: [PATCH 4/7] jsl data refactor --- .../api/src/utility/JsonLinesDatastore.js | 149 +++++++++++------- 1 file changed, 91 insertions(+), 58 deletions(-) diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js index eed749143..7ed08076b 100644 --- a/packages/api/src/utility/JsonLinesDatastore.js +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -3,38 +3,62 @@ const lineReader = require('line-reader'); class JsonLinesDatastore { constructor(file) { this.file = file; - this.readerInfo = { - reader: null, - readedDataRowCount: 0, - readedSchemaRow: false, - isReading: false, - closeAfterRead: null, - closeAfterReadPromise: null, - }; + this.reader = null; + this.readedDataRowCount = 0; + + this.readedSchemaRow = false; + this.isReading = false; + this.closeAfterRead = null; + this.closeAfterReadCallback = null; + this.closeAfterReadPromise = null; + + this.waitForReadyPromise = null; + this.waitForReadyResolve = null; + + // this.readerInfo = { + // reader: null, + // readedDataRowCount: 0, + // readedSchemaRow: false, + // isReading: false, + // closeAfterRead: null, + // closeAfterReadPromise: null, + // }; } - async closeReader() { - if (!this.readerInfo) return Promise.resolve(); - return new Promise((resolve, reject) => { - this.readerInfo.reader.close((err) => { - if (err) reject(err); - resolve(); - }); - }); + closeReader() { + if (!this.reader) return; + const reader = this.reader; + this.reader = null; + reader.close(); } - async notifyChanged() { - if (this.readerInfo && this.readerInfo.isReading) { - if (this.readerInfo.closeAfterRead) { - return this.readerInfo.closeAfterReadPromise; + waitForReady() { + if (this.isReading) { + if (this.waitForReadyResolve) { + return this.waitForReadyPromise; } const promise = new Promise((resolve, reject) => { - this.readerInfo.closeAfterRead = resolve; + this.waitForReadyResolve = resolve; }); - this.readerInfo.closeAfterReadPromise = promise; + this.waitForReadyPromise = promise; + return promise; + } + return Promise.resolve(); + } + + async notifyChanged(callback) { + if (this.isReading) { + this.closeAfterReadCallback = callback; + if (this.closeAfterRead) { + return this.closeAfterReadPromise; + } + const promise = new Promise((resolve, reject) => { + this.closeAfterRead = resolve; + }); + this.closeAfterReadPromise = promise; return promise; } else { - await this.closeReader(); + this.closeReader(); } } @@ -43,30 +67,21 @@ class JsonLinesDatastore { lineReader.open(this.file, (err, reader) => { if (err) reject(err); - const readerInfo = { - reader, - readedDataRowCount: 0, - readedSchemaRow: false, - isReading: true, - closeAfterRead: null, - closeAfterReadPromise: null, - }; - this.readerInfo = readerInfo; - resolve(readerInfo); + resolve(reader); }) ); } - readLine(readerInfo) { + readLine() { return new Promise((resolve, reject) => { - const { reader } = readerInfo; + const reader = this.reader; if (!reader.hasNextLine()) { resolve(null); return; } reader.nextLine((err, line) => { - if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; - else readerInfo.readedSchemaRow = true; + if (this.readedSchemaRow) this.readedDataRowCount += 1; + else this.readedSchemaRow = true; if (err) reject(err); resolve(line); }); @@ -74,40 +89,58 @@ class JsonLinesDatastore { } async ensureReader(offset) { - if (this.readerInfo && this.readerInfo.readedDataRowCount > offset) { - await this.closeReader(); + console.log('ENSURE', offset); + for (;;) { + await this.waitForReady(); + if (this.readedDataRowCount > offset) { + this.closeReader(); + } + if (!this.reader) { + const reader = await this.openReader(); + if (this.isReading) { + reader.close(); // throw away this reader + continue; // reader is already used by other getRows, wait for free reader + } + this.reader = reader; + this.isReading = true; + break; + } else { + break; + } } - let readerInfo = this.readerInfo; - if (!readerInfo || !readerInfo.reader) { - readerInfo = await this.openReader(); + if (!this.readedSchemaRow) { + await this.readLine(); // skip structure } - readerInfo.isReading = true; - if (!readerInfo.readedSchemaRow) { - await this.readLine(readerInfo); // skip structure + while (this.readedDataRowCount < offset) { + await this.readLine(); } - while (readerInfo.readedDataRowCount < offset) { - await this.readLine(readerInfo); - } - return readerInfo; } async getRows(offset, limit) { - const readerInfo = await this.ensureReader(offset); + await this.ensureReader(offset); const res = []; for (let i = 0; i < limit; i += 1) { - const line = await this.readLine(readerInfo); + const line = await this.readLine(); if (line == null) break; res.push(JSON.parse(line)); } - readerInfo.isReading = false; - if (readerInfo.closeAfterRead) { - await this.closeReader(); - // socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats); - const resolve = readerInfo.closeAfterRead; - readerInfo.closeAfterRead = null; - readerInfo.closeAfterReadPromise = null; + this.isReading = false; + if (this.closeAfterRead) { + if (this.closeAfterReadCallback) this.closeAfterReadCallback(); + this.closeReader(); + const resolve = this.closeAfterRead; + this.closeAfterRead = null; + this.closeAfterReadPromise = null; + this.closeAfterReadCallback = null; resolve(); } + if (this.waitForReadyResolve) { + const resolve = this.waitForReadyResolve; + this.waitForReadyResolve = null; + this.waitForReadyPromise = null; + resolve(); + } + console.log('RETURN', res.length); return res; } } From 541f064ddb31cebd8211c931d3bc1a5844762c4a Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 22 Oct 2020 10:02:11 +0200 Subject: [PATCH 5/7] jsl data store fix - uses lock, tested OK on all problematic queries --- packages/api/package.json | 1 + .../api/src/utility/JsonLinesDatastore.js | 113 +++++------------- yarn.lock | 5 + 3 files changed, 33 insertions(+), 86 deletions(-) diff --git a/packages/api/package.json b/packages/api/package.json index c07a2c21d..fd66f5cca 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -6,6 +6,7 @@ "dependencies": { "@dbgate/engines": "^0.1.0", "@dbgate/sqltree": "^0.1.0", + "async-lock": "^1.2.4", "axios": "^0.19.0", "body-parser": "^1.19.0", "bufferutil": "^4.0.1", diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js index 7ed08076b..95e6845d3 100644 --- a/packages/api/src/utility/JsonLinesDatastore.js +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -1,72 +1,39 @@ const lineReader = require('line-reader'); +const AsyncLock = require('async-lock'); +const lock = new AsyncLock(); class JsonLinesDatastore { constructor(file) { this.file = file; this.reader = null; this.readedDataRowCount = 0; - this.readedSchemaRow = false; - this.isReading = false; - this.closeAfterRead = null; - this.closeAfterReadCallback = null; - this.closeAfterReadPromise = null; - - this.waitForReadyPromise = null; - this.waitForReadyResolve = null; - - // this.readerInfo = { - // reader: null, - // readedDataRowCount: 0, - // readedSchemaRow: false, - // isReading: false, - // closeAfterRead: null, - // closeAfterReadPromise: null, - // }; + this.notifyChangedCallback = null; } closeReader() { if (!this.reader) return; const reader = this.reader; this.reader = null; - reader.close(); - } - - waitForReady() { - if (this.isReading) { - if (this.waitForReadyResolve) { - return this.waitForReadyPromise; - } - const promise = new Promise((resolve, reject) => { - this.waitForReadyResolve = resolve; - }); - this.waitForReadyPromise = promise; - return promise; - } - return Promise.resolve(); + this.readedDataRowCount = 0; + this.readedSchemaRow = false; + reader.close(() => {}); } async notifyChanged(callback) { - if (this.isReading) { - this.closeAfterReadCallback = callback; - if (this.closeAfterRead) { - return this.closeAfterReadPromise; - } - const promise = new Promise((resolve, reject) => { - this.closeAfterRead = resolve; - }); - this.closeAfterReadPromise = promise; - return promise; - } else { + this.notifyChangedCallback = callback; + await lock.acquire('reader', async () => { this.closeReader(); - } + }); + const call = this.notifyChangedCallback; + this.notifyChangedCallback = null; + if (call) call(); } async openReader() { return new Promise((resolve, reject) => lineReader.open(this.file, (err, reader) => { if (err) reject(err); - resolve(reader); }) ); @@ -89,24 +56,12 @@ class JsonLinesDatastore { } async ensureReader(offset) { - console.log('ENSURE', offset); - for (;;) { - await this.waitForReady(); - if (this.readedDataRowCount > offset) { - this.closeReader(); - } - if (!this.reader) { - const reader = await this.openReader(); - if (this.isReading) { - reader.close(); // throw away this reader - continue; // reader is already used by other getRows, wait for free reader - } - this.reader = reader; - this.isReading = true; - break; - } else { - break; - } + if (this.readedDataRowCount > offset) { + this.closeReader(); + } + if (!this.reader) { + const reader = await this.openReader(); + this.reader = reader; } if (!this.readedSchemaRow) { await this.readLine(); // skip structure @@ -117,30 +72,16 @@ class JsonLinesDatastore { } async getRows(offset, limit) { - await this.ensureReader(offset); const res = []; - for (let i = 0; i < limit; i += 1) { - const line = await this.readLine(); - if (line == null) break; - res.push(JSON.parse(line)); - } - this.isReading = false; - if (this.closeAfterRead) { - if (this.closeAfterReadCallback) this.closeAfterReadCallback(); - this.closeReader(); - const resolve = this.closeAfterRead; - this.closeAfterRead = null; - this.closeAfterReadPromise = null; - this.closeAfterReadCallback = null; - resolve(); - } - if (this.waitForReadyResolve) { - const resolve = this.waitForReadyResolve; - this.waitForReadyResolve = null; - this.waitForReadyPromise = null; - resolve(); - } - console.log('RETURN', res.length); + await lock.acquire('reader', async () => { + await this.ensureReader(offset); + for (let i = 0; i < limit; i += 1) { + const line = await this.readLine(); + if (line == null) break; + res.push(JSON.parse(line)); + } + }); + // console.log('RETURN', res.length); return res; } } diff --git a/yarn.lock b/yarn.lock index aa5eee79b..2f58d4324 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2265,6 +2265,11 @@ async-limiter@~1.0.0: resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.1.tgz#dd379e94f0db8310b08291f9d64c3209766617fd" integrity sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ== +async-lock@^1.2.4: + version "1.2.4" + resolved "https://registry.yarnpkg.com/async-lock/-/async-lock-1.2.4.tgz#80d0d612383045dd0c30eb5aad08510c1397cb91" + integrity sha512-UBQJC2pbeyGutIfYmErGc9RaJYnpZ1FHaxuKwb0ahvGiiCkPUf3p67Io+YLPmmv3RHY+mF6JEtNW8FlHsraAaA== + async@0.2.10: version "0.2.10" resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1" From 7681f9e1eca3cfff8f54e088c36df6f06366aa48 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 22 Oct 2020 10:04:52 +0200 Subject: [PATCH 6/7] jsl data source - renamed private methods --- .../api/src/utility/JsonLinesDatastore.js | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js index 95e6845d3..f3fa048be 100644 --- a/packages/api/src/utility/JsonLinesDatastore.js +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -11,7 +11,7 @@ class JsonLinesDatastore { this.notifyChangedCallback = null; } - closeReader() { + _closeReader() { if (!this.reader) return; const reader = this.reader; this.reader = null; @@ -23,14 +23,14 @@ class JsonLinesDatastore { async notifyChanged(callback) { this.notifyChangedCallback = callback; await lock.acquire('reader', async () => { - this.closeReader(); + this._closeReader(); }); const call = this.notifyChangedCallback; this.notifyChangedCallback = null; if (call) call(); } - async openReader() { + async _openReader() { return new Promise((resolve, reject) => lineReader.open(this.file, (err, reader) => { if (err) reject(err); @@ -39,7 +39,7 @@ class JsonLinesDatastore { ); } - readLine() { + _readLine() { return new Promise((resolve, reject) => { const reader = this.reader; if (!reader.hasNextLine()) { @@ -55,28 +55,28 @@ class JsonLinesDatastore { }); } - async ensureReader(offset) { + async _ensureReader(offset) { if (this.readedDataRowCount > offset) { - this.closeReader(); + this._closeReader(); } if (!this.reader) { - const reader = await this.openReader(); + const reader = await this._openReader(); this.reader = reader; } if (!this.readedSchemaRow) { - await this.readLine(); // skip structure + await this._readLine(); // skip structure } while (this.readedDataRowCount < offset) { - await this.readLine(); + await this._readLine(); } } async getRows(offset, limit) { const res = []; await lock.acquire('reader', async () => { - await this.ensureReader(offset); + await this._ensureReader(offset); for (let i = 0; i < limit; i += 1) { - const line = await this.readLine(); + const line = await this._readLine(); if (line == null) break; res.push(JSON.parse(line)); } From 9adf7a6ae250c73ab529bc6b9856147c420d0733 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 22 Oct 2020 11:27:23 +0200 Subject: [PATCH 7/7] datastore --- packages/api/src/proc/jslDatastoreProcess.js | 9 ++++---- packages/api/src/utility/DatastoreProxy.js | 24 ++++++++++++++++++-- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/packages/api/src/proc/jslDatastoreProcess.js b/packages/api/src/proc/jslDatastoreProcess.js index 3b3b112ed..38d492384 100644 --- a/packages/api/src/proc/jslDatastoreProcess.js +++ b/packages/api/src/proc/jslDatastoreProcess.js @@ -8,7 +8,7 @@ function handlePing() { lastPing = new Date().getTime(); } -function handleOpen({file }) { +function handleOpen({ file }) { handlePing(); datastore = new JsonLinesDatastore(file); } @@ -19,15 +19,16 @@ async function handleRead({ msgid, offset, limit }) { process.send({ msgtype: 'response', msgid, rows }); } -function handleNotifyChanged() { - datastore.notifyChanged(); +async function handleNotify({ msgid }) { + await datastore.notifyChanged(); + process.send({ msgtype: 'notify', msgid }); } const messageHandlers = { open: handleOpen, read: handleRead, ping: handlePing, - notifyChanged: handleNotifyChanged, + notify: handleNotify, }; async function handleMessage({ msgtype, ...other }) { diff --git a/packages/api/src/utility/DatastoreProxy.js b/packages/api/src/utility/DatastoreProxy.js index 437b3153d..1b4bcbec8 100644 --- a/packages/api/src/utility/DatastoreProxy.js +++ b/packages/api/src/utility/DatastoreProxy.js @@ -9,9 +9,9 @@ class DatastoreProxy { this.requests = {}; this.handle_response = this.handle_response.bind(this); this.handle_ping = this.handle_ping.bind(this); + this.notifyChangedCallback = null; } - // handle_response({ msgid, rows }) { handle_response({ msgid, rows }) { const [resolve, reject] = this.requests[msgid]; resolve(rows); @@ -20,6 +20,11 @@ class DatastoreProxy { handle_ping() {} + handle_notify({ msgid }) { + const [resolve, reject] = this.requests[msgid]; + resolve(); + delete this.requests[msgid]; + } async ensureSubprocess() { if (!this.subprocess) { @@ -49,7 +54,22 @@ class DatastoreProxy { return promise; } - async notifyChanged() {} + async notifyChangedCore() { + const msgid = uuidv1(); + const promise = new Promise((resolve, reject) => { + this.requests[msgid] = [resolve, reject]; + this.subprocess.send({ msgtype: 'notify', msgid }); + }); + return promise; + } + + async notifyChanged(callback) { + this.notifyChangedCallback = callback; + await this.notifyChangedCore(); + const call = this.notifyChangedCallback; + this.notifyChangedCallback = null; + if (call) call(); + } } module.exports = DatastoreProxy;