Implement Scrobble buffering/retrying

This commit is contained in:
Deluan
2021-06-24 00:01:05 -04:00
parent fb183e58e9
commit 289da56f64
17 changed files with 513 additions and 96 deletions
+33 -26
View File
@@ -160,9 +160,10 @@ func (l *lastfmAgent) callArtistGetTopTracks(ctx context.Context, artistName, mb
func (l *lastfmAgent) NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error {
sk, err := l.sessionKeys.get(ctx, userId)
if err != nil {
return err
if err != nil || sk == "" {
return scrobbler.ErrNotAuthorized
}
err = l.client.UpdateNowPlaying(ctx, sk, ScrobbleInfo{
artist: track.Artist,
track: track.Title,
@@ -173,38 +174,44 @@ func (l *lastfmAgent) NowPlaying(ctx context.Context, userId string, track *mode
albumArtist: track.AlbumArtist,
})
if err != nil {
return err
log.Warn(ctx, "Last.fm client.updateNowPlaying returned error", "track", track.Title, err)
return scrobbler.ErrUnrecoverable
}
return nil
}
func (l *lastfmAgent) Scrobble(ctx context.Context, userId string, scrobbles []scrobbler.Scrobble) error {
func (l *lastfmAgent) Scrobble(ctx context.Context, userId string, s scrobbler.Scrobble) error {
sk, err := l.sessionKeys.get(ctx, userId)
if err != nil {
return err
if err != nil || sk == "" {
return scrobbler.ErrNotAuthorized
}
// TODO Implement batch scrobbling
for _, s := range scrobbles {
if s.Duration <= 30 {
log.Debug(ctx, "Skipping Last.fm scrobble for short song", "track", s.Title, "duration", s.Duration)
continue
}
err = l.client.Scrobble(ctx, sk, ScrobbleInfo{
artist: s.Artist,
track: s.Title,
album: s.Album,
trackNumber: s.TrackNumber,
mbid: s.MbzTrackID,
duration: int(s.Duration),
albumArtist: s.AlbumArtist,
timestamp: s.TimeStamp,
})
if err != nil {
return err
}
if s.Duration <= 30 {
log.Debug(ctx, "Skipping Last.fm scrobble for short song", "track", s.Title, "duration", s.Duration)
return nil
}
return nil
err = l.client.Scrobble(ctx, sk, ScrobbleInfo{
artist: s.Artist,
track: s.Title,
album: s.Album,
trackNumber: s.TrackNumber,
mbid: s.MbzTrackID,
duration: int(s.Duration),
albumArtist: s.AlbumArtist,
timestamp: s.TimeStamp,
})
if err == nil {
return nil
}
lfErr, isLastFMError := err.(*lastFMError)
if !isLastFMError {
log.Warn(ctx, "Last.fm client.scrobble returned error", "track", s.Title, err)
return scrobbler.ErrRetryLater
}
if lfErr.Code == 11 || lfErr.Code == 16 {
return scrobbler.ErrRetryLater
}
return scrobbler.ErrUnrecoverable
}
func (l *lastfmAgent) IsAuthorized(ctx context.Context, userId string) bool {
+52 -4
View File
@@ -264,15 +264,19 @@ var _ = Describe("lastfmAgent", func() {
Expect(sentParams.Get("duration")).To(Equal(strconv.FormatFloat(float64(track.Duration), 'G', -1, 32)))
Expect(sentParams.Get("mbid")).To(Equal(track.MbzTrackID))
})
It("returns ErrNotAuthorized if user is not linked", func() {
err := agent.NowPlaying(ctx, "user-2", track)
Expect(err).To(MatchError(scrobbler.ErrNotAuthorized))
})
})
Describe("Scrobble", func() {
It("calls Last.fm with correct params", func() {
ts := time.Now()
scrobbles := []scrobbler.Scrobble{{MediaFile: *track, TimeStamp: ts}}
httpClient.Res = http.Response{Body: ioutil.NopCloser(bytes.NewBufferString("{}")), StatusCode: 200}
err := agent.Scrobble(ctx, "user-1", scrobbles)
err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: ts})
Expect(err).ToNot(HaveOccurred())
Expect(httpClient.SavedRequest.Method).To(Equal(http.MethodPost))
@@ -291,14 +295,58 @@ var _ = Describe("lastfmAgent", func() {
It("skips songs with less than 31 seconds", func() {
track.Duration = 29
scrobbles := []scrobbler.Scrobble{{MediaFile: *track, TimeStamp: time.Now()}}
httpClient.Res = http.Response{Body: ioutil.NopCloser(bytes.NewBufferString("{}")), StatusCode: 200}
err := agent.Scrobble(ctx, "user-1", scrobbles)
err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).ToNot(HaveOccurred())
Expect(httpClient.SavedRequest).To(BeNil())
})
It("returns ErrNotAuthorized if user is not linked", func() {
err := agent.Scrobble(ctx, "user-2", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrNotAuthorized))
})
It("returns ErrRetryLater on error 11", func() {
httpClient.Res = http.Response{
Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":11,"message":"Service Offline - This service is temporarily offline. Try again later."}`)),
StatusCode: 400,
}
err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrRetryLater))
})
It("returns ErrRetryLater on error 16", func() {
httpClient.Res = http.Response{
Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":16,"message":"There was a temporary error processing your request. Please try again"}`)),
StatusCode: 400,
}
err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrRetryLater))
})
It("returns ErrRetryLater on http errors", func() {
httpClient.Res = http.Response{
Body: ioutil.NopCloser(bytes.NewBufferString(`internal server error`)),
StatusCode: 500,
}
err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrRetryLater))
})
It("returns ErrUnrecoverable on other errors", func() {
httpClient.Res = http.Response{
Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":8,"message":"Operation failed - Something else went wrong"}`)),
StatusCode: 400,
}
err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrUnrecoverable))
})
})
})
+115
View File
@@ -0,0 +1,115 @@
package scrobbler
import (
"context"
"time"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
)
func NewBufferedScrobbler(ds model.DataStore, s Scrobbler, service string) *bufferedScrobbler {
b := &bufferedScrobbler{ds: ds, wrapped: s, service: service}
b.wakeSignal = make(chan struct{}, 1)
go b.run()
return b
}
type bufferedScrobbler struct {
ds model.DataStore
wrapped Scrobbler
service string
wakeSignal chan struct{}
}
func (b *bufferedScrobbler) IsAuthorized(ctx context.Context, userId string) bool {
return b.wrapped.IsAuthorized(ctx, userId)
}
func (b *bufferedScrobbler) NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error {
return b.wrapped.NowPlaying(ctx, userId, track)
}
func (b *bufferedScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error {
err := b.ds.ScrobbleBuffer(ctx).Enqueue(b.service, userId, s.ID, s.TimeStamp)
if err != nil {
return err
}
b.sendWakeSignal()
return nil
}
func (b *bufferedScrobbler) sendWakeSignal() {
// Don't block if the previous signal was not read yet
select {
case b.wakeSignal <- struct{}{}:
default:
}
}
func (b *bufferedScrobbler) run() {
ctx := context.Background()
for {
if !b.processQueue(ctx) {
time.AfterFunc(5*time.Second, func() {
b.sendWakeSignal()
})
}
<-b.wakeSignal
}
}
func (b *bufferedScrobbler) processQueue(ctx context.Context) bool {
buffer := b.ds.ScrobbleBuffer(ctx)
userIds, err := buffer.UserIDs(b.service)
if err != nil {
log.Error(ctx, "Error retrieving userIds from scrobble buffer", "scrobbler", b.service, err)
return false
}
result := true
for _, userId := range userIds {
if !b.processUserQueue(ctx, userId) {
result = false
}
}
return result
}
func (b *bufferedScrobbler) processUserQueue(ctx context.Context, userId string) bool {
buffer := b.ds.ScrobbleBuffer(ctx)
for {
entry, err := buffer.Next(b.service, userId)
if err != nil {
log.Error(ctx, "Error reading from scrobble buffer", "scrobbler", b.service, err)
return false
}
if entry == nil {
return true
}
log.Debug(ctx, "Sending scrobble", "scrobbler", b.service, "track", entry.Title, "artist", entry.Artist)
err = b.wrapped.Scrobble(ctx, entry.UserID, Scrobble{
MediaFile: entry.MediaFile,
TimeStamp: entry.PlayTime,
})
if err != nil {
switch err {
case ErrRetryLater:
log.Warn(ctx, "Could not send scrobble. Will be retried", "userId", entry.UserID,
"track", entry.Title, "artist", entry.Artist, "scrobbler", b.service, err)
return false
default:
log.Error(ctx, "Error sending scrobble to service. Discarding", "scrobbler", b.service,
"userId", entry.UserID, "artist", entry.Artist, "track", entry.Title, err)
}
}
err = buffer.Dequeue(entry)
if err != nil {
log.Error(ctx, "Error removing entry from scrobble buffer", "userId", entry.UserID,
"track", entry.Title, "artist", entry.Artist, "scrobbler", b.service, err)
return false
}
}
}
var _ Scrobbler = (*bufferedScrobbler)(nil)
+8 -1
View File
@@ -2,6 +2,7 @@ package scrobbler
import (
"context"
"errors"
"time"
"github.com/navidrome/navidrome/model"
@@ -12,10 +13,16 @@ type Scrobble struct {
TimeStamp time.Time
}
var (
ErrNotAuthorized = errors.New("not authorized")
ErrRetryLater = errors.New("retry later")
ErrUnrecoverable = errors.New("unrecoverable")
)
type Scrobbler interface {
IsAuthorized(ctx context.Context, userId string) bool
NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error
Scrobble(ctx context.Context, userId string, scrobbles []Scrobble) error
Scrobble(ctx context.Context, userId string, s Scrobble) error
}
type Constructor func(ds model.DataStore) Scrobbler
+29 -28
View File
@@ -39,9 +39,10 @@ type PlayTracker interface {
}
type playTracker struct {
ds model.DataStore
broker events.Broker
playMap *ttlcache.Cache
ds model.DataStore
broker events.Broker
playMap *ttlcache.Cache
scrobblers map[string]Scrobbler
}
func GetPlayTracker(ds model.DataStore, broker events.Broker) PlayTracker {
@@ -49,7 +50,14 @@ func GetPlayTracker(ds model.DataStore, broker events.Broker) PlayTracker {
m := ttlcache.NewCache()
m.SkipTTLExtensionOnHit(true)
_ = m.SetTTL(nowPlayingExpire)
return &playTracker{ds: ds, playMap: m, broker: broker}
p := &playTracker{ds: ds, playMap: m, broker: broker}
p.scrobblers = make(map[string]Scrobbler)
for name, constructor := range constructors {
s := constructor(ds)
s = NewBufferedScrobbler(ds, s, name)
p.scrobblers[name] = s
}
return p
})
return instance.(*playTracker)
}
@@ -78,15 +86,12 @@ func (p *playTracker) dispatchNowPlaying(ctx context.Context, userId string, tra
return
}
// TODO Parallelize
for name, constructor := range scrobblers {
err := func() error {
s := constructor(p.ds)
if !s.IsAuthorized(ctx, userId) {
return nil
}
log.Debug(ctx, "Sending NowPlaying info", "scrobbler", name, "track", t.Title, "artist", t.Artist)
return s.NowPlaying(ctx, userId, t)
}()
for name, s := range p.scrobblers {
if !s.IsAuthorized(ctx, userId) {
continue
}
log.Debug(ctx, "Sending NowPlaying info", "scrobbler", name, "track", t.Title, "artist", t.Artist)
err := s.NowPlaying(ctx, userId, t)
if err != nil {
log.Error(ctx, "Error sending NowPlayingInfo", "scrobbler", name, "track", t.Title, "artist", t.Artist, err)
return
@@ -161,17 +166,13 @@ func (p *playTracker) incPlay(ctx context.Context, track *model.MediaFile, times
func (p *playTracker) dispatchScrobble(ctx context.Context, t *model.MediaFile, playTime time.Time) error {
u, _ := request.UserFrom(ctx)
scrobbles := []Scrobble{{MediaFile: *t, TimeStamp: playTime}}
// TODO Parallelize
for name, constructor := range scrobblers {
err := func() error {
s := constructor(p.ds)
if !s.IsAuthorized(ctx, u.ID) {
return nil
}
log.Debug(ctx, "Sending Scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist)
return s.Scrobble(ctx, u.ID, scrobbles)
}()
scrobble := Scrobble{MediaFile: *t, TimeStamp: playTime}
for name, s := range p.scrobblers {
if !s.IsAuthorized(ctx, u.ID) {
continue
}
log.Debug(ctx, "Buffering scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist)
err := s.Scrobble(ctx, u.ID, scrobble)
if err != nil {
log.Error(ctx, "Error sending Scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist, err)
return err
@@ -180,14 +181,14 @@ func (p *playTracker) dispatchScrobble(ctx context.Context, t *model.MediaFile,
return nil
}
var scrobblers map[string]Constructor
var constructors map[string]Constructor
func Register(name string, init Constructor) {
if !conf.Server.DevEnableScrobble {
return
}
if scrobblers == nil {
scrobblers = make(map[string]Constructor)
if constructors == nil {
constructors = make(map[string]Constructor)
}
scrobblers[name] = init
constructors[name] = init
}
+28 -23
View File
@@ -6,11 +6,9 @@ import (
"time"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/server/events"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/request"
"github.com/navidrome/navidrome/server/events"
"github.com/navidrome/navidrome/tests"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -19,11 +17,11 @@ import (
var _ = Describe("PlayTracker", func() {
var ctx context.Context
var ds model.DataStore
var broker PlayTracker
var tracker PlayTracker
var track model.MediaFile
var album model.Album
var artist model.Artist
var fake *fakeScrobbler
var fake fakeScrobbler
BeforeEach(func() {
conf.Server.DevEnableScrobble = true
@@ -31,11 +29,18 @@ var _ = Describe("PlayTracker", func() {
ctx = request.WithUser(ctx, model.User{ID: "u-1"})
ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: true})
ds = &tests.MockDataStore{}
broker = GetPlayTracker(ds, events.GetBroker())
fake = &fakeScrobbler{Authorized: true}
fake = fakeScrobbler{Authorized: true}
Register("fake", func(ds model.DataStore) Scrobbler {
return fake
return &fake
})
tracker = GetPlayTracker(ds, events.GetBroker())
// Remove buffering to simplify tests
for i, s := range tracker.(*playTracker).scrobblers {
if bs, ok := s.(*bufferedScrobbler); ok {
tracker.(*playTracker).scrobblers[i] = bs.wrapped
}
}
track = model.MediaFile{
ID: "123",
@@ -58,7 +63,7 @@ var _ = Describe("PlayTracker", func() {
Describe("NowPlaying", func() {
It("sends track to agent", func() {
err := broker.NowPlaying(ctx, "player-1", "player-one", "123")
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123")
Expect(err).ToNot(HaveOccurred())
Expect(fake.NowPlayingCalled).To(BeTrue())
Expect(fake.UserID).To(Equal("u-1"))
@@ -67,7 +72,7 @@ var _ = Describe("PlayTracker", func() {
It("does not send track to agent if user has not authorized", func() {
fake.Authorized = false
err := broker.NowPlaying(ctx, "player-1", "player-one", "123")
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123")
Expect(err).ToNot(HaveOccurred())
Expect(fake.NowPlayingCalled).To(BeFalse())
@@ -75,7 +80,7 @@ var _ = Describe("PlayTracker", func() {
It("does not send track to agent if player is not enabled to send scrobbles", func() {
ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false})
err := broker.NowPlaying(ctx, "player-1", "player-one", "123")
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123")
Expect(err).ToNot(HaveOccurred())
Expect(fake.NowPlayingCalled).To(BeFalse())
@@ -91,11 +96,11 @@ var _ = Describe("PlayTracker", func() {
track2.ID = "456"
_ = ds.MediaFile(ctx).Put(&track)
ctx = request.WithUser(ctx, model.User{UserName: "user-1"})
_ = broker.NowPlaying(ctx, "player-1", "player-one", "123")
_ = tracker.NowPlaying(ctx, "player-1", "player-one", "123")
ctx = request.WithUser(ctx, model.User{UserName: "user-2"})
_ = broker.NowPlaying(ctx, "player-2", "player-two", "456")
_ = tracker.NowPlaying(ctx, "player-2", "player-two", "456")
playing, err := broker.GetNowPlaying(ctx)
playing, err := tracker.GetNowPlaying(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(playing).To(HaveLen(2))
@@ -116,19 +121,19 @@ var _ = Describe("PlayTracker", func() {
ctx = request.WithUser(ctx, model.User{ID: "u-1", UserName: "user-1"})
ts := time.Now()
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
Expect(err).ToNot(HaveOccurred())
Expect(fake.ScrobbleCalled).To(BeTrue())
Expect(fake.UserID).To(Equal("u-1"))
Expect(fake.Scrobbles[0].ID).To(Equal("123"))
Expect(fake.LastScrobble.ID).To(Equal("123"))
})
It("increments play counts in the DB", func() {
ctx = request.WithUser(ctx, model.User{ID: "u-1", UserName: "user-1"})
ts := time.Now()
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
Expect(err).ToNot(HaveOccurred())
Expect(track.PlayCount).To(Equal(int64(1)))
@@ -139,7 +144,7 @@ var _ = Describe("PlayTracker", func() {
It("does not send track to agent if user has not authorized", func() {
fake.Authorized = false
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
Expect(err).ToNot(HaveOccurred())
Expect(fake.ScrobbleCalled).To(BeFalse())
@@ -148,7 +153,7 @@ var _ = Describe("PlayTracker", func() {
It("does not send track to agent player is not enabled to send scrobbles", func() {
ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false})
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
Expect(err).ToNot(HaveOccurred())
Expect(fake.ScrobbleCalled).To(BeFalse())
@@ -157,7 +162,7 @@ var _ = Describe("PlayTracker", func() {
It("increments play counts even if it cannot scrobble", func() {
fake.Error = errors.New("error")
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
Expect(err).ToNot(HaveOccurred())
Expect(fake.ScrobbleCalled).To(BeFalse())
@@ -177,7 +182,7 @@ type fakeScrobbler struct {
ScrobbleCalled bool
UserID string
Track *model.MediaFile
Scrobbles []Scrobble
LastScrobble Scrobble
Error error
}
@@ -195,12 +200,12 @@ func (f *fakeScrobbler) NowPlaying(ctx context.Context, userId string, track *mo
return nil
}
func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, scrobbles []Scrobble) error {
func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error {
f.ScrobbleCalled = true
if f.Error != nil {
return f.Error
}
f.UserID = userId
f.Scrobbles = scrobbles
f.LastScrobble = s
return nil
}