feat(plugins): add TTL support, batch operations, and hardening to kvstore (#5127)
* feat(plugins): add expires_at column to kvstore schema * feat(plugins): filter expired keys in kvstore Get, Has, List * feat(plugins): add periodic cleanup of expired kvstore keys * feat(plugins): add SetWithTTL, DeleteByPrefix, and GetMany to kvstore Add three new methods to the KVStore host service: - SetWithTTL: store key-value pairs with automatic expiration - DeleteByPrefix: remove all keys matching a prefix in one operation - GetMany: retrieve multiple values in a single call All methods include comprehensive unit tests covering edge cases, expiration behavior, size tracking, and LIKE-special characters. * feat(plugins): regenerate code and update test plugin for new kvstore methods Regenerate host function wrappers and PDK bindings for Go, Python, and Rust. Update the test-kvstore plugin to exercise SetWithTTL, DeleteByPrefix, and GetMany. * feat(plugins): add integration tests for new kvstore methods Add WASM integration tests for SetWithTTL, DeleteByPrefix, and GetMany operations through the plugin boundary, verifying end-to-end behavior including TTL expiration, prefix deletion, and batch retrieval. * fix(plugins): address lint issues in kvstore implementation Handle tx.Rollback error return and suppress gosec false positive for parameterized SQL query construction in GetMany. * fix(plugins): Set clears expires_at when overwriting a TTL'd key Previously, calling Set() on a key that was stored with SetWithTTL() would leave the expires_at value intact, causing the key to silently expire even though Set implies permanent storage. Also excludes expired keys from currentSize calculation at startup. * refactor(plugins): simplify kvstore by removing in-memory size cache Replaced the in-memory currentSize cache (atomic.Int64), periodic cleanup timer, and mutex with direct database queries for storage accounting. This eliminates race conditions and cache drift issues at negligible performance cost for plugin-sized datasets. Also unified Set and SetWithTTL into a shared setValue method, simplified DeleteByPrefix to use RowsAffected instead of a transaction, and added an index on expires_at for efficient expiration filtering. * feat(plugins): add generic SQLite migration helper and refactor kvstore schema Add a reusable migrateDB helper that tracks schema versions via SQLite's PRAGMA user_version and applies pending migrations transactionally. Replace the ad-hoc createKVStoreSchema function in kvstore with a declarative migrations slice, making it easy to add future schema changes. Remove the now-redundant schema migration test since migrateDB has its own test suite and every kvstore test exercises the migrations implicitly. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): harden kvstore with explicit NULL handling, prefix validation, and cleanup timeout - Use sql.NullString for expires_at to explicitly send NULL instead of relying on datetime('now', '') returning NULL by accident - Reject empty prefix in DeleteByPrefix to prevent accidental data wipe - Add 5s timeout context to cleanupExpired on Close - Replace time.Sleep in unit tests with pre-expired timestamps Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): use batch processing in GetMany Process keys in chunks of 200 using slice.CollectChunks to avoid hitting SQLite's SQLITE_MAX_VARIABLE_NUMBER limit with large key sets. * feat(plugins): add periodic cleanup goroutine for expired kvstore keys Use the manager's context to control a background goroutine that purges expired keys every hour, stopping naturally on shutdown when the context is cancelled. --------- Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
+193
-67
@@ -7,14 +7,16 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/plugins/host"
|
||||
"github.com/navidrome/navidrome/utils/slice"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -22,17 +24,22 @@ const (
|
||||
maxKeyLength = 256 // Max key length in bytes
|
||||
)
|
||||
|
||||
// notExpiredFilter is the SQL condition to exclude expired keys.
|
||||
const notExpiredFilter = "(expires_at IS NULL OR expires_at > datetime('now'))"
|
||||
|
||||
const cleanupInterval = 1 * time.Hour
|
||||
|
||||
// kvstoreServiceImpl implements the host.KVStoreService interface.
|
||||
// Each plugin gets its own SQLite database for isolation.
|
||||
type kvstoreServiceImpl struct {
|
||||
pluginName string
|
||||
db *sql.DB
|
||||
maxSize int64
|
||||
currentSize atomic.Int64 // cached total size, updated on Set/Delete
|
||||
pluginName string
|
||||
db *sql.DB
|
||||
maxSize int64
|
||||
}
|
||||
|
||||
// newKVStoreService creates a new kvstoreServiceImpl instance with its own SQLite database.
|
||||
func newKVStoreService(pluginName string, perm *KVStorePermission) (*kvstoreServiceImpl, error) {
|
||||
// The provided context controls the lifetime of the background cleanup goroutine.
|
||||
func newKVStoreService(ctx context.Context, pluginName string, perm *KVStorePermission) (*kvstoreServiceImpl, error) {
|
||||
// Parse max size from permission, default to 1MB
|
||||
maxSize := int64(defaultMaxKVStoreSize)
|
||||
if perm != nil && perm.MaxSize != nil && *perm.MaxSize != "" {
|
||||
@@ -59,46 +66,69 @@ func newKVStoreService(pluginName string, perm *KVStorePermission) (*kvstoreServ
|
||||
db.SetMaxOpenConns(3)
|
||||
db.SetMaxIdleConns(1)
|
||||
|
||||
// Create schema
|
||||
// Apply schema migrations
|
||||
if err := createKVStoreSchema(db); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("creating kvstore schema: %w", err)
|
||||
return nil, fmt.Errorf("migrating kvstore schema: %w", err)
|
||||
}
|
||||
|
||||
// Load current storage size from database
|
||||
var currentSize int64
|
||||
if err := db.QueryRow(`SELECT COALESCE(SUM(size), 0) FROM kvstore`).Scan(¤tSize); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("loading storage size: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize)), "currentSize", humanize.Bytes(uint64(currentSize)))
|
||||
log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize)))
|
||||
|
||||
svc := &kvstoreServiceImpl{
|
||||
pluginName: pluginName,
|
||||
db: db,
|
||||
maxSize: maxSize,
|
||||
}
|
||||
svc.currentSize.Store(currentSize)
|
||||
go svc.cleanupLoop(ctx)
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
// createKVStoreSchema applies schema migrations to the kvstore database.
|
||||
// New migrations must be appended at the end of the slice.
|
||||
func createKVStoreSchema(db *sql.DB) error {
|
||||
_, err := db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS kvstore (
|
||||
return migrateDB(db, []string{
|
||||
`CREATE TABLE IF NOT EXISTS kvstore (
|
||||
key TEXT PRIMARY KEY NOT NULL,
|
||||
value BLOB NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
`)
|
||||
return err
|
||||
)`,
|
||||
`ALTER TABLE kvstore ADD COLUMN expires_at DATETIME DEFAULT NULL`,
|
||||
`CREATE INDEX idx_kvstore_expires_at ON kvstore(expires_at)`,
|
||||
})
|
||||
}
|
||||
|
||||
// Set stores a byte value with the given key.
|
||||
func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) error {
|
||||
// Validate key
|
||||
// storageUsed returns the current total storage used by non-expired keys.
|
||||
func (s *kvstoreServiceImpl) storageUsed(ctx context.Context) (int64, error) {
|
||||
var used int64
|
||||
err := s.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size), 0) FROM kvstore WHERE `+notExpiredFilter).Scan(&used)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("calculating storage used: %w", err)
|
||||
}
|
||||
return used, nil
|
||||
}
|
||||
|
||||
// checkStorageLimit verifies that adding delta bytes would not exceed the storage limit.
|
||||
func (s *kvstoreServiceImpl) checkStorageLimit(ctx context.Context, delta int64) error {
|
||||
if delta <= 0 {
|
||||
return nil
|
||||
}
|
||||
used, err := s.storageUsed(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newTotal := used + delta
|
||||
if newTotal > s.maxSize {
|
||||
return fmt.Errorf("storage limit exceeded: would use %s of %s allowed",
|
||||
humanize.Bytes(uint64(newTotal)), humanize.Bytes(uint64(s.maxSize)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// setValue is the shared implementation for Set and SetWithTTL.
|
||||
// A ttlSeconds of 0 means no expiration.
|
||||
func (s *kvstoreServiceImpl) setValue(ctx context.Context, key string, value []byte, ttlSeconds int64) error {
|
||||
if len(key) == 0 {
|
||||
return fmt.Errorf("key cannot be empty")
|
||||
}
|
||||
@@ -108,46 +138,59 @@ func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte)
|
||||
|
||||
newValueSize := int64(len(value))
|
||||
|
||||
// Get current size of this key (if it exists) to calculate delta
|
||||
// Get current size of this key (if it exists and not expired) to calculate delta
|
||||
var oldSize int64
|
||||
err := s.db.QueryRowContext(ctx, `SELECT COALESCE(size, 0) FROM kvstore WHERE key = ?`, key).Scan(&oldSize)
|
||||
err := s.db.QueryRowContext(ctx, `SELECT COALESCE(size, 0) FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&oldSize)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return fmt.Errorf("checking existing key: %w", err)
|
||||
}
|
||||
|
||||
// Check size limits using cached total
|
||||
delta := newValueSize - oldSize
|
||||
newTotal := s.currentSize.Load() + delta
|
||||
if newTotal > s.maxSize {
|
||||
return fmt.Errorf("storage limit exceeded: would use %s of %s allowed",
|
||||
humanize.Bytes(uint64(newTotal)), humanize.Bytes(uint64(s.maxSize)))
|
||||
if err := s.checkStorageLimit(ctx, newValueSize-oldSize); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Compute expires_at: sql.NullString{Valid:false} sends NULL (no expiration),
|
||||
// otherwise we send a concrete timestamp.
|
||||
var expiresAt sql.NullString
|
||||
if ttlSeconds > 0 {
|
||||
expiresAt = sql.NullString{String: fmt.Sprintf("+%d seconds", ttlSeconds), Valid: true}
|
||||
}
|
||||
|
||||
// Upsert the value
|
||||
_, err = s.db.ExecContext(ctx, `
|
||||
INSERT INTO kvstore (key, value, size, created_at, updated_at)
|
||||
VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||
ON CONFLICT(key) DO UPDATE SET
|
||||
value = excluded.value,
|
||||
INSERT INTO kvstore (key, value, size, created_at, updated_at, expires_at)
|
||||
VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, datetime('now', ?))
|
||||
ON CONFLICT(key) DO UPDATE SET
|
||||
value = excluded.value,
|
||||
size = excluded.size,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
`, key, value, newValueSize)
|
||||
updated_at = CURRENT_TIMESTAMP,
|
||||
expires_at = excluded.expires_at
|
||||
`, key, value, newValueSize, expiresAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("storing value: %w", err)
|
||||
}
|
||||
|
||||
// Update cached size
|
||||
s.currentSize.Add(delta)
|
||||
|
||||
log.Trace(ctx, "KVStore.Set", "plugin", s.pluginName, "key", key, "size", newValueSize)
|
||||
log.Trace(ctx, "KVStore.Set", "plugin", s.pluginName, "key", key, "size", newValueSize, "ttlSeconds", ttlSeconds)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set stores a byte value with the given key.
|
||||
func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) error {
|
||||
return s.setValue(ctx, key, value, 0)
|
||||
}
|
||||
|
||||
// SetWithTTL stores a byte value with the given key and a time-to-live.
|
||||
func (s *kvstoreServiceImpl) SetWithTTL(ctx context.Context, key string, value []byte, ttlSeconds int64) error {
|
||||
if ttlSeconds <= 0 {
|
||||
return fmt.Errorf("ttlSeconds must be greater than 0")
|
||||
}
|
||||
return s.setValue(ctx, key, value, ttlSeconds)
|
||||
}
|
||||
|
||||
// Get retrieves a byte value from storage.
|
||||
func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool, error) {
|
||||
var value []byte
|
||||
err := s.db.QueryRowContext(ctx, `SELECT value FROM kvstore WHERE key = ?`, key).Scan(&value)
|
||||
if err == sql.ErrNoRows {
|
||||
err := s.db.QueryRowContext(ctx, `SELECT value FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&value)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
@@ -160,25 +203,11 @@ func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool,
|
||||
|
||||
// Delete removes a value from storage.
|
||||
func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error {
|
||||
// Get size of the key being deleted to update cache
|
||||
var oldSize int64
|
||||
err := s.db.QueryRowContext(ctx, `SELECT size FROM kvstore WHERE key = ?`, key).Scan(&oldSize)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
// Key doesn't exist, nothing to delete
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("checking key size: %w", err)
|
||||
}
|
||||
|
||||
_, err = s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key = ?`, key)
|
||||
_, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key = ?`, key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting value: %w", err)
|
||||
}
|
||||
|
||||
// Update cached size
|
||||
s.currentSize.Add(-oldSize)
|
||||
|
||||
log.Trace(ctx, "KVStore.Delete", "plugin", s.pluginName, "key", key)
|
||||
return nil
|
||||
}
|
||||
@@ -186,7 +215,7 @@ func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error {
|
||||
// Has checks if a key exists in storage.
|
||||
func (s *kvstoreServiceImpl) Has(ctx context.Context, key string) (bool, error) {
|
||||
var count int
|
||||
err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM kvstore WHERE key = ?`, key).Scan(&count)
|
||||
err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&count)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("checking key: %w", err)
|
||||
}
|
||||
@@ -200,12 +229,12 @@ func (s *kvstoreServiceImpl) List(ctx context.Context, prefix string) ([]string,
|
||||
var err error
|
||||
|
||||
if prefix == "" {
|
||||
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore ORDER BY key`)
|
||||
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE `+notExpiredFilter+` ORDER BY key`)
|
||||
} else {
|
||||
// Escape special LIKE characters in prefix
|
||||
escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%")
|
||||
escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_")
|
||||
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE key LIKE ? ESCAPE '\' ORDER BY key`, escapedPrefix+"%")
|
||||
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE key LIKE ? ESCAPE '\' AND `+notExpiredFilter+` ORDER BY key`, escapedPrefix+"%")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing keys: %w", err)
|
||||
@@ -231,16 +260,113 @@ func (s *kvstoreServiceImpl) List(ctx context.Context, prefix string) ([]string,
|
||||
|
||||
// GetStorageUsed returns the total storage used by this plugin in bytes.
|
||||
func (s *kvstoreServiceImpl) GetStorageUsed(ctx context.Context) (int64, error) {
|
||||
used := s.currentSize.Load()
|
||||
used, err := s.storageUsed(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
log.Trace(ctx, "KVStore.GetStorageUsed", "plugin", s.pluginName, "bytes", used)
|
||||
return used, nil
|
||||
}
|
||||
|
||||
// Close closes the SQLite database connection.
|
||||
// This is called when the plugin is unloaded.
|
||||
// DeleteByPrefix removes all keys matching the given prefix.
|
||||
func (s *kvstoreServiceImpl) DeleteByPrefix(ctx context.Context, prefix string) (int64, error) {
|
||||
if prefix == "" {
|
||||
return 0, fmt.Errorf("prefix cannot be empty")
|
||||
}
|
||||
|
||||
escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%")
|
||||
escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_")
|
||||
result, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key LIKE ? ESCAPE '\'`, escapedPrefix+"%")
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("deleting keys: %w", err)
|
||||
}
|
||||
|
||||
count, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("getting deleted count: %w", err)
|
||||
}
|
||||
|
||||
log.Trace(ctx, "KVStore.DeleteByPrefix", "plugin", s.pluginName, "prefix", prefix, "deletedCount", count)
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// GetMany retrieves multiple values in a single call, processing keys in batches.
|
||||
func (s *kvstoreServiceImpl) GetMany(ctx context.Context, keys []string) (map[string][]byte, error) {
|
||||
if len(keys) == 0 {
|
||||
return map[string][]byte{}, nil
|
||||
}
|
||||
|
||||
const batchSize = 200
|
||||
result := make(map[string][]byte)
|
||||
for chunk := range slice.CollectChunks(slices.Values(keys), batchSize) {
|
||||
placeholders := make([]string, len(chunk))
|
||||
args := make([]any, len(chunk))
|
||||
for i, key := range chunk {
|
||||
placeholders[i] = "?"
|
||||
args[i] = key
|
||||
}
|
||||
|
||||
query := `SELECT key, value FROM kvstore WHERE key IN (` + strings.Join(placeholders, ",") + `) AND ` + notExpiredFilter //nolint:gosec // placeholders are always "?"
|
||||
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying values: %w", err)
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var key string
|
||||
var value []byte
|
||||
if err := rows.Scan(&key, &value); err != nil {
|
||||
rows.Close()
|
||||
return nil, fmt.Errorf("scanning value: %w", err)
|
||||
}
|
||||
result[key] = value
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
rows.Close()
|
||||
return nil, fmt.Errorf("iterating values: %w", err)
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
|
||||
log.Trace(ctx, "KVStore.GetMany", "plugin", s.pluginName, "requested", len(keys), "found", len(result))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// cleanupLoop periodically removes expired keys from the database.
|
||||
// It stops when the provided context is cancelled.
|
||||
func (s *kvstoreServiceImpl) cleanupLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(cleanupInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.cleanupExpired(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupExpired removes all expired keys from the database to reclaim disk space.
|
||||
func (s *kvstoreServiceImpl) cleanupExpired(ctx context.Context) {
|
||||
result, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE expires_at IS NOT NULL AND expires_at <= datetime('now')`)
|
||||
if err != nil {
|
||||
log.Error(ctx, "KVStore cleanup: failed to delete expired keys", "plugin", s.pluginName, err)
|
||||
return
|
||||
}
|
||||
if count, err := result.RowsAffected(); err == nil && count > 0 {
|
||||
log.Debug("KVStore cleanup completed", "plugin", s.pluginName, "deletedKeys", count)
|
||||
}
|
||||
}
|
||||
|
||||
// Close runs a final cleanup and closes the SQLite database connection.
|
||||
// The cleanup goroutine is stopped by the context passed to newKVStoreService.
|
||||
func (s *kvstoreServiceImpl) Close() error {
|
||||
if s.db != nil {
|
||||
log.Debug("Closing plugin kvstore", "plugin", s.pluginName)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
s.cleanupExpired(ctx)
|
||||
return s.db.Close()
|
||||
}
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user