fix(decaymap): serialize use of decaymap, make decaying happen once per minute

Signed-off-by: Xe Iaso <me@xeiaso.net>
This commit is contained in:
Xe Iaso
2025-12-28 18:30:37 -05:00
parent d748dc9da8
commit a75d9066fd

View File

@@ -19,7 +19,7 @@ type Impl[K comparable, V any] struct {
// stopCh stops the background cleanup worker. // stopCh stops the background cleanup worker.
stopCh chan struct{} stopCh chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
lock sync.RWMutex lock sync.Mutex
} }
type decayMapEntry[V any] struct { type decayMapEntry[V any] struct {
@@ -70,9 +70,14 @@ func (m *Impl[K, V]) Delete(key K) bool {
// Use a single write lock to avoid RUnlock->Lock convoy. // Use a single write lock to avoid RUnlock->Lock convoy.
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
_, ok := m.data[key] value, ok := m.data[key]
if ok { if ok {
delete(m.data, key) select {
// Defer decay deletion to the background worker to avoid convoy.
case m.deleteCh <- deleteReq[K]{key: key, expiry: value.expiry}:
default:
// Channel full: drop request; a future Cleanup() or Get will retry.
}
} }
return ok return ok
} }
@@ -81,9 +86,9 @@ func (m *Impl[K, V]) Delete(key K) bool {
// //
// If a value has expired, forcibly delete it if it was not updated. // If a value has expired, forcibly delete it if it was not updated.
func (m *Impl[K, V]) Get(key K) (V, bool) { func (m *Impl[K, V]) Get(key K) (V, bool) {
m.lock.RLock() m.lock.Lock()
defer m.lock.Unlock()
value, ok := m.data[key] value, ok := m.data[key]
m.lock.RUnlock()
if !ok { if !ok {
return Zilch[V](), false return Zilch[V](), false
@@ -129,8 +134,8 @@ func (m *Impl[K, V]) Cleanup() {
// Len returns the number of entries in the DecayMap. // Len returns the number of entries in the DecayMap.
func (m *Impl[K, V]) Len() int { func (m *Impl[K, V]) Len() int {
m.lock.RLock() m.lock.Lock()
defer m.lock.RUnlock() defer m.lock.Unlock()
return len(m.data) return len(m.data)
} }
@@ -146,7 +151,7 @@ func (m *Impl[K, V]) Close() {
func (m *Impl[K, V]) cleanupWorker() { func (m *Impl[K, V]) cleanupWorker() {
defer m.wg.Done() defer m.wg.Done()
batch := make([]deleteReq[K], 0, 64) batch := make([]deleteReq[K], 0, 64)
ticker := time.NewTicker(10 * time.Millisecond) ticker := time.NewTicker(time.Minute)
defer ticker.Stop() defer ticker.Stop()
flush := func() { flush := func() {