Move utilitarian/generic packages to utils: lastfm, spotify, gravatar, cache, and pool
This commit is contained in:
Vendored
+17
@@ -0,0 +1,17 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/tests"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
tests.Init(t, false)
|
||||
log.SetLevel(log.LevelCritical)
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "Cache Suite")
|
||||
}
|
||||
Vendored
+214
@@ -0,0 +1,214 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/djherbis/fscache"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/consts"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
)
|
||||
|
||||
type Item interface {
|
||||
Key() string
|
||||
}
|
||||
|
||||
type ReadFunc func(ctx context.Context, item Item) (io.Reader, error)
|
||||
|
||||
type FileCache interface {
|
||||
Get(ctx context.Context, item Item) (*CachedStream, error)
|
||||
Ready(ctx context.Context) bool
|
||||
Available(ctx context.Context) bool
|
||||
}
|
||||
|
||||
func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache {
|
||||
fc := &fileCache{
|
||||
name: name,
|
||||
cacheSize: cacheSize,
|
||||
cacheFolder: filepath.FromSlash(cacheFolder),
|
||||
maxItems: maxItems,
|
||||
getReader: getReader,
|
||||
mutex: &sync.RWMutex{},
|
||||
}
|
||||
|
||||
go func() {
|
||||
start := time.Now()
|
||||
cache, err := newFSCache(fc.name, fc.cacheSize, fc.cacheFolder, fc.maxItems)
|
||||
fc.mutex.Lock()
|
||||
defer fc.mutex.Unlock()
|
||||
if err == nil {
|
||||
fc.cache = cache
|
||||
fc.disabled = cache == nil
|
||||
}
|
||||
log.Info("Finished initializing cache", "cache", fc.name, "maxSize", fc.cacheSize, "elapsedTime", time.Since(start))
|
||||
fc.ready = true
|
||||
if fc.disabled {
|
||||
log.Debug("Cache DISABLED", "cache", fc.name, "size", fc.cacheSize)
|
||||
}
|
||||
}()
|
||||
|
||||
return fc
|
||||
}
|
||||
|
||||
type fileCache struct {
|
||||
name string
|
||||
cacheSize string
|
||||
cacheFolder string
|
||||
maxItems int
|
||||
cache fscache.Cache
|
||||
getReader ReadFunc
|
||||
disabled bool
|
||||
ready bool
|
||||
mutex *sync.RWMutex
|
||||
}
|
||||
|
||||
func (fc *fileCache) Ready(ctx context.Context) bool {
|
||||
fc.mutex.RLock()
|
||||
defer fc.mutex.RUnlock()
|
||||
return fc.ready
|
||||
}
|
||||
|
||||
func (fc *fileCache) Available(ctx context.Context) bool {
|
||||
fc.mutex.RLock()
|
||||
defer fc.mutex.RUnlock()
|
||||
|
||||
if !fc.ready {
|
||||
log.Debug(ctx, "Cache not initialized yet", "cache", fc.name)
|
||||
}
|
||||
|
||||
return fc.ready && !fc.disabled
|
||||
}
|
||||
|
||||
func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) {
|
||||
if !fc.Available(ctx) {
|
||||
reader, err := fc.getReader(ctx, arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &CachedStream{Reader: reader}, nil
|
||||
}
|
||||
|
||||
key := arg.Key()
|
||||
r, w, err := fc.cache.Get(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cached := w == nil
|
||||
|
||||
if !cached {
|
||||
log.Trace(ctx, "Cache MISS", "cache", fc.name, "key", key)
|
||||
reader, err := fc.getReader(ctx, arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go copyAndClose(ctx, w, reader)
|
||||
}
|
||||
|
||||
// If it is in the cache, check if the stream is done being written. If so, return a ReaderSeeker
|
||||
if cached {
|
||||
size := getFinalCachedSize(r)
|
||||
if size >= 0 {
|
||||
log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key, "size", size)
|
||||
sr := io.NewSectionReader(r, 0, size)
|
||||
return &CachedStream{
|
||||
Reader: sr,
|
||||
Seeker: sr,
|
||||
Cached: true,
|
||||
}, nil
|
||||
} else {
|
||||
log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key)
|
||||
}
|
||||
}
|
||||
|
||||
// All other cases, just return a Reader, without Seek capabilities
|
||||
return &CachedStream{Reader: r, Cached: cached}, nil
|
||||
}
|
||||
|
||||
type CachedStream struct {
|
||||
io.Reader
|
||||
io.Seeker
|
||||
Cached bool
|
||||
}
|
||||
|
||||
func (s *CachedStream) Seekable() bool { return s.Seeker != nil }
|
||||
func (s *CachedStream) Close() error {
|
||||
if c, ok := s.Reader.(io.Closer); ok {
|
||||
return c.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getFinalCachedSize(r fscache.ReadAtCloser) int64 {
|
||||
cr, ok := r.(*fscache.CacheReader)
|
||||
if ok {
|
||||
size, final, err := cr.Size()
|
||||
if final && err == nil {
|
||||
return size
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func copyAndClose(ctx context.Context, w io.WriteCloser, r io.Reader) {
|
||||
_, err := io.Copy(w, r)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error copying data to cache", err)
|
||||
}
|
||||
if c, ok := r.(io.Closer); ok {
|
||||
err = c.Close()
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error closing source stream", err)
|
||||
}
|
||||
}
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error closing cache writer", err)
|
||||
}
|
||||
}
|
||||
|
||||
func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) {
|
||||
size, err := humanize.ParseBytes(cacheSize)
|
||||
if err != nil {
|
||||
log.Error("Invalid cache size. Using default size", "cache", name, "size", cacheSize,
|
||||
"defaultSize", humanize.Bytes(consts.DefaultCacheSize))
|
||||
size = consts.DefaultCacheSize
|
||||
}
|
||||
if size == 0 {
|
||||
log.Warn(fmt.Sprintf("%s cache disabled", name))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
lru := fscache.NewLRUHaunter(maxItems, int64(size), consts.DefaultCacheCleanUpInterval)
|
||||
h := fscache.NewLRUHaunterStrategy(lru)
|
||||
cacheFolder = filepath.Join(conf.Server.DataFolder, cacheFolder)
|
||||
|
||||
var fs fscache.FileSystem
|
||||
log.Info(fmt.Sprintf("Creating %s cache", name), "path", cacheFolder, "maxSize", humanize.Bytes(size))
|
||||
if conf.Server.DevOldCacheLayout {
|
||||
fs, err = fscache.NewFs(cacheFolder, 0755)
|
||||
} else {
|
||||
fs, err = NewSpreadFS(cacheFolder, 0755)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("Error initializing %s cache", name), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ck, err := fscache.NewCacheWithHaunter(fs, h)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !conf.Server.DevOldCacheLayout {
|
||||
ck.SetKeyMapper(fs.(*spreadFS).KeyMapper)
|
||||
}
|
||||
|
||||
return ck, nil
|
||||
}
|
||||
Vendored
+99
@@ -0,0 +1,99 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// Call NewFileCache and wait for it to be ready
|
||||
func callNewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache {
|
||||
fc := NewFileCache(name, cacheSize, cacheFolder, maxItems, getReader)
|
||||
Eventually(func() bool { return fc.Ready(context.TODO()) }).Should(BeTrue())
|
||||
return fc
|
||||
}
|
||||
|
||||
var _ = Describe("File Caches", func() {
|
||||
BeforeEach(func() {
|
||||
conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches")
|
||||
})
|
||||
AfterEach(func() {
|
||||
os.RemoveAll(conf.Server.DataFolder)
|
||||
})
|
||||
|
||||
Describe("NewFileCache", func() {
|
||||
It("creates the cache folder", func() {
|
||||
Expect(callNewFileCache("test", "1k", "test", 0, nil)).ToNot(BeNil())
|
||||
|
||||
_, err := os.Stat(filepath.Join(conf.Server.DataFolder, "test"))
|
||||
Expect(os.IsNotExist(err)).To(BeFalse())
|
||||
})
|
||||
|
||||
It("creates the cache folder with invalid size", func() {
|
||||
fc := callNewFileCache("test", "abc", "test", 0, nil)
|
||||
Expect(fc.cache).ToNot(BeNil())
|
||||
Expect(fc.disabled).To(BeFalse())
|
||||
})
|
||||
|
||||
It("returns empty if cache size is '0'", func() {
|
||||
fc := callNewFileCache("test", "0", "test", 0, nil)
|
||||
Expect(fc.cache).To(BeNil())
|
||||
Expect(fc.disabled).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
Describe("FileCache", func() {
|
||||
It("caches data if cache is enabled", func() {
|
||||
called := false
|
||||
fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg Item) (io.Reader, error) {
|
||||
called = true
|
||||
return strings.NewReader(arg.Key()), nil
|
||||
})
|
||||
// First call is a MISS
|
||||
s, err := fc.Get(context.TODO(), &testArg{"test"})
|
||||
Expect(err).To(BeNil())
|
||||
Expect(s.Cached).To(BeFalse())
|
||||
Expect(ioutil.ReadAll(s)).To(Equal([]byte("test")))
|
||||
|
||||
// Second call is a HIT
|
||||
called = false
|
||||
s, err = fc.Get(context.TODO(), &testArg{"test"})
|
||||
Expect(err).To(BeNil())
|
||||
Expect(ioutil.ReadAll(s)).To(Equal([]byte("test")))
|
||||
Expect(s.Cached).To(BeTrue())
|
||||
Expect(called).To(BeFalse())
|
||||
})
|
||||
|
||||
It("does not cache data if cache is disabled", func() {
|
||||
called := false
|
||||
fc := callNewFileCache("test", "0", "test", 0, func(ctx context.Context, arg Item) (io.Reader, error) {
|
||||
called = true
|
||||
return strings.NewReader(arg.Key()), nil
|
||||
})
|
||||
// First call is a MISS
|
||||
s, err := fc.Get(context.TODO(), &testArg{"test"})
|
||||
Expect(err).To(BeNil())
|
||||
Expect(s.Cached).To(BeFalse())
|
||||
Expect(ioutil.ReadAll(s)).To(Equal([]byte("test")))
|
||||
|
||||
// Second call is also a MISS
|
||||
called = false
|
||||
s, err = fc.Get(context.TODO(), &testArg{"test"})
|
||||
Expect(err).To(BeNil())
|
||||
Expect(ioutil.ReadAll(s)).To(Equal([]byte("test")))
|
||||
Expect(s.Cached).To(BeFalse())
|
||||
Expect(called).To(BeTrue())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
type testArg struct{ s string }
|
||||
|
||||
func (t *testArg) Key() string { return t.s }
|
||||
Vendored
+87
@@ -0,0 +1,87 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"crypto/sha1"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/djherbis/fscache"
|
||||
"github.com/karrick/godirwalk"
|
||||
"gopkg.in/djherbis/atime.v1"
|
||||
"gopkg.in/djherbis/stream.v1"
|
||||
)
|
||||
|
||||
type spreadFS struct {
|
||||
root string
|
||||
mode os.FileMode
|
||||
init func() error
|
||||
}
|
||||
|
||||
// NewSpreadFS returns a FileSystem rooted at directory dir. It
|
||||
// Dir is created with perms if it doesn't exist.
|
||||
func NewSpreadFS(dir string, mode os.FileMode) (fscache.FileSystem, error) {
|
||||
fs := &spreadFS{root: dir, mode: mode, init: func() error {
|
||||
return os.MkdirAll(dir, mode)
|
||||
}}
|
||||
return fs, fs.init()
|
||||
}
|
||||
|
||||
func (fs *spreadFS) Reload(f func(key string, name string)) error {
|
||||
return godirwalk.Walk(fs.root, &godirwalk.Options{
|
||||
Callback: func(absoluteFilePath string, de *godirwalk.Dirent) error {
|
||||
path, err := filepath.Rel(fs.root, absoluteFilePath)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip if name is not in the format XX/XX/XXXXXXXXXXXX
|
||||
parts := strings.Split(path, string(os.PathSeparator))
|
||||
if len(parts) != 3 || len(parts[0]) != 2 || len(parts[1]) != 2 {
|
||||
return nil
|
||||
}
|
||||
|
||||
f(absoluteFilePath, absoluteFilePath)
|
||||
return nil
|
||||
},
|
||||
Unsorted: true,
|
||||
})
|
||||
}
|
||||
|
||||
func (fs *spreadFS) Create(name string) (stream.File, error) {
|
||||
path := filepath.Dir(name)
|
||||
err := os.MkdirAll(path, fs.mode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
|
||||
}
|
||||
|
||||
func (fs *spreadFS) Open(name string) (stream.File, error) {
|
||||
return os.Open(name)
|
||||
}
|
||||
|
||||
func (fs *spreadFS) Remove(name string) error {
|
||||
return os.Remove(name)
|
||||
}
|
||||
|
||||
func (fs *spreadFS) Stat(name string) (fscache.FileInfo, error) {
|
||||
stat, err := os.Stat(name)
|
||||
if err != nil {
|
||||
return fscache.FileInfo{}, err
|
||||
}
|
||||
return fscache.FileInfo{FileInfo: stat, Atime: atime.Get(stat)}, nil
|
||||
}
|
||||
|
||||
func (fs *spreadFS) RemoveAll() error {
|
||||
if err := os.RemoveAll(fs.root); err != nil {
|
||||
return err
|
||||
}
|
||||
return fs.init()
|
||||
}
|
||||
|
||||
func (fs *spreadFS) KeyMapper(key string) string {
|
||||
hash := fmt.Sprintf("%x", sha1.Sum([]byte(key)))
|
||||
return filepath.Join(fs.root, hash[0:2], hash[2:4], hash)
|
||||
}
|
||||
Reference in New Issue
Block a user