Don't ever stop the listen go routine
This commit is contained in:
+13
-11
@@ -53,9 +53,6 @@ type broker struct {
|
|||||||
|
|
||||||
// Closed client connections
|
// Closed client connections
|
||||||
unsubscribing clientsChan
|
unsubscribing clientsChan
|
||||||
|
|
||||||
// Client connections registry
|
|
||||||
clients map[client]struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBroker() Broker {
|
func NewBroker() Broker {
|
||||||
@@ -64,7 +61,6 @@ func NewBroker() Broker {
|
|||||||
publish: make(messageChan, 100),
|
publish: make(messageChan, 100),
|
||||||
subscribing: make(clientsChan, 1),
|
subscribing: make(clientsChan, 1),
|
||||||
unsubscribing: make(clientsChan, 1),
|
unsubscribing: make(clientsChan, 1),
|
||||||
clients: make(map[client]struct{}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set it running - listening and broadcasting events
|
// Set it running - listening and broadcasting events
|
||||||
@@ -160,13 +156,15 @@ func (b *broker) listen() {
|
|||||||
keepAlive := time.NewTicker(keepAliveFrequency)
|
keepAlive := time.NewTicker(keepAliveFrequency)
|
||||||
defer keepAlive.Stop()
|
defer keepAlive.Stop()
|
||||||
|
|
||||||
|
clients := map[client]struct{}{}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case c := <-b.subscribing:
|
case c := <-b.subscribing:
|
||||||
// A new client has connected.
|
// A new client has connected.
|
||||||
// Register their message channel
|
// Register their message channel
|
||||||
b.clients[c] = struct{}{}
|
clients[c] = struct{}{}
|
||||||
log.Debug("Client added to event broker", "numClients", len(b.clients), "newClient", c.String())
|
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
|
||||||
|
|
||||||
// Send a serverStart event to new client
|
// Send a serverStart event to new client
|
||||||
c.channel <- b.preparePackage(&ServerStart{serverStart})
|
c.channel <- b.preparePackage(&ServerStart{serverStart})
|
||||||
@@ -175,20 +173,24 @@ func (b *broker) listen() {
|
|||||||
// A client has detached and we want to
|
// A client has detached and we want to
|
||||||
// stop sending them messages.
|
// stop sending them messages.
|
||||||
close(c.channel)
|
close(c.channel)
|
||||||
delete(b.clients, c)
|
delete(clients, c)
|
||||||
log.Debug("Removed client from event broker", "numClients", len(b.clients), "client", c.String())
|
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
|
||||||
|
|
||||||
case event := <-b.publish:
|
case event := <-b.publish:
|
||||||
// We got a new event from the outside!
|
// We got a new event from the outside!
|
||||||
// Send event to all connected clients
|
// Send event to all connected clients
|
||||||
for c := range b.clients {
|
for c := range clients {
|
||||||
log.Trace("Putting event on client's queue", "client", c.String(), "event", event)
|
log.Trace("Putting event on client's queue", "client", c.String(), "event", event)
|
||||||
// Use non-blocking send. If cannot send, terminate the client's connection
|
// Use non-blocking send. If cannot send, terminate the client's connection
|
||||||
select {
|
select {
|
||||||
case c.channel <- event:
|
case c.channel <- event:
|
||||||
default:
|
default:
|
||||||
log.Warn("Could not send message to client", "client", c.String(), "event", event)
|
log.Warn("Could not send event to client", "client", c.String(), "event", event)
|
||||||
c.done <- struct{}{}
|
select {
|
||||||
|
case c.done <- struct{}{}:
|
||||||
|
default:
|
||||||
|
log.Warn("Could not ask client to end", "client", c.String())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user