mirror of
https://github.com/TecharoHQ/anubis.git
synced 2026-04-07 09:18:18 +00:00
Compare commits
4 Commits
Xe/decayma
...
Xe/actorif
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
63d557b96e | ||
|
|
d718792881 | ||
|
|
b28840f1a9 | ||
|
|
63591866aa |
4
.github/actions/spelling/expect.txt
vendored
4
.github/actions/spelling/expect.txt
vendored
@@ -1,4 +1,7 @@
|
||||
acs
|
||||
Actorified
|
||||
actorifiedstore
|
||||
actorify
|
||||
Aibrew
|
||||
alibaba
|
||||
alrest
|
||||
@@ -157,6 +160,7 @@ ifm
|
||||
Imagesift
|
||||
imgproxy
|
||||
impressum
|
||||
inbox
|
||||
inp
|
||||
internets
|
||||
IPTo
|
||||
|
||||
@@ -14,6 +14,12 @@ func Zilch[T any]() T {
|
||||
type Impl[K comparable, V any] struct {
|
||||
data map[K]decayMapEntry[V]
|
||||
lock sync.RWMutex
|
||||
|
||||
// deleteCh receives decay-deletion requests from readers.
|
||||
deleteCh chan deleteReq[K]
|
||||
// stopCh stops the background cleanup worker.
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type decayMapEntry[V any] struct {
|
||||
@@ -21,30 +27,38 @@ type decayMapEntry[V any] struct {
|
||||
expiry time.Time
|
||||
}
|
||||
|
||||
// deleteReq is a request to remove a key if its expiry timestamp still matches
|
||||
// the observed one. This prevents racing with concurrent Set updates.
|
||||
type deleteReq[K comparable] struct {
|
||||
key K
|
||||
expiry time.Time
|
||||
}
|
||||
|
||||
// New creates a new DecayMap of key type K and value type V.
|
||||
//
|
||||
// Key types must be comparable to work with maps.
|
||||
func New[K comparable, V any]() *Impl[K, V] {
|
||||
return &Impl[K, V]{
|
||||
data: make(map[K]decayMapEntry[V]),
|
||||
m := &Impl[K, V]{
|
||||
data: make(map[K]decayMapEntry[V]),
|
||||
deleteCh: make(chan deleteReq[K], 1024),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
m.wg.Add(1)
|
||||
go m.cleanupWorker()
|
||||
return m
|
||||
}
|
||||
|
||||
// expire forcibly expires a key by setting its time-to-live one second in the past.
|
||||
func (m *Impl[K, V]) expire(key K) bool {
|
||||
m.lock.RLock()
|
||||
// Use a single write lock to avoid RUnlock->Lock convoy.
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
val, ok := m.data[key]
|
||||
m.lock.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
m.lock.Lock()
|
||||
val.expiry = time.Now().Add(-1 * time.Second)
|
||||
m.data[key] = val
|
||||
m.lock.Unlock()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -53,19 +67,14 @@ func (m *Impl[K, V]) expire(key K) bool {
|
||||
// If the value does not exist, return false. Return true after
|
||||
// deletion.
|
||||
func (m *Impl[K, V]) Delete(key K) bool {
|
||||
m.lock.RLock()
|
||||
_, ok := m.data[key]
|
||||
m.lock.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
// Use a single write lock to avoid RUnlock->Lock convoy.
|
||||
m.lock.Lock()
|
||||
delete(m.data, key)
|
||||
m.lock.Unlock()
|
||||
|
||||
return true
|
||||
defer m.lock.Unlock()
|
||||
_, ok := m.data[key]
|
||||
if ok {
|
||||
delete(m.data, key)
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// Get gets a value from the DecayMap by key.
|
||||
@@ -81,13 +90,12 @@ func (m *Impl[K, V]) Get(key K) (V, bool) {
|
||||
}
|
||||
|
||||
if time.Now().After(value.expiry) {
|
||||
m.lock.Lock()
|
||||
// Since previously reading m.data[key], the value may have been updated.
|
||||
// Delete the entry only if the expiry time is still the same.
|
||||
if m.data[key].expiry.Equal(value.expiry) {
|
||||
delete(m.data, key)
|
||||
// Defer decay deletion to the background worker to avoid convoy.
|
||||
select {
|
||||
case m.deleteCh <- deleteReq[K]{key: key, expiry: value.expiry}:
|
||||
default:
|
||||
// Channel full: drop request; a future Cleanup() or Get will retry.
|
||||
}
|
||||
m.lock.Unlock()
|
||||
|
||||
return Zilch[V](), false
|
||||
}
|
||||
@@ -125,3 +133,64 @@ func (m *Impl[K, V]) Len() int {
|
||||
defer m.lock.RUnlock()
|
||||
return len(m.data)
|
||||
}
|
||||
|
||||
// Close stops the background cleanup worker. It's optional to call; maps live
|
||||
// for the process lifetime in many cases. Call in tests or when you know you no
|
||||
// longer need the map to avoid goroutine leaks.
|
||||
func (m *Impl[K, V]) Close() {
|
||||
close(m.stopCh)
|
||||
m.wg.Wait()
|
||||
}
|
||||
|
||||
// cleanupWorker batches decay deletions to minimize lock contention.
|
||||
func (m *Impl[K, V]) cleanupWorker() {
|
||||
defer m.wg.Done()
|
||||
batch := make([]deleteReq[K], 0, 64)
|
||||
ticker := time.NewTicker(10 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
flush := func() {
|
||||
if len(batch) == 0 {
|
||||
return
|
||||
}
|
||||
m.applyDeletes(batch)
|
||||
// reset batch without reallocating
|
||||
batch = batch[:0]
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case req := <-m.deleteCh:
|
||||
batch = append(batch, req)
|
||||
case <-ticker.C:
|
||||
flush()
|
||||
case <-m.stopCh:
|
||||
// Drain any remaining requests then exit
|
||||
for {
|
||||
select {
|
||||
case req := <-m.deleteCh:
|
||||
batch = append(batch, req)
|
||||
default:
|
||||
flush()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Impl[K, V]) applyDeletes(batch []deleteReq[K]) {
|
||||
now := time.Now()
|
||||
m.lock.Lock()
|
||||
for _, req := range batch {
|
||||
entry, ok := m.data[req.key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// Only delete if the expiry is unchanged and already past.
|
||||
if entry.expiry.Equal(req.expiry) && now.After(entry.expiry) {
|
||||
delete(m.data, req.key)
|
||||
}
|
||||
}
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
func TestImpl(t *testing.T) {
|
||||
dm := New[string, string]()
|
||||
t.Cleanup(dm.Close)
|
||||
|
||||
dm.Set("test", "hi", 5*time.Minute)
|
||||
|
||||
@@ -28,10 +29,24 @@ func TestImpl(t *testing.T) {
|
||||
if ok {
|
||||
t.Error("got value even though it was supposed to be expired")
|
||||
}
|
||||
|
||||
// Deletion of expired entries after Get is deferred to a background worker.
|
||||
// Assert it eventually disappears from the map.
|
||||
deadline := time.Now().Add(200 * time.Millisecond)
|
||||
for time.Now().Before(deadline) {
|
||||
if dm.Len() == 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
if dm.Len() != 0 {
|
||||
t.Fatalf("expected background cleanup to remove expired key; len=%d", dm.Len())
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanup(t *testing.T) {
|
||||
dm := New[string, string]()
|
||||
t.Cleanup(dm.Close)
|
||||
|
||||
dm.Set("test1", "hi1", 1*time.Second)
|
||||
dm.Set("test2", "hi2", 2*time.Second)
|
||||
|
||||
@@ -13,12 +13,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
<!-- This changes the project to: -->
|
||||
|
||||
- Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086))
|
||||
- Add validation warning when persistent storage is used without setting signing keys
|
||||
- Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925))
|
||||
- Fix lock convoy problem in decaymap ([#1103](https://github.com/TecharoHQ/anubis/issues/1103)).
|
||||
- Fix lock convoy problem in bbolt by implementing the actor pattern ([#1103](https://github.com/TecharoHQ/anubis/issues/1103)).
|
||||
- Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086)).
|
||||
- Add validation warning when persistent storage is used without setting signing keys.
|
||||
- Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925)).
|
||||
- Add the [`s3api` storage backend](./admin/policies.mdx#s3api) to allow Anubis to use S3 API compatible object storage as its storage backend.
|
||||
- Make `cmd/containerbuild` support commas for separating elements of the `--docker-tags` argument as well as newlines.
|
||||
- Add the `DIFFICULTY_IN_JWT` option, which allows one to add the `difficulty` field in the JWT claims which indicates the difficulty of the token ([#1063](https://github.com/TecharoHQ/anubis/pull/1063))
|
||||
- Add the `DIFFICULTY_IN_JWT` option, which allows one to add the `difficulty` field in the JWT claims which indicates the difficulty of the token ([#1063](https://github.com/TecharoHQ/anubis/pull/1063)).
|
||||
- Ported the client-side JS to TypeScript to avoid egregious errors in the future.
|
||||
- Fixes concurrency problems with very old browsers ([#1082](https://github.com/TecharoHQ/anubis/issues/1082)).
|
||||
|
||||
|
||||
107
internal/actorify/actorify.go
Normal file
107
internal/actorify/actorify.go
Normal file
@@ -0,0 +1,107 @@
|
||||
// Package actorify lets you transform a parallel operation into a serialized
|
||||
// operation via the Actor pattern[1].
|
||||
//
|
||||
// [1]: https://en.wikipedia.org/wiki/Actor_model
|
||||
package actorify
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
func z[Z any]() Z {
|
||||
var z Z
|
||||
return z
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrActorDied is returned when the actor inbox or reply channel was closed.
|
||||
ErrActorDied = errors.New("actorify: the actor inbox or reply channel was closed")
|
||||
)
|
||||
|
||||
// Handler is a function alias for the underlying logic the Actor should call.
|
||||
type Handler[Input, Output any] func(ctx context.Context, input Input) (Output, error)
|
||||
|
||||
// Actor is a serializing wrapper that runs a function in a background goroutine.
|
||||
// Whenever the Call method is invoked, a message is sent to the actor's inbox and then
|
||||
// the callee waits for a response. Depending on how busy the actor is, this may take
|
||||
// a moment.
|
||||
type Actor[Input, Output any] struct {
|
||||
handler Handler[Input, Output]
|
||||
inbox chan *message[Input, Output]
|
||||
}
|
||||
|
||||
type message[Input, Output any] struct {
|
||||
ctx context.Context
|
||||
arg Input
|
||||
reply chan reply[Output]
|
||||
}
|
||||
|
||||
type reply[Output any] struct {
|
||||
output Output
|
||||
err error
|
||||
}
|
||||
|
||||
// New constructs a new Actor and starts its background thread. Cancel the context and you cancel
|
||||
// the Actor.
|
||||
func New[Input, Output any](ctx context.Context, handler Handler[Input, Output]) *Actor[Input, Output] {
|
||||
result := &Actor[Input, Output]{
|
||||
handler: handler,
|
||||
inbox: make(chan *message[Input, Output], 32),
|
||||
}
|
||||
|
||||
go result.handle(ctx)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (a *Actor[Input, Output]) handle(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(a.inbox)
|
||||
return
|
||||
case msg, ok := <-a.inbox:
|
||||
if !ok {
|
||||
if msg.reply != nil {
|
||||
close(msg.reply)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
result, err := a.handler(msg.ctx, msg.arg)
|
||||
|
||||
reply := reply[Output]{
|
||||
output: result,
|
||||
err: err,
|
||||
}
|
||||
|
||||
msg.reply <- reply
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Call calls the Actor with a given Input and returns the handler's Output.
|
||||
//
|
||||
// This only works with unary functions by design. If you need to have more inputs, define
|
||||
// a struct type to use as a container.
|
||||
func (a *Actor[Input, Output]) Call(ctx context.Context, input Input) (Output, error) {
|
||||
replyCh := make(chan reply[Output])
|
||||
|
||||
a.inbox <- &message[Input, Output]{
|
||||
arg: input,
|
||||
reply: replyCh,
|
||||
}
|
||||
|
||||
select {
|
||||
case reply, ok := <-replyCh:
|
||||
if !ok {
|
||||
return z[Output](), ErrActorDied
|
||||
}
|
||||
|
||||
return reply.output, reply.err
|
||||
case <-ctx.Done():
|
||||
return z[Output](), context.Cause(ctx)
|
||||
}
|
||||
}
|
||||
82
lib/store/actorifiedstore.go
Normal file
82
lib/store/actorifiedstore.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/internal/actorify"
|
||||
)
|
||||
|
||||
type unit struct{}
|
||||
|
||||
type ActorifiedStore struct {
|
||||
Interface
|
||||
|
||||
deleteActor *actorify.Actor[string, unit]
|
||||
getActor *actorify.Actor[string, []byte]
|
||||
setActor *actorify.Actor[*actorSetReq, unit]
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
type actorSetReq struct {
|
||||
key string
|
||||
value []byte
|
||||
expiry time.Duration
|
||||
}
|
||||
|
||||
func NewActorifiedStore(backend Interface) *ActorifiedStore {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
result := &ActorifiedStore{
|
||||
Interface: backend,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
result.deleteActor = actorify.New(ctx, result.actorDelete)
|
||||
result.getActor = actorify.New(ctx, backend.Get)
|
||||
result.setActor = actorify.New(ctx, result.actorSet)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (a *ActorifiedStore) Close() { a.cancel() }
|
||||
|
||||
func (a *ActorifiedStore) Delete(ctx context.Context, key string) error {
|
||||
if _, err := a.deleteActor.Call(ctx, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *ActorifiedStore) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
return a.getActor.Call(ctx, key)
|
||||
}
|
||||
|
||||
func (a *ActorifiedStore) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
if _, err := a.setActor.Call(ctx, &actorSetReq{
|
||||
key: key,
|
||||
value: value,
|
||||
expiry: expiry,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *ActorifiedStore) actorDelete(ctx context.Context, key string) (unit, error) {
|
||||
if err := a.Interface.Delete(ctx, key); err != nil {
|
||||
return unit{}, err
|
||||
}
|
||||
|
||||
return unit{}, nil
|
||||
}
|
||||
|
||||
func (a *ActorifiedStore) actorSet(ctx context.Context, req *actorSetReq) (unit, error) {
|
||||
if err := a.Interface.Set(ctx, req.key, req.value, req.expiry); err != nil {
|
||||
return unit{}, err
|
||||
}
|
||||
|
||||
return unit{}, nil
|
||||
}
|
||||
@@ -48,7 +48,7 @@ func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface
|
||||
|
||||
go result.cleanupThread(ctx)
|
||||
|
||||
return result, nil
|
||||
return store.NewActorifiedStore(result), nil
|
||||
}
|
||||
|
||||
// Valid parses and validates the bbolt store Config or returns
|
||||
|
||||
Reference in New Issue
Block a user