fix(server): memory leak in cache warmer (#4095)
* Prevent cache warmer memory leak when cache disabled * refactor(tests): replace disabledCache with mockFileCache in CacheWarmer tests Signed-off-by: Deluan <deluan@navidrome.org> * test(cache): enhance CacheWarmer tests for initialization, buffer management, and error handling Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
@@ -31,6 +31,12 @@ func NewCacheWarmer(artwork Artwork, cache cache.FileCache) CacheWarmer {
|
|||||||
return &noopCacheWarmer{}
|
return &noopCacheWarmer{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the file cache is disabled, return a NOOP implementation
|
||||||
|
if cache.Disabled(context.Background()) {
|
||||||
|
log.Debug("Image cache disabled. Cache warmer will not run")
|
||||||
|
return &noopCacheWarmer{}
|
||||||
|
}
|
||||||
|
|
||||||
a := &cacheWarmer{
|
a := &cacheWarmer{
|
||||||
artwork: artwork,
|
artwork: artwork,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
@@ -53,6 +59,9 @@ type cacheWarmer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *cacheWarmer) PreCache(artID model.ArtworkID) {
|
func (a *cacheWarmer) PreCache(artID model.ArtworkID) {
|
||||||
|
if a.cache.Disabled(context.Background()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
a.mutex.Lock()
|
a.mutex.Lock()
|
||||||
defer a.mutex.Unlock()
|
defer a.mutex.Unlock()
|
||||||
a.buffer[artID] = struct{}{}
|
a.buffer[artID] = struct{}{}
|
||||||
@@ -74,6 +83,17 @@ func (a *cacheWarmer) run(ctx context.Context) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if a.cache.Disabled(ctx) {
|
||||||
|
a.mutex.Lock()
|
||||||
|
pending := len(a.buffer)
|
||||||
|
a.buffer = make(map[model.ArtworkID]struct{})
|
||||||
|
a.mutex.Unlock()
|
||||||
|
if pending > 0 {
|
||||||
|
log.Trace(ctx, "Cache disabled, discarding precache buffer", "bufferLen", pending)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// If cache not available, keep waiting
|
// If cache not available, keep waiting
|
||||||
if !a.cache.Available(ctx) {
|
if !a.cache.Available(ctx) {
|
||||||
if len(a.buffer) > 0 {
|
if len(a.buffer) > 0 {
|
||||||
|
|||||||
@@ -0,0 +1,216 @@
|
|||||||
|
package artwork
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/navidrome/navidrome/conf"
|
||||||
|
"github.com/navidrome/navidrome/conf/configtest"
|
||||||
|
"github.com/navidrome/navidrome/model"
|
||||||
|
"github.com/navidrome/navidrome/utils/cache"
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("CacheWarmer", func() {
|
||||||
|
var (
|
||||||
|
fc *mockFileCache
|
||||||
|
aw *mockArtwork
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
DeferCleanup(configtest.SetupConfig())
|
||||||
|
fc = &mockFileCache{}
|
||||||
|
aw = &mockArtwork{}
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("initialization", func() {
|
||||||
|
It("returns noop when cache is disabled", func() {
|
||||||
|
fc.SetDisabled(true)
|
||||||
|
cw := NewCacheWarmer(aw, fc)
|
||||||
|
_, ok := cw.(*noopCacheWarmer)
|
||||||
|
Expect(ok).To(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns noop when ImageCacheSize is 0", func() {
|
||||||
|
conf.Server.ImageCacheSize = "0"
|
||||||
|
cw := NewCacheWarmer(aw, fc)
|
||||||
|
_, ok := cw.(*noopCacheWarmer)
|
||||||
|
Expect(ok).To(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns noop when EnableArtworkPrecache is false", func() {
|
||||||
|
conf.Server.EnableArtworkPrecache = false
|
||||||
|
cw := NewCacheWarmer(aw, fc)
|
||||||
|
_, ok := cw.(*noopCacheWarmer)
|
||||||
|
Expect(ok).To(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns real implementation when properly configured", func() {
|
||||||
|
conf.Server.ImageCacheSize = "100MB"
|
||||||
|
conf.Server.EnableArtworkPrecache = true
|
||||||
|
fc.SetDisabled(false)
|
||||||
|
cw := NewCacheWarmer(aw, fc)
|
||||||
|
_, ok := cw.(*cacheWarmer)
|
||||||
|
Expect(ok).To(BeTrue())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("buffer management", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
conf.Server.ImageCacheSize = "100MB"
|
||||||
|
conf.Server.EnableArtworkPrecache = true
|
||||||
|
fc.SetDisabled(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("drops buffered items when cache becomes disabled", func() {
|
||||||
|
cw := NewCacheWarmer(aw, fc).(*cacheWarmer)
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-test"))
|
||||||
|
fc.SetDisabled(true)
|
||||||
|
Eventually(func() int {
|
||||||
|
cw.mutex.Lock()
|
||||||
|
defer cw.mutex.Unlock()
|
||||||
|
return len(cw.buffer)
|
||||||
|
}).Should(Equal(0))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("adds multiple items to buffer", func() {
|
||||||
|
cw := NewCacheWarmer(aw, fc).(*cacheWarmer)
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-1"))
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-2"))
|
||||||
|
cw.mutex.Lock()
|
||||||
|
defer cw.mutex.Unlock()
|
||||||
|
Expect(len(cw.buffer)).To(Equal(2))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("deduplicates items in buffer", func() {
|
||||||
|
cw := NewCacheWarmer(aw, fc).(*cacheWarmer)
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-1"))
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-1"))
|
||||||
|
cw.mutex.Lock()
|
||||||
|
defer cw.mutex.Unlock()
|
||||||
|
Expect(len(cw.buffer)).To(Equal(1))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("error handling", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
conf.Server.ImageCacheSize = "100MB"
|
||||||
|
conf.Server.EnableArtworkPrecache = true
|
||||||
|
fc.SetDisabled(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("continues processing after artwork retrieval error", func() {
|
||||||
|
aw.err = errors.New("artwork error")
|
||||||
|
cw := NewCacheWarmer(aw, fc).(*cacheWarmer)
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-error"))
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-1"))
|
||||||
|
|
||||||
|
Eventually(func() int {
|
||||||
|
cw.mutex.Lock()
|
||||||
|
defer cw.mutex.Unlock()
|
||||||
|
return len(cw.buffer)
|
||||||
|
}).Should(Equal(0))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("continues processing after cache error", func() {
|
||||||
|
fc.err = errors.New("cache error")
|
||||||
|
cw := NewCacheWarmer(aw, fc).(*cacheWarmer)
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-error"))
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-1"))
|
||||||
|
|
||||||
|
Eventually(func() int {
|
||||||
|
cw.mutex.Lock()
|
||||||
|
defer cw.mutex.Unlock()
|
||||||
|
return len(cw.buffer)
|
||||||
|
}).Should(Equal(0))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("background processing", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
conf.Server.ImageCacheSize = "100MB"
|
||||||
|
conf.Server.EnableArtworkPrecache = true
|
||||||
|
fc.SetDisabled(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("processes items in batches", func() {
|
||||||
|
cw := NewCacheWarmer(aw, fc).(*cacheWarmer)
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
cw.PreCache(model.MustParseArtworkID(fmt.Sprintf("al-%d", i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
Eventually(func() int {
|
||||||
|
cw.mutex.Lock()
|
||||||
|
defer cw.mutex.Unlock()
|
||||||
|
return len(cw.buffer)
|
||||||
|
}).Should(Equal(0))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("wakes up on new items", func() {
|
||||||
|
cw := NewCacheWarmer(aw, fc).(*cacheWarmer)
|
||||||
|
|
||||||
|
// Add first batch
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-1"))
|
||||||
|
Eventually(func() int {
|
||||||
|
cw.mutex.Lock()
|
||||||
|
defer cw.mutex.Unlock()
|
||||||
|
return len(cw.buffer)
|
||||||
|
}).Should(Equal(0))
|
||||||
|
|
||||||
|
// Add second batch
|
||||||
|
cw.PreCache(model.MustParseArtworkID("al-2"))
|
||||||
|
Eventually(func() int {
|
||||||
|
cw.mutex.Lock()
|
||||||
|
defer cw.mutex.Unlock()
|
||||||
|
return len(cw.buffer)
|
||||||
|
}).Should(Equal(0))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
type mockArtwork struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockArtwork) Get(ctx context.Context, artID model.ArtworkID, size int, square bool) (io.ReadCloser, time.Time, error) {
|
||||||
|
if m.err != nil {
|
||||||
|
return nil, time.Time{}, m.err
|
||||||
|
}
|
||||||
|
return io.NopCloser(strings.NewReader("test")), time.Now(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockArtwork) GetOrPlaceholder(ctx context.Context, id string, size int, square bool) (io.ReadCloser, time.Time, error) {
|
||||||
|
return m.Get(ctx, model.ArtworkID{}, size, square)
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockFileCache struct {
|
||||||
|
disabled atomic.Bool
|
||||||
|
ready atomic.Bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *mockFileCache) Get(ctx context.Context, item cache.Item) (*cache.CachedStream, error) {
|
||||||
|
if f.err != nil {
|
||||||
|
return nil, f.err
|
||||||
|
}
|
||||||
|
return &cache.CachedStream{Reader: io.NopCloser(strings.NewReader("cached"))}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *mockFileCache) Available(ctx context.Context) bool {
|
||||||
|
return f.ready.Load() && !f.disabled.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *mockFileCache) Disabled(ctx context.Context) bool {
|
||||||
|
return f.disabled.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *mockFileCache) SetDisabled(v bool) {
|
||||||
|
f.disabled.Store(v)
|
||||||
|
f.ready.Store(true)
|
||||||
|
}
|
||||||
Vendored
+10
@@ -54,6 +54,9 @@ type FileCache interface {
|
|||||||
|
|
||||||
// Available checks if the cache is available
|
// Available checks if the cache is available
|
||||||
Available(ctx context.Context) bool
|
Available(ctx context.Context) bool
|
||||||
|
|
||||||
|
// Disabled reports if the cache has been permanently disabled
|
||||||
|
Disabled(ctx context.Context) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFileCache creates a new FileCache. This function initializes the cache and starts it in the background.
|
// NewFileCache creates a new FileCache. This function initializes the cache and starts it in the background.
|
||||||
@@ -119,6 +122,13 @@ func (fc *fileCache) Available(_ context.Context) bool {
|
|||||||
return fc.ready.Load() && !fc.disabled
|
return fc.ready.Load() && !fc.disabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fc *fileCache) Disabled(_ context.Context) bool {
|
||||||
|
fc.mutex.RLock()
|
||||||
|
defer fc.mutex.RUnlock()
|
||||||
|
|
||||||
|
return fc.disabled
|
||||||
|
}
|
||||||
|
|
||||||
func (fc *fileCache) invalidate(ctx context.Context, key string) error {
|
func (fc *fileCache) invalidate(ctx context.Context, key string) error {
|
||||||
if !fc.Available(ctx) {
|
if !fc.Available(ctx) {
|
||||||
log.Debug(ctx, "Cache not initialized yet. Cannot invalidate key", "cache", fc.name, "key", key)
|
log.Debug(ctx, "Cache not initialized yet. Cannot invalidate key", "cache", fc.name, "key", key)
|
||||||
|
|||||||
Vendored
+7
@@ -50,6 +50,13 @@ var _ = Describe("File Caches", func() {
|
|||||||
Expect(fc.cache).To(BeNil())
|
Expect(fc.cache).To(BeNil())
|
||||||
Expect(fc.disabled).To(BeTrue())
|
Expect(fc.disabled).To(BeTrue())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("reports when cache is disabled", func() {
|
||||||
|
fc := callNewFileCache("test", "0", "test", 0, nil)
|
||||||
|
Expect(fc.Disabled(context.Background())).To(BeTrue())
|
||||||
|
fc = callNewFileCache("test", "1KB", "test", 0, nil)
|
||||||
|
Expect(fc.Disabled(context.Background())).To(BeFalse())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("FileCache", func() {
|
Describe("FileCache", func() {
|
||||||
|
|||||||
Reference in New Issue
Block a user