Only send events to clients who need it

- User events (star, rating, plays) only sent to same user
- Don't send to the client (browser window) that originated the event
This commit is contained in:
Deluan
2021-06-15 18:35:08 -04:00
parent 5f6f74ff2d
commit b65e76293a
13 changed files with 197 additions and 63 deletions
+51 -25
View File
@@ -2,6 +2,7 @@
package events
import (
"context"
"errors"
"fmt"
"io"
@@ -18,7 +19,7 @@ import (
type Broker interface {
http.Handler
SendMessage(event Event)
SendMessage(ctx context.Context, event Event)
}
const (
@@ -33,23 +34,28 @@ var (
type (
message struct {
ID uint32
Event string
Data string
id uint32
event string
data string
senderCtx context.Context
}
messageChan chan message
clientsChan chan client
client struct {
id string
address string
username string
userAgent string
diode *diode
id string
address string
username string
userAgent string
clientUniqueId string
diode *diode
}
)
func (c client) String() string {
return fmt.Sprintf("%s (%s - %s - %s)", c.id, c.username, c.address, c.userAgent)
if log.CurrentLevel() >= log.LevelTrace {
return fmt.Sprintf("%s (%s - %s - %s - %s)", c.id, c.username, c.address, c.clientUniqueId, c.userAgent)
}
return fmt.Sprintf("%s (%s - %s - %s)", c.id, c.username, c.address, c.clientUniqueId)
}
type broker struct {
@@ -77,17 +83,18 @@ func NewBroker() Broker {
return broker
}
func (b *broker) SendMessage(evt Event) {
func (b *broker) SendMessage(ctx context.Context, evt Event) {
msg := b.prepareMessage(evt)
msg.senderCtx = ctx
log.Trace("Broker received new event", "event", msg)
b.publish <- msg
}
func (b *broker) prepareMessage(event Event) message {
msg := message{}
msg.ID = atomic.AddUint32(&eventId, 1)
msg.Data = event.Data(event)
msg.Event = event.Name(event)
msg.id = atomic.AddUint32(&eventId, 1)
msg.data = event.Data(event)
msg.event = event.Name(event)
return msg
}
@@ -96,7 +103,7 @@ func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) {
flusher, _ := w.(http.Flusher)
complete := make(chan struct{}, 1)
go func() {
_, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
_, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.id, event.event, event.data)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
complete <- struct{}{}
@@ -149,14 +156,17 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (b *broker) subscribe(r *http.Request) client {
user, _ := request.UserFrom(r.Context())
ctx := r.Context()
user, _ := request.UserFrom(ctx)
clientUniqueId, _ := request.ClientUniqueIdFrom(ctx)
c := client{
id: uuid.NewString(),
username: user.UserName,
address: r.RemoteAddr,
userAgent: r.UserAgent(),
id: uuid.NewString(),
username: user.UserName,
address: r.RemoteAddr,
userAgent: r.UserAgent(),
clientUniqueId: clientUniqueId,
}
c.diode = newDiode(r.Context(), 1024, diodes.AlertFunc(func(missed int) {
c.diode = newDiode(ctx, 1024, diodes.AlertFunc(func(missed int) {
log.Trace("Dropped SSE events", "client", c.String(), "missed", missed)
}))
@@ -169,6 +179,20 @@ func (b *broker) unsubscribe(c client) {
b.unsubscribing <- c
}
func (b *broker) shouldSend(msg message, c client) bool {
clientUniqueId, originatedFromClient := request.ClientUniqueIdFrom(msg.senderCtx)
if !originatedFromClient {
return true
}
if c.clientUniqueId == clientUniqueId {
return false
}
if username, ok := request.UsernameFrom(msg.senderCtx); ok {
return username == c.username
}
return true
}
func (b *broker) listen() {
keepAlive := time.NewTicker(keepAliveFrequency)
defer keepAlive.Stop()
@@ -184,7 +208,7 @@ func (b *broker) listen() {
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
// Send a serverStart event to new client
c.diode.set(b.prepareMessage(&ServerStart{StartTime: consts.ServerStart}))
c.diode.put(b.prepareMessage(&ServerStart{StartTime: consts.ServerStart}))
case c := <-b.unsubscribing:
// A client has detached and we want to
@@ -196,13 +220,15 @@ func (b *broker) listen() {
// We got a new event from the outside!
// Send event to all connected clients
for c := range clients {
log.Trace("Putting event on client's queue", "client", c.String(), "event", event)
c.diode.set(event)
if b.shouldSend(event, c) {
log.Trace("Putting event on client's queue", "client", c.String(), "event", event)
c.diode.put(event)
}
}
case ts := <-keepAlive.C:
// Send a keep alive message every 15 seconds
b.SendMessage(&KeepAlive{TS: ts.Unix()})
b.SendMessage(context.Background(), &KeepAlive{TS: ts.Unix()})
}
}
}