Assign event ids in the main loop, to avoid out-of-order events
This commit is contained in:
@@ -95,7 +95,6 @@ func (b *broker) SendMessage(ctx context.Context, evt Event) {
|
|||||||
|
|
||||||
func (b *broker) prepareMessage(ctx context.Context, event Event) message {
|
func (b *broker) prepareMessage(ctx context.Context, event Event) message {
|
||||||
msg := message{}
|
msg := message{}
|
||||||
msg.id = atomic.AddUint32(&eventId, 1)
|
|
||||||
msg.data = event.Data(event)
|
msg.data = event.Data(event)
|
||||||
msg.event = event.Name(event)
|
msg.event = event.Name(event)
|
||||||
msg.senderCtx = ctx
|
msg.senderCtx = ctx
|
||||||
@@ -212,8 +211,9 @@ func (b *broker) listen() {
|
|||||||
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
|
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
|
||||||
|
|
||||||
// Send a serverStart event to new client
|
// Send a serverStart event to new client
|
||||||
c.diode.put(b.prepareMessage(context.Background(),
|
msg := b.prepareMessage(context.Background(),
|
||||||
&ServerStart{StartTime: consts.ServerStart, Version: consts.Version()}))
|
&ServerStart{StartTime: consts.ServerStart, Version: consts.Version()})
|
||||||
|
c.diode.put(msg)
|
||||||
|
|
||||||
case c := <-b.unsubscribing:
|
case c := <-b.unsubscribing:
|
||||||
// A client has detached and we want to
|
// A client has detached and we want to
|
||||||
@@ -221,14 +221,15 @@ func (b *broker) listen() {
|
|||||||
delete(clients, c)
|
delete(clients, c)
|
||||||
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
|
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
|
||||||
|
|
||||||
case event := <-b.publish:
|
case msg := <-b.publish:
|
||||||
log.Trace("Got new published event", "event", event)
|
msg.id = atomic.AddUint32(&eventId, 1)
|
||||||
|
log.Trace("Got new published event", "event", msg)
|
||||||
// We got a new event from the outside!
|
// We got a new event from the outside!
|
||||||
// Send event to all connected clients
|
// Send event to all connected clients
|
||||||
for c := range clients {
|
for c := range clients {
|
||||||
if b.shouldSend(event, c) {
|
if b.shouldSend(msg, c) {
|
||||||
log.Trace("Putting event on client's queue", "client", c.String(), "event", event)
|
log.Trace("Putting event on client's queue", "client", c.String(), "event", msg)
|
||||||
c.diode.put(event)
|
c.diode.put(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user