Merge branch 'master' into grid-refactor

This commit is contained in:
Jan Prochazka
2020-10-22 12:32:05 +02:00
9 changed files with 328 additions and 87 deletions

View File

@@ -1,6 +1,6 @@
{ {
"name": "dbgate", "name": "dbgate",
"version": "3.7.21", "version": "3.7.22",
"private": true, "private": true,
"author": "Jan Prochazka <jenasoft.database@gmail.com>", "author": "Jan Prochazka <jenasoft.database@gmail.com>",
"dependencies": { "dependencies": {

View File

@@ -6,6 +6,7 @@
"dependencies": { "dependencies": {
"@dbgate/engines": "^0.1.0", "@dbgate/engines": "^0.1.0",
"@dbgate/sqltree": "^0.1.0", "@dbgate/sqltree": "^0.1.0",
"async-lock": "^1.2.4",
"axios": "^0.19.0", "axios": "^0.19.0",
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"bufferutil": "^4.0.1", "bufferutil": "^4.0.1",

View File

@@ -1,6 +1,9 @@
const fs = require('fs'); const fs = require('fs');
const lineReader = require('line-reader'); const lineReader = require('line-reader');
const { off } = require('process');
const DatastoreProxy = require('../utility/DatastoreProxy');
const getJslFileName = require('../utility/getJslFileName'); const getJslFileName = require('../utility/getJslFileName');
const JsonLinesDatastore = require('../utility/JsonLinesDatastore');
const socket = require('../utility/socket'); const socket = require('../utility/socket');
function readFirstLine(file) { function readFirstLine(file) {
@@ -19,76 +22,85 @@ function readFirstLine(file) {
}); });
} }
module.exports = { module.exports = {
openedReaders: {}, datastores: {},
closeReader(jslid) { // closeReader(jslid) {
// console.log('CLOSING READER'); // // console.log('CLOSING READER');
if (!this.openedReaders[jslid]) return Promise.resolve(); // if (!this.openedReaders[jslid]) return Promise.resolve();
return new Promise((resolve, reject) => { // return new Promise((resolve, reject) => {
this.openedReaders[jslid].reader.close((err) => { // this.openedReaders[jslid].reader.close((err) => {
if (err) reject(err); // if (err) reject(err);
delete this.openedReaders[jslid]; // delete this.openedReaders[jslid];
resolve(); // resolve();
}); // });
}); // });
}, // },
readLine(readerInfo) { // readLine(readerInfo) {
return new Promise((resolve, reject) => { // return new Promise((resolve, reject) => {
const { reader } = readerInfo; // const { reader } = readerInfo;
if (!reader.hasNextLine()) { // if (!reader.hasNextLine()) {
resolve(null); // resolve(null);
return; // return;
} // }
reader.nextLine((err, line) => { // reader.nextLine((err, line) => {
if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; // if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1;
else readerInfo.readedSchemaRow = true; // else readerInfo.readedSchemaRow = true;
if (err) reject(err); // if (err) reject(err);
resolve(line); // resolve(line);
}); // });
}); // });
}, // },
openReader(jslid) { // openReader(jslid) {
// console.log('OPENING READER'); // // console.log('OPENING READER');
// console.log( // // console.log(
// 'OPENING READER, LINES=', // // 'OPENING READER, LINES=',
// fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length // // fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length
// ); // // );
const file = getJslFileName(jslid); // const file = getJslFileName(jslid);
return new Promise((resolve, reject) => // return new Promise((resolve, reject) =>
lineReader.open(file, (err, reader) => { // lineReader.open(file, (err, reader) => {
if (err) reject(err); // if (err) reject(err);
const readerInfo = { // const readerInfo = {
reader, // reader,
readedDataRowCount: 0, // readedDataRowCount: 0,
readedSchemaRow: false, // readedSchemaRow: false,
isReading: true, // isReading: true,
}; // };
this.openedReaders[jslid] = readerInfo; // this.openedReaders[jslid] = readerInfo;
resolve(readerInfo); // resolve(readerInfo);
}) // })
); // );
}, // },
async ensureReader(jslid, offset) { // async ensureReader(jslid, offset) {
if (this.openedReaders[jslid] && this.openedReaders[jslid].readedDataRowCount > offset) { // if (this.openedReaders[jslid] && this.openedReaders[jslid].readedDataRowCount > offset) {
await this.closeReader(jslid); // await this.closeReader(jslid);
// }
// let readerInfo = this.openedReaders[jslid];
// if (!this.openedReaders[jslid]) {
// readerInfo = await this.openReader(jslid);
// }
// readerInfo.isReading = true;
// if (!readerInfo.readedSchemaRow) {
// await this.readLine(readerInfo); // skip structure
// }
// while (readerInfo.readedDataRowCount < offset) {
// await this.readLine(readerInfo);
// }
// return readerInfo;
// },
async ensureDatastore(jslid) {
let datastore = this.datastores[jslid];
if (!datastore) {
datastore = new JsonLinesDatastore(getJslFileName(jslid));
// datastore = new DatastoreProxy(getJslFileName(jslid));
this.datastores[jslid] = datastore;
} }
let readerInfo = this.openedReaders[jslid]; return datastore;
if (!this.openedReaders[jslid]) {
readerInfo = await this.openReader(jslid);
}
readerInfo.isReading = true;
if (!readerInfo.readedSchemaRow) {
await this.readLine(readerInfo); // skip structure
}
while (readerInfo.readedDataRowCount < offset) {
await this.readLine(readerInfo);
}
return readerInfo;
}, },
getInfo_meta: 'get', getInfo_meta: 'get',
@@ -101,20 +113,8 @@ module.exports = {
getRows_meta: 'get', getRows_meta: 'get',
async getRows({ jslid, offset, limit }) { async getRows({ jslid, offset, limit }) {
const readerInfo = await this.ensureReader(jslid, offset); const datastore = await this.ensureDatastore(jslid);
const res = []; return datastore.getRows(offset, limit);
for (let i = 0; i < limit; i += 1) {
const line = await this.readLine(readerInfo);
if (line == null) break;
res.push(JSON.parse(line));
}
readerInfo.isReading = false;
if (readerInfo.closeAfterReadAndSendStats) {
await this.closeReader(jslid);
socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats);
readerInfo.closeAfterReadAndSendStats = null;
}
return res;
}, },
getStats_meta: 'get', getStats_meta: 'get',
@@ -126,12 +126,16 @@ module.exports = {
async notifyChangedStats(stats) { async notifyChangedStats(stats) {
console.log('SENDING STATS', JSON.stringify(stats)); console.log('SENDING STATS', JSON.stringify(stats));
const readerInfo = this.openedReaders[stats.jslid]; const datastore = this.datastores[stats.jslid];
if (readerInfo && readerInfo.isReading) { if (datastore) await datastore.notifyChanged();
readerInfo.closeAfterReadAndSendStats = stats; socket.emit(`jsldata-stats-${stats.jslid}`, stats);
} else {
await this.closeReader(stats.jslid); // const readerInfo = this.openedReaders[stats.jslid];
socket.emit(`jsldata-stats-${stats.jslid}`, stats); // if (readerInfo && readerInfo.isReading) {
} // readerInfo.closeAfterReadAndSendStats = stats;
// } else {
// await this.closeReader(stats.jslid);
// socket.emit(`jsldata-stats-${stats.jslid}`, stats);
// }
}, },
}; };

View File

@@ -2,10 +2,12 @@ const connectProcess = require('./connectProcess');
const databaseConnectionProcess = require('./databaseConnectionProcess'); const databaseConnectionProcess = require('./databaseConnectionProcess');
const serverConnectionProcess = require('./serverConnectionProcess'); const serverConnectionProcess = require('./serverConnectionProcess');
const sessionProcess = require('./sessionProcess'); const sessionProcess = require('./sessionProcess');
const jslDatastoreProcess = require('./jslDatastoreProcess');
module.exports = { module.exports = {
connectProcess, connectProcess,
databaseConnectionProcess, databaseConnectionProcess,
serverConnectionProcess, serverConnectionProcess,
sessionProcess, sessionProcess,
jslDatastoreProcess,
}; };

View File

@@ -0,0 +1,58 @@
const childProcessChecker = require('../utility/childProcessChecker');
const JsonLinesDatastore = require('../utility/JsonLinesDatastore');
let lastPing = null;
let datastore = new JsonLinesDatastore();
function handlePing() {
lastPing = new Date().getTime();
}
function handleOpen({ file }) {
handlePing();
datastore = new JsonLinesDatastore(file);
}
async function handleRead({ msgid, offset, limit }) {
handlePing();
const rows = await datastore.getRows(offset, limit);
process.send({ msgtype: 'response', msgid, rows });
}
async function handleNotify({ msgid }) {
await datastore.notifyChanged();
process.send({ msgtype: 'notify', msgid });
}
const messageHandlers = {
open: handleOpen,
read: handleRead,
ping: handlePing,
notify: handleNotify,
};
async function handleMessage({ msgtype, ...other }) {
const handler = messageHandlers[msgtype];
await handler(other);
}
function start() {
childProcessChecker();
setInterval(() => {
const time = new Date().getTime();
if (time - lastPing > 60 * 1000) {
process.exit(0);
}
}, 60 * 1000);
process.on('message', async (message) => {
try {
await handleMessage(message);
} catch (e) {
process.send({ msgtype: 'error', error: e.message });
}
});
}
module.exports = { start };

View File

@@ -0,0 +1,75 @@
const { fork } = require('child_process');
const uuidv1 = require('uuid/v1');
class DatastoreProxy {
constructor(file) {
this.subprocess = null;
this.disconnected = false;
this.file = file;
this.requests = {};
this.handle_response = this.handle_response.bind(this);
this.handle_ping = this.handle_ping.bind(this);
this.notifyChangedCallback = null;
}
handle_response({ msgid, rows }) {
const [resolve, reject] = this.requests[msgid];
resolve(rows);
delete this.requests[msgid];
}
handle_ping() {}
handle_notify({ msgid }) {
const [resolve, reject] = this.requests[msgid];
resolve();
delete this.requests[msgid];
}
async ensureSubprocess() {
if (!this.subprocess) {
this.subprocess = fork(process.argv[1], ['jslDatastoreProcess']);
// @ts-ignore
this.subprocess.on('message', ({ msgtype, ...message }) => {
// if (this.disconnected) return;
this[`handle_${msgtype}`](message);
});
this.subprocess.on('exit', () => {
// if (this.disconnected) return;
this.subprocess = null;
});
this.subprocess.send({ msgtype: 'open', file: this.file });
}
return this.subprocess;
}
async getRows(offset, limit) {
await this.ensureSubprocess();
const msgid = uuidv1();
const promise = new Promise((resolve, reject) => {
this.requests[msgid] = [resolve, reject];
this.subprocess.send({ msgtype: 'read', msgid, offset, limit });
});
return promise;
}
async notifyChangedCore() {
const msgid = uuidv1();
const promise = new Promise((resolve, reject) => {
this.requests[msgid] = [resolve, reject];
this.subprocess.send({ msgtype: 'notify', msgid });
});
return promise;
}
async notifyChanged(callback) {
this.notifyChangedCallback = callback;
await this.notifyChangedCore();
const call = this.notifyChangedCallback;
this.notifyChangedCallback = null;
if (call) call();
}
}
module.exports = DatastoreProxy;

View File

@@ -0,0 +1,89 @@
const lineReader = require('line-reader');
const AsyncLock = require('async-lock');
const lock = new AsyncLock();
class JsonLinesDatastore {
constructor(file) {
this.file = file;
this.reader = null;
this.readedDataRowCount = 0;
this.readedSchemaRow = false;
this.notifyChangedCallback = null;
}
_closeReader() {
if (!this.reader) return;
const reader = this.reader;
this.reader = null;
this.readedDataRowCount = 0;
this.readedSchemaRow = false;
reader.close(() => {});
}
async notifyChanged(callback) {
this.notifyChangedCallback = callback;
await lock.acquire('reader', async () => {
this._closeReader();
});
const call = this.notifyChangedCallback;
this.notifyChangedCallback = null;
if (call) call();
}
async _openReader() {
return new Promise((resolve, reject) =>
lineReader.open(this.file, (err, reader) => {
if (err) reject(err);
resolve(reader);
})
);
}
_readLine() {
return new Promise((resolve, reject) => {
const reader = this.reader;
if (!reader.hasNextLine()) {
resolve(null);
return;
}
reader.nextLine((err, line) => {
if (this.readedSchemaRow) this.readedDataRowCount += 1;
else this.readedSchemaRow = true;
if (err) reject(err);
resolve(line);
});
});
}
async _ensureReader(offset) {
if (this.readedDataRowCount > offset) {
this._closeReader();
}
if (!this.reader) {
const reader = await this._openReader();
this.reader = reader;
}
if (!this.readedSchemaRow) {
await this._readLine(); // skip structure
}
while (this.readedDataRowCount < offset) {
await this._readLine();
}
}
async getRows(offset, limit) {
const res = [];
await lock.acquire('reader', async () => {
await this._ensureReader(offset);
for (let i = 0; i < limit; i += 1) {
const line = await this._readLine();
if (line == null) break;
res.push(JSON.parse(line));
}
});
// console.log('RETURN', res.length);
return res;
}
}
module.exports = JsonLinesDatastore;

View File

@@ -176,7 +176,14 @@ export default function TabsPanel() {
const closeTab = closeTabFunc((x, active) => x.tabid == active.tabid); const closeTab = closeTabFunc((x, active) => x.tabid == active.tabid);
const closeAll = () => { const closeAll = () => {
setOpenedTabs([]); const closedTime = new Date().getTime();
setOpenedTabs((tabs) =>
tabs.map((tab) => ({
...tab,
closedTime: tab.closedTime || closedTime,
selected: false,
}))
);
}; };
const closeWithSameDb = closeTabFunc( const closeWithSameDb = closeTabFunc(
(x, active) => (x, active) =>

View File

@@ -2265,6 +2265,11 @@ async-limiter@~1.0.0:
resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.1.tgz#dd379e94f0db8310b08291f9d64c3209766617fd" resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.1.tgz#dd379e94f0db8310b08291f9d64c3209766617fd"
integrity sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ== integrity sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==
async-lock@^1.2.4:
version "1.2.4"
resolved "https://registry.yarnpkg.com/async-lock/-/async-lock-1.2.4.tgz#80d0d612383045dd0c30eb5aad08510c1397cb91"
integrity sha512-UBQJC2pbeyGutIfYmErGc9RaJYnpZ1FHaxuKwb0ahvGiiCkPUf3p67Io+YLPmmv3RHY+mF6JEtNW8FlHsraAaA==
async@0.2.10: async@0.2.10:
version "0.2.10" version "0.2.10"
resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1" resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1"