Enqueue external metadata refreshes
This commit is contained in:
+55
-28
@@ -25,6 +25,9 @@ import (
|
|||||||
const (
|
const (
|
||||||
unavailableArtistID = "-1"
|
unavailableArtistID = "-1"
|
||||||
maxSimilarArtists = 100
|
maxSimilarArtists = 100
|
||||||
|
refreshDelay = 5 * time.Second
|
||||||
|
refreshTimeout = 15 * time.Second
|
||||||
|
refreshQueueLength = 2000
|
||||||
)
|
)
|
||||||
|
|
||||||
type ExternalMetadata interface {
|
type ExternalMetadata interface {
|
||||||
@@ -39,6 +42,8 @@ type ExternalMetadata interface {
|
|||||||
type externalMetadata struct {
|
type externalMetadata struct {
|
||||||
ds model.DataStore
|
ds model.DataStore
|
||||||
ag *agents.Agents
|
ag *agents.Agents
|
||||||
|
artistQueue chan<- *auxArtist
|
||||||
|
albumQueue chan<- *auxAlbum
|
||||||
}
|
}
|
||||||
|
|
||||||
type auxAlbum struct {
|
type auxAlbum struct {
|
||||||
@@ -52,7 +57,10 @@ type auxArtist struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewExternalMetadata(ds model.DataStore, agents *agents.Agents) ExternalMetadata {
|
func NewExternalMetadata(ds model.DataStore, agents *agents.Agents) ExternalMetadata {
|
||||||
return &externalMetadata{ds: ds, ag: agents}
|
e := &externalMetadata{ds: ds, ag: agents}
|
||||||
|
e.artistQueue = startRefreshQueue(context.TODO(), e.populateArtistInfo)
|
||||||
|
e.albumQueue = startRefreshQueue(context.TODO(), e.populateAlbumInfo)
|
||||||
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *externalMetadata) getAlbum(ctx context.Context, id string) (*auxAlbum, error) {
|
func (e *externalMetadata) getAlbum(ctx context.Context, id string) (*auxAlbum, error) {
|
||||||
@@ -84,7 +92,7 @@ func (e *externalMetadata) UpdateAlbumInfo(ctx context.Context, id string) (*mod
|
|||||||
|
|
||||||
if album.ExternalInfoUpdatedAt.IsZero() {
|
if album.ExternalInfoUpdatedAt.IsZero() {
|
||||||
log.Debug(ctx, "AlbumInfo not cached. Retrieving it now", "updatedAt", album.ExternalInfoUpdatedAt, "id", id, "name", album.Name)
|
log.Debug(ctx, "AlbumInfo not cached. Retrieving it now", "updatedAt", album.ExternalInfoUpdatedAt, "id", id, "name", album.Name)
|
||||||
err = e.refreshAlbumInfo(ctx, album)
|
err = e.populateAlbumInfo(ctx, album)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -92,25 +100,21 @@ func (e *externalMetadata) UpdateAlbumInfo(ctx context.Context, id string) (*mod
|
|||||||
|
|
||||||
if time.Since(album.ExternalInfoUpdatedAt) > conf.Server.DevAlbumInfoTimeToLive {
|
if time.Since(album.ExternalInfoUpdatedAt) > conf.Server.DevAlbumInfoTimeToLive {
|
||||||
log.Debug("Found expired cached AlbumInfo, refreshing in the background", "updatedAt", album.ExternalInfoUpdatedAt, "name", album.Name)
|
log.Debug("Found expired cached AlbumInfo, refreshing in the background", "updatedAt", album.ExternalInfoUpdatedAt, "name", album.Name)
|
||||||
go func() {
|
enqueueRefresh(e.albumQueue, album)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
||||||
defer cancel()
|
|
||||||
err := e.refreshAlbumInfo(ctx, album)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Error refreshing AlbumInfo", "id", id, "name", album.Name, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &album.Album, nil
|
return &album.Album, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *externalMetadata) refreshAlbumInfo(ctx context.Context, album *auxAlbum) error {
|
func (e *externalMetadata) populateAlbumInfo(ctx context.Context, album *auxAlbum) error {
|
||||||
|
start := time.Now()
|
||||||
info, err := e.ag.GetAlbumInfo(ctx, album.Name, album.AlbumArtist, album.MbzAlbumID)
|
info, err := e.ag.GetAlbumInfo(ctx, album.Name, album.AlbumArtist, album.MbzAlbumID)
|
||||||
if errors.Is(err, agents.ErrNotFound) {
|
if errors.Is(err, agents.ErrNotFound) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error("Error refreshing AlbumInfo", "id", album.ID, "name", album.Name, "artist", album.AlbumArtist,
|
||||||
|
"elapsed", time.Since(start), err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,10 +143,12 @@ func (e *externalMetadata) refreshAlbumInfo(ctx context.Context, album *auxAlbum
|
|||||||
|
|
||||||
err = e.ds.Album(ctx).Put(&album.Album)
|
err = e.ds.Album(ctx).Put(&album.Album)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, "Error trying to update album external information", "id", album.ID, "name", album.Name, err)
|
log.Error(ctx, "Error trying to update album external information", "id", album.ID, "name", album.Name,
|
||||||
|
"elapsed", time.Since(start), err)
|
||||||
|
} else {
|
||||||
|
log.Trace(ctx, "AlbumInfo collected", "album", album, "elapsed", time.Since(start))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Trace(ctx, "AlbumInfo collected", "album", album)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,19 +213,13 @@ func (e *externalMetadata) refreshArtistInfo(ctx context.Context, id string) (*a
|
|||||||
// If info is expired, trigger a populateArtistInfo in the background
|
// If info is expired, trigger a populateArtistInfo in the background
|
||||||
if time.Since(artist.ExternalInfoUpdatedAt) > conf.Server.DevArtistInfoTimeToLive {
|
if time.Since(artist.ExternalInfoUpdatedAt) > conf.Server.DevArtistInfoTimeToLive {
|
||||||
log.Debug("Found expired cached ArtistInfo, refreshing in the background", "updatedAt", artist.ExternalInfoUpdatedAt, "name", artist.Name)
|
log.Debug("Found expired cached ArtistInfo, refreshing in the background", "updatedAt", artist.ExternalInfoUpdatedAt, "name", artist.Name)
|
||||||
go func() {
|
enqueueRefresh(e.artistQueue, artist)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
||||||
defer cancel()
|
|
||||||
err := e.populateArtistInfo(ctx, artist)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Error refreshing ArtistInfo", "id", id, "name", artist.Name, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
return artist, nil
|
return artist, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *externalMetadata) populateArtistInfo(ctx context.Context, artist *auxArtist) error {
|
func (e *externalMetadata) populateArtistInfo(ctx context.Context, artist *auxArtist) error {
|
||||||
|
start := time.Now()
|
||||||
// Get MBID first, if it is not yet available
|
// Get MBID first, if it is not yet available
|
||||||
if artist.MbzArtistID == "" {
|
if artist.MbzArtistID == "" {
|
||||||
mbid, err := e.ag.GetArtistMBID(ctx, artist.ID, artist.Name)
|
mbid, err := e.ag.GetArtistMBID(ctx, artist.ID, artist.Name)
|
||||||
@@ -237,17 +237,18 @@ func (e *externalMetadata) populateArtistInfo(ctx context.Context, artist *auxAr
|
|||||||
})
|
})
|
||||||
|
|
||||||
if utils.IsCtxDone(ctx) {
|
if utils.IsCtxDone(ctx) {
|
||||||
log.Warn(ctx, "ArtistInfo update canceled", ctx.Err())
|
log.Warn(ctx, "ArtistInfo update canceled", "elapsed", "id", artist.ID, "name", artist.Name, time.Since(start), ctx.Err())
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
artist.ExternalInfoUpdatedAt = time.Now()
|
artist.ExternalInfoUpdatedAt = time.Now()
|
||||||
err := e.ds.Artist(ctx).Put(&artist.Artist)
|
err := e.ds.Artist(ctx).Put(&artist.Artist)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, "Error trying to update artist external information", "id", artist.ID, "name", artist.Name, err)
|
log.Error(ctx, "Error trying to update artist external information", "id", artist.ID, "name", artist.Name,
|
||||||
|
"elapsed", time.Since(start), err)
|
||||||
|
} else {
|
||||||
|
log.Trace(ctx, "ArtistInfo collected", "artist", artist, "elapsed", time.Since(start))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Trace(ctx, "ArtistInfo collected", "artist", artist)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -434,11 +435,11 @@ func (e *externalMetadata) findMatchingTrack(ctx context.Context, mbid string, a
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *externalMetadata) callGetURL(ctx context.Context, agent agents.ArtistURLRetriever, artist *auxArtist) {
|
func (e *externalMetadata) callGetURL(ctx context.Context, agent agents.ArtistURLRetriever, artist *auxArtist) {
|
||||||
url, err := agent.GetArtistURL(ctx, artist.ID, artist.Name, artist.MbzArtistID)
|
artisURL, err := agent.GetArtistURL(ctx, artist.ID, artist.Name, artist.MbzArtistID)
|
||||||
if url == "" || err != nil {
|
if artisURL == "" || err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
artist.ExternalUrl = url
|
artist.ExternalUrl = artisURL
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *externalMetadata) callGetBiography(ctx context.Context, agent agents.ArtistBiographyRetriever, artist *auxArtist) {
|
func (e *externalMetadata) callGetBiography(ctx context.Context, agent agents.ArtistBiographyRetriever, artist *auxArtist) {
|
||||||
@@ -568,3 +569,29 @@ func (e *externalMetadata) loadSimilar(ctx context.Context, artist *auxArtist, c
|
|||||||
artist.SimilarArtists = loaded
|
artist.SimilarArtists = loaded
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func startRefreshQueue[T any](ctx context.Context, processFn func(context.Context, T) error) chan<- T {
|
||||||
|
queue := make(chan T, refreshQueueLength)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
time.Sleep(refreshDelay)
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, refreshTimeout)
|
||||||
|
select {
|
||||||
|
case a := <-queue:
|
||||||
|
_ = processFn(ctx, a)
|
||||||
|
cancel()
|
||||||
|
case <-ctx.Done():
|
||||||
|
cancel()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return queue
|
||||||
|
}
|
||||||
|
|
||||||
|
func enqueueRefresh[T any](queue chan<- T, item T) {
|
||||||
|
select {
|
||||||
|
case queue <- item:
|
||||||
|
default: // It is ok to miss a refresh
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user