Handling a racing condition between Register and NowPlaying, when the queue is empty
This commit is contained in:
@@ -48,6 +48,18 @@ func (m *MockNowPlaying) Dequeue(playerId int) (*NowPlayingInfo, error) {
|
|||||||
return &info, nil
|
return &info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockNowPlaying) Count(playerId int) (int64, error) {
|
||||||
|
return int64(len(m.data)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockNowPlaying) GetAll() ([]*NowPlayingInfo, error) {
|
||||||
|
np, err := m.Head(1)
|
||||||
|
if np == nil || err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return []*NowPlayingInfo{np}, err
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MockNowPlaying) Head(playerId int) (*NowPlayingInfo, error) {
|
func (m *MockNowPlaying) Head(playerId int) (*NowPlayingInfo, error) {
|
||||||
if len(m.data) == 0 {
|
if len(m.data) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@@ -56,6 +68,14 @@ func (m *MockNowPlaying) Head(playerId int) (*NowPlayingInfo, error) {
|
|||||||
return &info, nil
|
return &info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockNowPlaying) Tail(playerId int) (*NowPlayingInfo, error) {
|
||||||
|
if len(m.data) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
info := m.data[len(m.data)-1]
|
||||||
|
return &info, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MockNowPlaying) ClearAll() {
|
func (m *MockNowPlaying) ClearAll() {
|
||||||
m.data = make([]NowPlayingInfo, 0)
|
m.data = make([]NowPlayingInfo, 0)
|
||||||
m.err = false
|
m.err = false
|
||||||
|
|||||||
+11
-5
@@ -12,17 +12,23 @@ type NowPlayingInfo struct {
|
|||||||
PlayerName string
|
PlayerName string
|
||||||
}
|
}
|
||||||
|
|
||||||
// This repo has the semantics of a FIFO queue, for each playerId
|
// This repo must have the semantics of a FIFO queue, for each playerId
|
||||||
type NowPlayingRepository interface {
|
type NowPlayingRepository interface {
|
||||||
// Insert at the head of the queue
|
// Insert at the head of the queue
|
||||||
Enqueue(playerId int, playerName string, trackId, username string) error
|
Enqueue(playerId int, playerName string, trackId, username string) error
|
||||||
|
|
||||||
// Returns the element at the head of the queue (last inserted one)
|
|
||||||
Head(playerId int) (*NowPlayingInfo, error)
|
|
||||||
|
|
||||||
// Removes and returns the element at the end of the queue
|
// Removes and returns the element at the end of the queue
|
||||||
Dequeue(playerId int) (*NowPlayingInfo, error)
|
Dequeue(playerId int) (*NowPlayingInfo, error)
|
||||||
|
|
||||||
// Returns all heads from all playerIds
|
// Returns the element at the head of the queue (last inserted one)
|
||||||
|
Head(playerId int) (*NowPlayingInfo, error)
|
||||||
|
|
||||||
|
// Returns the element at the end of the queue (first inserted one)
|
||||||
|
Tail(playerId int) (*NowPlayingInfo, error)
|
||||||
|
|
||||||
|
// Size of the queue for the playerId
|
||||||
|
Count(playerId int) (int64, error)
|
||||||
|
|
||||||
|
// Returns all tails from all playerIds
|
||||||
GetAll() ([]*NowPlayingInfo, error)
|
GetAll() ([]*NowPlayingInfo, error)
|
||||||
}
|
}
|
||||||
|
|||||||
+13
-3
@@ -25,10 +25,16 @@ type scrobbler struct {
|
|||||||
npRepo NowPlayingRepository
|
npRepo NowPlayingRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *scrobbler) Register(playerId int, trackId string, playDate time.Time) (*domain.MediaFile, error) {
|
func (s *scrobbler) detectSkipped(playerId int, trackId string) {
|
||||||
for {
|
for {
|
||||||
np, err := s.npRepo.Dequeue(playerId)
|
size, _ := s.npRepo.Count(playerId)
|
||||||
if err != nil || np == nil || np.TrackId == trackId {
|
np, err := s.npRepo.Tail(playerId)
|
||||||
|
if err != nil || np == nil || (size == 1 && np.TrackId != trackId) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
s.npRepo.Dequeue(playerId)
|
||||||
|
if np.TrackId == trackId {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
err = s.itunes.MarkAsSkipped(np.TrackId, np.Start.Add(time.Duration(1)*time.Minute))
|
err = s.itunes.MarkAsSkipped(np.TrackId, np.Start.Add(time.Duration(1)*time.Minute))
|
||||||
@@ -38,6 +44,10 @@ func (s *scrobbler) Register(playerId int, trackId string, playDate time.Time) (
|
|||||||
beego.Debug("Skipped track", np.TrackId)
|
beego.Debug("Skipped track", np.TrackId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *scrobbler) Register(playerId int, trackId string, playDate time.Time) (*domain.MediaFile, error) {
|
||||||
|
s.detectSkipped(playerId, trackId)
|
||||||
|
|
||||||
mf, err := s.mfRepo.Get(trackId)
|
mf, err := s.mfRepo.Get(trackId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -80,7 +80,7 @@ func TestScrobbler(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
Convey("Given a DB with two songs", t, func() {
|
Convey("Given a DB with three songs", t, func() {
|
||||||
mfRepo.SetData(`[{"Id":"1","Title":"Femme Fatale"},{"Id":"2","Title":"Here She Comes Now"},{"Id":"3","Title":"Lady Godiva's Operation"}]`, 3)
|
mfRepo.SetData(`[{"Id":"1","Title":"Femme Fatale"},{"Id":"2","Title":"Here She Comes Now"},{"Id":"3","Title":"Lady Godiva's Operation"}]`, 3)
|
||||||
itCtrl.skipped = make(map[string]time.Time)
|
itCtrl.skipped = make(map[string]time.Time)
|
||||||
npRepo.ClearAll()
|
npRepo.ClearAll()
|
||||||
@@ -104,6 +104,15 @@ func TestScrobbler(t *testing.T) {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
Convey("When the NowPlaying for the next song happens before the Scrobble", func() {
|
||||||
|
scrobbler.NowPlaying(1, "DSub", "2", "deluan")
|
||||||
|
scrobbler.Register(1, "1", time.Now())
|
||||||
|
Convey("Then the NowPlaying info is not not changed", func() {
|
||||||
|
np, _ := npRepo.GetAll()
|
||||||
|
So(np, ShouldHaveLength, 1)
|
||||||
|
So(np[0].TrackId, ShouldEqual, "2")
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -41,30 +41,6 @@ func (r *nowPlayingRepository) Enqueue(playerId int, playerName, id, username st
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *nowPlayingRepository) Head(playerId int) (*engine.NowPlayingInfo, error) {
|
|
||||||
keyName := []byte(nowPlayingKeyName(playerId))
|
|
||||||
|
|
||||||
val, err := Db().LIndex(keyName, 0)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
info := &engine.NowPlayingInfo{}
|
|
||||||
err = json.Unmarshal(val, info)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return info, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO Will not work for multiple players
|
|
||||||
func (r *nowPlayingRepository) GetAll() ([]*engine.NowPlayingInfo, error) {
|
|
||||||
np, err := r.Head(1)
|
|
||||||
if np == nil || err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return []*engine.NowPlayingInfo{np}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *nowPlayingRepository) Dequeue(playerId int) (*engine.NowPlayingInfo, error) {
|
func (r *nowPlayingRepository) Dequeue(playerId int) (*engine.NowPlayingInfo, error) {
|
||||||
keyName := []byte(nowPlayingKeyName(playerId))
|
keyName := []byte(nowPlayingKeyName(playerId))
|
||||||
|
|
||||||
@@ -75,8 +51,46 @@ func (r *nowPlayingRepository) Dequeue(playerId int) (*engine.NowPlayingInfo, er
|
|||||||
if val == nil {
|
if val == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
return r.parseInfo(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *nowPlayingRepository) Head(playerId int) (*engine.NowPlayingInfo, error) {
|
||||||
|
keyName := []byte(nowPlayingKeyName(playerId))
|
||||||
|
|
||||||
|
val, err := Db().LIndex(keyName, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return r.parseInfo(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *nowPlayingRepository) Tail(playerId int) (*engine.NowPlayingInfo, error) {
|
||||||
|
keyName := []byte(nowPlayingKeyName(playerId))
|
||||||
|
|
||||||
|
val, err := Db().LIndex(keyName, -1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return r.parseInfo(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *nowPlayingRepository) Count(playerId int) (int64, error) {
|
||||||
|
keyName := []byte(nowPlayingKeyName(playerId))
|
||||||
|
return Db().LLen(keyName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Will not work for multiple players
|
||||||
|
func (r *nowPlayingRepository) GetAll() ([]*engine.NowPlayingInfo, error) {
|
||||||
|
np, err := r.Tail(1)
|
||||||
|
if np == nil || err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return []*engine.NowPlayingInfo{np}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *nowPlayingRepository) parseInfo(val []byte) (*engine.NowPlayingInfo, error) {
|
||||||
info := &engine.NowPlayingInfo{}
|
info := &engine.NowPlayingInfo{}
|
||||||
err = json.Unmarshal(val, info)
|
err := json.Unmarshal(val, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user