Refactor: Consolidate scrobbling logic in play_tracker
This commit is contained in:
@@ -5,6 +5,8 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/server/events"
|
||||
|
||||
"github.com/navidrome/navidrome/log"
|
||||
|
||||
"github.com/ReneKroon/ttlcache/v2"
|
||||
@@ -23,28 +25,34 @@ type NowPlayingInfo struct {
|
||||
PlayerName string
|
||||
}
|
||||
|
||||
type Broker interface {
|
||||
NowPlaying(ctx context.Context, playerId string, playerName string, trackId string) error
|
||||
GetNowPlaying(ctx context.Context) ([]NowPlayingInfo, error)
|
||||
Submit(ctx context.Context, trackId string, playTime time.Time) error
|
||||
type Submission struct {
|
||||
TrackID string
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
type broker struct {
|
||||
type PlayTracker interface {
|
||||
NowPlaying(ctx context.Context, playerId string, playerName string, trackId string) error
|
||||
GetNowPlaying(ctx context.Context) ([]NowPlayingInfo, error)
|
||||
Submit(ctx context.Context, submissions []Submission) error
|
||||
}
|
||||
|
||||
type playTracker struct {
|
||||
ds model.DataStore
|
||||
broker events.Broker
|
||||
playMap *ttlcache.Cache
|
||||
}
|
||||
|
||||
func GetBroker(ds model.DataStore) Broker {
|
||||
instance := singleton.Get(broker{}, func() interface{} {
|
||||
func GetPlayTracker(ds model.DataStore, broker events.Broker) PlayTracker {
|
||||
instance := singleton.Get(playTracker{}, func() interface{} {
|
||||
m := ttlcache.NewCache()
|
||||
m.SkipTTLExtensionOnHit(true)
|
||||
_ = m.SetTTL(nowPlayingExpire)
|
||||
return &broker{ds: ds, playMap: m}
|
||||
return &playTracker{ds: ds, playMap: m, broker: broker}
|
||||
})
|
||||
return instance.(*broker)
|
||||
return instance.(*playTracker)
|
||||
}
|
||||
|
||||
func (s *broker) NowPlaying(ctx context.Context, playerId string, playerName string, trackId string) error {
|
||||
func (p *playTracker) NowPlaying(ctx context.Context, playerId string, playerName string, trackId string) error {
|
||||
user, _ := request.UserFrom(ctx)
|
||||
info := NowPlayingInfo{
|
||||
TrackID: trackId,
|
||||
@@ -53,13 +61,13 @@ func (s *broker) NowPlaying(ctx context.Context, playerId string, playerName str
|
||||
PlayerId: playerId,
|
||||
PlayerName: playerName,
|
||||
}
|
||||
_ = s.playMap.Set(playerId, info)
|
||||
s.dispatchNowPlaying(ctx, user.ID, trackId)
|
||||
_ = p.playMap.Set(playerId, info)
|
||||
p.dispatchNowPlaying(ctx, user.ID, trackId)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *broker) dispatchNowPlaying(ctx context.Context, userId string, trackId string) {
|
||||
t, err := s.ds.MediaFile(ctx).Get(trackId)
|
||||
func (p *playTracker) dispatchNowPlaying(ctx context.Context, userId string, trackId string) {
|
||||
t, err := p.ds.MediaFile(ctx).Get(trackId)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error retrieving mediaFile", "id", trackId, err)
|
||||
return
|
||||
@@ -67,7 +75,7 @@ func (s *broker) dispatchNowPlaying(ctx context.Context, userId string, trackId
|
||||
// TODO Parallelize
|
||||
for name, constructor := range scrobblers {
|
||||
err := func() error {
|
||||
s := constructor(s.ds)
|
||||
s := constructor(p.ds)
|
||||
if !s.IsAuthorized(ctx, userId) {
|
||||
return nil
|
||||
}
|
||||
@@ -81,10 +89,10 @@ func (s *broker) dispatchNowPlaying(ctx context.Context, userId string, trackId
|
||||
}
|
||||
}
|
||||
|
||||
func (s *broker) GetNowPlaying(ctx context.Context) ([]NowPlayingInfo, error) {
|
||||
func (p *playTracker) GetNowPlaying(ctx context.Context) ([]NowPlayingInfo, error) {
|
||||
var res []NowPlayingInfo
|
||||
for _, playerId := range s.playMap.GetKeys() {
|
||||
value, err := s.playMap.Get(playerId)
|
||||
for _, playerId := range p.playMap.GetKeys() {
|
||||
value, err := p.playMap.Get(playerId)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -97,18 +105,56 @@ func (s *broker) GetNowPlaying(ctx context.Context) ([]NowPlayingInfo, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *broker) Submit(ctx context.Context, trackId string, playTime time.Time) error {
|
||||
u, _ := request.UserFrom(ctx)
|
||||
t, err := s.ds.MediaFile(ctx).Get(trackId)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error retrieving mediaFile", "id", trackId, err)
|
||||
return err
|
||||
func (p *playTracker) Submit(ctx context.Context, submissions []Submission) error {
|
||||
username, _ := request.UsernameFrom(ctx)
|
||||
event := &events.RefreshResource{}
|
||||
success := 0
|
||||
|
||||
for _, s := range submissions {
|
||||
mf, err := p.ds.MediaFile(ctx).Get(s.TrackID)
|
||||
if err != nil {
|
||||
log.Error("Cannot find track for scrobbling", "id", s.TrackID, "user", username, err)
|
||||
continue
|
||||
}
|
||||
err = p.incPlay(ctx, mf, s.Timestamp)
|
||||
if err != nil {
|
||||
log.Error("Error updating play counts", "id", mf.ID, "track", mf.Title, "user", username, err)
|
||||
} else {
|
||||
success++
|
||||
event.With("song", mf.ID).With("album", mf.AlbumID).With("artist", mf.AlbumArtistID)
|
||||
log.Info("Scrobbled", "title", mf.Title, "artist", mf.Artist, "user", username)
|
||||
_ = p.dispatchScrobble(ctx, mf, s.Timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
if success > 0 {
|
||||
p.broker.SendMessage(ctx, event)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *playTracker) incPlay(ctx context.Context, track *model.MediaFile, timestamp time.Time) error {
|
||||
return p.ds.WithTx(func(tx model.DataStore) error {
|
||||
err := p.ds.MediaFile(ctx).IncPlayCount(track.ID, timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.ds.Album(ctx).IncPlayCount(track.AlbumID, timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.ds.Artist(ctx).IncPlayCount(track.ArtistID, timestamp)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (p *playTracker) dispatchScrobble(ctx context.Context, t *model.MediaFile, playTime time.Time) error {
|
||||
u, _ := request.UserFrom(ctx)
|
||||
scrobbles := []Scrobble{{MediaFile: *t, TimeStamp: playTime}}
|
||||
// TODO Parallelize
|
||||
for name, constructor := range scrobblers {
|
||||
err := func() error {
|
||||
s := constructor(s.ds)
|
||||
s := constructor(p.ds)
|
||||
if !s.IsAuthorized(ctx, u.ID) {
|
||||
return nil
|
||||
}
|
||||
@@ -2,8 +2,11 @@ package scrobbler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/server/events"
|
||||
|
||||
"github.com/navidrome/navidrome/model"
|
||||
"github.com/navidrome/navidrome/model/request"
|
||||
"github.com/navidrome/navidrome/tests"
|
||||
@@ -11,17 +14,19 @@ import (
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Broker", func() {
|
||||
var _ = Describe("PlayTracker", func() {
|
||||
var ctx context.Context
|
||||
var ds model.DataStore
|
||||
var broker Broker
|
||||
var broker PlayTracker
|
||||
var track model.MediaFile
|
||||
var album model.Album
|
||||
var artist model.Artist
|
||||
var fake *fakeScrobbler
|
||||
BeforeEach(func() {
|
||||
ctx = context.Background()
|
||||
ctx = request.WithUser(ctx, model.User{ID: "u-1"})
|
||||
ds = &tests.MockDataStore{}
|
||||
broker = GetBroker(ds)
|
||||
broker = GetPlayTracker(ds, events.GetBroker())
|
||||
fake = &fakeScrobbler{Authorized: true}
|
||||
Register("fake", func(ds model.DataStore) Scrobbler {
|
||||
return fake
|
||||
@@ -31,13 +36,19 @@ var _ = Describe("Broker", func() {
|
||||
ID: "123",
|
||||
Title: "Track Title",
|
||||
Album: "Track Album",
|
||||
AlbumID: "al-1",
|
||||
Artist: "Track Artist",
|
||||
ArtistID: "ar-1",
|
||||
AlbumArtist: "Track AlbumArtist",
|
||||
TrackNumber: 1,
|
||||
Duration: 180,
|
||||
MbzTrackID: "mbz-123",
|
||||
}
|
||||
_ = ds.MediaFile(ctx).Put(&track)
|
||||
artist = model.Artist{ID: "ar-1"}
|
||||
_ = ds.Artist(ctx).Put(&artist)
|
||||
album = model.Album{ID: "al-1"}
|
||||
_ = ds.Album(ctx).(*tests.MockAlbumRepo).Put(&album)
|
||||
})
|
||||
|
||||
Describe("NowPlaying", func() {
|
||||
@@ -50,9 +61,11 @@ var _ = Describe("Broker", func() {
|
||||
})
|
||||
It("does not send track to agent if user has not authorized", func() {
|
||||
fake.Authorized = false
|
||||
|
||||
err := broker.NowPlaying(ctx, "player-1", "player-one", "123")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fake.NowPlayingCalled).ToNot(BeTrue())
|
||||
Expect(fake.NowPlayingCalled).To(BeFalse())
|
||||
})
|
||||
})
|
||||
|
||||
@@ -90,7 +103,7 @@ var _ = Describe("Broker", func() {
|
||||
ctx = request.WithUser(ctx, model.User{ID: "u-1", UserName: "user-1"})
|
||||
ts := time.Now()
|
||||
|
||||
err := broker.Submit(ctx, "123", ts)
|
||||
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fake.ScrobbleCalled).To(BeTrue())
|
||||
@@ -98,11 +111,38 @@ var _ = Describe("Broker", func() {
|
||||
Expect(fake.Scrobbles[0].ID).To(Equal("123"))
|
||||
})
|
||||
|
||||
It("increments play counts in the DB", func() {
|
||||
ctx = request.WithUser(ctx, model.User{ID: "u-1", UserName: "user-1"})
|
||||
ts := time.Now()
|
||||
|
||||
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(track.PlayCount).To(Equal(int64(1)))
|
||||
Expect(album.PlayCount).To(Equal(int64(1)))
|
||||
Expect(artist.PlayCount).To(Equal(int64(1)))
|
||||
})
|
||||
|
||||
It("does not send track to agent if user has not authorized", func() {
|
||||
fake.Authorized = false
|
||||
err := broker.Submit(ctx, "123", time.Now())
|
||||
|
||||
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fake.ScrobbleCalled).ToNot(BeTrue())
|
||||
Expect(fake.ScrobbleCalled).To(BeFalse())
|
||||
})
|
||||
|
||||
It("increments play counts even if it cannot scrobble", func() {
|
||||
fake.Error = errors.New("error")
|
||||
|
||||
err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fake.ScrobbleCalled).To(BeFalse())
|
||||
|
||||
Expect(track.PlayCount).To(Equal(int64(1)))
|
||||
Expect(album.PlayCount).To(Equal(int64(1)))
|
||||
Expect(artist.PlayCount).To(Equal(int64(1)))
|
||||
})
|
||||
|
||||
})
|
||||
@@ -18,6 +18,6 @@ var Set = wire.NewSet(
|
||||
NewPlayers,
|
||||
agents.New,
|
||||
transcoder.New,
|
||||
scrobbler.GetBroker,
|
||||
scrobbler.GetPlayTracker,
|
||||
NewShare,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user