Simplify EventStream handling
This commit is contained in:
+20
-61
@@ -1,83 +1,42 @@
|
||||
import { baseUrl } from './utils'
|
||||
import throttle from 'lodash.throttle'
|
||||
import { processEvent, serverDown } from './actions'
|
||||
import { httpClient } from './dataProvider'
|
||||
import { REST_URL } from './consts'
|
||||
|
||||
const defaultIntervalCheck = 20000
|
||||
const reconnectIntervalCheck = 2000
|
||||
let currentIntervalCheck = reconnectIntervalCheck
|
||||
let es = null
|
||||
let dispatch = null
|
||||
let timeout = null
|
||||
|
||||
const getEventStream = async () => {
|
||||
if (!es) {
|
||||
// Call `keepalive` to refresh the jwt token
|
||||
await httpClient(`${REST_URL}/keepalive/keepalive`)
|
||||
let url = baseUrl(`${REST_URL}/events`)
|
||||
if (localStorage.getItem('token')) {
|
||||
url = url + `?jwt=${localStorage.getItem('token')}`
|
||||
}
|
||||
es = new EventSource(url)
|
||||
const newEventStream = async () => {
|
||||
let url = baseUrl(`${REST_URL}/events`)
|
||||
if (localStorage.getItem('token')) {
|
||||
url = url + `?jwt=${localStorage.getItem('token')}`
|
||||
}
|
||||
return es
|
||||
return new EventSource(url)
|
||||
}
|
||||
|
||||
// Reestablish the event stream after 20 secs of inactivity
|
||||
const setTimeout = (value) => {
|
||||
currentIntervalCheck = value
|
||||
if (timeout) {
|
||||
window.clearTimeout(timeout)
|
||||
}
|
||||
timeout = window.setTimeout(async () => {
|
||||
es?.close()
|
||||
es = null
|
||||
await startEventStream()
|
||||
}, currentIntervalCheck)
|
||||
}
|
||||
|
||||
const stopEventStream = () => {
|
||||
es?.close()
|
||||
es = null
|
||||
if (timeout) {
|
||||
window.clearTimeout(timeout)
|
||||
}
|
||||
timeout = null
|
||||
console.log('eventSource closed') // TODO For debug purposes. Remove later
|
||||
}
|
||||
|
||||
const setDispatch = (dispatchFunc) => {
|
||||
dispatch = dispatchFunc
|
||||
}
|
||||
|
||||
const eventHandler = (event) => {
|
||||
const eventHandler = (dispatchFn) => (event) => {
|
||||
const data = JSON.parse(event.data)
|
||||
if (event.type !== 'keepAlive') {
|
||||
dispatch(processEvent(event.type, data))
|
||||
dispatchFn(processEvent(event.type, data))
|
||||
}
|
||||
setTimeout(defaultIntervalCheck) // Reset timeout on every received message
|
||||
}
|
||||
|
||||
const throttledEventHandler = throttle(eventHandler, 100, { trailing: true })
|
||||
const throttledEventHandler = (dispatchFn) =>
|
||||
throttle(eventHandler(dispatchFn), 100, { trailing: true })
|
||||
|
||||
const startEventStream = async () => {
|
||||
setTimeout(currentIntervalCheck)
|
||||
const startEventStream = async (dispatchFn) => {
|
||||
if (!localStorage.getItem('is-authenticated')) {
|
||||
return Promise.resolve()
|
||||
}
|
||||
return getEventStream()
|
||||
return newEventStream()
|
||||
.then((newStream) => {
|
||||
newStream.addEventListener('serverStart', eventHandler)
|
||||
newStream.addEventListener('scanStatus', throttledEventHandler)
|
||||
newStream.addEventListener('refreshResource', eventHandler)
|
||||
newStream.addEventListener('keepAlive', eventHandler)
|
||||
newStream.addEventListener('serverStart', eventHandler(dispatchFn))
|
||||
newStream.addEventListener(
|
||||
'scanStatus',
|
||||
throttledEventHandler(dispatchFn)
|
||||
)
|
||||
newStream.addEventListener('refreshResource', eventHandler(dispatchFn))
|
||||
newStream.addEventListener('keepAlive', eventHandler(dispatchFn))
|
||||
newStream.onerror = (e) => {
|
||||
console.log('EventStream error', e)
|
||||
es?.close()
|
||||
es = null
|
||||
setTimeout(reconnectIntervalCheck)
|
||||
dispatch(serverDown())
|
||||
dispatchFn(serverDown())
|
||||
}
|
||||
return newStream
|
||||
})
|
||||
@@ -86,4 +45,4 @@ const startEventStream = async () => {
|
||||
})
|
||||
}
|
||||
|
||||
export { setDispatch, startEventStream, stopEventStream }
|
||||
export { startEventStream }
|
||||
|
||||
Reference in New Issue
Block a user