diff --git a/plugins/host/kvstore.go b/plugins/host/kvstore.go index 4d9dafd2..aa2597f2 100644 --- a/plugins/host/kvstore.go +++ b/plugins/host/kvstore.go @@ -23,6 +23,20 @@ type KVStoreService interface { //nd:hostfunc Set(ctx context.Context, key string, value []byte) error + // SetWithTTL stores a byte value with the given key and a time-to-live. + // + // After ttlSeconds, the key is treated as non-existent and will be + // cleaned up lazily. ttlSeconds must be greater than 0. + // + // Parameters: + // - key: The storage key (max 256 bytes, UTF-8) + // - value: The byte slice to store + // - ttlSeconds: Time-to-live in seconds (must be > 0) + // + // Returns an error if the storage limit would be exceeded or the operation fails. + //nd:hostfunc + SetWithTTL(ctx context.Context, key string, value []byte, ttlSeconds int64) error + // Get retrieves a byte value from storage. // // Parameters: @@ -32,14 +46,15 @@ type KVStoreService interface { //nd:hostfunc Get(ctx context.Context, key string) (value []byte, exists bool, err error) - // Delete removes a value from storage. + // GetMany retrieves multiple values in a single call. // // Parameters: - // - key: The storage key + // - keys: The storage keys to retrieve // - // Returns an error if the operation fails. Does not return an error if the key doesn't exist. + // Returns a map of key to value for keys that exist and have not expired. + // Missing or expired keys are omitted from the result. //nd:hostfunc - Delete(ctx context.Context, key string) error + GetMany(ctx context.Context, keys []string) (values map[string][]byte, err error) // Has checks if a key exists in storage. // @@ -59,6 +74,24 @@ type KVStoreService interface { //nd:hostfunc List(ctx context.Context, prefix string) (keys []string, err error) + // Delete removes a value from storage. + // + // Parameters: + // - key: The storage key + // + // Returns an error if the operation fails. Does not return an error if the key doesn't exist. + //nd:hostfunc + Delete(ctx context.Context, key string) error + + // DeleteByPrefix removes all keys matching the given prefix. + // + // Parameters: + // - prefix: Key prefix to match (must not be empty) + // + // Returns the number of keys deleted. Includes expired keys. + //nd:hostfunc + DeleteByPrefix(ctx context.Context, prefix string) (deletedCount int64, err error) + // GetStorageUsed returns the total storage used by this plugin in bytes. //nd:hostfunc GetStorageUsed(ctx context.Context) (bytes int64, err error) diff --git a/plugins/host/kvstore_gen.go b/plugins/host/kvstore_gen.go index 2ad24959..44ee3b13 100644 --- a/plugins/host/kvstore_gen.go +++ b/plugins/host/kvstore_gen.go @@ -20,6 +20,18 @@ type KVStoreSetResponse struct { Error string `json:"error,omitempty"` } +// KVStoreSetWithTTLRequest is the request type for KVStore.SetWithTTL. +type KVStoreSetWithTTLRequest struct { + Key string `json:"key"` + Value []byte `json:"value"` + TtlSeconds int64 `json:"ttlSeconds"` +} + +// KVStoreSetWithTTLResponse is the response type for KVStore.SetWithTTL. +type KVStoreSetWithTTLResponse struct { + Error string `json:"error,omitempty"` +} + // KVStoreGetRequest is the request type for KVStore.Get. type KVStoreGetRequest struct { Key string `json:"key"` @@ -32,14 +44,15 @@ type KVStoreGetResponse struct { Error string `json:"error,omitempty"` } -// KVStoreDeleteRequest is the request type for KVStore.Delete. -type KVStoreDeleteRequest struct { - Key string `json:"key"` +// KVStoreGetManyRequest is the request type for KVStore.GetMany. +type KVStoreGetManyRequest struct { + Keys []string `json:"keys"` } -// KVStoreDeleteResponse is the response type for KVStore.Delete. -type KVStoreDeleteResponse struct { - Error string `json:"error,omitempty"` +// KVStoreGetManyResponse is the response type for KVStore.GetMany. +type KVStoreGetManyResponse struct { + Values map[string][]byte `json:"values,omitempty"` + Error string `json:"error,omitempty"` } // KVStoreHasRequest is the request type for KVStore.Has. @@ -64,6 +77,27 @@ type KVStoreListResponse struct { Error string `json:"error,omitempty"` } +// KVStoreDeleteRequest is the request type for KVStore.Delete. +type KVStoreDeleteRequest struct { + Key string `json:"key"` +} + +// KVStoreDeleteResponse is the response type for KVStore.Delete. +type KVStoreDeleteResponse struct { + Error string `json:"error,omitempty"` +} + +// KVStoreDeleteByPrefixRequest is the request type for KVStore.DeleteByPrefix. +type KVStoreDeleteByPrefixRequest struct { + Prefix string `json:"prefix"` +} + +// KVStoreDeleteByPrefixResponse is the response type for KVStore.DeleteByPrefix. +type KVStoreDeleteByPrefixResponse struct { + DeletedCount int64 `json:"deletedCount,omitempty"` + Error string `json:"error,omitempty"` +} + // KVStoreGetStorageUsedResponse is the response type for KVStore.GetStorageUsed. type KVStoreGetStorageUsedResponse struct { Bytes int64 `json:"bytes,omitempty"` @@ -75,10 +109,13 @@ type KVStoreGetStorageUsedResponse struct { func RegisterKVStoreHostFunctions(service KVStoreService) []extism.HostFunction { return []extism.HostFunction{ newKVStoreSetHostFunction(service), + newKVStoreSetWithTTLHostFunction(service), newKVStoreGetHostFunction(service), - newKVStoreDeleteHostFunction(service), + newKVStoreGetManyHostFunction(service), newKVStoreHasHostFunction(service), newKVStoreListHostFunction(service), + newKVStoreDeleteHostFunction(service), + newKVStoreDeleteByPrefixHostFunction(service), newKVStoreGetStorageUsedHostFunction(service), } } @@ -114,6 +151,37 @@ func newKVStoreSetHostFunction(service KVStoreService) extism.HostFunction { ) } +func newKVStoreSetWithTTLHostFunction(service KVStoreService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "kvstore_setwithttl", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + kvstoreWriteError(p, stack, err) + return + } + var req KVStoreSetWithTTLRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + kvstoreWriteError(p, stack, err) + return + } + + // Call the service method + if svcErr := service.SetWithTTL(ctx, req.Key, req.Value, req.TtlSeconds); svcErr != nil { + kvstoreWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := KVStoreSetWithTTLResponse{} + kvstoreWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + func newKVStoreGetHostFunction(service KVStoreService) extism.HostFunction { return extism.NewHostFunctionWithStack( "kvstore_get", @@ -149,9 +217,9 @@ func newKVStoreGetHostFunction(service KVStoreService) extism.HostFunction { ) } -func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction { +func newKVStoreGetManyHostFunction(service KVStoreService) extism.HostFunction { return extism.NewHostFunctionWithStack( - "kvstore_delete", + "kvstore_getmany", func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { // Read JSON request from plugin memory reqBytes, err := p.ReadBytes(stack[0]) @@ -159,20 +227,23 @@ func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction { kvstoreWriteError(p, stack, err) return } - var req KVStoreDeleteRequest + var req KVStoreGetManyRequest if err := json.Unmarshal(reqBytes, &req); err != nil { kvstoreWriteError(p, stack, err) return } // Call the service method - if svcErr := service.Delete(ctx, req.Key); svcErr != nil { + values, svcErr := service.GetMany(ctx, req.Keys) + if svcErr != nil { kvstoreWriteError(p, stack, svcErr) return } // Write JSON response to plugin memory - resp := KVStoreDeleteResponse{} + resp := KVStoreGetManyResponse{ + Values: values, + } kvstoreWriteResponse(p, stack, resp) }, []extism.ValueType{extism.ValueTypePTR}, @@ -248,6 +319,71 @@ func newKVStoreListHostFunction(service KVStoreService) extism.HostFunction { ) } +func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "kvstore_delete", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + kvstoreWriteError(p, stack, err) + return + } + var req KVStoreDeleteRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + kvstoreWriteError(p, stack, err) + return + } + + // Call the service method + if svcErr := service.Delete(ctx, req.Key); svcErr != nil { + kvstoreWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := KVStoreDeleteResponse{} + kvstoreWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newKVStoreDeleteByPrefixHostFunction(service KVStoreService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "kvstore_deletebyprefix", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + kvstoreWriteError(p, stack, err) + return + } + var req KVStoreDeleteByPrefixRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + kvstoreWriteError(p, stack, err) + return + } + + // Call the service method + deletedcount, svcErr := service.DeleteByPrefix(ctx, req.Prefix) + if svcErr != nil { + kvstoreWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := KVStoreDeleteByPrefixResponse{ + DeletedCount: deletedcount, + } + kvstoreWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + func newKVStoreGetStorageUsedHostFunction(service KVStoreService) extism.HostFunction { return extism.NewHostFunctionWithStack( "kvstore_getstorageused", diff --git a/plugins/host_kvstore.go b/plugins/host_kvstore.go index 53d4da92..9aa37f7e 100644 --- a/plugins/host_kvstore.go +++ b/plugins/host_kvstore.go @@ -7,14 +7,16 @@ import ( "fmt" "os" "path/filepath" + "slices" "strings" - "sync/atomic" + "time" "github.com/dustin/go-humanize" _ "github.com/mattn/go-sqlite3" "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/plugins/host" + "github.com/navidrome/navidrome/utils/slice" ) const ( @@ -22,17 +24,22 @@ const ( maxKeyLength = 256 // Max key length in bytes ) +// notExpiredFilter is the SQL condition to exclude expired keys. +const notExpiredFilter = "(expires_at IS NULL OR expires_at > datetime('now'))" + +const cleanupInterval = 1 * time.Hour + // kvstoreServiceImpl implements the host.KVStoreService interface. // Each plugin gets its own SQLite database for isolation. type kvstoreServiceImpl struct { - pluginName string - db *sql.DB - maxSize int64 - currentSize atomic.Int64 // cached total size, updated on Set/Delete + pluginName string + db *sql.DB + maxSize int64 } // newKVStoreService creates a new kvstoreServiceImpl instance with its own SQLite database. -func newKVStoreService(pluginName string, perm *KVStorePermission) (*kvstoreServiceImpl, error) { +// The provided context controls the lifetime of the background cleanup goroutine. +func newKVStoreService(ctx context.Context, pluginName string, perm *KVStorePermission) (*kvstoreServiceImpl, error) { // Parse max size from permission, default to 1MB maxSize := int64(defaultMaxKVStoreSize) if perm != nil && perm.MaxSize != nil && *perm.MaxSize != "" { @@ -59,46 +66,69 @@ func newKVStoreService(pluginName string, perm *KVStorePermission) (*kvstoreServ db.SetMaxOpenConns(3) db.SetMaxIdleConns(1) - // Create schema + // Apply schema migrations if err := createKVStoreSchema(db); err != nil { db.Close() - return nil, fmt.Errorf("creating kvstore schema: %w", err) + return nil, fmt.Errorf("migrating kvstore schema: %w", err) } - // Load current storage size from database - var currentSize int64 - if err := db.QueryRow(`SELECT COALESCE(SUM(size), 0) FROM kvstore`).Scan(¤tSize); err != nil { - db.Close() - return nil, fmt.Errorf("loading storage size: %w", err) - } - - log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize)), "currentSize", humanize.Bytes(uint64(currentSize))) + log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize))) svc := &kvstoreServiceImpl{ pluginName: pluginName, db: db, maxSize: maxSize, } - svc.currentSize.Store(currentSize) + go svc.cleanupLoop(ctx) return svc, nil } +// createKVStoreSchema applies schema migrations to the kvstore database. +// New migrations must be appended at the end of the slice. func createKVStoreSchema(db *sql.DB) error { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS kvstore ( + return migrateDB(db, []string{ + `CREATE TABLE IF NOT EXISTS kvstore ( key TEXT PRIMARY KEY NOT NULL, value BLOB NOT NULL, size INTEGER NOT NULL, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP - ) - `) - return err + )`, + `ALTER TABLE kvstore ADD COLUMN expires_at DATETIME DEFAULT NULL`, + `CREATE INDEX idx_kvstore_expires_at ON kvstore(expires_at)`, + }) } -// Set stores a byte value with the given key. -func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) error { - // Validate key +// storageUsed returns the current total storage used by non-expired keys. +func (s *kvstoreServiceImpl) storageUsed(ctx context.Context) (int64, error) { + var used int64 + err := s.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size), 0) FROM kvstore WHERE `+notExpiredFilter).Scan(&used) + if err != nil { + return 0, fmt.Errorf("calculating storage used: %w", err) + } + return used, nil +} + +// checkStorageLimit verifies that adding delta bytes would not exceed the storage limit. +func (s *kvstoreServiceImpl) checkStorageLimit(ctx context.Context, delta int64) error { + if delta <= 0 { + return nil + } + used, err := s.storageUsed(ctx) + if err != nil { + return err + } + newTotal := used + delta + if newTotal > s.maxSize { + return fmt.Errorf("storage limit exceeded: would use %s of %s allowed", + humanize.Bytes(uint64(newTotal)), humanize.Bytes(uint64(s.maxSize))) + } + return nil +} + +// setValue is the shared implementation for Set and SetWithTTL. +// A ttlSeconds of 0 means no expiration. +func (s *kvstoreServiceImpl) setValue(ctx context.Context, key string, value []byte, ttlSeconds int64) error { if len(key) == 0 { return fmt.Errorf("key cannot be empty") } @@ -108,46 +138,59 @@ func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) newValueSize := int64(len(value)) - // Get current size of this key (if it exists) to calculate delta + // Get current size of this key (if it exists and not expired) to calculate delta var oldSize int64 - err := s.db.QueryRowContext(ctx, `SELECT COALESCE(size, 0) FROM kvstore WHERE key = ?`, key).Scan(&oldSize) + err := s.db.QueryRowContext(ctx, `SELECT COALESCE(size, 0) FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&oldSize) if err != nil && !errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("checking existing key: %w", err) } - // Check size limits using cached total - delta := newValueSize - oldSize - newTotal := s.currentSize.Load() + delta - if newTotal > s.maxSize { - return fmt.Errorf("storage limit exceeded: would use %s of %s allowed", - humanize.Bytes(uint64(newTotal)), humanize.Bytes(uint64(s.maxSize))) + if err := s.checkStorageLimit(ctx, newValueSize-oldSize); err != nil { + return err + } + + // Compute expires_at: sql.NullString{Valid:false} sends NULL (no expiration), + // otherwise we send a concrete timestamp. + var expiresAt sql.NullString + if ttlSeconds > 0 { + expiresAt = sql.NullString{String: fmt.Sprintf("+%d seconds", ttlSeconds), Valid: true} } - // Upsert the value _, err = s.db.ExecContext(ctx, ` - INSERT INTO kvstore (key, value, size, created_at, updated_at) - VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT(key) DO UPDATE SET - value = excluded.value, + INSERT INTO kvstore (key, value, size, created_at, updated_at, expires_at) + VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, datetime('now', ?)) + ON CONFLICT(key) DO UPDATE SET + value = excluded.value, size = excluded.size, - updated_at = CURRENT_TIMESTAMP - `, key, value, newValueSize) + updated_at = CURRENT_TIMESTAMP, + expires_at = excluded.expires_at + `, key, value, newValueSize, expiresAt) if err != nil { return fmt.Errorf("storing value: %w", err) } - // Update cached size - s.currentSize.Add(delta) - - log.Trace(ctx, "KVStore.Set", "plugin", s.pluginName, "key", key, "size", newValueSize) + log.Trace(ctx, "KVStore.Set", "plugin", s.pluginName, "key", key, "size", newValueSize, "ttlSeconds", ttlSeconds) return nil } +// Set stores a byte value with the given key. +func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) error { + return s.setValue(ctx, key, value, 0) +} + +// SetWithTTL stores a byte value with the given key and a time-to-live. +func (s *kvstoreServiceImpl) SetWithTTL(ctx context.Context, key string, value []byte, ttlSeconds int64) error { + if ttlSeconds <= 0 { + return fmt.Errorf("ttlSeconds must be greater than 0") + } + return s.setValue(ctx, key, value, ttlSeconds) +} + // Get retrieves a byte value from storage. func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool, error) { var value []byte - err := s.db.QueryRowContext(ctx, `SELECT value FROM kvstore WHERE key = ?`, key).Scan(&value) - if err == sql.ErrNoRows { + err := s.db.QueryRowContext(ctx, `SELECT value FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&value) + if errors.Is(err, sql.ErrNoRows) { return nil, false, nil } if err != nil { @@ -160,25 +203,11 @@ func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool, // Delete removes a value from storage. func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error { - // Get size of the key being deleted to update cache - var oldSize int64 - err := s.db.QueryRowContext(ctx, `SELECT size FROM kvstore WHERE key = ?`, key).Scan(&oldSize) - if errors.Is(err, sql.ErrNoRows) { - // Key doesn't exist, nothing to delete - return nil - } - if err != nil { - return fmt.Errorf("checking key size: %w", err) - } - - _, err = s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key = ?`, key) + _, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key = ?`, key) if err != nil { return fmt.Errorf("deleting value: %w", err) } - // Update cached size - s.currentSize.Add(-oldSize) - log.Trace(ctx, "KVStore.Delete", "plugin", s.pluginName, "key", key) return nil } @@ -186,7 +215,7 @@ func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error { // Has checks if a key exists in storage. func (s *kvstoreServiceImpl) Has(ctx context.Context, key string) (bool, error) { var count int - err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM kvstore WHERE key = ?`, key).Scan(&count) + err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&count) if err != nil { return false, fmt.Errorf("checking key: %w", err) } @@ -200,12 +229,12 @@ func (s *kvstoreServiceImpl) List(ctx context.Context, prefix string) ([]string, var err error if prefix == "" { - rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore ORDER BY key`) + rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE `+notExpiredFilter+` ORDER BY key`) } else { // Escape special LIKE characters in prefix escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%") escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_") - rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE key LIKE ? ESCAPE '\' ORDER BY key`, escapedPrefix+"%") + rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE key LIKE ? ESCAPE '\' AND `+notExpiredFilter+` ORDER BY key`, escapedPrefix+"%") } if err != nil { return nil, fmt.Errorf("listing keys: %w", err) @@ -231,16 +260,113 @@ func (s *kvstoreServiceImpl) List(ctx context.Context, prefix string) ([]string, // GetStorageUsed returns the total storage used by this plugin in bytes. func (s *kvstoreServiceImpl) GetStorageUsed(ctx context.Context) (int64, error) { - used := s.currentSize.Load() + used, err := s.storageUsed(ctx) + if err != nil { + return 0, err + } log.Trace(ctx, "KVStore.GetStorageUsed", "plugin", s.pluginName, "bytes", used) return used, nil } -// Close closes the SQLite database connection. -// This is called when the plugin is unloaded. +// DeleteByPrefix removes all keys matching the given prefix. +func (s *kvstoreServiceImpl) DeleteByPrefix(ctx context.Context, prefix string) (int64, error) { + if prefix == "" { + return 0, fmt.Errorf("prefix cannot be empty") + } + + escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%") + escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_") + result, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key LIKE ? ESCAPE '\'`, escapedPrefix+"%") + if err != nil { + return 0, fmt.Errorf("deleting keys: %w", err) + } + + count, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("getting deleted count: %w", err) + } + + log.Trace(ctx, "KVStore.DeleteByPrefix", "plugin", s.pluginName, "prefix", prefix, "deletedCount", count) + return count, nil +} + +// GetMany retrieves multiple values in a single call, processing keys in batches. +func (s *kvstoreServiceImpl) GetMany(ctx context.Context, keys []string) (map[string][]byte, error) { + if len(keys) == 0 { + return map[string][]byte{}, nil + } + + const batchSize = 200 + result := make(map[string][]byte) + for chunk := range slice.CollectChunks(slices.Values(keys), batchSize) { + placeholders := make([]string, len(chunk)) + args := make([]any, len(chunk)) + for i, key := range chunk { + placeholders[i] = "?" + args[i] = key + } + + query := `SELECT key, value FROM kvstore WHERE key IN (` + strings.Join(placeholders, ",") + `) AND ` + notExpiredFilter //nolint:gosec // placeholders are always "?" + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("querying values: %w", err) + } + + for rows.Next() { + var key string + var value []byte + if err := rows.Scan(&key, &value); err != nil { + rows.Close() + return nil, fmt.Errorf("scanning value: %w", err) + } + result[key] = value + } + if err := rows.Err(); err != nil { + rows.Close() + return nil, fmt.Errorf("iterating values: %w", err) + } + rows.Close() + } + + log.Trace(ctx, "KVStore.GetMany", "plugin", s.pluginName, "requested", len(keys), "found", len(result)) + return result, nil +} + +// cleanupLoop periodically removes expired keys from the database. +// It stops when the provided context is cancelled. +func (s *kvstoreServiceImpl) cleanupLoop(ctx context.Context) { + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.cleanupExpired(ctx) + } + } +} + +// cleanupExpired removes all expired keys from the database to reclaim disk space. +func (s *kvstoreServiceImpl) cleanupExpired(ctx context.Context) { + result, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE expires_at IS NOT NULL AND expires_at <= datetime('now')`) + if err != nil { + log.Error(ctx, "KVStore cleanup: failed to delete expired keys", "plugin", s.pluginName, err) + return + } + if count, err := result.RowsAffected(); err == nil && count > 0 { + log.Debug("KVStore cleanup completed", "plugin", s.pluginName, "deletedKeys", count) + } +} + +// Close runs a final cleanup and closes the SQLite database connection. +// The cleanup goroutine is stopped by the context passed to newKVStoreService. func (s *kvstoreServiceImpl) Close() error { if s.db != nil { log.Debug("Closing plugin kvstore", "plugin", s.pluginName) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s.cleanupExpired(ctx) return s.db.Close() } return nil diff --git a/plugins/host_kvstore_test.go b/plugins/host_kvstore_test.go index 3e2cbd01..b900a659 100644 --- a/plugins/host_kvstore_test.go +++ b/plugins/host_kvstore_test.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf/configtest" @@ -37,7 +38,7 @@ var _ = Describe("KVStoreService", func() { // Create service with 1KB limit for testing maxSize := "1KB" - service, err = newKVStoreService("test_plugin", &KVStorePermission{MaxSize: &maxSize}) + service, err = newKVStoreService(ctx, "test_plugin", &KVStorePermission{MaxSize: &maxSize}) Expect(err).ToNot(HaveOccurred()) }) @@ -253,7 +254,7 @@ var _ = Describe("KVStoreService", func() { Expect(service.Close()).To(Succeed()) maxSize := "1KB" - service2, err := newKVStoreService("test_plugin", &KVStorePermission{MaxSize: &maxSize}) + service2, err := newKVStoreService(ctx, "test_plugin", &KVStorePermission{MaxSize: &maxSize}) Expect(err).ToNot(HaveOccurred()) defer service2.Close() @@ -302,7 +303,7 @@ var _ = Describe("KVStoreService", func() { Describe("Plugin Isolation", func() { It("isolates data between plugins", func() { - service2, err := newKVStoreService("other_plugin", &KVStorePermission{}) + service2, err := newKVStoreService(ctx, "other_plugin", &KVStorePermission{}) Expect(err).ToNot(HaveOccurred()) defer service2.Close() @@ -321,7 +322,7 @@ var _ = Describe("KVStoreService", func() { }) It("creates separate database files per plugin", func() { - service2, err := newKVStoreService("other_plugin", &KVStorePermission{}) + service2, err := newKVStoreService(ctx, "other_plugin", &KVStorePermission{}) Expect(err).ToNot(HaveOccurred()) defer service2.Close() @@ -343,6 +344,309 @@ var _ = Describe("KVStoreService", func() { Expect(err).To(HaveOccurred()) }) }) + + Describe("TTL Expiration", func() { + It("Get returns not-exists for expired keys", func() { + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('expired_key', 'old', 3, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + value, exists, err := service.Get(ctx, "expired_key") + Expect(err).ToNot(HaveOccurred()) + Expect(exists).To(BeFalse()) + Expect(value).To(BeNil()) + }) + It("Has returns false for expired keys", func() { + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('expired_has', 'old', 3, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + exists, err := service.Has(ctx, "expired_has") + Expect(err).ToNot(HaveOccurred()) + Expect(exists).To(BeFalse()) + }) + It("List excludes expired keys", func() { + Expect(service.Set(ctx, "live:1", []byte("alive"))).To(Succeed()) + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('live:expired', 'dead', 4, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + keys, err := service.List(ctx, "live:") + Expect(err).ToNot(HaveOccurred()) + Expect(keys).To(HaveLen(1)) + Expect(keys).To(ContainElement("live:1")) + }) + It("Get returns value for non-expired keys with TTL", func() { + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('future_key', 'still alive', 11, datetime('now', '+3600 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + value, exists, err := service.Get(ctx, "future_key") + Expect(err).ToNot(HaveOccurred()) + Expect(exists).To(BeTrue()) + Expect(value).To(Equal([]byte("still alive"))) + }) + It("Set clears expires_at from a key previously set with TTL", func() { + // Insert a key with a TTL that has already expired + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('ttl_then_set', 'temp', 4, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + + // Overwrite with Set (no TTL) — should become permanent + err = service.Set(ctx, "ttl_then_set", []byte("permanent")) + Expect(err).ToNot(HaveOccurred()) + + // Should exist because Set cleared expires_at + value, exists, err := service.Get(ctx, "ttl_then_set") + Expect(err).ToNot(HaveOccurred()) + Expect(exists).To(BeTrue()) + Expect(value).To(Equal([]byte("permanent"))) + + // Verify expires_at is actually NULL + var expiresAt *string + Expect(service.db.QueryRow(`SELECT expires_at FROM kvstore WHERE key = 'ttl_then_set'`).Scan(&expiresAt)).To(Succeed()) + Expect(expiresAt).To(BeNil()) + }) + It("expired keys are not counted in storage used", func() { + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('expired_key', '12345', 5, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + + // Expired keys should not be counted + used, err := service.GetStorageUsed(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(used).To(Equal(int64(0))) + }) + It("cleanup removes expired rows from disk", func() { + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('cleanup_me', '12345', 5, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + + // Row exists in DB but is logically expired + var count int + Expect(service.db.QueryRow(`SELECT COUNT(*) FROM kvstore`).Scan(&count)).To(Succeed()) + Expect(count).To(Equal(1)) + + service.cleanupExpired(ctx) + + // Row should be physically deleted + Expect(service.db.QueryRow(`SELECT COUNT(*) FROM kvstore`).Scan(&count)).To(Succeed()) + Expect(count).To(Equal(0)) + }) + }) + + Describe("SetWithTTL", func() { + It("stores value that is retrievable before expiry", func() { + err := service.SetWithTTL(ctx, "ttl_key", []byte("ttl_value"), 3600) + Expect(err).ToNot(HaveOccurred()) + + value, exists, err := service.Get(ctx, "ttl_key") + Expect(err).ToNot(HaveOccurred()) + Expect(exists).To(BeTrue()) + Expect(value).To(Equal([]byte("ttl_value"))) + }) + + It("value is not retrievable after expiry", func() { + // Insert a key with an already-expired TTL + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('short_ttl', 'gone_soon', 9, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + + _, exists, err := service.Get(ctx, "short_ttl") + Expect(err).ToNot(HaveOccurred()) + Expect(exists).To(BeFalse()) + }) + + It("rejects ttlSeconds <= 0", func() { + err := service.SetWithTTL(ctx, "bad_ttl", []byte("value"), 0) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("ttlSeconds must be greater than 0")) + + err = service.SetWithTTL(ctx, "bad_ttl", []byte("value"), -5) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("ttlSeconds must be greater than 0")) + }) + + It("validates key same as Set", func() { + err := service.SetWithTTL(ctx, "", []byte("value"), 60) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("key cannot be empty")) + }) + + It("enforces size limits same as Set", func() { + bigValue := make([]byte, 2048) + err := service.SetWithTTL(ctx, "big_ttl", bigValue, 60) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("storage limit exceeded")) + }) + + It("overwrites existing key and updates TTL", func() { + // Insert a key with an already-expired TTL + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('overwrite_ttl', 'first', 5, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + + // Overwrite with a long TTL — should be retrievable + err = service.SetWithTTL(ctx, "overwrite_ttl", []byte("second"), 3600) + Expect(err).ToNot(HaveOccurred()) + + value, exists, err := service.Get(ctx, "overwrite_ttl") + Expect(err).ToNot(HaveOccurred()) + Expect(exists).To(BeTrue()) + Expect(value).To(Equal([]byte("second"))) + }) + + It("tracks storage correctly", func() { + err := service.SetWithTTL(ctx, "sized_ttl", []byte("12345"), 3600) + Expect(err).ToNot(HaveOccurred()) + + used, err := service.GetStorageUsed(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(used).To(Equal(int64(5))) + }) + }) + + Describe("DeleteByPrefix", func() { + BeforeEach(func() { + Expect(service.Set(ctx, "cache:user:1", []byte("Alice"))).To(Succeed()) + Expect(service.Set(ctx, "cache:user:2", []byte("Bob"))).To(Succeed()) + Expect(service.Set(ctx, "cache:item:1", []byte("Widget"))).To(Succeed()) + Expect(service.Set(ctx, "data:important", []byte("keep"))).To(Succeed()) + }) + + It("deletes all keys with the given prefix", func() { + deleted, err := service.DeleteByPrefix(ctx, "cache:user:") + Expect(err).ToNot(HaveOccurred()) + Expect(deleted).To(Equal(int64(2))) + + keys, err := service.List(ctx, "") + Expect(err).ToNot(HaveOccurred()) + Expect(keys).To(HaveLen(2)) + Expect(keys).To(ContainElements("cache:item:1", "data:important")) + }) + + It("rejects empty prefix", func() { + _, err := service.DeleteByPrefix(ctx, "") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("prefix cannot be empty")) + }) + + It("returns 0 when no keys match", func() { + deleted, err := service.DeleteByPrefix(ctx, "nonexistent:") + Expect(err).ToNot(HaveOccurred()) + Expect(deleted).To(Equal(int64(0))) + }) + + It("updates storage size correctly", func() { + usedBefore, _ := service.GetStorageUsed(ctx) + Expect(usedBefore).To(BeNumerically(">", 0)) + + _, err := service.DeleteByPrefix(ctx, "cache:") + Expect(err).ToNot(HaveOccurred()) + + usedAfter, _ := service.GetStorageUsed(ctx) + Expect(usedAfter).To(Equal(int64(4))) + }) + + It("handles special LIKE characters in prefix", func() { + Expect(service.Set(ctx, "test%special", []byte("v1"))).To(Succeed()) + Expect(service.Set(ctx, "test_special", []byte("v2"))).To(Succeed()) + Expect(service.Set(ctx, "testXspecial", []byte("v3"))).To(Succeed()) + + deleted, err := service.DeleteByPrefix(ctx, "test%") + Expect(err).ToNot(HaveOccurred()) + Expect(deleted).To(Equal(int64(1))) + + exists, _ := service.Has(ctx, "test_special") + Expect(exists).To(BeTrue()) + exists, _ = service.Has(ctx, "testXspecial") + Expect(exists).To(BeTrue()) + }) + + It("also deletes expired keys matching prefix", func() { + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('cache:expired', 'old', 3, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + + deleted, err := service.DeleteByPrefix(ctx, "cache:") + Expect(err).ToNot(HaveOccurred()) + Expect(deleted).To(Equal(int64(4))) + }) + }) + + Describe("GetMany", func() { + BeforeEach(func() { + Expect(service.Set(ctx, "key1", []byte("value1"))).To(Succeed()) + Expect(service.Set(ctx, "key2", []byte("value2"))).To(Succeed()) + Expect(service.Set(ctx, "key3", []byte("value3"))).To(Succeed()) + }) + + It("retrieves multiple values at once", func() { + values, err := service.GetMany(ctx, []string{"key1", "key2", "key3"}) + Expect(err).ToNot(HaveOccurred()) + Expect(values).To(HaveLen(3)) + Expect(values["key1"]).To(Equal([]byte("value1"))) + Expect(values["key2"]).To(Equal([]byte("value2"))) + Expect(values["key3"]).To(Equal([]byte("value3"))) + }) + + It("omits missing keys from result", func() { + values, err := service.GetMany(ctx, []string{"key1", "missing", "key3"}) + Expect(err).ToNot(HaveOccurred()) + Expect(values).To(HaveLen(2)) + Expect(values["key1"]).To(Equal([]byte("value1"))) + Expect(values["key3"]).To(Equal([]byte("value3"))) + _, hasMissing := values["missing"] + Expect(hasMissing).To(BeFalse()) + }) + + It("returns empty map for empty keys slice", func() { + values, err := service.GetMany(ctx, []string{}) + Expect(err).ToNot(HaveOccurred()) + Expect(values).To(BeEmpty()) + }) + + It("returns empty map for nil keys slice", func() { + values, err := service.GetMany(ctx, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(values).To(BeEmpty()) + }) + + It("excludes expired keys", func() { + _, err := service.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('expired_many', 'old', 3, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + + values, err := service.GetMany(ctx, []string{"key1", "expired_many"}) + Expect(err).ToNot(HaveOccurred()) + Expect(values).To(HaveLen(1)) + Expect(values["key1"]).To(Equal([]byte("value1"))) + }) + + It("handles all keys missing", func() { + values, err := service.GetMany(ctx, []string{"nope1", "nope2"}) + Expect(err).ToNot(HaveOccurred()) + Expect(values).To(BeEmpty()) + }) + }) }) var _ = Describe("KVStoreService Integration", Ordered, func() { @@ -416,17 +720,21 @@ var _ = Describe("KVStoreService Integration", Ordered, func() { Describe("KVStore Operations via Plugin", func() { type testKVStoreInput struct { - Operation string `json:"operation"` - Key string `json:"key"` - Value []byte `json:"value,omitempty"` - Prefix string `json:"prefix,omitempty"` + Operation string `json:"operation"` + Key string `json:"key"` + Value []byte `json:"value,omitempty"` + Prefix string `json:"prefix,omitempty"` + TTLSeconds int64 `json:"ttl_seconds,omitempty"` + Keys []string `json:"keys,omitempty"` } type testKVStoreOutput struct { - Value []byte `json:"value,omitempty"` - Exists bool `json:"exists,omitempty"` - Keys []string `json:"keys,omitempty"` - StorageUsed int64 `json:"storage_used,omitempty"` - Error *string `json:"error,omitempty"` + Value []byte `json:"value,omitempty"` + Values map[string][]byte `json:"values,omitempty"` + Exists bool `json:"exists,omitempty"` + Keys []string `json:"keys,omitempty"` + StorageUsed int64 `json:"storage_used,omitempty"` + DeletedCount int64 `json:"deleted_count,omitempty"` + Error *string `json:"error,omitempty"` } callTestKVStore := func(ctx context.Context, input testKVStoreInput) (*testKVStoreOutput, error) { @@ -594,6 +902,107 @@ var _ = Describe("KVStoreService Integration", Ordered, func() { Expect(output.Exists).To(BeTrue()) Expect(output.Value).To(Equal(binaryData)) }) + + It("should set value with TTL and expire it", func() { + ctx := GinkgoT().Context() + + // Set value with 1 second TTL + _, err := callTestKVStore(ctx, testKVStoreInput{ + Operation: "set_with_ttl", + Key: "ttl_key", + Value: []byte("temporary"), + TTLSeconds: 1, + }) + Expect(err).ToNot(HaveOccurred()) + + // Immediately should exist + output, err := callTestKVStore(ctx, testKVStoreInput{ + Operation: "get", + Key: "ttl_key", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output.Exists).To(BeTrue()) + Expect(output.Value).To(Equal([]byte("temporary"))) + + // Wait for expiration + time.Sleep(2 * time.Second) + + // Should no longer exist + output, err = callTestKVStore(ctx, testKVStoreInput{ + Operation: "get", + Key: "ttl_key", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output.Exists).To(BeFalse()) + }) + + It("should delete keys by prefix", func() { + ctx := GinkgoT().Context() + + // Set multiple keys with shared prefix + for _, key := range []string{"del_prefix:a", "del_prefix:b", "keep:c"} { + _, err := callTestKVStore(ctx, testKVStoreInput{ + Operation: "set", + Key: key, + Value: []byte("value"), + }) + Expect(err).ToNot(HaveOccurred()) + } + + // Delete by prefix + output, err := callTestKVStore(ctx, testKVStoreInput{ + Operation: "delete_by_prefix", + Prefix: "del_prefix:", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output.DeletedCount).To(Equal(int64(2))) + + // Verify remaining key + getOutput, err := callTestKVStore(ctx, testKVStoreInput{ + Operation: "has", + Key: "keep:c", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(getOutput.Exists).To(BeTrue()) + + // Verify deleted keys are gone + getOutput, err = callTestKVStore(ctx, testKVStoreInput{ + Operation: "has", + Key: "del_prefix:a", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(getOutput.Exists).To(BeFalse()) + }) + + It("should get many values at once", func() { + ctx := GinkgoT().Context() + + // Set multiple keys + for _, kv := range []struct{ k, v string }{ + {"many:1", "val1"}, + {"many:2", "val2"}, + {"many:3", "val3"}, + } { + _, err := callTestKVStore(ctx, testKVStoreInput{ + Operation: "set", + Key: kv.k, + Value: []byte(kv.v), + }) + Expect(err).ToNot(HaveOccurred()) + } + + // Get many, including a missing key + output, err := callTestKVStore(ctx, testKVStoreInput{ + Operation: "get_many", + Keys: []string{"many:1", "many:3", "many:missing"}, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output.Values).To(HaveLen(2)) + Expect(output.Values["many:1"]).To(Equal([]byte("val1"))) + Expect(output.Values["many:3"]).To(Equal([]byte("val3"))) + _, hasMissing := output.Values["many:missing"] + Expect(hasMissing).To(BeFalse()) + }) }) Describe("Database Isolation", func() { diff --git a/plugins/manager_loader.go b/plugins/manager_loader.go index 688c4519..610dbd02 100644 --- a/plugins/manager_loader.go +++ b/plugins/manager_loader.go @@ -103,7 +103,7 @@ var hostServices = []hostServiceEntry{ hasPermission: func(p *Permissions) bool { return p != nil && p.Kvstore != nil }, create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) { perm := ctx.permissions.Kvstore - service, err := newKVStoreService(ctx.pluginName, perm) + service, err := newKVStoreService(ctx.manager.ctx, ctx.pluginName, perm) if err != nil { log.Error("Failed to create KVStore service", "plugin", ctx.pluginName, err) return nil, nil diff --git a/plugins/migrate.go b/plugins/migrate.go new file mode 100644 index 00000000..332e3483 --- /dev/null +++ b/plugins/migrate.go @@ -0,0 +1,47 @@ +package plugins + +import ( + "database/sql" + "fmt" +) + +// migrateDB applies schema migrations to a SQLite database. +// +// Each entry in migrations is a single SQL statement. The current schema version +// is tracked using SQLite's built-in PRAGMA user_version. Only statements after +// the current version are executed, within a single transaction. +func migrateDB(db *sql.DB, migrations []string) error { + var version int + if err := db.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil { + return fmt.Errorf("reading schema version: %w", err) + } + + if version >= len(migrations) { + return nil + } + + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("starting migration transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + for i := version; i < len(migrations); i++ { + if _, err := tx.Exec(migrations[i]); err != nil { + return fmt.Errorf("migration %d failed: %w", i+1, err) + } + } + + // PRAGMA statements cannot be executed inside a transaction in some SQLite + // drivers, but with mattn/go-sqlite3 this works. We set it inside the tx + // so that a failed commit leaves the version unchanged. + if _, err := tx.Exec(fmt.Sprintf(`PRAGMA user_version = %d`, len(migrations))); err != nil { + return fmt.Errorf("updating schema version: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("committing migrations: %w", err) + } + + return nil +} diff --git a/plugins/migrate_test.go b/plugins/migrate_test.go new file mode 100644 index 00000000..17ed43c5 --- /dev/null +++ b/plugins/migrate_test.go @@ -0,0 +1,99 @@ +//go:build !windows + +package plugins + +import ( + "database/sql" + + _ "github.com/mattn/go-sqlite3" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("migrateDB", func() { + var db *sql.DB + + BeforeEach(func() { + var err error + db, err = sql.Open("sqlite3", ":memory:") + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + if db != nil { + db.Close() + } + }) + + getUserVersion := func() int { + var version int + Expect(db.QueryRow(`PRAGMA user_version`).Scan(&version)).To(Succeed()) + return version + } + + It("applies all migrations on a fresh database", func() { + migrations := []string{ + `CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`, + `ALTER TABLE test ADD COLUMN email TEXT`, + } + + Expect(migrateDB(db, migrations)).To(Succeed()) + Expect(getUserVersion()).To(Equal(2)) + + // Verify schema + _, err := db.Exec(`INSERT INTO test (id, name, email) VALUES (1, 'Alice', 'alice@test.com')`) + Expect(err).ToNot(HaveOccurred()) + }) + + It("skips already applied migrations", func() { + migrations1 := []string{ + `CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`, + } + Expect(migrateDB(db, migrations1)).To(Succeed()) + Expect(getUserVersion()).To(Equal(1)) + + // Add a new migration + migrations2 := []string{ + `CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`, + `ALTER TABLE test ADD COLUMN email TEXT`, + } + Expect(migrateDB(db, migrations2)).To(Succeed()) + Expect(getUserVersion()).To(Equal(2)) + + // Verify the new column exists + _, err := db.Exec(`INSERT INTO test (id, name, email) VALUES (1, 'Alice', 'alice@test.com')`) + Expect(err).ToNot(HaveOccurred()) + }) + + It("is a no-op when all migrations are applied", func() { + migrations := []string{ + `CREATE TABLE test (id INTEGER PRIMARY KEY)`, + } + Expect(migrateDB(db, migrations)).To(Succeed()) + Expect(migrateDB(db, migrations)).To(Succeed()) + Expect(getUserVersion()).To(Equal(1)) + }) + + It("is a no-op with empty migrations slice", func() { + Expect(migrateDB(db, nil)).To(Succeed()) + Expect(getUserVersion()).To(Equal(0)) + }) + + It("rolls back on failure", func() { + migrations := []string{ + `CREATE TABLE test (id INTEGER PRIMARY KEY)`, + `INVALID SQL STATEMENT`, + } + + err := migrateDB(db, migrations) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("migration 2 failed")) + + // Version should remain 0 (rolled back) + Expect(getUserVersion()).To(Equal(0)) + + // Table should not exist (rolled back) + _, err = db.Exec(`INSERT INTO test (id) VALUES (1)`) + Expect(err).To(HaveOccurred()) + }) +}) diff --git a/plugins/pdk/go/host/nd_host_kvstore.go b/plugins/pdk/go/host/nd_host_kvstore.go index 92ac9d77..15e1e366 100644 --- a/plugins/pdk/go/host/nd_host_kvstore.go +++ b/plugins/pdk/go/host/nd_host_kvstore.go @@ -19,15 +19,20 @@ import ( //go:wasmimport extism:host/user kvstore_set func kvstore_set(uint64) uint64 +// kvstore_setwithttl is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user kvstore_setwithttl +func kvstore_setwithttl(uint64) uint64 + // kvstore_get is the host function provided by Navidrome. // //go:wasmimport extism:host/user kvstore_get func kvstore_get(uint64) uint64 -// kvstore_delete is the host function provided by Navidrome. +// kvstore_getmany is the host function provided by Navidrome. // -//go:wasmimport extism:host/user kvstore_delete -func kvstore_delete(uint64) uint64 +//go:wasmimport extism:host/user kvstore_getmany +func kvstore_getmany(uint64) uint64 // kvstore_has is the host function provided by Navidrome. // @@ -39,6 +44,16 @@ func kvstore_has(uint64) uint64 //go:wasmimport extism:host/user kvstore_list func kvstore_list(uint64) uint64 +// kvstore_delete is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user kvstore_delete +func kvstore_delete(uint64) uint64 + +// kvstore_deletebyprefix is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user kvstore_deletebyprefix +func kvstore_deletebyprefix(uint64) uint64 + // kvstore_getstorageused is the host function provided by Navidrome. // //go:wasmimport extism:host/user kvstore_getstorageused @@ -49,6 +64,12 @@ type kVStoreSetRequest struct { Value []byte `json:"value"` } +type kVStoreSetWithTTLRequest struct { + Key string `json:"key"` + Value []byte `json:"value"` + TtlSeconds int64 `json:"ttlSeconds"` +} + type kVStoreGetRequest struct { Key string `json:"key"` } @@ -59,8 +80,13 @@ type kVStoreGetResponse struct { Error string `json:"error,omitempty"` } -type kVStoreDeleteRequest struct { - Key string `json:"key"` +type kVStoreGetManyRequest struct { + Keys []string `json:"keys"` +} + +type kVStoreGetManyResponse struct { + Values map[string][]byte `json:"values,omitempty"` + Error string `json:"error,omitempty"` } type kVStoreHasRequest struct { @@ -81,6 +107,19 @@ type kVStoreListResponse struct { Error string `json:"error,omitempty"` } +type kVStoreDeleteRequest struct { + Key string `json:"key"` +} + +type kVStoreDeleteByPrefixRequest struct { + Prefix string `json:"prefix"` +} + +type kVStoreDeleteByPrefixResponse struct { + DeletedCount int64 `json:"deletedCount,omitempty"` + Error string `json:"error,omitempty"` +} + type kVStoreGetStorageUsedResponse struct { Bytes int64 `json:"bytes,omitempty"` Error string `json:"error,omitempty"` @@ -127,6 +166,52 @@ func KVStoreSet(key string, value []byte) error { return nil } +// KVStoreSetWithTTL calls the kvstore_setwithttl host function. +// SetWithTTL stores a byte value with the given key and a time-to-live. +// +// After ttlSeconds, the key is treated as non-existent and will be +// cleaned up lazily. ttlSeconds must be greater than 0. +// +// Parameters: +// - key: The storage key (max 256 bytes, UTF-8) +// - value: The byte slice to store +// - ttlSeconds: Time-to-live in seconds (must be > 0) +// +// Returns an error if the storage limit would be exceeded or the operation fails. +func KVStoreSetWithTTL(key string, value []byte, ttlSeconds int64) error { + // Marshal request to JSON + req := kVStoreSetWithTTLRequest{ + Key: key, + Value: value, + TtlSeconds: ttlSeconds, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := kvstore_setwithttl(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse error-only response + var response struct { + Error string `json:"error,omitempty"` + } + if err := json.Unmarshal(responseBytes, &response); err != nil { + return err + } + if response.Error != "" { + return errors.New(response.Error) + } + return nil +} + // KVStoreGet calls the kvstore_get host function. // Get retrieves a byte value from storage. // @@ -167,43 +252,45 @@ func KVStoreGet(key string) ([]byte, bool, error) { return response.Value, response.Exists, nil } -// KVStoreDelete calls the kvstore_delete host function. -// Delete removes a value from storage. +// KVStoreGetMany calls the kvstore_getmany host function. +// GetMany retrieves multiple values in a single call. // // Parameters: -// - key: The storage key +// - keys: The storage keys to retrieve // -// Returns an error if the operation fails. Does not return an error if the key doesn't exist. -func KVStoreDelete(key string) error { +// Returns a map of key to value for keys that exist and have not expired. +// Missing or expired keys are omitted from the result. +func KVStoreGetMany(keys []string) (map[string][]byte, error) { // Marshal request to JSON - req := kVStoreDeleteRequest{ - Key: key, + req := kVStoreGetManyRequest{ + Keys: keys, } reqBytes, err := json.Marshal(req) if err != nil { - return err + return nil, err } reqMem := pdk.AllocateBytes(reqBytes) defer reqMem.Free() // Call the host function - responsePtr := kvstore_delete(reqMem.Offset()) + responsePtr := kvstore_getmany(reqMem.Offset()) // Read the response from memory responseMem := pdk.FindMemory(responsePtr) responseBytes := responseMem.ReadBytes() - // Parse error-only response - var response struct { - Error string `json:"error,omitempty"` - } + // Parse the response + var response kVStoreGetManyResponse if err := json.Unmarshal(responseBytes, &response); err != nil { - return err + return nil, err } + + // Convert Error field to Go error if response.Error != "" { - return errors.New(response.Error) + return nil, errors.New(response.Error) } - return nil + + return response.Values, nil } // KVStoreHas calls the kvstore_has host function. @@ -286,6 +373,85 @@ func KVStoreList(prefix string) ([]string, error) { return response.Keys, nil } +// KVStoreDelete calls the kvstore_delete host function. +// Delete removes a value from storage. +// +// Parameters: +// - key: The storage key +// +// Returns an error if the operation fails. Does not return an error if the key doesn't exist. +func KVStoreDelete(key string) error { + // Marshal request to JSON + req := kVStoreDeleteRequest{ + Key: key, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := kvstore_delete(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse error-only response + var response struct { + Error string `json:"error,omitempty"` + } + if err := json.Unmarshal(responseBytes, &response); err != nil { + return err + } + if response.Error != "" { + return errors.New(response.Error) + } + return nil +} + +// KVStoreDeleteByPrefix calls the kvstore_deletebyprefix host function. +// DeleteByPrefix removes all keys matching the given prefix. +// +// Parameters: +// - prefix: Key prefix to match (must not be empty) +// +// Returns the number of keys deleted. Includes expired keys. +func KVStoreDeleteByPrefix(prefix string) (int64, error) { + // Marshal request to JSON + req := kVStoreDeleteByPrefixRequest{ + Prefix: prefix, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return 0, err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := kvstore_deletebyprefix(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse the response + var response kVStoreDeleteByPrefixResponse + if err := json.Unmarshal(responseBytes, &response); err != nil { + return 0, err + } + + // Convert Error field to Go error + if response.Error != "" { + return 0, errors.New(response.Error) + } + + return response.DeletedCount, nil +} + // KVStoreGetStorageUsed calls the kvstore_getstorageused host function. // GetStorageUsed returns the total storage used by this plugin in bytes. func KVStoreGetStorageUsed() (int64, error) { diff --git a/plugins/pdk/go/host/nd_host_kvstore_stub.go b/plugins/pdk/go/host/nd_host_kvstore_stub.go index 1b3ff1e8..83b55d3a 100644 --- a/plugins/pdk/go/host/nd_host_kvstore_stub.go +++ b/plugins/pdk/go/host/nd_host_kvstore_stub.go @@ -37,6 +37,28 @@ func KVStoreSet(key string, value []byte) error { return KVStoreMock.Set(key, value) } +// SetWithTTL is the mock method for KVStoreSetWithTTL. +func (m *mockKVStoreService) SetWithTTL(key string, value []byte, ttlSeconds int64) error { + args := m.Called(key, value, ttlSeconds) + return args.Error(0) +} + +// KVStoreSetWithTTL delegates to the mock instance. +// SetWithTTL stores a byte value with the given key and a time-to-live. +// +// After ttlSeconds, the key is treated as non-existent and will be +// cleaned up lazily. ttlSeconds must be greater than 0. +// +// Parameters: +// - key: The storage key (max 256 bytes, UTF-8) +// - value: The byte slice to store +// - ttlSeconds: Time-to-live in seconds (must be > 0) +// +// Returns an error if the storage limit would be exceeded or the operation fails. +func KVStoreSetWithTTL(key string, value []byte, ttlSeconds int64) error { + return KVStoreMock.SetWithTTL(key, value, ttlSeconds) +} + // Get is the mock method for KVStoreGet. func (m *mockKVStoreService) Get(key string) ([]byte, bool, error) { args := m.Called(key) @@ -54,21 +76,22 @@ func KVStoreGet(key string) ([]byte, bool, error) { return KVStoreMock.Get(key) } -// Delete is the mock method for KVStoreDelete. -func (m *mockKVStoreService) Delete(key string) error { - args := m.Called(key) - return args.Error(0) +// GetMany is the mock method for KVStoreGetMany. +func (m *mockKVStoreService) GetMany(keys []string) (map[string][]byte, error) { + args := m.Called(keys) + return args.Get(0).(map[string][]byte), args.Error(1) } -// KVStoreDelete delegates to the mock instance. -// Delete removes a value from storage. +// KVStoreGetMany delegates to the mock instance. +// GetMany retrieves multiple values in a single call. // // Parameters: -// - key: The storage key +// - keys: The storage keys to retrieve // -// Returns an error if the operation fails. Does not return an error if the key doesn't exist. -func KVStoreDelete(key string) error { - return KVStoreMock.Delete(key) +// Returns a map of key to value for keys that exist and have not expired. +// Missing or expired keys are omitted from the result. +func KVStoreGetMany(keys []string) (map[string][]byte, error) { + return KVStoreMock.GetMany(keys) } // Has is the mock method for KVStoreHas. @@ -105,6 +128,40 @@ func KVStoreList(prefix string) ([]string, error) { return KVStoreMock.List(prefix) } +// Delete is the mock method for KVStoreDelete. +func (m *mockKVStoreService) Delete(key string) error { + args := m.Called(key) + return args.Error(0) +} + +// KVStoreDelete delegates to the mock instance. +// Delete removes a value from storage. +// +// Parameters: +// - key: The storage key +// +// Returns an error if the operation fails. Does not return an error if the key doesn't exist. +func KVStoreDelete(key string) error { + return KVStoreMock.Delete(key) +} + +// DeleteByPrefix is the mock method for KVStoreDeleteByPrefix. +func (m *mockKVStoreService) DeleteByPrefix(prefix string) (int64, error) { + args := m.Called(prefix) + return args.Get(0).(int64), args.Error(1) +} + +// KVStoreDeleteByPrefix delegates to the mock instance. +// DeleteByPrefix removes all keys matching the given prefix. +// +// Parameters: +// - prefix: Key prefix to match (must not be empty) +// +// Returns the number of keys deleted. Includes expired keys. +func KVStoreDeleteByPrefix(prefix string) (int64, error) { + return KVStoreMock.DeleteByPrefix(prefix) +} + // GetStorageUsed is the mock method for KVStoreGetStorageUsed. func (m *mockKVStoreService) GetStorageUsed() (int64, error) { args := m.Called() diff --git a/plugins/pdk/python/host/nd_host_kvstore.py b/plugins/pdk/python/host/nd_host_kvstore.py index 3c3e61f5..33eaffc5 100644 --- a/plugins/pdk/python/host/nd_host_kvstore.py +++ b/plugins/pdk/python/host/nd_host_kvstore.py @@ -26,14 +26,20 @@ def _kvstore_set(offset: int) -> int: ... +@extism.import_fn("extism:host/user", "kvstore_setwithttl") +def _kvstore_setwithttl(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + @extism.import_fn("extism:host/user", "kvstore_get") def _kvstore_get(offset: int) -> int: """Raw host function - do not call directly.""" ... -@extism.import_fn("extism:host/user", "kvstore_delete") -def _kvstore_delete(offset: int) -> int: +@extism.import_fn("extism:host/user", "kvstore_getmany") +def _kvstore_getmany(offset: int) -> int: """Raw host function - do not call directly.""" ... @@ -50,6 +56,18 @@ def _kvstore_list(offset: int) -> int: ... +@extism.import_fn("extism:host/user", "kvstore_delete") +def _kvstore_delete(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +@extism.import_fn("extism:host/user", "kvstore_deletebyprefix") +def _kvstore_deletebyprefix(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + @extism.import_fn("extism:host/user", "kvstore_getstorageused") def _kvstore_getstorageused(offset: int) -> int: """Raw host function - do not call directly.""" @@ -94,6 +112,43 @@ Returns an error if the storage limit would be exceeded or the operation fails. +def kvstore_set_with_ttl(key: str, value: bytes, ttl_seconds: int) -> None: + """SetWithTTL stores a byte value with the given key and a time-to-live. + +After ttlSeconds, the key is treated as non-existent and will be +cleaned up lazily. ttlSeconds must be greater than 0. + +Parameters: + - key: The storage key (max 256 bytes, UTF-8) + - value: The byte slice to store + - ttlSeconds: Time-to-live in seconds (must be > 0) + +Returns an error if the storage limit would be exceeded or the operation fails. + + Args: + key: str parameter. + value: bytes parameter. + ttl_seconds: int parameter. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "key": key, + "value": base64.b64encode(value).decode("ascii"), + "ttlSeconds": ttl_seconds, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _kvstore_setwithttl(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + + def kvstore_get(key: str) -> KVStoreGetResult: """Get retrieves a byte value from storage. @@ -129,32 +184,37 @@ Returns the value and whether the key exists. ) -def kvstore_delete(key: str) -> None: - """Delete removes a value from storage. +def kvstore_get_many(keys: Any) -> Any: + """GetMany retrieves multiple values in a single call. Parameters: - - key: The storage key + - keys: The storage keys to retrieve -Returns an error if the operation fails. Does not return an error if the key doesn't exist. +Returns a map of key to value for keys that exist and have not expired. +Missing or expired keys are omitted from the result. Args: - key: str parameter. + keys: Any parameter. + + Returns: + Any: The result value. Raises: HostFunctionError: If the host function returns an error. """ request = { - "key": key, + "keys": keys, } request_bytes = json.dumps(request).encode("utf-8") request_mem = extism.memory.alloc(request_bytes) - response_offset = _kvstore_delete(request_mem.offset) + response_offset = _kvstore_getmany(request_mem.offset) response_mem = extism.memory.find(response_offset) response = json.loads(extism.memory.string(response_mem)) if response.get("error"): raise HostFunctionError(response["error"]) + return response.get("values", None) def kvstore_has(key: str) -> bool: @@ -221,6 +281,66 @@ Returns a slice of matching keys. return response.get("keys", None) +def kvstore_delete(key: str) -> None: + """Delete removes a value from storage. + +Parameters: + - key: The storage key + +Returns an error if the operation fails. Does not return an error if the key doesn't exist. + + Args: + key: str parameter. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "key": key, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _kvstore_delete(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + + +def kvstore_delete_by_prefix(prefix: str) -> int: + """DeleteByPrefix removes all keys matching the given prefix. + +Parameters: + - prefix: Key prefix to match (must not be empty) + +Returns the number of keys deleted. Includes expired keys. + + Args: + prefix: str parameter. + + Returns: + int: The result value. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "prefix": prefix, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _kvstore_deletebyprefix(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + return response.get("deletedCount", 0) + + def kvstore_get_storage_used() -> int: """GetStorageUsed returns the total storage used by this plugin in bytes. diff --git a/plugins/pdk/rust/nd-pdk-host/src/nd_host_kvstore.rs b/plugins/pdk/rust/nd-pdk-host/src/nd_host_kvstore.rs index 20fe18c6..a85e7289 100644 --- a/plugins/pdk/rust/nd-pdk-host/src/nd_host_kvstore.rs +++ b/plugins/pdk/rust/nd-pdk-host/src/nd_host_kvstore.rs @@ -44,6 +44,22 @@ struct KVStoreSetResponse { error: Option, } +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct KVStoreSetWithTTLRequest { + key: String, + #[serde(with = "base64_bytes")] + value: Vec, + ttl_seconds: i64, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct KVStoreSetWithTTLResponse { + #[serde(default)] + error: Option, +} + #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] struct KVStoreGetRequest { @@ -64,13 +80,15 @@ struct KVStoreGetResponse { #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] -struct KVStoreDeleteRequest { - key: String, +struct KVStoreGetManyRequest { + keys: Vec, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] -struct KVStoreDeleteResponse { +struct KVStoreGetManyResponse { + #[serde(default)] + values: std::collections::HashMap>, #[serde(default)] error: Option, } @@ -105,6 +123,34 @@ struct KVStoreListResponse { error: Option, } +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct KVStoreDeleteRequest { + key: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct KVStoreDeleteResponse { + #[serde(default)] + error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct KVStoreDeleteByPrefixRequest { + prefix: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct KVStoreDeleteByPrefixResponse { + #[serde(default)] + deleted_count: i64, + #[serde(default)] + error: Option, +} + #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] struct KVStoreGetStorageUsedResponse { @@ -117,10 +163,13 @@ struct KVStoreGetStorageUsedResponse { #[host_fn] extern "ExtismHost" { fn kvstore_set(input: Json) -> Json; + fn kvstore_setwithttl(input: Json) -> Json; fn kvstore_get(input: Json) -> Json; - fn kvstore_delete(input: Json) -> Json; + fn kvstore_getmany(input: Json) -> Json; fn kvstore_has(input: Json) -> Json; fn kvstore_list(input: Json) -> Json; + fn kvstore_delete(input: Json) -> Json; + fn kvstore_deletebyprefix(input: Json) -> Json; fn kvstore_getstorageused(input: Json) -> Json; } @@ -153,6 +202,41 @@ pub fn set(key: &str, value: Vec) -> Result<(), Error> { Ok(()) } +/// SetWithTTL stores a byte value with the given key and a time-to-live. +/// +/// After ttlSeconds, the key is treated as non-existent and will be +/// cleaned up lazily. ttlSeconds must be greater than 0. +/// +/// Parameters: +/// - key: The storage key (max 256 bytes, UTF-8) +/// - value: The byte slice to store +/// - ttlSeconds: Time-to-live in seconds (must be > 0) +/// +/// Returns an error if the storage limit would be exceeded or the operation fails. +/// +/// # Arguments +/// * `key` - String parameter. +/// * `value` - Vec parameter. +/// * `ttl_seconds` - i64 parameter. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn set_with_ttl(key: &str, value: Vec, ttl_seconds: i64) -> Result<(), Error> { + let response = unsafe { + kvstore_setwithttl(Json(KVStoreSetWithTTLRequest { + key: key.to_owned(), + value: value, + ttl_seconds: ttl_seconds, + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(()) +} + /// Get retrieves a byte value from storage. /// /// Parameters: @@ -186,22 +270,26 @@ pub fn get(key: &str) -> Result>, Error> { } } -/// Delete removes a value from storage. +/// GetMany retrieves multiple values in a single call. /// /// Parameters: -/// - key: The storage key +/// - keys: The storage keys to retrieve /// -/// Returns an error if the operation fails. Does not return an error if the key doesn't exist. +/// Returns a map of key to value for keys that exist and have not expired. +/// Missing or expired keys are omitted from the result. /// /// # Arguments -/// * `key` - String parameter. +/// * `keys` - Vec parameter. +/// +/// # Returns +/// The values value. /// /// # Errors /// Returns an error if the host function call fails. -pub fn delete(key: &str) -> Result<(), Error> { +pub fn get_many(keys: Vec) -> Result>, Error> { let response = unsafe { - kvstore_delete(Json(KVStoreDeleteRequest { - key: key.to_owned(), + kvstore_getmany(Json(KVStoreGetManyRequest { + keys: keys, }))? }; @@ -209,7 +297,7 @@ pub fn delete(key: &str) -> Result<(), Error> { return Err(Error::msg(err)); } - Ok(()) + Ok(response.0.values) } /// Has checks if a key exists in storage. @@ -270,6 +358,61 @@ pub fn list(prefix: &str) -> Result, Error> { Ok(response.0.keys) } +/// Delete removes a value from storage. +/// +/// Parameters: +/// - key: The storage key +/// +/// Returns an error if the operation fails. Does not return an error if the key doesn't exist. +/// +/// # Arguments +/// * `key` - String parameter. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn delete(key: &str) -> Result<(), Error> { + let response = unsafe { + kvstore_delete(Json(KVStoreDeleteRequest { + key: key.to_owned(), + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(()) +} + +/// DeleteByPrefix removes all keys matching the given prefix. +/// +/// Parameters: +/// - prefix: Key prefix to match (must not be empty) +/// +/// Returns the number of keys deleted. Includes expired keys. +/// +/// # Arguments +/// * `prefix` - String parameter. +/// +/// # Returns +/// The deleted_count value. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn delete_by_prefix(prefix: &str) -> Result { + let response = unsafe { + kvstore_deletebyprefix(Json(KVStoreDeleteByPrefixRequest { + prefix: prefix.to_owned(), + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(response.0.deleted_count) +} + /// GetStorageUsed returns the total storage used by this plugin in bytes. /// /// # Returns diff --git a/plugins/testdata/test-kvstore/main.go b/plugins/testdata/test-kvstore/main.go index 9c289df0..7fba14ab 100644 --- a/plugins/testdata/test-kvstore/main.go +++ b/plugins/testdata/test-kvstore/main.go @@ -9,19 +9,23 @@ import ( // TestKVStoreInput is the input for nd_test_kvstore callback. type TestKVStoreInput struct { - Operation string `json:"operation"` // "set", "get", "delete", "has", "list", "get_storage_used" - Key string `json:"key"` // Storage key - Value []byte `json:"value"` // For set operations - Prefix string `json:"prefix"` // For list operation + Operation string `json:"operation"` // "set", "get", "delete", "has", "list", "get_storage_used", "set_with_ttl", "delete_by_prefix", "get_many" + Key string `json:"key"` // Storage key + Value []byte `json:"value"` // For set operations + Prefix string `json:"prefix"` // For list/delete_by_prefix operations + TTLSeconds int64 `json:"ttl_seconds,omitempty"` // For set_with_ttl + Keys []string `json:"keys,omitempty"` // For get_many } // TestKVStoreOutput is the output from nd_test_kvstore callback. type TestKVStoreOutput struct { - Value []byte `json:"value,omitempty"` - Exists bool `json:"exists,omitempty"` - Keys []string `json:"keys,omitempty"` - StorageUsed int64 `json:"storage_used,omitempty"` - Error *string `json:"error,omitempty"` + Value []byte `json:"value,omitempty"` + Values map[string][]byte `json:"values,omitempty"` + Exists bool `json:"exists,omitempty"` + Keys []string `json:"keys,omitempty"` + StorageUsed int64 `json:"storage_used,omitempty"` + DeletedCount int64 `json:"deleted_count,omitempty"` + Error *string `json:"error,omitempty"` } // nd_test_kvstore is the test callback that tests the kvstore host functions. @@ -96,6 +100,36 @@ func ndTestKVStore() int32 { pdk.OutputJSON(TestKVStoreOutput{StorageUsed: bytesUsed}) return 0 + case "set_with_ttl": + err := host.KVStoreSetWithTTL(input.Key, input.Value, input.TTLSeconds) + if err != nil { + errStr := err.Error() + pdk.OutputJSON(TestKVStoreOutput{Error: &errStr}) + return 0 + } + pdk.OutputJSON(TestKVStoreOutput{}) + return 0 + + case "delete_by_prefix": + deletedCount, err := host.KVStoreDeleteByPrefix(input.Prefix) + if err != nil { + errStr := err.Error() + pdk.OutputJSON(TestKVStoreOutput{Error: &errStr}) + return 0 + } + pdk.OutputJSON(TestKVStoreOutput{DeletedCount: deletedCount}) + return 0 + + case "get_many": + values, err := host.KVStoreGetMany(input.Keys) + if err != nil { + errStr := err.Error() + pdk.OutputJSON(TestKVStoreOutput{Error: &errStr}) + return 0 + } + pdk.OutputJSON(TestKVStoreOutput{Values: values}) + return 0 + default: errStr := "unknown operation: " + input.Operation pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})