fix(scanner): increase watcher channel buffers to prevent dropped filesystem events
When files were moved between libraries, the small channel buffers (size 1) throughout the watcher pipeline caused backpressure that led to dropped filesystem events. This meant only some of the affected folders were scanned, preventing cross-library move detection from working correctly. Increase all watcher channel buffers to 500 and switch to blocking sends to ensure no filesystem events are silently dropped.
This commit is contained in:
@@ -17,8 +17,8 @@ func (s *localStorage) Start(ctx context.Context) (<-chan string, error) {
|
|||||||
if !s.watching.CompareAndSwap(false, true) {
|
if !s.watching.CompareAndSwap(false, true) {
|
||||||
return nil, errors.New("watcher already started")
|
return nil, errors.New("watcher already started")
|
||||||
}
|
}
|
||||||
input := make(chan notify.EventInfo, 1)
|
input := make(chan notify.EventInfo, 500)
|
||||||
output := make(chan string, 1)
|
output := make(chan string, 500)
|
||||||
|
|
||||||
started := make(chan struct{})
|
started := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
|
|||||||
+3
-7
@@ -48,7 +48,7 @@ func GetWatcher(ds model.DataStore, s model.Scanner) Watcher {
|
|||||||
ds: ds,
|
ds: ds,
|
||||||
scanner: s,
|
scanner: s,
|
||||||
triggerWait: conf.Server.Scanner.WatcherWait,
|
triggerWait: conf.Server.Scanner.WatcherWait,
|
||||||
watcherNotify: make(chan scanNotification, 1),
|
watcherNotify: make(chan scanNotification, 500),
|
||||||
libraryWatchers: make(map[int]*libraryWatcherInstance),
|
libraryWatchers: make(map[int]*libraryWatcherInstance),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -272,12 +272,8 @@ func (w *watcher) processLibraryEvents(ctx context.Context, lib *model.Library,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify the main watcher of changes
|
// Notify the main watcher of changes. This will trigger a scan after the debounce period.
|
||||||
select {
|
w.watcherNotify <- scanNotification{Library: lib, FolderPath: folderPath}
|
||||||
case w.watcherNotify <- scanNotification{Library: lib, FolderPath: folderPath}:
|
|
||||||
default:
|
|
||||||
// Channel is full, notification already pending
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ var _ = Describe("Watcher", func() {
|
|||||||
DeferCleanup(configtest.SetupConfig())
|
DeferCleanup(configtest.SetupConfig())
|
||||||
conf.Server.Scanner.WatcherWait = 50 * time.Millisecond // Short wait for tests
|
conf.Server.Scanner.WatcherWait = 50 * time.Millisecond // Short wait for tests
|
||||||
|
|
||||||
ctx, cancel = context.WithCancel(context.Background())
|
ctx, cancel = context.WithCancel(GinkgoT().Context())
|
||||||
DeferCleanup(cancel)
|
DeferCleanup(cancel)
|
||||||
|
|
||||||
lib = &model.Library{
|
lib = &model.Library{
|
||||||
|
|||||||
Reference in New Issue
Block a user