Use a RWMutex instead of an AtomicBool, to reduce contention
This commit is contained in:
+30
-18
@@ -5,12 +5,12 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/deluan/navidrome/conf"
|
"github.com/deluan/navidrome/conf"
|
||||||
"github.com/deluan/navidrome/consts"
|
"github.com/deluan/navidrome/consts"
|
||||||
"github.com/deluan/navidrome/log"
|
"github.com/deluan/navidrome/log"
|
||||||
"github.com/deluan/navidrome/utils"
|
|
||||||
"github.com/djherbis/fscache"
|
"github.com/djherbis/fscache"
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
)
|
)
|
||||||
@@ -29,17 +29,21 @@ func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader R
|
|||||||
cacheFolder: cacheFolder,
|
cacheFolder: cacheFolder,
|
||||||
maxItems: maxItems,
|
maxItems: maxItems,
|
||||||
getReader: getReader,
|
getReader: getReader,
|
||||||
disabled: utils.AtomicBool{},
|
mutex: &sync.RWMutex{},
|
||||||
ready: utils.AtomicBool{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
cache, err := newFSCache(fc.name, fc.cacheSize, fc.cacheFolder, fc.maxItems)
|
cache, err := newFSCache(fc.name, fc.cacheSize, fc.cacheFolder, fc.maxItems)
|
||||||
|
fc.mutex.Lock()
|
||||||
|
defer fc.mutex.Unlock()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fc.cache = cache
|
fc.cache = cache
|
||||||
fc.disabled.Set(cache == nil)
|
fc.disabled = cache == nil
|
||||||
|
}
|
||||||
|
fc.ready = true
|
||||||
|
if fc.disabled {
|
||||||
|
log.Debug("Cache disabled", "cache", fc.name, "size", fc.cacheSize)
|
||||||
}
|
}
|
||||||
fc.ready.Set(true)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return fc
|
return fc
|
||||||
@@ -52,18 +56,30 @@ type fileCache struct {
|
|||||||
maxItems int
|
maxItems int
|
||||||
cache fscache.Cache
|
cache fscache.Cache
|
||||||
getReader ReadFunc
|
getReader ReadFunc
|
||||||
disabled utils.AtomicBool
|
disabled bool
|
||||||
ready utils.AtomicBool
|
ready bool
|
||||||
|
mutex *sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fc *fileCache) Ready() bool {
|
||||||
|
fc.mutex.RLock()
|
||||||
|
defer fc.mutex.RUnlock()
|
||||||
|
return fc.ready
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fc *fileCache) available(ctx context.Context) bool {
|
||||||
|
fc.mutex.RLock()
|
||||||
|
defer fc.mutex.RUnlock()
|
||||||
|
|
||||||
|
if !fc.ready {
|
||||||
|
log.Debug(ctx, "Cache not initialized yet", "cache", fc.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fc.ready && !fc.disabled
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fc *fileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) {
|
func (fc *fileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) {
|
||||||
if !fc.Ready() {
|
if !fc.available(ctx) {
|
||||||
log.Debug(ctx, "Cache not initialized yet", "cache", fc.name)
|
|
||||||
}
|
|
||||||
if fc.disabled.Get() {
|
|
||||||
log.Debug(ctx, "Cache disabled", "cache", fc.name)
|
|
||||||
}
|
|
||||||
if fc.disabled.Get() || !fc.Ready() {
|
|
||||||
reader, err := fc.getReader(ctx, arg)
|
reader, err := fc.getReader(ctx, arg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -108,10 +124,6 @@ func (fc *fileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream,
|
|||||||
return &CachedStream{Reader: r, Cached: cached}, nil
|
return &CachedStream{Reader: r, Cached: cached}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fc *fileCache) Ready() bool {
|
|
||||||
return fc.ready.Get()
|
|
||||||
}
|
|
||||||
|
|
||||||
type CachedStream struct {
|
type CachedStream struct {
|
||||||
io.Reader
|
io.Reader
|
||||||
io.Seeker
|
io.Seeker
|
||||||
|
|||||||
@@ -40,13 +40,13 @@ var _ = Describe("File Caches", func() {
|
|||||||
It("creates the cache folder with invalid size", func() {
|
It("creates the cache folder with invalid size", func() {
|
||||||
fc := callNewFileCache("test", "abc", "test", 0, nil)
|
fc := callNewFileCache("test", "abc", "test", 0, nil)
|
||||||
Expect(fc.cache).ToNot(BeNil())
|
Expect(fc.cache).ToNot(BeNil())
|
||||||
Expect(fc.disabled.Get()).To(BeFalse())
|
Expect(fc.disabled).To(BeFalse())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("returns empty if cache size is '0'", func() {
|
It("returns empty if cache size is '0'", func() {
|
||||||
fc := callNewFileCache("test", "0", "test", 0, nil)
|
fc := callNewFileCache("test", "0", "test", 0, nil)
|
||||||
Expect(fc.cache).To(BeNil())
|
Expect(fc.cache).To(BeNil())
|
||||||
Expect(fc.disabled.Get()).To(BeTrue())
|
Expect(fc.disabled).To(BeTrue())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -70,8 +70,7 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat str
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.Reader = f
|
s.ReadCloser = f
|
||||||
s.Closer = f
|
|
||||||
s.Seeker = f
|
s.Seeker = f
|
||||||
s.format = mf.Suffix
|
s.format = mf.Suffix
|
||||||
return s, nil
|
return s, nil
|
||||||
@@ -93,10 +92,9 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat str
|
|||||||
log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", mf.Path,
|
log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", mf.Path,
|
||||||
"requestBitrate", reqBitRate, "requestFormat", reqFormat,
|
"requestBitrate", reqBitRate, "requestFormat", reqFormat,
|
||||||
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
|
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
|
||||||
"selectedBitrate", bitRate, "selectedFormat", format, "cached", cached)
|
"selectedBitrate", bitRate, "selectedFormat", format, "cached", cached, "seekable", s.Seekable())
|
||||||
|
|
||||||
s.Reader = r
|
s.ReadCloser = r
|
||||||
s.Closer = r
|
|
||||||
if r.Seekable() {
|
if r.Seekable() {
|
||||||
s.Seeker = r
|
s.Seeker = r
|
||||||
}
|
}
|
||||||
@@ -109,8 +107,7 @@ type Stream struct {
|
|||||||
mf *model.MediaFile
|
mf *model.MediaFile
|
||||||
bitRate int
|
bitRate int
|
||||||
format string
|
format string
|
||||||
io.Reader
|
io.ReadCloser
|
||||||
io.Closer
|
|
||||||
io.Seeker
|
io.Seeker
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -32,8 +32,10 @@ func (c *StreamController) Stream(w http.ResponseWriter, r *http.Request) (*resp
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure the stream will be closed at the end, to avoid leakage
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := stream.Close(); err != nil {
|
if err := stream.Close(); err != nil && log.CurrentLevel() >= log.LevelDebug {
|
||||||
log.Error("Error closing stream", "id", id, "file", stream.Name(), err)
|
log.Error("Error closing stream", "id", id, "file", stream.Name(), err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
Reference in New Issue
Block a user