mirror of
https://github.com/TecharoHQ/anubis.git
synced 2026-04-30 12:02:43 +00:00
feat(store/valkey): add Redis(R) Sentinel support
Signed-off-by: Xe Iaso <me@xeiaso.net>
This commit is contained in:
+94
-13
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/internal"
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
valkey "github.com/redis/go-redis/v9"
|
||||
"github.com/redis/go-redis/v9/maintnotifications"
|
||||
@@ -16,26 +17,89 @@ func init() {
|
||||
store.Register("valkey", Factory{})
|
||||
}
|
||||
|
||||
// Errors kept as-is so other code/tests still pass.
|
||||
var (
|
||||
ErrNoURL = errors.New("valkey.Config: no URL defined")
|
||||
ErrBadURL = errors.New("valkey.Config: URL is invalid")
|
||||
|
||||
// Sentinel validation errors
|
||||
ErrSentinelMasterNameRequired = errors.New("valkey.Sentinel: masterName is required")
|
||||
ErrSentinelAddrRequired = errors.New("valkey.Sentinel: addr is required")
|
||||
ErrSentinelPasswordRequired = errors.New("valkey.Sentinel: password is required")
|
||||
ErrSentinelAddrEmpty = errors.New("valkey.Sentinel: addr cannot be empty")
|
||||
)
|
||||
|
||||
// Config is what Anubis unmarshals from the "parameters" JSON.
|
||||
type Config struct {
|
||||
URL string `json:"url"`
|
||||
Cluster bool `json:"cluster,omitempty"`
|
||||
|
||||
Sentinel *Sentinel `json:"sentinel,omitempty"`
|
||||
}
|
||||
|
||||
func (c Config) Valid() error {
|
||||
if c.URL == "" {
|
||||
return ErrNoURL
|
||||
var errs []error
|
||||
|
||||
if c.URL == "" && c.Sentinel == nil {
|
||||
errs = append(errs, ErrNoURL)
|
||||
}
|
||||
|
||||
// Just validate that it's a valid Redis URL.
|
||||
if _, err := valkey.ParseURL(c.URL); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrBadURL, err)
|
||||
// Validate URL only if provided
|
||||
if c.URL != "" {
|
||||
if _, err := valkey.ParseURL(c.URL); err != nil {
|
||||
errs = append(errs, fmt.Errorf("%w: %v", ErrBadURL, err))
|
||||
}
|
||||
}
|
||||
|
||||
if c.Sentinel != nil {
|
||||
if err := c.Sentinel.Valid(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type Sentinel struct {
|
||||
MasterName string `json:"masterName"`
|
||||
Addr internal.ListOr[string] `json:"addr"`
|
||||
ClientName string `json:"clientName,omitempty"` // if not set, default to Anubis or anubis.ProductName.
|
||||
Username string `json:"username,omitempty"` // if not set, ignored
|
||||
Password string `json:"password"`
|
||||
}
|
||||
|
||||
func (s Sentinel) Valid() error {
|
||||
var errs []error
|
||||
|
||||
if s.MasterName == "" {
|
||||
errs = append(errs, ErrSentinelMasterNameRequired)
|
||||
}
|
||||
|
||||
if len(s.Addr) == 0 {
|
||||
errs = append(errs, ErrSentinelAddrRequired)
|
||||
} else {
|
||||
// Check if all addresses in the list are empty
|
||||
allEmpty := true
|
||||
for _, addr := range s.Addr {
|
||||
if addr != "" {
|
||||
allEmpty = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allEmpty {
|
||||
errs = append(errs, ErrSentinelAddrEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
if s.Password == "" {
|
||||
errs = append(errs, ErrSentinelPasswordRequired)
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -68,14 +132,15 @@ func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts, err := valkey.ParseURL(cfg.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("valkey.Factory: %w", err)
|
||||
}
|
||||
|
||||
var client redisClient
|
||||
|
||||
if cfg.Cluster {
|
||||
switch {
|
||||
case cfg.Cluster:
|
||||
opts, err := valkey.ParseURL(cfg.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("valkey.Factory: %w", err)
|
||||
}
|
||||
|
||||
// Cluster mode: use the parsed Addr as the seed node.
|
||||
clusterOpts := &valkey.ClusterOptions{
|
||||
Addrs: []string{opts.Addr},
|
||||
@@ -86,7 +151,23 @@ func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface
|
||||
},
|
||||
}
|
||||
client = valkey.NewClusterClient(clusterOpts)
|
||||
} else {
|
||||
case cfg.Sentinel != nil:
|
||||
opts := &valkey.FailoverOptions{
|
||||
MasterName: cfg.Sentinel.MasterName,
|
||||
SentinelAddrs: cfg.Sentinel.Addr,
|
||||
SentinelUsername: cfg.Sentinel.Username,
|
||||
SentinelPassword: cfg.Sentinel.Password,
|
||||
Username: cfg.Sentinel.Username,
|
||||
Password: cfg.Sentinel.Password,
|
||||
ClientName: cfg.Sentinel.ClientName,
|
||||
}
|
||||
client = valkey.NewFailoverClusterClient(opts)
|
||||
default:
|
||||
opts, err := valkey.ParseURL(cfg.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("valkey.Factory: %w", err)
|
||||
}
|
||||
|
||||
opts.MaintNotificationsConfig = &maintnotifications.Config{
|
||||
Mode: maintnotifications.ModeDisabled,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user