refactor: rename chain package to run and update references
Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
@@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/kr/pretty"
|
"github.com/kr/pretty"
|
||||||
"github.com/navidrome/navidrome/consts"
|
"github.com/navidrome/navidrome/consts"
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
"github.com/navidrome/navidrome/utils/chain"
|
"github.com/navidrome/navidrome/utils/run"
|
||||||
"github.com/robfig/cron/v3"
|
"github.com/robfig/cron/v3"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
@@ -276,7 +276,7 @@ func Load(noConfigDump bool) {
|
|||||||
log.SetLogSourceLine(Server.DevLogSourceLine)
|
log.SetLogSourceLine(Server.DevLogSourceLine)
|
||||||
log.SetRedacting(Server.EnableLogRedacting)
|
log.SetRedacting(Server.EnableLogRedacting)
|
||||||
|
|
||||||
err = chain.RunSequentially(
|
err = run.Sequentially(
|
||||||
validateScanSchedule,
|
validateScanSchedule,
|
||||||
validateBackupSchedule,
|
validateBackupSchedule,
|
||||||
validatePlaylistsPath,
|
validatePlaylistsPath,
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ import (
|
|||||||
|
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
"github.com/navidrome/navidrome/model"
|
"github.com/navidrome/navidrome/model"
|
||||||
"github.com/navidrome/navidrome/utils/chain"
|
"github.com/navidrome/navidrome/utils/run"
|
||||||
"github.com/pressly/goose/v3"
|
"github.com/pressly/goose/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -25,7 +25,7 @@ func upSupportNewScanner(ctx context.Context, tx *sql.Tx) error {
|
|||||||
execute := createExecuteFunc(ctx, tx)
|
execute := createExecuteFunc(ctx, tx)
|
||||||
addColumn := createAddColumnFunc(ctx, tx)
|
addColumn := createAddColumnFunc(ctx, tx)
|
||||||
|
|
||||||
return chain.RunSequentially(
|
return run.Sequentially(
|
||||||
upSupportNewScanner_CreateTableFolder(ctx, execute),
|
upSupportNewScanner_CreateTableFolder(ctx, execute),
|
||||||
upSupportNewScanner_PopulateTableFolder(ctx, tx),
|
upSupportNewScanner_PopulateTableFolder(ctx, tx),
|
||||||
upSupportNewScanner_UpdateTableMediaFile(ctx, execute, addColumn),
|
upSupportNewScanner_UpdateTableMediaFile(ctx, execute, addColumn),
|
||||||
@@ -213,7 +213,7 @@ update media_file set path = replace(substr(path, %d), '\', '/');`, libPathLen+2
|
|||||||
|
|
||||||
func upSupportNewScanner_UpdateTableMediaFile(_ context.Context, execute execStmtFunc, addColumn addColumnFunc) execFunc {
|
func upSupportNewScanner_UpdateTableMediaFile(_ context.Context, execute execStmtFunc, addColumn addColumnFunc) execFunc {
|
||||||
return func() error {
|
return func() error {
|
||||||
return chain.RunSequentially(
|
return run.Sequentially(
|
||||||
execute(`
|
execute(`
|
||||||
alter table media_file
|
alter table media_file
|
||||||
add column folder_id varchar default '' not null;
|
add column folder_id varchar default '' not null;
|
||||||
@@ -288,7 +288,7 @@ create index if not exists album_mbz_release_group_id
|
|||||||
|
|
||||||
func upSupportNewScanner_UpdateTableArtist(_ context.Context, execute execStmtFunc, addColumn addColumnFunc) execFunc {
|
func upSupportNewScanner_UpdateTableArtist(_ context.Context, execute execStmtFunc, addColumn addColumnFunc) execFunc {
|
||||||
return func() error {
|
return func() error {
|
||||||
return chain.RunSequentially(
|
return run.Sequentially(
|
||||||
execute(`
|
execute(`
|
||||||
alter table artist
|
alter table artist
|
||||||
drop column album_count;
|
drop column album_count;
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
"github.com/navidrome/navidrome/conf"
|
"github.com/navidrome/navidrome/conf"
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
"github.com/navidrome/navidrome/model"
|
"github.com/navidrome/navidrome/model"
|
||||||
"github.com/navidrome/navidrome/utils/chain"
|
"github.com/navidrome/navidrome/utils/run"
|
||||||
"github.com/pocketbase/dbx"
|
"github.com/pocketbase/dbx"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -151,7 +151,7 @@ func (r *libraryRepository) RefreshStats(id int) error {
|
|||||||
var songsRes, albumsRes, artistsRes, foldersRes, filesRes, missingRes struct{ Count int64 }
|
var songsRes, albumsRes, artistsRes, foldersRes, filesRes, missingRes struct{ Count int64 }
|
||||||
var sizeRes struct{ Sum int64 }
|
var sizeRes struct{ Sum int64 }
|
||||||
|
|
||||||
err := chain.RunParallel(
|
err := run.Parallel(
|
||||||
func() error {
|
func() error {
|
||||||
return r.queryOne(Select("count(*) as count").From("media_file").Where(Eq{"library_id": id, "missing": false}), &songsRes)
|
return r.queryOne(Select("count(*) as count").From("media_file").Where(Eq{"library_id": id, "missing": false}), &songsRes)
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
"github.com/navidrome/navidrome/db"
|
"github.com/navidrome/navidrome/db"
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
"github.com/navidrome/navidrome/model"
|
"github.com/navidrome/navidrome/model"
|
||||||
"github.com/navidrome/navidrome/utils/chain"
|
"github.com/navidrome/navidrome/utils/run"
|
||||||
"github.com/pocketbase/dbx"
|
"github.com/pocketbase/dbx"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -167,7 +167,7 @@ func (s *SQLStore) GC(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := chain.RunSequentially(
|
err := run.Sequentially(
|
||||||
trace(ctx, "purge empty albums", func() error { return s.Album(ctx).(*albumRepository).purgeEmpty() }),
|
trace(ctx, "purge empty albums", func() error { return s.Album(ctx).(*albumRepository).purgeEmpty() }),
|
||||||
trace(ctx, "purge empty artists", func() error { return s.Artist(ctx).(*artistRepository).purgeEmpty() }),
|
trace(ctx, "purge empty artists", func() error { return s.Artist(ctx).(*artistRepository).purgeEmpty() }),
|
||||||
trace(ctx, "mark missing artists", func() error { return s.Artist(ctx).(*artistRepository).markMissing() }),
|
trace(ctx, "mark missing artists", func() error { return s.Artist(ctx).(*artistRepository).markMissing() }),
|
||||||
|
|||||||
+3
-3
@@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/navidrome/navidrome/db"
|
"github.com/navidrome/navidrome/db"
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
"github.com/navidrome/navidrome/model"
|
"github.com/navidrome/navidrome/model"
|
||||||
"github.com/navidrome/navidrome/utils/chain"
|
"github.com/navidrome/navidrome/utils/run"
|
||||||
)
|
)
|
||||||
|
|
||||||
type scannerImpl struct {
|
type scannerImpl struct {
|
||||||
@@ -75,7 +75,7 @@ func (s *scannerImpl) scanAll(ctx context.Context, fullScan bool, progress chan<
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = chain.RunSequentially(
|
err = run.Sequentially(
|
||||||
// Phase 1: Scan all libraries and import new/updated files
|
// Phase 1: Scan all libraries and import new/updated files
|
||||||
runPhase[*folderEntry](ctx, 1, createPhaseFolders(ctx, &state, s.ds, s.cw, libs)),
|
runPhase[*folderEntry](ctx, 1, createPhaseFolders(ctx, &state, s.ds, s.cw, libs)),
|
||||||
|
|
||||||
@@ -83,7 +83,7 @@ func (s *scannerImpl) scanAll(ctx context.Context, fullScan bool, progress chan<
|
|||||||
runPhase[*missingTracks](ctx, 2, createPhaseMissingTracks(ctx, &state, s.ds)),
|
runPhase[*missingTracks](ctx, 2, createPhaseMissingTracks(ctx, &state, s.ds)),
|
||||||
|
|
||||||
// Phases 3 and 4 can be run in parallel
|
// Phases 3 and 4 can be run in parallel
|
||||||
chain.RunParallel(
|
run.Parallel(
|
||||||
// Phase 3: Refresh all new/changed albums and update artists
|
// Phase 3: Refresh all new/changed albums and update artists
|
||||||
runPhase[*model.Album](ctx, 3, createPhaseRefreshAlbums(ctx, &state, s.ds, libs)),
|
runPhase[*model.Album](ctx, 3, createPhaseRefreshAlbums(ctx, &state, s.ds, libs)),
|
||||||
|
|
||||||
|
|||||||
@@ -1,51 +0,0 @@
|
|||||||
package chain_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/navidrome/navidrome/utils/chain"
|
|
||||||
. "github.com/onsi/ginkgo/v2"
|
|
||||||
. "github.com/onsi/gomega"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestChain(t *testing.T) {
|
|
||||||
RegisterFailHandler(Fail)
|
|
||||||
RunSpecs(t, "chain Suite")
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ = Describe("RunSequentially", func() {
|
|
||||||
It("should return nil if no functions are provided", func() {
|
|
||||||
err := chain.RunSequentially()
|
|
||||||
Expect(err).To(BeNil())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should return nil if all functions succeed", func() {
|
|
||||||
err := chain.RunSequentially(
|
|
||||||
func() error { return nil },
|
|
||||||
func() error { return nil },
|
|
||||||
)
|
|
||||||
Expect(err).To(BeNil())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should return the error from the first failing function", func() {
|
|
||||||
expectedErr := errors.New("error in function 2")
|
|
||||||
err := chain.RunSequentially(
|
|
||||||
func() error { return nil },
|
|
||||||
func() error { return expectedErr },
|
|
||||||
func() error { return errors.New("error in function 3") },
|
|
||||||
)
|
|
||||||
Expect(err).To(Equal(expectedErr))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should not run functions after the first failing function", func() {
|
|
||||||
expectedErr := errors.New("error in function 1")
|
|
||||||
var runCount int
|
|
||||||
err := chain.RunSequentially(
|
|
||||||
func() error { runCount++; return expectedErr },
|
|
||||||
func() error { runCount++; return nil },
|
|
||||||
)
|
|
||||||
Expect(err).To(Equal(expectedErr))
|
|
||||||
Expect(runCount).To(Equal(1))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
@@ -1,11 +1,11 @@
|
|||||||
package chain
|
package run
|
||||||
|
|
||||||
import "golang.org/x/sync/errgroup"
|
import "golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
// RunSequentially runs the given functions sequentially,
|
// Sequentially runs the given functions sequentially,
|
||||||
// If any function returns an error, it stops the execution and returns that error.
|
// If any function returns an error, it stops the execution and returns that error.
|
||||||
// If all functions return nil, it returns nil.
|
// If all functions return nil, it returns nil.
|
||||||
func RunSequentially(fs ...func() error) error {
|
func Sequentially(fs ...func() error) error {
|
||||||
for _, f := range fs {
|
for _, f := range fs {
|
||||||
if err := f(); err != nil {
|
if err := f(); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -14,9 +14,9 @@ func RunSequentially(fs ...func() error) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunParallel runs the given functions in parallel,
|
// Parallel runs the given functions in parallel,
|
||||||
// It waits for all functions to finish and returns the first error encountered.
|
// It waits for all functions to finish and returns the first error encountered.
|
||||||
func RunParallel(fs ...func() error) func() error {
|
func Parallel(fs ...func() error) func() error {
|
||||||
return func() error {
|
return func() error {
|
||||||
g := errgroup.Group{}
|
g := errgroup.Group{}
|
||||||
for _, f := range fs {
|
for _, f := range fs {
|
||||||
@@ -0,0 +1,171 @@
|
|||||||
|
package run_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/navidrome/navidrome/utils/run"
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRun(t *testing.T) {
|
||||||
|
RegisterFailHandler(Fail)
|
||||||
|
RunSpecs(t, "Run Suite")
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ = Describe("Sequentially", func() {
|
||||||
|
It("should return nil if no functions are provided", func() {
|
||||||
|
err := run.Sequentially()
|
||||||
|
Expect(err).To(BeNil())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return nil if all functions succeed", func() {
|
||||||
|
err := run.Sequentially(
|
||||||
|
func() error { return nil },
|
||||||
|
func() error { return nil },
|
||||||
|
)
|
||||||
|
Expect(err).To(BeNil())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return the error from the first failing function", func() {
|
||||||
|
expectedErr := errors.New("error in function 2")
|
||||||
|
err := run.Sequentially(
|
||||||
|
func() error { return nil },
|
||||||
|
func() error { return expectedErr },
|
||||||
|
func() error { return errors.New("error in function 3") },
|
||||||
|
)
|
||||||
|
Expect(err).To(Equal(expectedErr))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should not run functions after the first failing function", func() {
|
||||||
|
expectedErr := errors.New("error in function 1")
|
||||||
|
var runCount int
|
||||||
|
err := run.Sequentially(
|
||||||
|
func() error { runCount++; return expectedErr },
|
||||||
|
func() error { runCount++; return nil },
|
||||||
|
)
|
||||||
|
Expect(err).To(Equal(expectedErr))
|
||||||
|
Expect(runCount).To(Equal(1))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
var _ = Describe("Parallel", func() {
|
||||||
|
It("should return a function that returns nil if no functions are provided", func() {
|
||||||
|
parallelFunc := run.Parallel()
|
||||||
|
err := parallelFunc()
|
||||||
|
Expect(err).To(BeNil())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return a function that returns nil if all functions succeed", func() {
|
||||||
|
parallelFunc := run.Parallel(
|
||||||
|
func() error { return nil },
|
||||||
|
func() error { return nil },
|
||||||
|
func() error { return nil },
|
||||||
|
)
|
||||||
|
err := parallelFunc()
|
||||||
|
Expect(err).To(BeNil())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return the first error encountered when functions fail", func() {
|
||||||
|
expectedErr := errors.New("parallel error")
|
||||||
|
parallelFunc := run.Parallel(
|
||||||
|
func() error { return nil },
|
||||||
|
func() error { return expectedErr },
|
||||||
|
func() error { return errors.New("another error") },
|
||||||
|
)
|
||||||
|
err := parallelFunc()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
// Note: We can't guarantee which error will be returned first in parallel execution
|
||||||
|
// but we can ensure an error is returned
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should run all functions in parallel", func() {
|
||||||
|
var runCount atomic.Int32
|
||||||
|
sync := make(chan struct{})
|
||||||
|
|
||||||
|
parallelFunc := run.Parallel(
|
||||||
|
func() error {
|
||||||
|
runCount.Add(1)
|
||||||
|
<-sync
|
||||||
|
runCount.Add(-1)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
func() error {
|
||||||
|
runCount.Add(1)
|
||||||
|
<-sync
|
||||||
|
runCount.Add(-1)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
func() error {
|
||||||
|
runCount.Add(1)
|
||||||
|
<-sync
|
||||||
|
runCount.Add(-1)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Run the parallel function in a goroutine
|
||||||
|
go func() {
|
||||||
|
Expect(parallelFunc()).To(Succeed())
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for all functions to start running
|
||||||
|
Eventually(func() int32 { return runCount.Load() }).Should(Equal(int32(3)))
|
||||||
|
|
||||||
|
// Release the functions to complete
|
||||||
|
close(sync)
|
||||||
|
|
||||||
|
// Wait for all functions to finish
|
||||||
|
Eventually(func() int32 { return runCount.Load() }).Should(Equal(int32(0)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should wait for all functions to complete before returning", func() {
|
||||||
|
var completedCount atomic.Int32
|
||||||
|
|
||||||
|
parallelFunc := run.Parallel(
|
||||||
|
func() error {
|
||||||
|
completedCount.Add(1)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
func() error {
|
||||||
|
completedCount.Add(1)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
func() error {
|
||||||
|
completedCount.Add(1)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
Expect(parallelFunc()).To(Succeed())
|
||||||
|
Expect(completedCount.Load()).To(Equal(int32(3)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return an error even if other functions are still running", func() {
|
||||||
|
expectedErr := errors.New("fast error")
|
||||||
|
var slowFunctionCompleted bool
|
||||||
|
|
||||||
|
parallelFunc := run.Parallel(
|
||||||
|
func() error {
|
||||||
|
return expectedErr // Return error immediately
|
||||||
|
},
|
||||||
|
func() error {
|
||||||
|
time.Sleep(50 * time.Millisecond) // Slow function
|
||||||
|
slowFunctionCompleted = true
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err := parallelFunc()
|
||||||
|
duration := time.Since(start)
|
||||||
|
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
// Should wait for all functions to complete, even if one fails early
|
||||||
|
Expect(duration).To(BeNumerically(">=", 50*time.Millisecond))
|
||||||
|
Expect(slowFunctionCompleted).To(BeTrue())
|
||||||
|
})
|
||||||
|
})
|
||||||
Reference in New Issue
Block a user