diff --git a/app/package.json b/app/package.json index e65acba9f..e98f8ad36 100644 --- a/app/package.json +++ b/app/package.json @@ -1,6 +1,6 @@ { "name": "dbgate", - "version": "3.7.21", + "version": "3.7.22", "private": true, "author": "Jan Prochazka ", "dependencies": { diff --git a/packages/api/package.json b/packages/api/package.json index c07a2c21d..fd66f5cca 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -6,6 +6,7 @@ "dependencies": { "@dbgate/engines": "^0.1.0", "@dbgate/sqltree": "^0.1.0", + "async-lock": "^1.2.4", "axios": "^0.19.0", "body-parser": "^1.19.0", "bufferutil": "^4.0.1", diff --git a/packages/api/src/controllers/jsldata.js b/packages/api/src/controllers/jsldata.js index 9c71bb634..b451c130a 100644 --- a/packages/api/src/controllers/jsldata.js +++ b/packages/api/src/controllers/jsldata.js @@ -1,6 +1,9 @@ const fs = require('fs'); const lineReader = require('line-reader'); +const { off } = require('process'); +const DatastoreProxy = require('../utility/DatastoreProxy'); const getJslFileName = require('../utility/getJslFileName'); +const JsonLinesDatastore = require('../utility/JsonLinesDatastore'); const socket = require('../utility/socket'); function readFirstLine(file) { @@ -19,76 +22,85 @@ function readFirstLine(file) { }); } - module.exports = { - openedReaders: {}, + datastores: {}, - closeReader(jslid) { - // console.log('CLOSING READER'); - if (!this.openedReaders[jslid]) return Promise.resolve(); - return new Promise((resolve, reject) => { - this.openedReaders[jslid].reader.close((err) => { - if (err) reject(err); - delete this.openedReaders[jslid]; - resolve(); - }); - }); - }, + // closeReader(jslid) { + // // console.log('CLOSING READER'); + // if (!this.openedReaders[jslid]) return Promise.resolve(); + // return new Promise((resolve, reject) => { + // this.openedReaders[jslid].reader.close((err) => { + // if (err) reject(err); + // delete this.openedReaders[jslid]; + // resolve(); + // }); + // }); + // }, - readLine(readerInfo) { - return new Promise((resolve, reject) => { - const { reader } = readerInfo; - if (!reader.hasNextLine()) { - resolve(null); - return; - } - reader.nextLine((err, line) => { - if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; - else readerInfo.readedSchemaRow = true; - if (err) reject(err); - resolve(line); - }); - }); - }, + // readLine(readerInfo) { + // return new Promise((resolve, reject) => { + // const { reader } = readerInfo; + // if (!reader.hasNextLine()) { + // resolve(null); + // return; + // } + // reader.nextLine((err, line) => { + // if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; + // else readerInfo.readedSchemaRow = true; + // if (err) reject(err); + // resolve(line); + // }); + // }); + // }, - openReader(jslid) { - // console.log('OPENING READER'); - // console.log( - // 'OPENING READER, LINES=', - // fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length - // ); - const file = getJslFileName(jslid); - return new Promise((resolve, reject) => - lineReader.open(file, (err, reader) => { - if (err) reject(err); - const readerInfo = { - reader, - readedDataRowCount: 0, - readedSchemaRow: false, - isReading: true, - }; - this.openedReaders[jslid] = readerInfo; - resolve(readerInfo); - }) - ); - }, + // openReader(jslid) { + // // console.log('OPENING READER'); + // // console.log( + // // 'OPENING READER, LINES=', + // // fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length + // // ); + // const file = getJslFileName(jslid); + // return new Promise((resolve, reject) => + // lineReader.open(file, (err, reader) => { + // if (err) reject(err); + // const readerInfo = { + // reader, + // readedDataRowCount: 0, + // readedSchemaRow: false, + // isReading: true, + // }; + // this.openedReaders[jslid] = readerInfo; + // resolve(readerInfo); + // }) + // ); + // }, - async ensureReader(jslid, offset) { - if (this.openedReaders[jslid] && this.openedReaders[jslid].readedDataRowCount > offset) { - await this.closeReader(jslid); + // async ensureReader(jslid, offset) { + // if (this.openedReaders[jslid] && this.openedReaders[jslid].readedDataRowCount > offset) { + // await this.closeReader(jslid); + // } + // let readerInfo = this.openedReaders[jslid]; + // if (!this.openedReaders[jslid]) { + // readerInfo = await this.openReader(jslid); + // } + // readerInfo.isReading = true; + // if (!readerInfo.readedSchemaRow) { + // await this.readLine(readerInfo); // skip structure + // } + // while (readerInfo.readedDataRowCount < offset) { + // await this.readLine(readerInfo); + // } + // return readerInfo; + // }, + + async ensureDatastore(jslid) { + let datastore = this.datastores[jslid]; + if (!datastore) { + datastore = new JsonLinesDatastore(getJslFileName(jslid)); + // datastore = new DatastoreProxy(getJslFileName(jslid)); + this.datastores[jslid] = datastore; } - let readerInfo = this.openedReaders[jslid]; - if (!this.openedReaders[jslid]) { - readerInfo = await this.openReader(jslid); - } - readerInfo.isReading = true; - if (!readerInfo.readedSchemaRow) { - await this.readLine(readerInfo); // skip structure - } - while (readerInfo.readedDataRowCount < offset) { - await this.readLine(readerInfo); - } - return readerInfo; + return datastore; }, getInfo_meta: 'get', @@ -101,20 +113,8 @@ module.exports = { getRows_meta: 'get', async getRows({ jslid, offset, limit }) { - const readerInfo = await this.ensureReader(jslid, offset); - const res = []; - for (let i = 0; i < limit; i += 1) { - const line = await this.readLine(readerInfo); - if (line == null) break; - res.push(JSON.parse(line)); - } - readerInfo.isReading = false; - if (readerInfo.closeAfterReadAndSendStats) { - await this.closeReader(jslid); - socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats); - readerInfo.closeAfterReadAndSendStats = null; - } - return res; + const datastore = await this.ensureDatastore(jslid); + return datastore.getRows(offset, limit); }, getStats_meta: 'get', @@ -126,12 +126,16 @@ module.exports = { async notifyChangedStats(stats) { console.log('SENDING STATS', JSON.stringify(stats)); - const readerInfo = this.openedReaders[stats.jslid]; - if (readerInfo && readerInfo.isReading) { - readerInfo.closeAfterReadAndSendStats = stats; - } else { - await this.closeReader(stats.jslid); - socket.emit(`jsldata-stats-${stats.jslid}`, stats); - } + const datastore = this.datastores[stats.jslid]; + if (datastore) await datastore.notifyChanged(); + socket.emit(`jsldata-stats-${stats.jslid}`, stats); + + // const readerInfo = this.openedReaders[stats.jslid]; + // if (readerInfo && readerInfo.isReading) { + // readerInfo.closeAfterReadAndSendStats = stats; + // } else { + // await this.closeReader(stats.jslid); + // socket.emit(`jsldata-stats-${stats.jslid}`, stats); + // } }, }; diff --git a/packages/api/src/proc/index.js b/packages/api/src/proc/index.js index 8292b416f..b6f0be5a1 100644 --- a/packages/api/src/proc/index.js +++ b/packages/api/src/proc/index.js @@ -2,10 +2,12 @@ const connectProcess = require('./connectProcess'); const databaseConnectionProcess = require('./databaseConnectionProcess'); const serverConnectionProcess = require('./serverConnectionProcess'); const sessionProcess = require('./sessionProcess'); +const jslDatastoreProcess = require('./jslDatastoreProcess'); module.exports = { connectProcess, databaseConnectionProcess, serverConnectionProcess, sessionProcess, + jslDatastoreProcess, }; diff --git a/packages/api/src/proc/jslDatastoreProcess.js b/packages/api/src/proc/jslDatastoreProcess.js new file mode 100644 index 000000000..38d492384 --- /dev/null +++ b/packages/api/src/proc/jslDatastoreProcess.js @@ -0,0 +1,58 @@ +const childProcessChecker = require('../utility/childProcessChecker'); +const JsonLinesDatastore = require('../utility/JsonLinesDatastore'); + +let lastPing = null; +let datastore = new JsonLinesDatastore(); + +function handlePing() { + lastPing = new Date().getTime(); +} + +function handleOpen({ file }) { + handlePing(); + datastore = new JsonLinesDatastore(file); +} + +async function handleRead({ msgid, offset, limit }) { + handlePing(); + const rows = await datastore.getRows(offset, limit); + process.send({ msgtype: 'response', msgid, rows }); +} + +async function handleNotify({ msgid }) { + await datastore.notifyChanged(); + process.send({ msgtype: 'notify', msgid }); +} + +const messageHandlers = { + open: handleOpen, + read: handleRead, + ping: handlePing, + notify: handleNotify, +}; + +async function handleMessage({ msgtype, ...other }) { + const handler = messageHandlers[msgtype]; + await handler(other); +} + +function start() { + childProcessChecker(); + + setInterval(() => { + const time = new Date().getTime(); + if (time - lastPing > 60 * 1000) { + process.exit(0); + } + }, 60 * 1000); + + process.on('message', async (message) => { + try { + await handleMessage(message); + } catch (e) { + process.send({ msgtype: 'error', error: e.message }); + } + }); +} + +module.exports = { start }; diff --git a/packages/api/src/utility/DatastoreProxy.js b/packages/api/src/utility/DatastoreProxy.js new file mode 100644 index 000000000..1b4bcbec8 --- /dev/null +++ b/packages/api/src/utility/DatastoreProxy.js @@ -0,0 +1,75 @@ +const { fork } = require('child_process'); +const uuidv1 = require('uuid/v1'); + +class DatastoreProxy { + constructor(file) { + this.subprocess = null; + this.disconnected = false; + this.file = file; + this.requests = {}; + this.handle_response = this.handle_response.bind(this); + this.handle_ping = this.handle_ping.bind(this); + this.notifyChangedCallback = null; + } + + handle_response({ msgid, rows }) { + const [resolve, reject] = this.requests[msgid]; + resolve(rows); + delete this.requests[msgid]; + } + + handle_ping() {} + + handle_notify({ msgid }) { + const [resolve, reject] = this.requests[msgid]; + resolve(); + delete this.requests[msgid]; + } + + async ensureSubprocess() { + if (!this.subprocess) { + this.subprocess = fork(process.argv[1], ['jslDatastoreProcess']); + + // @ts-ignore + this.subprocess.on('message', ({ msgtype, ...message }) => { + // if (this.disconnected) return; + this[`handle_${msgtype}`](message); + }); + this.subprocess.on('exit', () => { + // if (this.disconnected) return; + this.subprocess = null; + }); + this.subprocess.send({ msgtype: 'open', file: this.file }); + } + return this.subprocess; + } + + async getRows(offset, limit) { + await this.ensureSubprocess(); + const msgid = uuidv1(); + const promise = new Promise((resolve, reject) => { + this.requests[msgid] = [resolve, reject]; + this.subprocess.send({ msgtype: 'read', msgid, offset, limit }); + }); + return promise; + } + + async notifyChangedCore() { + const msgid = uuidv1(); + const promise = new Promise((resolve, reject) => { + this.requests[msgid] = [resolve, reject]; + this.subprocess.send({ msgtype: 'notify', msgid }); + }); + return promise; + } + + async notifyChanged(callback) { + this.notifyChangedCallback = callback; + await this.notifyChangedCore(); + const call = this.notifyChangedCallback; + this.notifyChangedCallback = null; + if (call) call(); + } +} + +module.exports = DatastoreProxy; diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js new file mode 100644 index 000000000..f3fa048be --- /dev/null +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -0,0 +1,89 @@ +const lineReader = require('line-reader'); +const AsyncLock = require('async-lock'); +const lock = new AsyncLock(); + +class JsonLinesDatastore { + constructor(file) { + this.file = file; + this.reader = null; + this.readedDataRowCount = 0; + this.readedSchemaRow = false; + this.notifyChangedCallback = null; + } + + _closeReader() { + if (!this.reader) return; + const reader = this.reader; + this.reader = null; + this.readedDataRowCount = 0; + this.readedSchemaRow = false; + reader.close(() => {}); + } + + async notifyChanged(callback) { + this.notifyChangedCallback = callback; + await lock.acquire('reader', async () => { + this._closeReader(); + }); + const call = this.notifyChangedCallback; + this.notifyChangedCallback = null; + if (call) call(); + } + + async _openReader() { + return new Promise((resolve, reject) => + lineReader.open(this.file, (err, reader) => { + if (err) reject(err); + resolve(reader); + }) + ); + } + + _readLine() { + return new Promise((resolve, reject) => { + const reader = this.reader; + if (!reader.hasNextLine()) { + resolve(null); + return; + } + reader.nextLine((err, line) => { + if (this.readedSchemaRow) this.readedDataRowCount += 1; + else this.readedSchemaRow = true; + if (err) reject(err); + resolve(line); + }); + }); + } + + async _ensureReader(offset) { + if (this.readedDataRowCount > offset) { + this._closeReader(); + } + if (!this.reader) { + const reader = await this._openReader(); + this.reader = reader; + } + if (!this.readedSchemaRow) { + await this._readLine(); // skip structure + } + while (this.readedDataRowCount < offset) { + await this._readLine(); + } + } + + async getRows(offset, limit) { + const res = []; + await lock.acquire('reader', async () => { + await this._ensureReader(offset); + for (let i = 0; i < limit; i += 1) { + const line = await this._readLine(); + if (line == null) break; + res.push(JSON.parse(line)); + } + }); + // console.log('RETURN', res.length); + return res; + } +} + +module.exports = JsonLinesDatastore; diff --git a/packages/web/src/TabsPanel.js b/packages/web/src/TabsPanel.js index 9a5a1514b..abbc02e1b 100644 --- a/packages/web/src/TabsPanel.js +++ b/packages/web/src/TabsPanel.js @@ -176,7 +176,14 @@ export default function TabsPanel() { const closeTab = closeTabFunc((x, active) => x.tabid == active.tabid); const closeAll = () => { - setOpenedTabs([]); + const closedTime = new Date().getTime(); + setOpenedTabs((tabs) => + tabs.map((tab) => ({ + ...tab, + closedTime: tab.closedTime || closedTime, + selected: false, + })) + ); }; const closeWithSameDb = closeTabFunc( (x, active) => diff --git a/yarn.lock b/yarn.lock index aa5eee79b..2f58d4324 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2265,6 +2265,11 @@ async-limiter@~1.0.0: resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.1.tgz#dd379e94f0db8310b08291f9d64c3209766617fd" integrity sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ== +async-lock@^1.2.4: + version "1.2.4" + resolved "https://registry.yarnpkg.com/async-lock/-/async-lock-1.2.4.tgz#80d0d612383045dd0c30eb5aad08510c1397cb91" + integrity sha512-UBQJC2pbeyGutIfYmErGc9RaJYnpZ1FHaxuKwb0ahvGiiCkPUf3p67Io+YLPmmv3RHY+mF6JEtNW8FlHsraAaA== + async@0.2.10: version "0.2.10" resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1"