Disconnect the client if the output buffer fills up
This commit is contained in:
+11
-6
@@ -36,6 +36,7 @@ type (
|
||||
username string
|
||||
userAgent string
|
||||
channel messageChan
|
||||
done chan struct{}
|
||||
}
|
||||
)
|
||||
|
||||
@@ -124,9 +125,12 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Flush the data immediately instead of buffering it for later.
|
||||
flusher.Flush()
|
||||
case <-ctx.Done():
|
||||
case <-c.done:
|
||||
log.Trace(ctx, "Closing event stream connection", "client", c.String())
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Trace(ctx, "Client closed the connection", "client", c.String())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -176,13 +180,14 @@ func (b *broker) listen() {
|
||||
case event := <-b.publish:
|
||||
// We got a new event from the outside!
|
||||
// Send event to all connected clients
|
||||
for client := range b.clients {
|
||||
log.Trace("Putting event on client's queue", "client", client.String(), "event", event)
|
||||
// Use non-blocking send
|
||||
for c := range b.clients {
|
||||
log.Trace("Putting event on client's queue", "client", c.String(), "event", event)
|
||||
// Use non-blocking send. If cannot send, terminate the client's connection
|
||||
select {
|
||||
case client.channel <- event:
|
||||
case c.channel <- event:
|
||||
default:
|
||||
log.Warn("Could not send message to client", "client", client.String(), "event", event)
|
||||
log.Warn("Could not send message to client", "client", c.String(), "event", event)
|
||||
c.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user