Add tests to diode
This commit is contained in:
@@ -20,6 +20,14 @@ func (d *diode) set(data message) {
|
|||||||
d.d.Set(diodes.GenericDataType(&data))
|
d.d.Set(diodes.GenericDataType(&data))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *diode) tryNext() (*message, bool) {
|
||||||
|
data, ok := d.d.TryNext()
|
||||||
|
if !ok {
|
||||||
|
return nil, ok
|
||||||
|
}
|
||||||
|
return (*message)(data), true
|
||||||
|
}
|
||||||
|
|
||||||
func (d *diode) next() *message {
|
func (d *diode) next() *message {
|
||||||
data := d.d.Next()
|
data := d.d.Next()
|
||||||
return (*message)(data)
|
return (*message)(data)
|
||||||
|
|||||||
@@ -0,0 +1,51 @@
|
|||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"code.cloudfoundry.org/go-diodes"
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("diode", func() {
|
||||||
|
var diode *diode
|
||||||
|
var ctx context.Context
|
||||||
|
var ctxCancel context.CancelFunc
|
||||||
|
var missed int
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
missed = 0
|
||||||
|
ctx, ctxCancel = context.WithCancel(context.Background())
|
||||||
|
diode = newDiode(ctx, 2, diodes.AlertFunc(func(m int) { missed = m }))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("enqueues the data correctly", func() {
|
||||||
|
diode.set(message{Data: "1"})
|
||||||
|
diode.set(message{Data: "2"})
|
||||||
|
Expect(diode.next()).To(Equal(&message{Data: "1"}))
|
||||||
|
Expect(diode.next()).To(Equal(&message{Data: "2"}))
|
||||||
|
Expect(missed).To(BeZero())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("drops messages when diode is full", func() {
|
||||||
|
diode.set(message{Data: "1"})
|
||||||
|
diode.set(message{Data: "2"})
|
||||||
|
diode.set(message{Data: "3"})
|
||||||
|
next, ok := diode.tryNext()
|
||||||
|
Expect(ok).To(BeTrue())
|
||||||
|
Expect(next).To(Equal(&message{Data: "3"}))
|
||||||
|
|
||||||
|
_, ok = diode.tryNext()
|
||||||
|
Expect(ok).To(BeFalse())
|
||||||
|
|
||||||
|
Expect(missed).To(Equal(2))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns nil when diode is empty and the context is canceled", func() {
|
||||||
|
diode.set(message{Data: "1"})
|
||||||
|
ctxCancel()
|
||||||
|
Expect(diode.next()).To(Equal(&message{Data: "1"}))
|
||||||
|
Expect(diode.next()).To(BeNil())
|
||||||
|
})
|
||||||
|
})
|
||||||
@@ -148,7 +148,7 @@ func (b *broker) subscribe(r *http.Request) client {
|
|||||||
address: r.RemoteAddr,
|
address: r.RemoteAddr,
|
||||||
userAgent: r.UserAgent(),
|
userAgent: r.UserAgent(),
|
||||||
}
|
}
|
||||||
c.diode = newDiode(r.Context(), 1000, diodes.AlertFunc(func(missed int) {
|
c.diode = newDiode(r.Context(), 1024, diodes.AlertFunc(func(missed int) {
|
||||||
log.Trace("Dropped SSE events", "client", c.String(), "missed", missed)
|
log.Trace("Dropped SSE events", "client", c.String(), "missed", missed)
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user