mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-18 02:06:01 +00:00
stream filtering by connection id
This commit is contained in:
@@ -152,7 +152,7 @@ module.exports = {
|
||||
},
|
||||
|
||||
ping_meta: true,
|
||||
async ping({ conidArray }) {
|
||||
async ping({ conidArray, strmid }) {
|
||||
await Promise.all(
|
||||
_.uniq(conidArray).map(async conid => {
|
||||
const last = this.lastPinged[conid];
|
||||
@@ -169,6 +169,7 @@ module.exports = {
|
||||
}
|
||||
})
|
||||
);
|
||||
socket.setStreamIdFilter(strmid, { conid: conidArray });
|
||||
return { status: 'ok' };
|
||||
},
|
||||
|
||||
|
||||
@@ -77,6 +77,7 @@ function start() {
|
||||
}
|
||||
|
||||
app.get(getExpressPath('/stream'), async function (req, res) {
|
||||
const strmid = req.query.strmid;
|
||||
res.set({
|
||||
'Cache-Control': 'no-cache',
|
||||
'Content-Type': 'text/event-stream',
|
||||
@@ -87,9 +88,9 @@ function start() {
|
||||
|
||||
// Tell the client to retry every 10 seconds if connectivity is lost
|
||||
res.write('retry: 10000\n\n');
|
||||
socket.addSseResponse(res);
|
||||
socket.addSseResponse(res, strmid);
|
||||
onFinished(req, () => {
|
||||
socket.removeSseResponse(res);
|
||||
socket.removeSseResponse(strmid);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
const _ = require('lodash');
|
||||
const stableStringify = require('json-stable-stringify');
|
||||
|
||||
const sseResponses = [];
|
||||
const sseResponses = {};
|
||||
let electronSender = null;
|
||||
let pingConfigured = false;
|
||||
|
||||
@@ -12,12 +12,15 @@ module.exports = {
|
||||
pingConfigured = true;
|
||||
}
|
||||
},
|
||||
addSseResponse(value) {
|
||||
sseResponses.push(value);
|
||||
addSseResponse(value, strmid) {
|
||||
sseResponses[strmid] = {
|
||||
response: value,
|
||||
filter: {},
|
||||
};
|
||||
this.ensurePing();
|
||||
},
|
||||
removeSseResponse(value) {
|
||||
_.remove(sseResponses, x => x == value);
|
||||
removeSseResponse(strmid) {
|
||||
delete sseResponses[strmid];
|
||||
},
|
||||
setElectronSender(value) {
|
||||
electronSender = value;
|
||||
@@ -27,8 +30,23 @@ module.exports = {
|
||||
if (electronSender) {
|
||||
electronSender.send(message, data == null ? null : data);
|
||||
}
|
||||
for (const res of sseResponses) {
|
||||
res.write(`event: ${message}\ndata: ${stableStringify(data == null ? null : data)}\n\n`);
|
||||
for (const strmid in sseResponses) {
|
||||
let skipThisStream = false;
|
||||
for (const key in sseResponses[strmid].filter) {
|
||||
if (data && data[key]) {
|
||||
if (!sseResponses[strmid].filter[key].includes(data[key])) {
|
||||
skipThisStream = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (skipThisStream) {
|
||||
continue;
|
||||
}
|
||||
|
||||
sseResponses[strmid].response.write(
|
||||
`event: ${message}\ndata: ${stableStringify(data == null ? null : data)}\n\n`
|
||||
);
|
||||
}
|
||||
},
|
||||
emitChanged(key, params = undefined) {
|
||||
@@ -36,4 +54,7 @@ module.exports = {
|
||||
this.emit('changed-cache', { key, ...params });
|
||||
// this.emit(key);
|
||||
},
|
||||
setStreamIdFilter(strmid, filter) {
|
||||
sseResponses[strmid].filter = filter;
|
||||
},
|
||||
};
|
||||
|
||||
@@ -8,6 +8,9 @@ import { isOauthCallback, redirectToLogin } from '../clientAuth';
|
||||
import { showModal } from '../modals/modalTools';
|
||||
import DatabaseLoginModal, { isDatabaseLoginVisible } from '../modals/DatabaseLoginModal.svelte';
|
||||
import _ from 'lodash';
|
||||
import uuidv1 from 'uuid/v1';
|
||||
|
||||
export const strmid = uuidv1();
|
||||
|
||||
let eventSource;
|
||||
let apiLogging = false;
|
||||
@@ -49,7 +52,7 @@ export function removeVolatileMapping(conid) {
|
||||
|
||||
function wantEventSource() {
|
||||
if (!eventSource) {
|
||||
eventSource = new EventSource(`${resolveApi()}/stream`);
|
||||
eventSource = new EventSource(`${resolveApi()}/stream?strmid=${strmid}`);
|
||||
// eventSource.addEventListener('clean-cache', e => cacheClean(JSON.parse(e.data)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import _ from 'lodash';
|
||||
import { openedConnections, currentDatabase } from '../stores';
|
||||
import { apiCall } from './api';
|
||||
import { apiCall, strmid } from './api';
|
||||
import { getConnectionList } from './metadataLoaders';
|
||||
|
||||
// const doServerPing = async value => {
|
||||
@@ -10,7 +10,7 @@ import { getConnectionList } from './metadataLoaders';
|
||||
// };
|
||||
|
||||
const doServerPing = value => {
|
||||
apiCall('server-connections/ping', { conidArray: value });
|
||||
apiCall('server-connections/ping', { conidArray: value, strmid });
|
||||
};
|
||||
|
||||
const doDatabasePing = value => {
|
||||
|
||||
Reference in New Issue
Block a user