diff --git a/server/events/sse.go b/server/events/sse.go index e548a3ec..97462590 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -95,16 +95,22 @@ func (b *broker) prepareMessage(ctx context.Context, event Event) message { var errWriteTimeOut = errors.New("write timeout") -// writeEvent Write to the ResponseWriter, Server Sent Events compatible +// writeEvent Write to the ResponseWriter, Server Sent Events compatible, and sends it +// right away, by flushing the writer (if it is a Flusher). It waits for the message to be flushed +// or times out after the specified timeout func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) { - flusher, _ := w.(http.Flusher) complete := make(chan struct{}, 1) - go func() { - _, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.id, event.event, event.data) - // Flush the data immediately instead of buffering it for later. - flusher.Flush() + flusher, ok := w.(http.Flusher) + if ok { + go func() { + _, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.id, event.event, event.data) + // Flush the data immediately instead of buffering it for later. + flusher.Flush() + complete <- struct{}{} + }() + } else { complete <- struct{}{} - }() + } select { case <-complete: return