diff --git a/packages/api/package.json b/packages/api/package.json index 899c04b23..e29a4e2e0 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -35,6 +35,7 @@ "express": "^4.17.1", "express-basic-auth": "^1.2.0", "express-fileupload": "^1.2.0", + "express-sse": "^0.5.3", "fs-extra": "^9.1.0", "fs-reverse": "^0.0.3", "get-port": "^5.1.1", diff --git a/packages/api/src/main.js b/packages/api/src/main.js index 29159fca9..07f7504d0 100644 --- a/packages/api/src/main.js +++ b/packages/api/src/main.js @@ -4,7 +4,7 @@ const bodyParser = require('body-parser'); const fileUpload = require('express-fileupload'); const http = require('http'); const cors = require('cors'); -const io = require('socket.io'); +// const io = require('socket.io'); const fs = require('fs'); const getPort = require('get-port'); const childProcessChecker = require('./utility/childProcessChecker'); @@ -43,7 +43,11 @@ function start() { const app = express(); const server = http.createServer(app); - socket.set(io(server)); + + // const sse = new SSE(); + // app.get('/stream', sse.init); + // socket.set(sse); + // socket.set(io(server)); if (process.env.LOGIN && process.env.PASSWORD) { app.use( @@ -81,6 +85,30 @@ function start() { }); app.use(cors()); + + app.get('/stream', async function (req, res) { + res.set({ + 'Cache-Control': 'no-cache', + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + }); + res.flushHeaders(); + + // Tell the client to retry every 10 seconds if connectivity is lost + res.write('retry: 10000\n\n'); + socket.set(res); + + // let count = 0; + + // while (true) { + // await new Promise((resolve) => setTimeout(resolve, 1000)); + + // console.log("Emit", ++count); + // // Emit an SSE that contains the current 'count' as a string + // res.write(`event: ping\ndata: ${JSON.stringify({ count })}\n\n`); + // } + }); + app.use(bodyParser.json({ limit: '50mb' })); app.use( diff --git a/packages/api/src/utility/socket.js b/packages/api/src/utility/socket.js index aefed81fb..b7659d7eb 100644 --- a/packages/api/src/utility/socket.js +++ b/packages/api/src/utility/socket.js @@ -1,19 +1,35 @@ -let socket = null; +let res = null; +let init = ''; module.exports = { set(value) { - socket = value; - }, - get() { - return socket; + res = value; }, + // get() { + // return socket; + // }, emit(message, data) { + if (res) { + if (init) { + res.write(init); + init = ''; + } + res.write(`event: ${message}\ndata: ${JSON.stringify(data == null ? null : data)}\n\n`); + } else { + init += res; + } + // console.log('EMIT:', message, data); - socket.emit(message, data); + // socket.emit(message, data); }, emitChanged(key) { + this.emit('clean-cache', key); + this.emit(key); // console.log('EMIT_CHANGED:', key); - socket.emit('clean-cache', key); - socket.emit(key); + // socket.emit('clean-cache', key); + // socket.emit(key); + + // socket.send(key, 'clean-cache'); + // socket.send(null, key); }, }; diff --git a/packages/web/src/utility/api.ts b/packages/web/src/utility/api.ts index a64c4bec8..b9ddc9d9b 100644 --- a/packages/web/src/utility/api.ts +++ b/packages/web/src/utility/api.ts @@ -1,6 +1,16 @@ import resolveApi, { resolveApiHeaders } from './resolveApi'; import { writable } from 'svelte/store'; -import socket from './socket'; +import { cacheClean } from './cache'; +// import socket from './socket'; + +let eventSource; + +function wantEventSource() { + if (!eventSource) { + eventSource = new EventSource(`${resolveApi()}/stream`); + eventSource.addEventListener('clean-cache', e => cacheClean(JSON.parse(e.data))); + } +} export async function apiCall(route: string, args: {} = undefined) { const resp = await fetch(`${resolveApi()}/${route}`, { @@ -15,16 +25,28 @@ export async function apiCall(route: string, args: {} = undefined) { return resp.json(); } +const apiHandlers = new WeakMap(); + export function apiOn(event: string, handler: Function) { - socket().on(event, handler); + wantEventSource(); + if (!apiHandlers.has(handler)) { + const handlerProxy = e => { + // console.log('RECEIVED', e.type, JSON.parse(e.data)); + handler(JSON.parse(e.data)); + }; + apiHandlers.set(handler, handlerProxy); + } + + eventSource.addEventListener(event, apiHandlers.get(handler)); } export function apiOff(event: string, handler: Function) { - socket().off(event, handler); + wantEventSource(); + if (apiHandlers.has(handler)) { + eventSource.removeEventListener(event, apiHandlers.get(handler)); + } } -import _ from 'lodash'; - export function useApiCall(route, args, defaultValue) { const result = writable(defaultValue); diff --git a/packages/web/src/utility/metadataLoaders.ts b/packages/web/src/utility/metadataLoaders.ts index 332d85a33..3fcf6fa6f 100644 --- a/packages/web/src/utility/metadataLoaders.ts +++ b/packages/web/src/utility/metadataLoaders.ts @@ -2,7 +2,6 @@ import _ from 'lodash'; import { cacheGet, cacheSet, getCachedPromise } from './cache'; import stableStringify from 'json-stable-stringify'; import { cacheClean } from './cache'; -import socket from './socket'; import getAsArray from './getAsArray'; import { DatabaseInfo } from 'dbgate-types'; import { derived } from 'svelte/store'; @@ -194,11 +193,11 @@ function useCore(loader, args) { } } - if (reloadTrigger && !socket) { - console.error('Socket not available, reloadTrigger not planned'); - } + // if (reloadTrigger && !socket) { + // console.error('Socket not available, reloadTrigger not planned'); + // } handleReload(); - if (reloadTrigger && socket) { + if (reloadTrigger) { for (const item of getAsArray(reloadTrigger)) { apiOn(item, handleReload); } diff --git a/packages/web/src/utility/socket.js b/packages/web/src/utility/socket.js deleted file mode 100644 index ea3be8654..000000000 --- a/packages/web/src/utility/socket.js +++ /dev/null @@ -1,23 +0,0 @@ -import io from 'socket.io-client'; -import resolveApi from './resolveApi'; -import { cacheClean } from './cache'; -import { shouldWaitForElectronInitialize } from './getElectron'; - -let socketInstance; - -function recreateSocket() { - if (shouldWaitForElectronInitialize()) return; - - socketInstance = io(resolveApi()); - socketInstance.on('clean-cache', reloadTrigger => cacheClean(reloadTrigger)); -} - -window['dbgate_recreateSocket'] = recreateSocket; - -recreateSocket(); - -function socket() { - return socketInstance; -} - -export default socket; diff --git a/yarn.lock b/yarn.lock index 482681cdc..4d094988b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4032,6 +4032,11 @@ express-fileupload@^1.2.0: dependencies: busboy "^0.3.1" +express-sse@^0.5.3: + version "0.5.3" + resolved "https://registry.yarnpkg.com/express-sse/-/express-sse-0.5.3.tgz#6e6cb1a85ef7b6ec1eb658e37e923907c482bd31" + integrity sha512-DJF0nofFGq0IXJLGq95hfrryP3ZprVAVpyZUnmAk6QhHnm7zCzsHBNFP0i4FKFo2XjOf+JiYUKjT7jQhIeljpg== + express@^4.17.1: version "4.17.1" resolved "https://registry.yarnpkg.com/express/-/express-4.17.1.tgz#4491fc38605cf51f8629d39c2b5d026f98a4c134"