diff --git a/packages/api/src/controllers/serverConnections.js b/packages/api/src/controllers/serverConnections.js index fb168e022..e2d167852 100644 --- a/packages/api/src/controllers/serverConnections.js +++ b/packages/api/src/controllers/serverConnections.js @@ -152,7 +152,7 @@ module.exports = { }, ping_meta: true, - async ping({ conidArray }) { + async ping({ conidArray, strmid }) { await Promise.all( _.uniq(conidArray).map(async conid => { const last = this.lastPinged[conid]; @@ -169,6 +169,7 @@ module.exports = { } }) ); + socket.setStreamIdFilter(strmid, { conid: conidArray }); return { status: 'ok' }; }, diff --git a/packages/api/src/main.js b/packages/api/src/main.js index 6b97431ce..1164ada6d 100644 --- a/packages/api/src/main.js +++ b/packages/api/src/main.js @@ -77,6 +77,7 @@ function start() { } app.get(getExpressPath('/stream'), async function (req, res) { + const strmid = req.query.strmid; res.set({ 'Cache-Control': 'no-cache', 'Content-Type': 'text/event-stream', @@ -87,9 +88,9 @@ function start() { // Tell the client to retry every 10 seconds if connectivity is lost res.write('retry: 10000\n\n'); - socket.addSseResponse(res); + socket.addSseResponse(res, strmid); onFinished(req, () => { - socket.removeSseResponse(res); + socket.removeSseResponse(strmid); }); }); diff --git a/packages/api/src/utility/socket.js b/packages/api/src/utility/socket.js index b0f9fc3cb..ae0c9f070 100644 --- a/packages/api/src/utility/socket.js +++ b/packages/api/src/utility/socket.js @@ -1,7 +1,7 @@ const _ = require('lodash'); const stableStringify = require('json-stable-stringify'); -const sseResponses = []; +const sseResponses = {}; let electronSender = null; let pingConfigured = false; @@ -12,12 +12,15 @@ module.exports = { pingConfigured = true; } }, - addSseResponse(value) { - sseResponses.push(value); + addSseResponse(value, strmid) { + sseResponses[strmid] = { + response: value, + filter: {}, + }; this.ensurePing(); }, - removeSseResponse(value) { - _.remove(sseResponses, x => x == value); + removeSseResponse(strmid) { + delete sseResponses[strmid]; }, setElectronSender(value) { electronSender = value; @@ -27,8 +30,23 @@ module.exports = { if (electronSender) { electronSender.send(message, data == null ? null : data); } - for (const res of sseResponses) { - res.write(`event: ${message}\ndata: ${stableStringify(data == null ? null : data)}\n\n`); + for (const strmid in sseResponses) { + let skipThisStream = false; + for (const key in sseResponses[strmid].filter) { + if (data && data[key]) { + if (!sseResponses[strmid].filter[key].includes(data[key])) { + skipThisStream = true; + break; + } + } + } + if (skipThisStream) { + continue; + } + + sseResponses[strmid].response.write( + `event: ${message}\ndata: ${stableStringify(data == null ? null : data)}\n\n` + ); } }, emitChanged(key, params = undefined) { @@ -36,4 +54,7 @@ module.exports = { this.emit('changed-cache', { key, ...params }); // this.emit(key); }, + setStreamIdFilter(strmid, filter) { + sseResponses[strmid].filter = filter; + }, }; diff --git a/packages/web/src/utility/api.ts b/packages/web/src/utility/api.ts index eb47fd55a..9e46e0296 100644 --- a/packages/web/src/utility/api.ts +++ b/packages/web/src/utility/api.ts @@ -8,6 +8,9 @@ import { isOauthCallback, redirectToLogin } from '../clientAuth'; import { showModal } from '../modals/modalTools'; import DatabaseLoginModal, { isDatabaseLoginVisible } from '../modals/DatabaseLoginModal.svelte'; import _ from 'lodash'; +import uuidv1 from 'uuid/v1'; + +export const strmid = uuidv1(); let eventSource; let apiLogging = false; @@ -49,7 +52,7 @@ export function removeVolatileMapping(conid) { function wantEventSource() { if (!eventSource) { - eventSource = new EventSource(`${resolveApi()}/stream`); + eventSource = new EventSource(`${resolveApi()}/stream?strmid=${strmid}`); // eventSource.addEventListener('clean-cache', e => cacheClean(JSON.parse(e.data))); } } diff --git a/packages/web/src/utility/connectionsPinger.js b/packages/web/src/utility/connectionsPinger.js index f22ee3d25..78796efd4 100644 --- a/packages/web/src/utility/connectionsPinger.js +++ b/packages/web/src/utility/connectionsPinger.js @@ -1,6 +1,6 @@ import _ from 'lodash'; import { openedConnections, currentDatabase } from '../stores'; -import { apiCall } from './api'; +import { apiCall, strmid } from './api'; import { getConnectionList } from './metadataLoaders'; // const doServerPing = async value => { @@ -10,7 +10,7 @@ import { getConnectionList } from './metadataLoaders'; // }; const doServerPing = value => { - apiCall('server-connections/ping', { conidArray: value }); + apiCall('server-connections/ping', { conidArray: value, strmid }); }; const doDatabasePing = value => {