diff --git a/core/ffmpeg/ffmpeg.go b/core/ffmpeg/ffmpeg.go index 2b06250a..33d6733c 100644 --- a/core/ffmpeg/ffmpeg.go +++ b/core/ffmpeg/ffmpeg.go @@ -1,6 +1,7 @@ package ffmpeg import ( + "bytes" "context" "encoding/json" "errors" @@ -258,10 +259,11 @@ func (e *ffmpeg) start(ctx context.Context, args []string, input ...io.Reader) ( type ffCmd struct { *io.PipeReader - out *io.PipeWriter - args []string - cmd *exec.Cmd - input io.Reader // optional stdin source + out *io.PipeWriter + args []string + cmd *exec.Cmd + input io.Reader // optional stdin source + stderr *bytes.Buffer } func (j *ffCmd) start(ctx context.Context) error { @@ -270,10 +272,12 @@ func (j *ffCmd) start(ctx context.Context) error { if j.input != nil { cmd.Stdin = j.input } + j.stderr = &bytes.Buffer{} + stderrWriter := &limitedWriter{buf: j.stderr, limit: 4096} if log.IsGreaterOrEqualTo(log.LevelTrace) { - cmd.Stderr = os.Stderr + cmd.Stderr = io.MultiWriter(os.Stderr, stderrWriter) } else { - cmd.Stderr = io.Discard + cmd.Stderr = stderrWriter } j.cmd = cmd @@ -287,7 +291,11 @@ func (j *ffCmd) wait() { if err := j.cmd.Wait(); err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) { - _ = j.out.CloseWithError(fmt.Errorf("%s exited with non-zero status code: %d", j.args[0], exitErr.ExitCode())) + errMsg := fmt.Sprintf("%s exited with non-zero status code: %d", j.args[0], exitErr.ExitCode()) + if stderrOutput := strings.TrimSpace(j.stderr.String()); stderrOutput != "" { + errMsg += ": " + stderrOutput + } + _ = j.out.CloseWithError(errors.New(errMsg)) } else { _ = j.out.CloseWithError(fmt.Errorf("waiting %s cmd: %w", j.args[0], err)) } @@ -296,6 +304,26 @@ func (j *ffCmd) wait() { _ = j.out.Close() } +// limitedWriter wraps a bytes.Buffer and stops writing once the limit is reached. +// Writes that would exceed the limit are silently discarded to prevent unbounded memory usage. +type limitedWriter struct { + buf *bytes.Buffer + limit int +} + +func (w *limitedWriter) Write(p []byte) (int, error) { + n := len(p) + remaining := w.limit - w.buf.Len() + if remaining <= 0 { + return n, nil // Discard but report success to avoid breaking the writer + } + if len(p) > remaining { + p = p[:remaining] + } + w.buf.Write(p) + return n, nil // Always report full write to avoid ErrShortWrite from io.MultiWriter +} + // formatCodecMap maps target format to ffmpeg codec flag. var formatCodecMap = map[string]string{ "mp3": "libmp3lame", diff --git a/core/ffmpeg/ffmpeg_test.go b/core/ffmpeg/ffmpeg_test.go index 23e41921..04663828 100644 --- a/core/ffmpeg/ffmpeg_test.go +++ b/core/ffmpeg/ffmpeg_test.go @@ -604,6 +604,46 @@ var _ = Describe("ffmpeg", func() { }) }) + Context("stderr capture", func() { + BeforeEach(func() { + if runtime.GOOS == "windows" { + Skip("stderr capture tests use /bin/sh, skipping on Windows") + } + }) + + It("should include stderr in error when process fails", func() { + ff := &ffmpeg{} + ctx := GinkgoT().Context() + + // Directly call start() with a bash command that writes to stderr and fails + args := []string{"/bin/sh", "-c", "echo 'codec not found: libopus' >&2; exit 1"} + stream, err := ff.start(ctx, args) + Expect(err).ToNot(HaveOccurred()) + defer stream.Close() + + buf := make([]byte, 1024) + _, err = stream.Read(buf) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("codec not found: libopus")) + }) + + It("should not include stderr in error when process succeeds", func() { + ff := &ffmpeg{} + ctx := GinkgoT().Context() + + // Command that writes to stderr but exits successfully + args := []string{"/bin/sh", "-c", "echo 'warning: something' >&2; printf 'output'"} + stream, err := ff.start(ctx, args) + Expect(err).ToNot(HaveOccurred()) + defer stream.Close() + + buf := make([]byte, 1024) + n, err := stream.Read(buf) + Expect(err).ToNot(HaveOccurred()) + Expect(string(buf[:n])).To(Equal("output")) + }) + }) + Context("with mock process behavior", func() { var longRunningCmd string BeforeEach(func() { diff --git a/core/stream/media_streamer.go b/core/stream/media_streamer.go index 062a1388..de03b4d2 100644 --- a/core/stream/media_streamer.go +++ b/core/stream/media_streamer.go @@ -5,8 +5,9 @@ import ( "fmt" "io" "mime" + "net/http" "os" - "strings" + "strconv" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/model/request" "github.com/navidrome/navidrome/utils/cache" + "github.com/navidrome/navidrome/utils/req" ) type MediaStreamer interface { @@ -51,6 +53,9 @@ func (j *streamJob) Key() string { return fmt.Sprintf("%s.%s.%d.%d.%d.%d.%s.%d", j.mf.ID, j.mf.UpdatedAt.Format(time.RFC3339Nano), j.bitRate, j.sampleRate, j.bitDepth, j.channels, j.format, j.offset) } +// NewStream creates a Stream for the given MediaFile and Request. It handles both raw streaming (no transcoding) +// and transcoded streaming based on the requested format and bitrate. It also logs detailed information about +// the streaming request and whether the transcoding result was served from cache or not. func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req Request) (*Stream, error) { var format string var bitRate int @@ -133,14 +138,59 @@ func (s *Stream) EstimatedContentLength() int { return int(s.mf.Duration * float32(s.bitRate) / 8 * 1024) } -// NewTestStream creates a Stream for testing purposes. -func NewTestStream(mf *model.MediaFile, format string, bitRate int) *Stream { +// Serve writes the stream to the HTTP response. For seekable streams it uses http.ServeContent +// (supporting range requests). For non-seekable streams it writes directly and logs any errors. +// Returns the number of bytes written and an error only when io.Copy fails with 0 bytes written +// (meaning the HTTP 200 status has not been flushed yet and the caller can still send an error response). +// Empty output (0 bytes, no error) is logged but not treated as an error. +func (s *Stream) Serve(ctx context.Context, w http.ResponseWriter, r *http.Request) (int64, error) { + if s.Seekable() { + http.ServeContent(w, r, s.Name(), s.ModTime(), s) + return -1, nil + } + + w.Header().Set("Accept-Ranges", "none") + w.Header().Set("Content-Type", s.ContentType()) + + if req.Params(r).BoolOr("estimateContentLength", false) { + length := strconv.Itoa(s.EstimatedContentLength()) + log.Trace(ctx, "Estimated content-length", "contentLength", length) + w.Header().Set("Content-Length", length) + } + + if r.Method == http.MethodHead { + go func() { _, _ = io.Copy(io.Discard, s) }() + return 0, nil + } + + id := s.mf.ID + c, err := io.Copy(w, s) + if err != nil { + log.Error(ctx, "Error sending transcoded file", "id", id, err) + if c == 0 { + w.Header().Del("Content-Length") + return 0, fmt.Errorf("sending transcoded file: %w", err) + } + return c, nil + } + if c == 0 { + log.Error(ctx, "Transcoding returned empty output, ffmpeg may have failed. "+ + "Check that ffmpeg supports the requested codec. Enable Trace logging for ffmpeg stderr details", + "id", id, "format", s.ContentType()) + } else { + log.Trace(ctx, "Success sending transcoded file", "id", id, "size", c) + } + return c, nil +} + +// NewStream creates a non-seekable Stream from the given components. +func NewStream(mf *model.MediaFile, format string, bitRate int, r io.ReadCloser) *Stream { return &Stream{ ctx: context.Background(), mf: mf, format: format, bitRate: bitRate, - ReadCloser: io.NopCloser(strings.NewReader("")), + ReadCloser: r, } } diff --git a/server/e2e/e2e_suite_test.go b/server/e2e/e2e_suite_test.go index cb851deb..262a5ed3 100644 --- a/server/e2e/e2e_suite_test.go +++ b/server/e2e/e2e_suite_test.go @@ -11,6 +11,7 @@ import ( "net/url" "os" "path/filepath" + "strings" "testing" "testing/fstest" "time" @@ -287,18 +288,28 @@ func (n noopArtwork) GetOrPlaceholder(_ context.Context, _ string, _ int, _ bool // spyStreamer captures the Request passed to NewStream for test assertions, // then returns a minimal fake Stream so the handler completes without error. type spyStreamer struct { - LastRequest stream.Request - LastMediaFile *model.MediaFile + LastRequest stream.Request + LastMediaFile *model.MediaFile + SimulateError error // When set, NewStream returns this error + SimulateEmptyStream bool // When true, returns a 0-byte stream (simulates ffmpeg producing no output) } func (s *spyStreamer) NewStream(_ context.Context, mf *model.MediaFile, req stream.Request) (*stream.Stream, error) { s.LastRequest = req s.LastMediaFile = mf + if s.SimulateError != nil { + return nil, s.SimulateError + } format := req.Format if format == "" || format == "raw" { format = mf.Suffix } - return stream.NewTestStream(mf, format, req.BitRate), nil + content := "fake audio data" + if s.SimulateEmptyStream { + content = "" + } + r := io.NopCloser(strings.NewReader(content)) + return stream.NewStream(mf, format, req.BitRate, r), nil } // noopFFmpeg implements ffmpeg.FFmpeg with no-op methods. diff --git a/server/e2e/subsonic_stream_test.go b/server/e2e/subsonic_stream_test.go index d144dc4e..6a11c174 100644 --- a/server/e2e/subsonic_stream_test.go +++ b/server/e2e/subsonic_stream_test.go @@ -1,9 +1,12 @@ package e2e import ( + "encoding/json" + "errors" "net/http" "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/server/subsonic/responses" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -124,4 +127,56 @@ var _ = Describe("stream.view (legacy streaming)", Ordered, func() { Expect(streamerSpy.LastRequest.Offset).To(Equal(30)) }) }) + + Describe("stream creation failure", func() { + BeforeEach(func() { + streamerSpy.SimulateError = errors.New("ffmpeg exited with non-zero status code: 1: Unknown encoder 'libopus'") + }) + AfterEach(func() { + streamerSpy.SimulateError = nil + }) + + It("returns a Subsonic error for stream endpoint", func() { + w := doRawReq("stream", "id", flacTrackID, "format", "opus") + Expect(w.Code).To(Equal(http.StatusOK)) // Subsonic errors are returned as 200 + + var wrapper responses.JsonWrapper + Expect(json.Unmarshal(w.Body.Bytes(), &wrapper)).To(Succeed()) + Expect(wrapper.Subsonic.Status).To(Equal(responses.StatusFailed)) + Expect(wrapper.Subsonic.Error).ToNot(BeNil()) + }) + + It("returns a Subsonic error for download endpoint", func() { + conf.Server.EnableDownloads = true + w := doRawReq("download", "id", flacTrackID, "format", "opus") + Expect(w.Code).To(Equal(http.StatusOK)) + + var wrapper responses.JsonWrapper + Expect(json.Unmarshal(w.Body.Bytes(), &wrapper)).To(Succeed()) + Expect(wrapper.Subsonic.Status).To(Equal(responses.StatusFailed)) + Expect(wrapper.Subsonic.Error).ToNot(BeNil()) + }) + }) + + Describe("empty transcoded output", func() { + BeforeEach(func() { + streamerSpy.SimulateEmptyStream = true + }) + AfterEach(func() { + streamerSpy.SimulateEmptyStream = false + }) + + It("returns 200 with empty body for stream endpoint", func() { + w := doRawReq("stream", "id", flacTrackID, "format", "opus") + Expect(w.Code).To(Equal(http.StatusOK)) + Expect(w.Body.Len()).To(Equal(0)) + }) + + It("returns 200 with empty body for download endpoint", func() { + conf.Server.EnableDownloads = true + w := doRawReq("download", "id", flacTrackID, "format", "opus") + Expect(w.Code).To(Equal(http.StatusOK)) + Expect(w.Body.Len()).To(Equal(0)) + }) + }) }) diff --git a/server/e2e/subsonic_transcode_test.go b/server/e2e/subsonic_transcode_test.go index a9a180dc..f134448d 100644 --- a/server/e2e/subsonic_transcode_test.go +++ b/server/e2e/subsonic_transcode_test.go @@ -1,6 +1,7 @@ package e2e import ( + "errors" "net/http" "time" @@ -602,6 +603,36 @@ var _ = Describe("Transcode Endpoints", Ordered, func() { mf.UpdatedAt = originalUpdatedAt Expect(ds.MediaFile(ctx).Put(mf)).To(Succeed()) }) + + It("returns 500 when stream creation fails", func() { + // Get a valid decision token + resp := doPostReq("getTranscodeDecision", mp3OnlyClient, "mediaId", flacTrackID, "mediaType", "song") + Expect(resp.Status).To(Equal(responses.StatusOK)) + token := resp.TranscodeDecision.TranscodeParams + Expect(token).ToNot(BeEmpty()) + + // Simulate streamer failure (e.g., ffmpeg missing codec) + streamerSpy.SimulateError = errors.New("ffmpeg exited with non-zero status code: 1: Unknown encoder 'libopus'") + defer func() { streamerSpy.SimulateError = nil }() + + w := doRawReq("getTranscodeStream", "mediaId", flacTrackID, "mediaType", "song", "transcodeParams", token) + Expect(w.Code).To(Equal(http.StatusInternalServerError)) + }) + + It("returns 500 when transcoded stream is empty", func() { + // Get a valid decision token + resp := doPostReq("getTranscodeDecision", mp3OnlyClient, "mediaId", flacTrackID, "mediaType", "song") + Expect(resp.Status).To(Equal(responses.StatusOK)) + token := resp.TranscodeDecision.TranscodeParams + Expect(token).ToNot(BeEmpty()) + + // Simulate ffmpeg producing 0 bytes + streamerSpy.SimulateEmptyStream = true + defer func() { streamerSpy.SimulateEmptyStream = false }() + + w := doRawReq("getTranscodeStream", "mediaId", flacTrackID, "mediaType", "song", "transcodeParams", token) + Expect(w.Code).To(Equal(http.StatusInternalServerError)) + }) }) Describe("round-trip: decision then stream", func() { diff --git a/server/public/handle_streams.go b/server/public/handle_streams.go index 6cdf8b44..daa09c37 100644 --- a/server/public/handle_streams.go +++ b/server/public/handle_streams.go @@ -2,7 +2,6 @@ package public import ( "errors" - "io" "net/http" "strconv" @@ -54,34 +53,9 @@ func (pub *Router) handleStream(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(stream.Duration()), 'G', -1, 32)) - if stream.Seekable() { - http.ServeContent(w, r, stream.Name(), stream.ModTime(), stream) - } else { - // If the stream doesn't provide a size (i.e. is not seekable), we can't support ranges/content-length - w.Header().Set("Accept-Ranges", "none") - w.Header().Set("Content-Type", stream.ContentType()) - - estimateContentLength := p.BoolOr("estimateContentLength", false) - - // if Client requests the estimated content-length, send it - if estimateContentLength { - length := strconv.Itoa(stream.EstimatedContentLength()) - log.Trace(ctx, "Estimated content-length", "contentLength", length) - w.Header().Set("Content-Length", length) - } - - if r.Method == http.MethodHead { - go func() { _, _ = io.Copy(io.Discard, stream) }() - } else { - c, err := io.Copy(w, stream) - if log.IsGreaterOrEqualTo(log.LevelDebug) { - if err != nil { - log.Error(ctx, "Error sending shared transcoded file", "id", info.id, err) - } else { - log.Trace(ctx, "Success sending shared transcode file", "id", info.id, "size", c) - } - } - } + n, err := stream.Serve(ctx, w, r) + if err != nil || n == 0 { + http.Error(w, "internal error", http.StatusInternalServerError) } } diff --git a/server/subsonic/stream.go b/server/subsonic/stream.go index ebebb97f..b49af2b2 100644 --- a/server/subsonic/stream.go +++ b/server/subsonic/stream.go @@ -1,15 +1,12 @@ package subsonic import ( - "context" "fmt" - "io" "net/http" "strconv" "strings" "github.com/navidrome/navidrome/conf" - "github.com/navidrome/navidrome/core/stream" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/model/request" @@ -17,38 +14,6 @@ import ( "github.com/navidrome/navidrome/utils/req" ) -func (api *Router) serveStream(ctx context.Context, w http.ResponseWriter, r *http.Request, stream *stream.Stream, id string) { - if stream.Seekable() { - http.ServeContent(w, r, stream.Name(), stream.ModTime(), stream) - } else { - // If the stream doesn't provide a size (i.e. is not seekable), we can't support ranges/content-length - w.Header().Set("Accept-Ranges", "none") - w.Header().Set("Content-Type", stream.ContentType()) - - estimateContentLength := req.Params(r).BoolOr("estimateContentLength", false) - - // if Client requests the estimated content-length, send it - if estimateContentLength { - length := strconv.Itoa(stream.EstimatedContentLength()) - log.Trace(ctx, "Estimated content-length", "contentLength", length) - w.Header().Set("Content-Length", length) - } - - if r.Method == http.MethodHead { - go func() { _, _ = io.Copy(io.Discard, stream) }() - } else { - c, err := io.Copy(w, stream) - if log.IsGreaterOrEqualTo(log.LevelDebug) { - if err != nil { - log.Error(ctx, "Error sending transcoded file", "id", id, err) - } else { - log.Trace(ctx, "Success sending transcode file", "id", id, "size", c) - } - } - } - } -} - func (api *Router) Stream(w http.ResponseWriter, r *http.Request) (*responses.Subsonic, error) { ctx := r.Context() p := req.Params(r) @@ -81,9 +46,8 @@ func (api *Router) Stream(w http.ResponseWriter, r *http.Request) (*responses.Su w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(stream.Duration()), 'G', -1, 32)) - api.serveStream(ctx, w, r, stream, id) - - return nil, nil + _, err = stream.Serve(ctx, w, r) + return nil, err } func (api *Router) Download(w http.ResponseWriter, r *http.Request) (*responses.Subsonic, error) { @@ -151,20 +115,18 @@ func (api *Router) Download(w http.ResponseWriter, r *http.Request) (*responses. disposition := fmt.Sprintf("attachment; filename=\"%s\"", stream.Name()) w.Header().Set("Content-Disposition", disposition) - api.serveStream(ctx, w, r, stream, id) - return nil, nil + _, err = stream.Serve(ctx, w, r) + return nil, err case *model.Album: setHeaders(v.Name) - err = api.archiver.ZipAlbum(ctx, id, format, maxBitRate, w) + return nil, api.archiver.ZipAlbum(ctx, id, format, maxBitRate, w) case *model.Artist: setHeaders(v.Name) - err = api.archiver.ZipArtist(ctx, id, format, maxBitRate, w) + return nil, api.archiver.ZipArtist(ctx, id, format, maxBitRate, w) case *model.Playlist: setHeaders(v.Name) - err = api.archiver.ZipPlaylist(ctx, id, format, maxBitRate, w) + return nil, api.archiver.ZipPlaylist(ctx, id, format, maxBitRate, w) default: - err = model.ErrNotFound + return nil, model.ErrNotFound } - - return nil, err } diff --git a/server/subsonic/transcode.go b/server/subsonic/transcode.go index 64e74d46..4e494b32 100644 --- a/server/subsonic/transcode.go +++ b/server/subsonic/transcode.go @@ -395,7 +395,9 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (* w.Header().Set("X-Content-Type-Options", "nosniff") - api.serveStream(ctx, w, r, stream, mediaID) - + n, err := stream.Serve(ctx, w, r) + if err != nil || n == 0 { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } return nil, nil }