refactor: new transcoding engine. third (fourth?) time is a charm!
This commit is contained in:
+1
-1
@@ -105,7 +105,7 @@ func FromMediaFile(mf *model.MediaFile) Entry {
|
||||
e.Created = mf.CreatedAt
|
||||
e.AlbumId = mf.AlbumID
|
||||
e.ArtistId = mf.ArtistID
|
||||
e.Type = "music" // TODO Hardcoded for now
|
||||
e.Type = "music"
|
||||
e.PlayCount = int32(mf.PlayCount)
|
||||
e.Starred = mf.StarredAt
|
||||
e.UserRating = mf.Rating
|
||||
|
||||
+113
-116
@@ -4,10 +4,9 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"mime"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/deluan/navidrome/conf"
|
||||
@@ -16,11 +15,11 @@ import (
|
||||
"github.com/deluan/navidrome/log"
|
||||
"github.com/deluan/navidrome/model"
|
||||
"github.com/deluan/navidrome/utils"
|
||||
"gopkg.in/djherbis/fscache.v0"
|
||||
"github.com/djherbis/fscache"
|
||||
)
|
||||
|
||||
type MediaStreamer interface {
|
||||
NewFileSystem(ctx context.Context, maxBitRate int, format string) (http.FileSystem, error)
|
||||
NewStream(ctx context.Context, id string, maxBitRate int, format string) (*Stream, error)
|
||||
}
|
||||
|
||||
func NewMediaStreamer(ds model.DataStore, ffm ffmpeg.FFmpeg, cache fscache.Cache) MediaStreamer {
|
||||
@@ -33,30 +32,117 @@ type mediaStreamer struct {
|
||||
cache fscache.Cache
|
||||
}
|
||||
|
||||
func (ms *mediaStreamer) NewFileSystem(ctx context.Context, maxBitRate int, format string) (http.FileSystem, error) {
|
||||
return &mediaFileSystem{ctx: ctx, ds: ms.ds, ffm: ms.ffm, cache: ms.cache, maxBitRate: maxBitRate, format: format}, nil
|
||||
func (ms *mediaStreamer) NewStream(ctx context.Context, id string, maxBitRate int, reqFormat string) (*Stream, error) {
|
||||
mf, err := ms.ds.MediaFile(ctx).Get(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
format, bitRate := selectTranscodingOptions(mf, maxBitRate, reqFormat)
|
||||
s := &Stream{ctx: ctx, mf: mf, format: format, bitRate: bitRate}
|
||||
|
||||
if format == "raw" {
|
||||
log.Debug(ctx, "Streaming raw file", "id", mf.ID, "path", mf.Path,
|
||||
"requestBitrate", maxBitRate, "requestFormat", reqFormat,
|
||||
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix)
|
||||
f, err := os.Open(mf.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.Reader = f
|
||||
s.Closer = f
|
||||
s.Seeker = f
|
||||
s.format = mf.Suffix
|
||||
return s, nil
|
||||
}
|
||||
|
||||
key := cacheKey(id, bitRate, format)
|
||||
r, w, err := ms.cache.Get(key)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error creating stream caching buffer", "id", mf.ID, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If this is a brand new transcoding request, not in the cache, start transcoding
|
||||
if w != nil {
|
||||
log.Trace(ctx, "Cache miss. Starting new transcoding session", "id", mf.ID)
|
||||
out, err := ms.ffm.StartTranscoding(ctx, mf.Path, bitRate, format)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error starting transcoder", "id", mf.ID, err)
|
||||
return nil, os.ErrInvalid
|
||||
}
|
||||
go copyAndClose(ctx, w, out)()
|
||||
}
|
||||
|
||||
// If it is in the cache, check if the stream is done being written. If so, return a ReaderSeeker
|
||||
if w == nil {
|
||||
size := getFinalCachedSize(r)
|
||||
if size > 0 {
|
||||
log.Debug(ctx, "Streaming cached file", "id", mf.ID, "path", mf.Path,
|
||||
"requestBitrate", maxBitRate, "requestFormat", reqFormat,
|
||||
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix, "size", size)
|
||||
sr := io.NewSectionReader(r, 0, size)
|
||||
s.Reader = sr
|
||||
s.Closer = r
|
||||
s.Seeker = sr
|
||||
s.format = format
|
||||
return s, nil
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug(ctx, "Streaming transcoded file", "id", mf.ID, "path", mf.Path,
|
||||
"requestBitrate", maxBitRate, "requestFormat", reqFormat,
|
||||
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix)
|
||||
// All other cases, just return a ReadCloser, without Seek capabilities
|
||||
s.Reader = r
|
||||
s.Closer = r
|
||||
s.format = format
|
||||
return s, nil
|
||||
}
|
||||
|
||||
type mediaFileSystem struct {
|
||||
ctx context.Context
|
||||
ds model.DataStore
|
||||
maxBitRate int
|
||||
format string
|
||||
ffm ffmpeg.FFmpeg
|
||||
cache fscache.Cache
|
||||
func copyAndClose(ctx context.Context, w io.WriteCloser, r io.ReadCloser) func() {
|
||||
return func() {
|
||||
_, err := io.Copy(w, r)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error copying data to cache", err)
|
||||
}
|
||||
err = r.Close()
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error closing transcode output", err)
|
||||
}
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error closing cache", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *mediaFileSystem) selectTranscodingOptions(mf *model.MediaFile) (string, int) {
|
||||
type Stream struct {
|
||||
ctx context.Context
|
||||
mf *model.MediaFile
|
||||
bitRate int
|
||||
format string
|
||||
io.Reader
|
||||
io.Closer
|
||||
io.Seeker
|
||||
}
|
||||
|
||||
func (s *Stream) Seekable() bool { return s.Seeker != nil }
|
||||
func (s *Stream) Duration() float32 { return s.mf.Duration }
|
||||
func (s *Stream) ContentType() string { return mime.TypeByExtension("." + s.format) }
|
||||
func (s *Stream) Name() string { return s.mf.Path }
|
||||
func (s *Stream) ModTime() time.Time { return s.mf.UpdatedAt }
|
||||
|
||||
func selectTranscodingOptions(mf *model.MediaFile, maxBitRate int, format string) (string, int) {
|
||||
var bitRate int
|
||||
var format string
|
||||
|
||||
if fs.format == "raw" || !conf.Server.EnableDownsampling {
|
||||
if format == "raw" || !conf.Server.EnableDownsampling {
|
||||
return "raw", bitRate
|
||||
} else {
|
||||
if fs.maxBitRate == 0 {
|
||||
if maxBitRate == 0 {
|
||||
bitRate = mf.BitRate
|
||||
} else {
|
||||
bitRate = utils.MinInt(mf.BitRate, fs.maxBitRate)
|
||||
bitRate = utils.MinInt(mf.BitRate, maxBitRate)
|
||||
}
|
||||
format = "mp3" //mf.Suffix
|
||||
}
|
||||
@@ -70,110 +156,21 @@ func (fs *mediaFileSystem) selectTranscodingOptions(mf *model.MediaFile) (string
|
||||
return format, bitRate
|
||||
}
|
||||
|
||||
func (fs *mediaFileSystem) Open(name string) (http.File, error) {
|
||||
id := strings.Trim(name, "/")
|
||||
mf, err := fs.ds.MediaFile(fs.ctx).Get(id)
|
||||
if err == model.ErrNotFound {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Error opening mediaFile", "id", id, err)
|
||||
return nil, os.ErrInvalid
|
||||
}
|
||||
|
||||
format, bitRate := fs.selectTranscodingOptions(mf)
|
||||
if format == "raw" {
|
||||
log.Debug(fs.ctx, "Streaming raw file", "id", mf.ID, "path", mf.Path,
|
||||
"requestBitrate", bitRate, "requestFormat", format,
|
||||
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix)
|
||||
return os.Open(mf.Path)
|
||||
}
|
||||
|
||||
log.Debug(fs.ctx, "Streaming transcoded file", "id", mf.ID, "path", mf.Path,
|
||||
"requestBitrate", bitRate, "requestFormat", format,
|
||||
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix)
|
||||
|
||||
return fs.transcodeFile(mf, bitRate, format)
|
||||
func cacheKey(id string, bitRate int, format string) string {
|
||||
return fmt.Sprintf("%s.%d.%s", id, bitRate, format)
|
||||
}
|
||||
|
||||
func (fs *mediaFileSystem) transcodeFile(mf *model.MediaFile, bitRate int, format string) (*transcodingFile, error) {
|
||||
key := fmt.Sprintf("%s.%d.%s", mf.ID, bitRate, format)
|
||||
r, w, err := fs.cache.Get(key)
|
||||
if err != nil {
|
||||
log.Error("Error creating stream caching buffer", "id", mf.ID, err)
|
||||
return nil, os.ErrInvalid
|
||||
}
|
||||
|
||||
// If it is a new file (not found in the cached), start a new transcoding session
|
||||
if w != nil {
|
||||
log.Debug("File not found in cache. Starting new transcoding session", "id", mf.ID)
|
||||
out, err := fs.ffm.StartTranscoding(fs.ctx, mf.Path, bitRate, format)
|
||||
if err != nil {
|
||||
log.Error("Error starting transcoder", "id", mf.ID, err)
|
||||
return nil, os.ErrInvalid
|
||||
func getFinalCachedSize(r fscache.ReadAtCloser) int64 {
|
||||
cr, ok := r.(*fscache.CacheReader)
|
||||
if ok {
|
||||
size, final, err := cr.Size()
|
||||
if final && err == nil {
|
||||
return size
|
||||
}
|
||||
go func() {
|
||||
io.Copy(w, out)
|
||||
out.Close()
|
||||
w.Close()
|
||||
}()
|
||||
} else {
|
||||
log.Debug("Reading transcoded file from cache", "id", mf.ID)
|
||||
}
|
||||
|
||||
return newTranscodingFile(fs.ctx, r, mf, bitRate), nil
|
||||
return -1
|
||||
}
|
||||
|
||||
// transcodingFile Implements http.File interface, required for the FileSystem. It needs a Closer, a Reader and
|
||||
// a Seeker for the same stream. Because the fscache package only provides a ReaderAtCloser (without the Seek()
|
||||
// method), we wrap that reader with a SectionReader, which provides a Seek(). But we still need the original
|
||||
// reader, as we need to close the stream when the transfer is complete
|
||||
func newTranscodingFile(ctx context.Context, reader fscache.ReadAtCloser,
|
||||
mf *model.MediaFile, bitRate int) *transcodingFile {
|
||||
|
||||
size := int64(mf.Duration*float32(bitRate*1000)) / 8
|
||||
return &transcodingFile{
|
||||
ctx: ctx,
|
||||
mf: mf,
|
||||
bitRate: bitRate,
|
||||
size: size,
|
||||
closer: reader,
|
||||
ReadSeeker: io.NewSectionReader(reader, 0, size),
|
||||
}
|
||||
}
|
||||
|
||||
type transcodingFile struct {
|
||||
ctx context.Context
|
||||
mf *model.MediaFile
|
||||
bitRate int
|
||||
size int64
|
||||
closer io.Closer
|
||||
io.ReadSeeker
|
||||
}
|
||||
|
||||
func (tf *transcodingFile) Stat() (os.FileInfo, error) {
|
||||
return &streamHandlerFileInfo{f: tf}, nil
|
||||
}
|
||||
|
||||
func (tf *transcodingFile) Close() error {
|
||||
return tf.closer.Close()
|
||||
}
|
||||
|
||||
func (tf *transcodingFile) Readdir(count int) ([]os.FileInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type streamHandlerFileInfo struct {
|
||||
f *transcodingFile
|
||||
}
|
||||
|
||||
func (fi *streamHandlerFileInfo) Name() string { return fi.f.mf.Title }
|
||||
func (fi *streamHandlerFileInfo) ModTime() time.Time { return fi.f.mf.UpdatedAt }
|
||||
func (fi *streamHandlerFileInfo) Size() int64 { return fi.f.size }
|
||||
func (fi *streamHandlerFileInfo) Mode() os.FileMode { return os.FileMode(0777) }
|
||||
func (fi *streamHandlerFileInfo) IsDir() bool { return false }
|
||||
func (fi *streamHandlerFileInfo) Sys() interface{} { return nil }
|
||||
|
||||
func NewTranscodingCache() (fscache.Cache, error) {
|
||||
lru := fscache.NewLRUHaunter(0, conf.Server.MaxTranscodingCacheSize, 10*time.Minute)
|
||||
h := fscache.NewLRUHaunterStrategy(lru)
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
@@ -12,53 +11,82 @@ import (
|
||||
"github.com/deluan/navidrome/log"
|
||||
"github.com/deluan/navidrome/model"
|
||||
"github.com/deluan/navidrome/persistence"
|
||||
"github.com/djherbis/fscache"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"gopkg.in/djherbis/fscache.v0"
|
||||
)
|
||||
|
||||
var _ = Describe("MediaStreamer", func() {
|
||||
|
||||
var streamer MediaStreamer
|
||||
var ds model.DataStore
|
||||
var cache fscache.Cache
|
||||
var tempDir string
|
||||
ffmpeg := &fakeFFmpeg{}
|
||||
ctx := log.NewContext(nil)
|
||||
|
||||
BeforeSuite(func() {
|
||||
tempDir, _ = ioutil.TempDir("", "stream_tests")
|
||||
fs, _ := fscache.NewFs(tempDir, 0755)
|
||||
cache, _ = fscache.NewCache(fs, nil)
|
||||
})
|
||||
|
||||
BeforeEach(func() {
|
||||
conf.Server.EnableDownsampling = true
|
||||
fs := fscache.NewMemFs()
|
||||
cache, _ := fscache.NewCache(fs, nil)
|
||||
ds = &persistence.MockDataStore{}
|
||||
ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "bitRate": 128}]`, 1)
|
||||
streamer = NewMediaStreamer(ds, &fakeFFmpeg{}, cache)
|
||||
ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "bitRate": 128, "duration": 257.0}]`, 1)
|
||||
streamer = NewMediaStreamer(ds, ffmpeg, cache)
|
||||
})
|
||||
|
||||
getFile := func(id string, maxBitRate int, format string) (http.File, error) {
|
||||
fs, _ := streamer.NewFileSystem(ctx, maxBitRate, format)
|
||||
return fs.Open(id)
|
||||
}
|
||||
AfterSuite(func() {
|
||||
os.RemoveAll(tempDir)
|
||||
})
|
||||
|
||||
Context("NewFileSystem", func() {
|
||||
It("returns a File if format is 'raw'", func() {
|
||||
Expect(getFile("123", 0, "raw")).To(BeAssignableToTypeOf(&os.File{}))
|
||||
It("returns a seekable stream if format is 'raw'", func() {
|
||||
s, err := streamer.NewStream(ctx, "123", 0, "raw")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(s.Seekable()).To(BeTrue())
|
||||
})
|
||||
It("returns a File if maxBitRate is 0", func() {
|
||||
Expect(getFile("123", 0, "mp3")).To(BeAssignableToTypeOf(&os.File{}))
|
||||
It("returns a seekable stream if maxBitRate is 0", func() {
|
||||
s, err := streamer.NewStream(ctx, "123", 0, "mp3")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(s.Seekable()).To(BeTrue())
|
||||
})
|
||||
It("returns a File if maxBitRate is higher than file bitRate", func() {
|
||||
Expect(getFile("123", 256, "mp3")).To(BeAssignableToTypeOf(&os.File{}))
|
||||
It("returns a seekable stream if maxBitRate is higher than file bitRate", func() {
|
||||
s, err := streamer.NewStream(ctx, "123", 320, "mp3")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(s.Seekable()).To(BeTrue())
|
||||
})
|
||||
It("returns a transcodingFile if maxBitRate is lower than file bitRate", func() {
|
||||
s, err := getFile("123", 64, "mp3")
|
||||
It("returns a NON seekable stream if transcode is required", func() {
|
||||
s, err := streamer.NewStream(ctx, "123", 64, "mp3")
|
||||
Expect(err).To(BeNil())
|
||||
Expect(s).To(BeAssignableToTypeOf(&transcodingFile{}))
|
||||
Expect(s.(*transcodingFile).bitRate).To(Equal(64))
|
||||
Expect(s.Seekable()).To(BeFalse())
|
||||
Expect(s.Duration()).To(Equal(float32(257.0)))
|
||||
})
|
||||
It("returns a seekable stream if the file is complete in the cache", func() {
|
||||
Eventually(func() bool { return ffmpeg.closed }).Should(BeTrue())
|
||||
s, err := streamer.NewStream(ctx, "123", 64, "mp3")
|
||||
Expect(err).To(BeNil())
|
||||
Expect(s.Seekable()).To(BeTrue())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
type fakeFFmpeg struct {
|
||||
r io.Reader
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (ff *fakeFFmpeg) StartTranscoding(ctx context.Context, path string, maxBitRate int, format string) (f io.ReadCloser, err error) {
|
||||
return ioutil.NopCloser(strings.NewReader("fake data")), nil
|
||||
ff.r = strings.NewReader("fake data")
|
||||
return ff, nil
|
||||
}
|
||||
|
||||
func (ff *fakeFFmpeg) Read(p []byte) (n int, err error) {
|
||||
return ff.r.Read(p)
|
||||
}
|
||||
|
||||
func (ff *fakeFFmpeg) Close() error {
|
||||
ff.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user