feat(scheduler): add crontab(5) random ~ syntax support (#5233)
* feat(scheduler): add CrontabSchedule with crontab(5) random ~ syntax Implement ParseCrontab() that extends robfig/cron with support for the crontab(5) random ~ operator (e.g., 0~30 * * * *). Random values are resolved fresh on each Next() call for load spreading. Supports A~B, ~B, A~, and bare ~ forms in all 6 fields (including seconds). Expressions without ~ delegate to robfig's standard parser with zero overhead. Integrates into scheduler.Add() and conf.validateSchedule() so that scanner.schedule and backup.schedule config values accept ~ syntax. * refactor(scheduler): resolve random ~ values once at parse time Change from per-Next() randomization to per-parse randomization, matching crontab(5) semantics. This prevents double-firing within the same period when random values land after the current time. ParseCrontab now resolves ~ fields to concrete values, substitutes them into the spec string, and delegates to robfig's parser. This eliminates CrontabSchedule, randomField, and resolveField entirely. * test(scheduler): replace WaitGroup with channel for job execution synchronization Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
+16
-24
@@ -1,19 +1,16 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/tests"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
func TestScheduler(t *testing.T) {
|
||||
tests.Init(t, false)
|
||||
log.SetLevel(log.LevelFatal)
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "Scheduler Suite")
|
||||
@@ -33,54 +30,49 @@ var _ = Describe("Scheduler", func() {
|
||||
})
|
||||
|
||||
It("adds and executes a job", func() {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
done := make(chan struct{})
|
||||
|
||||
executed := false
|
||||
id, err := s.Add("@every 100ms", func() {
|
||||
executed = true
|
||||
wg.Done()
|
||||
id, err := s.Add("@every 50ms", func() {
|
||||
close(done)
|
||||
})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(id).ToNot(BeZero())
|
||||
|
||||
wg.Wait()
|
||||
Expect(executed).To(BeTrue())
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("adds a job with random ~ syntax", func() {
|
||||
id, err := s.Add("0~59 * * * *", func() {})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(id).ToNot(BeZero())
|
||||
s.Remove(id)
|
||||
})
|
||||
|
||||
It("removes a job", func() {
|
||||
// Use a WaitGroup to ensure the job executes once
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
done := make(chan struct{})
|
||||
|
||||
counter := 0
|
||||
id, err := s.Add("@every 100ms", func() {
|
||||
id, err := s.Add("@every 50ms", func() {
|
||||
counter++
|
||||
if counter == 1 {
|
||||
wg.Done() // Signal that the job has executed once
|
||||
close(done)
|
||||
}
|
||||
})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(id).ToNot(BeZero())
|
||||
|
||||
// Wait for the job to execute at least once
|
||||
wg.Wait()
|
||||
|
||||
// Verify job executed
|
||||
Eventually(done).Should(BeClosed())
|
||||
Expect(counter).To(Equal(1))
|
||||
|
||||
// Remove the job
|
||||
s.Remove(id)
|
||||
|
||||
// Store the counter value
|
||||
currentCount := counter
|
||||
|
||||
// Wait some time to ensure job doesn't execute again
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify counter didn't increase
|
||||
Expect(counter).To(Equal(currentCount))
|
||||
Expect(counter).To(Equal(1))
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user