0faf744e32
* feat: make NowPlaying dispatch asynchronous with worker pool Implemented asynchronous NowPlaying dispatch using a queue worker pattern similar to cacheWarmer. Instead of dispatching NowPlaying updates synchronously during the HTTP request, they are now queued and processed by background workers at controlled intervals. Key changes: - Added nowPlayingEntry struct to represent queued entries - Added npQueue map (keyed by playerId), npMu mutex, and npSignal channel to playTracker - Implemented enqueueNowPlaying() to add entries to the queue - Implemented nowPlayingWorker() that polls every 100ms, drains queue, and processes entries - Changed NowPlaying() to queue dispatch instead of calling synchronously - Renamed dispatchNowPlaying() to dispatchNowPlayingAsync() and updated it to use background context Benefits: - HTTP handlers return immediately without waiting for scrobbler responses - Deduplication by key: rapid calls (seeking) only dispatch latest state - Fire-and-forget: one-shot attempts with logged failures - Backpressure-free: worker processes at its own pace - Tests updated to use Eventually() assertions for async dispatch Signed-off-by: Deluan <deluan@navidrome.org> * fix(play_tracker): increase timeout duration for signal handling Signed-off-by: Deluan <deluan@navidrome.org> * refactor(play_tracker): simplify queue processing by directly assigning entries Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
90 lines
2.5 KiB
Go
90 lines
2.5 KiB
Go
package scrobbler
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/navidrome/navidrome/model"
|
|
"github.com/navidrome/navidrome/tests"
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
)
|
|
|
|
var _ = Describe("BufferedScrobbler", func() {
|
|
var ds model.DataStore
|
|
var scr *fakeScrobbler
|
|
var bs *bufferedScrobbler
|
|
var ctx context.Context
|
|
var buffer *tests.MockedScrobbleBufferRepo
|
|
|
|
BeforeEach(func() {
|
|
ctx = context.Background()
|
|
buffer = tests.CreateMockedScrobbleBufferRepo()
|
|
ds = &tests.MockDataStore{
|
|
MockedScrobbleBuffer: buffer,
|
|
}
|
|
scr = &fakeScrobbler{Authorized: true}
|
|
bs = newBufferedScrobbler(ds, scr, "test")
|
|
})
|
|
|
|
It("forwards IsAuthorized calls", func() {
|
|
scr.Authorized = true
|
|
Expect(bs.IsAuthorized(ctx, "user1")).To(BeTrue())
|
|
|
|
scr.Authorized = false
|
|
Expect(bs.IsAuthorized(ctx, "user1")).To(BeFalse())
|
|
})
|
|
|
|
It("forwards NowPlaying calls", func() {
|
|
track := &model.MediaFile{ID: "123", Title: "Test Track"}
|
|
Expect(bs.NowPlaying(ctx, "user1", track, 0)).To(Succeed())
|
|
Expect(scr.GetNowPlayingCalled()).To(BeTrue())
|
|
Expect(scr.GetUserID()).To(Equal("user1"))
|
|
Expect(scr.GetTrack()).To(Equal(track))
|
|
})
|
|
|
|
It("enqueues scrobbles to buffer", func() {
|
|
track := model.MediaFile{ID: "123", Title: "Test Track"}
|
|
now := time.Now()
|
|
scrobble := Scrobble{MediaFile: track, TimeStamp: now}
|
|
Expect(buffer.Length()).To(Equal(int64(0)))
|
|
Expect(scr.ScrobbleCalled.Load()).To(BeFalse())
|
|
|
|
Expect(bs.Scrobble(ctx, "user1", scrobble)).To(Succeed())
|
|
|
|
// Wait for the background goroutine to process the scrobble.
|
|
// We don't check buffer.Length() here because the background goroutine
|
|
// may dequeue the entry before we can observe it.
|
|
Eventually(scr.ScrobbleCalled.Load).Should(BeTrue())
|
|
|
|
lastScrobble := scr.LastScrobble.Load()
|
|
Expect(lastScrobble.MediaFile.ID).To(Equal("123"))
|
|
Expect(lastScrobble.TimeStamp).To(BeTemporally("==", now))
|
|
})
|
|
|
|
It("stops the background goroutine when Stop is called", func() {
|
|
// Replace the real run method with one that signals when it exits
|
|
done := make(chan struct{})
|
|
|
|
// Start our instrumented run function that will signal when it exits
|
|
go func() {
|
|
defer close(done)
|
|
bs.run(bs.ctx)
|
|
}()
|
|
|
|
// Wait a bit to ensure the goroutine is running
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// Call the real Stop method
|
|
bs.Stop()
|
|
|
|
// Wait for the goroutine to exit or timeout
|
|
select {
|
|
case <-done:
|
|
// Success, goroutine exited
|
|
case <-time.After(100 * time.Millisecond):
|
|
Fail("Goroutine did not exit in time after Stop was called")
|
|
}
|
|
})
|
|
})
|