Fix writeEvents race condition.
This required removing the compress middleware from the /events route.
This commit is contained in:
+28
-32
@@ -3,7 +3,6 @@ package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -93,38 +92,35 @@ func (b *broker) prepareMessage(ctx context.Context, event Event) message {
|
||||
return msg
|
||||
}
|
||||
|
||||
var errWriteTimeOut = errors.New("write timeout")
|
||||
|
||||
// writeEvent writes a message to the given io.Writer, formatted as a Server-Sent Event.
|
||||
// If the writer is an http.Flusher, it flushes the data immediately instead of buffering it.
|
||||
// The function waits for the message to be written or times out after the specified timeout.
|
||||
func writeEvent(w io.Writer, event message, timeout time.Duration) error {
|
||||
// Create a context with a timeout based on the event's sender context.
|
||||
ctx, cancel := context.WithTimeout(event.senderCtx, timeout)
|
||||
defer cancel()
|
||||
func writeEvent(ctx context.Context, w io.Writer, event message, timeout time.Duration) error {
|
||||
if err := setWriteTimeout(w, timeout); err != nil {
|
||||
log.Debug(ctx, "Error setting write timeout", err)
|
||||
}
|
||||
|
||||
// Create a channel to signal the completion of writing.
|
||||
errC := make(chan error, 1)
|
||||
|
||||
// Start a goroutine to write the event and optionally flush the writer.
|
||||
go func() {
|
||||
_, err := fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.id, event.event, event.data)
|
||||
|
||||
// If the writer is an http.Flusher, flush the data immediately.
|
||||
if flusher, ok := w.(http.Flusher); ok && flusher != nil {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
// Signal that writing is complete.
|
||||
errC <- err
|
||||
}()
|
||||
|
||||
// Wait for either the write completion or the context to time out.
|
||||
select {
|
||||
case err := <-errC:
|
||||
_, err := fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.id, event.event, event.data)
|
||||
if err != nil {
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return errWriteTimeOut
|
||||
}
|
||||
|
||||
// If the writer is an http.Flusher, flush the data immediately.
|
||||
if flusher, ok := w.(http.Flusher); ok && flusher != nil {
|
||||
flusher.Flush()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func setWriteTimeout(rw io.Writer, timeout time.Duration) error {
|
||||
for {
|
||||
switch t := rw.(type) {
|
||||
case interface{ SetWriteDeadline(time.Time) error }:
|
||||
return t.SetWriteDeadline(time.Now().Add(timeout))
|
||||
case interface{ Unwrap() http.ResponseWriter }:
|
||||
rw = t.Unwrap()
|
||||
default:
|
||||
return fmt.Errorf("%T - %w", rw, http.ErrNotSupported)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,9 +156,9 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
log.Trace(ctx, "Sending event to client", "event", *event, "client", c.String())
|
||||
if err := writeEvent(w, *event, writeTimeOut); errors.Is(err, errWriteTimeOut) {
|
||||
log.Debug(ctx, "Timeout sending event to client", "event", *event, "client", c.String())
|
||||
return
|
||||
err := writeEvent(ctx, w, *event, writeTimeOut)
|
||||
if err != nil {
|
||||
log.Debug(ctx, "Error sending event to client", "event", *event, "client", c.String(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user