+13
-6
@@ -9,16 +9,18 @@ import (
|
||||
)
|
||||
|
||||
type Event interface {
|
||||
Prepare(Event) string
|
||||
Name(Event) string
|
||||
Data(Event) string
|
||||
}
|
||||
|
||||
type baseEvent struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
type baseEvent struct{}
|
||||
|
||||
func (e *baseEvent) Prepare(evt Event) string {
|
||||
func (e *baseEvent) Name(evt Event) string {
|
||||
str := strings.TrimPrefix(reflect.TypeOf(evt).String(), "*events.")
|
||||
e.Name = str[:0] + string(unicode.ToLower(rune(str[0]))) + str[1:]
|
||||
return str[:0] + string(unicode.ToLower(rune(str[0]))) + str[1:]
|
||||
}
|
||||
|
||||
func (e *baseEvent) Data(evt Event) string {
|
||||
data, _ := json.Marshal(evt)
|
||||
return string(data)
|
||||
}
|
||||
@@ -35,6 +37,11 @@ type KeepAlive struct {
|
||||
TS int64 `json:"ts"`
|
||||
}
|
||||
|
||||
type RefreshResource struct {
|
||||
baseEvent
|
||||
Resource string `json:"resource"`
|
||||
}
|
||||
|
||||
type ServerStart struct {
|
||||
baseEvent
|
||||
StartTime time.Time `json:"startTime"`
|
||||
|
||||
@@ -8,8 +8,10 @@ import (
|
||||
var _ = Describe("Event", func() {
|
||||
It("marshals Event to JSON", func() {
|
||||
testEvent := TestEvent{Test: "some data"}
|
||||
json := testEvent.Prepare(&testEvent)
|
||||
Expect(json).To(Equal(`{"name":"testEvent","Test":"some data"}`))
|
||||
data := testEvent.Data(&testEvent)
|
||||
Expect(data).To(Equal(`{"Test":"some data"}`))
|
||||
name := testEvent.Name(&testEvent)
|
||||
Expect(name).To(Equal("testEvent"))
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"code.cloudfoundry.org/go-diodes"
|
||||
@@ -26,12 +27,15 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
eventId uint32
|
||||
errWriteTimeOut = errors.New("write timeout")
|
||||
)
|
||||
|
||||
type (
|
||||
message struct {
|
||||
Data string
|
||||
ID uint32
|
||||
Event string
|
||||
Data string
|
||||
}
|
||||
messageChan chan message
|
||||
clientsChan chan client
|
||||
@@ -81,7 +85,9 @@ func (b *broker) SendMessage(evt Event) {
|
||||
|
||||
func (b *broker) prepareMessage(event Event) message {
|
||||
msg := message{}
|
||||
msg.Data = event.Prepare(event)
|
||||
msg.ID = atomic.AddUint32(&eventId, 1)
|
||||
msg.Data = event.Data(event)
|
||||
msg.Event = event.Name(event)
|
||||
return msg
|
||||
}
|
||||
|
||||
@@ -90,7 +96,7 @@ func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) {
|
||||
flusher, _ := w.(http.Flusher)
|
||||
complete := make(chan struct{}, 1)
|
||||
go func() {
|
||||
_, err = fmt.Fprintf(w, "data: %s\n\n", event.Data)
|
||||
_, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
|
||||
// Flush the data immediately instead of buffering it for later.
|
||||
flusher.Flush()
|
||||
complete <- struct{}{}
|
||||
|
||||
Reference in New Issue
Block a user