diff --git a/plugins/capabilities/taskworker.go b/plugins/capabilities/taskworker.go new file mode 100644 index 00000000..c53d5017 --- /dev/null +++ b/plugins/capabilities/taskworker.go @@ -0,0 +1,27 @@ +package capabilities + +// TaskWorker provides task execution handling. +// This capability allows plugins to receive callbacks when their queued tasks +// are ready to execute. Plugins that use the taskqueue host service must +// implement this capability. +// +//nd:capability name=taskworker +type TaskWorker interface { + // OnTaskExecute is called when a queued task is ready to run. + // The returned string is a status/result message stored in the tasks table. + // Return an error to trigger retry (if retries are configured). + //nd:export name=nd_task_execute + OnTaskExecute(TaskExecuteRequest) (string, error) +} + +// TaskExecuteRequest is the request provided when a task is ready to execute. +type TaskExecuteRequest struct { + // QueueName is the name of the queue this task belongs to. + QueueName string `json:"queueName"` + // TaskID is the unique identifier for this task. + TaskID string `json:"taskId"` + // Payload is the opaque data provided when the task was enqueued. + Payload []byte `json:"payload"` + // Attempt is the current attempt number (1-based: first attempt = 1). + Attempt int32 `json:"attempt"` +} diff --git a/plugins/capabilities/taskworker.yaml b/plugins/capabilities/taskworker.yaml new file mode 100644 index 00000000..f10fd079 --- /dev/null +++ b/plugins/capabilities/taskworker.yaml @@ -0,0 +1,38 @@ +version: v1-draft +exports: + nd_task_execute: + description: |- + OnTaskExecute is called when a queued task is ready to run. + The returned string is a status/result message stored in the tasks table. + Return an error to trigger retry (if retries are configured). + input: + $ref: '#/components/schemas/TaskExecuteRequest' + contentType: application/json + output: + type: string + contentType: application/json +components: + schemas: + TaskExecuteRequest: + description: TaskExecuteRequest is the request provided when a task is ready to execute. + properties: + queueName: + type: string + description: QueueName is the name of the queue this task belongs to. + taskId: + type: string + description: TaskID is the unique identifier for this task. + payload: + type: array + description: Payload is the opaque data provided when the task was enqueued. + items: + type: object + attempt: + type: integer + format: int32 + description: 'Attempt is the current attempt number (1-based: first attempt = 1).' + required: + - queueName + - taskId + - payload + - attempt diff --git a/plugins/host/task.go b/plugins/host/task.go new file mode 100644 index 00000000..dcaf7197 --- /dev/null +++ b/plugins/host/task.go @@ -0,0 +1,73 @@ +package host + +import "context" + +// TaskInfo holds the current state of a task. +type TaskInfo struct { + // Status is the current task status: "pending", "running", + // "completed", "failed", or "cancelled". + Status string `json:"status"` + // Message is the status/result message returned by the plugin callback. + Message string `json:"message"` + // Attempt is the current or last attempt number (1-based). + Attempt int32 `json:"attempt"` +} + +// QueueConfig holds configuration for a task queue. +type QueueConfig struct { + // Concurrency is the max number of parallel workers. Default: 1. + // Capped by the plugin's manifest maxConcurrency. + Concurrency int32 `json:"concurrency"` + + // MaxRetries is the number of times to retry a failed task. Default: 0. + MaxRetries int32 `json:"maxRetries"` + + // BackoffMs is the initial backoff between retries in milliseconds. + // Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000. + BackoffMs int64 `json:"backoffMs"` + + // DelayMs is the minimum delay between starting consecutive tasks + // in milliseconds. Useful for rate limiting. Default: 0. + DelayMs int64 `json:"delayMs"` + + // RetentionMs is how long completed/failed/cancelled tasks are kept + // in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w). + RetentionMs int64 `json:"retentionMs"` +} + +// TaskService provides persistent task queues for plugins. +// +// This service allows plugins to create named queues with configurable concurrency, +// retry policies, and rate limiting. Tasks are persisted to SQLite and survive +// server restarts. When a task is ready to execute, the host calls the plugin's +// nd_task_execute callback function. +// +//nd:hostservice name=Task permission=taskqueue +type TaskService interface { + // CreateQueue creates a named task queue with the given configuration. + // Zero-value fields in config use sensible defaults. + // If a queue with the same name already exists, returns an error. + // On startup, this also recovers any stale "running" tasks from a previous crash. + //nd:hostfunc + CreateQueue(ctx context.Context, name string, config QueueConfig) error + + // Enqueue adds a task to the named queue. Returns the task ID. + // payload is opaque bytes passed back to the plugin on execution. + //nd:hostfunc + Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) + + // Get returns the current state of a task including its status, + // message, and attempt count. + //nd:hostfunc + Get(ctx context.Context, taskID string) (*TaskInfo, error) + + // Cancel cancels a pending task. Returns error if already + // running, completed, or failed. + //nd:hostfunc + Cancel(ctx context.Context, taskID string) error + + // ClearQueue removes all pending tasks from the named queue. + // Running tasks are not affected. Returns the number of tasks removed. + //nd:hostfunc + ClearQueue(ctx context.Context, queueName string) (int64, error) +} diff --git a/plugins/host/task_gen.go b/plugins/host/task_gen.go new file mode 100644 index 00000000..e4864bb5 --- /dev/null +++ b/plugins/host/task_gen.go @@ -0,0 +1,266 @@ +// Code generated by ndpgen. DO NOT EDIT. + +package host + +import ( + "context" + "encoding/json" + + extism "github.com/extism/go-sdk" +) + +// TaskCreateQueueRequest is the request type for Task.CreateQueue. +type TaskCreateQueueRequest struct { + Name string `json:"name"` + Config QueueConfig `json:"config"` +} + +// TaskCreateQueueResponse is the response type for Task.CreateQueue. +type TaskCreateQueueResponse struct { + Error string `json:"error,omitempty"` +} + +// TaskEnqueueRequest is the request type for Task.Enqueue. +type TaskEnqueueRequest struct { + QueueName string `json:"queueName"` + Payload []byte `json:"payload"` +} + +// TaskEnqueueResponse is the response type for Task.Enqueue. +type TaskEnqueueResponse struct { + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// TaskGetRequest is the request type for Task.Get. +type TaskGetRequest struct { + TaskID string `json:"taskId"` +} + +// TaskGetResponse is the response type for Task.Get. +type TaskGetResponse struct { + Result *TaskInfo `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// TaskCancelRequest is the request type for Task.Cancel. +type TaskCancelRequest struct { + TaskID string `json:"taskId"` +} + +// TaskCancelResponse is the response type for Task.Cancel. +type TaskCancelResponse struct { + Error string `json:"error,omitempty"` +} + +// TaskClearQueueRequest is the request type for Task.ClearQueue. +type TaskClearQueueRequest struct { + QueueName string `json:"queueName"` +} + +// TaskClearQueueResponse is the response type for Task.ClearQueue. +type TaskClearQueueResponse struct { + Result int64 `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// RegisterTaskHostFunctions registers Task service host functions. +// The returned host functions should be added to the plugin's configuration. +func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction { + return []extism.HostFunction{ + newTaskCreateQueueHostFunction(service), + newTaskEnqueueHostFunction(service), + newTaskGetHostFunction(service), + newTaskCancelHostFunction(service), + newTaskClearQueueHostFunction(service), + } +} + +func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_createqueue", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskCreateQueueRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskCreateQueueResponse{} + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_enqueue", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskEnqueueRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload) + if svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskEnqueueResponse{ + Result: result, + } + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskGetHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_get", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskGetRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + result, svcErr := service.Get(ctx, req.TaskID) + if svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskGetResponse{ + Result: result, + } + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskCancelHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_cancel", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskCancelRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskCancelResponse{} + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskClearQueueHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_clearqueue", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskClearQueueRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + result, svcErr := service.ClearQueue(ctx, req.QueueName) + if svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskClearQueueResponse{ + Result: result, + } + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +// taskWriteResponse writes a JSON response to plugin memory. +func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) { + respBytes, err := json.Marshal(resp) + if err != nil { + taskWriteError(p, stack, err) + return + } + respPtr, err := p.WriteBytes(respBytes) + if err != nil { + stack[0] = 0 + return + } + stack[0] = respPtr +} + +// taskWriteError writes an error response to plugin memory. +func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) { + errResp := struct { + Error string `json:"error"` + }{Error: err.Error()} + respBytes, _ := json.Marshal(errResp) + respPtr, _ := p.WriteBytes(respBytes) + stack[0] = respPtr +} diff --git a/plugins/host_taskqueue.go b/plugins/host_taskqueue.go new file mode 100644 index 00000000..283bc963 --- /dev/null +++ b/plugins/host_taskqueue.go @@ -0,0 +1,595 @@ +package plugins + +import ( + "context" + "database/sql" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" + + _ "github.com/mattn/go-sqlite3" + "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/log" + "github.com/navidrome/navidrome/model/id" + "github.com/navidrome/navidrome/plugins/capabilities" + "github.com/navidrome/navidrome/plugins/host" + "golang.org/x/time/rate" +) + +const ( + defaultConcurrency int32 = 1 + defaultBackoffMs int64 = 1000 + defaultRetentionMs int64 = 3_600_000 // 1 hour + minRetentionMs int64 = 60_000 // 1 minute + maxRetentionMs int64 = 604_800_000 // 1 week + maxQueueNameLength = 128 + maxPayloadSize = 1 * 1024 * 1024 // 1MB + maxBackoffMs int64 = 3_600_000 // 1 hour + taskCleanupInterval = 5 * time.Minute + pollInterval = 5 * time.Second + shutdownTimeout = 10 * time.Second + + taskStatusPending = "pending" + taskStatusRunning = "running" + taskStatusCompleted = "completed" + taskStatusFailed = "failed" + taskStatusCancelled = "cancelled" +) + +// CapabilityTaskWorker indicates the plugin can receive task execution callbacks. +const CapabilityTaskWorker Capability = "TaskWorker" + +const FuncTaskWorkerCallback = "nd_task_execute" + +func init() { + registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback) +} + +type queueState struct { + config host.QueueConfig + signal chan struct{} + limiter *rate.Limiter +} + +// notifyWorkers sends a non-blocking signal to wake up queue workers. +func (qs *queueState) notifyWorkers() { + select { + case qs.signal <- struct{}{}: + default: + } +} + +// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence +// and background worker goroutines for task execution. +type taskQueueServiceImpl struct { + pluginName string + manager *Manager + maxConcurrency int32 + db *sql.DB + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + mu sync.Mutex + queues map[string]*queueState + + // For testing: override how callbacks are invoked + invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) +} + +// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database. +func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) { + dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName) + if err := os.MkdirAll(dataDir, 0700); err != nil { + return nil, fmt.Errorf("creating plugin data directory: %w", err) + } + + dbPath := filepath.Join(dataDir, "taskqueue.db") + db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off") + if err != nil { + return nil, fmt.Errorf("opening taskqueue database: %w", err) + } + + db.SetMaxOpenConns(3) + db.SetMaxIdleConns(1) + + if err := createTaskQueueSchema(db); err != nil { + db.Close() + return nil, fmt.Errorf("creating taskqueue schema: %w", err) + } + + ctx, cancel := context.WithCancel(manager.ctx) + + s := &taskQueueServiceImpl{ + pluginName: pluginName, + manager: manager, + maxConcurrency: maxConcurrency, + db: db, + ctx: ctx, + cancel: cancel, + queues: make(map[string]*queueState), + } + s.invokeCallbackFn = s.defaultInvokeCallback + + s.wg.Go(s.cleanupLoop) + + log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency) + return s, nil +} + +// createTaskQueueSchema applies schema migrations to the taskqueue database. +// New migrations must be appended at the end of the slice. +func createTaskQueueSchema(db *sql.DB) error { + return migrateDB(db, []string{ + `CREATE TABLE IF NOT EXISTS queues ( + name TEXT PRIMARY KEY, + concurrency INTEGER NOT NULL DEFAULT 1, + max_retries INTEGER NOT NULL DEFAULT 0, + backoff_ms INTEGER NOT NULL DEFAULT 1000, + delay_ms INTEGER NOT NULL DEFAULT 0, + retention_ms INTEGER NOT NULL DEFAULT 3600000 + )`, + `CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + queue_name TEXT NOT NULL REFERENCES queues(name), + payload BLOB NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + attempt INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL, + next_run_at INTEGER NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + message TEXT NOT NULL DEFAULT '' + )`, + `CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at)`, + }) +} + +// applyConfigDefaults fills zero-value config fields with sensible defaults +// and clamps values to valid ranges, logging warnings for clamped values. +func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) { + if config.Concurrency <= 0 { + config.Concurrency = defaultConcurrency + } + if config.BackoffMs <= 0 { + config.BackoffMs = defaultBackoffMs + } + if config.RetentionMs <= 0 { + config.RetentionMs = defaultRetentionMs + } + + if config.RetentionMs < minRetentionMs { + log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name, + "requested", config.RetentionMs, "min", minRetentionMs) + config.RetentionMs = minRetentionMs + } + if config.RetentionMs > maxRetentionMs { + log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name, + "requested", config.RetentionMs, "max", maxRetentionMs) + config.RetentionMs = maxRetentionMs + } +} + +// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget. +// Returns an error when the concurrency budget is fully exhausted. +// Must be called with s.mu held. +func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error { + var allocated int32 + for _, qs := range s.queues { + allocated += qs.config.Concurrency + } + available := s.maxConcurrency - allocated + if available <= 0 { + log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name, + "allocated", allocated, "maxConcurrency", s.maxConcurrency) + return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency) + } + if config.Concurrency > available { + log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name, + "requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency) + config.Concurrency = available + } + return nil +} + +func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error { + if len(name) == 0 { + return fmt.Errorf("queue name cannot be empty") + } + if len(name) > maxQueueNameLength { + return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength) + } + + s.applyConfigDefaults(ctx, name, &config) + + s.mu.Lock() + defer s.mu.Unlock() + + if err := s.clampConcurrency(ctx, name, &config); err != nil { + return err + } + + if _, exists := s.queues[name]; exists { + return fmt.Errorf("queue %q already exists", name) + } + + // Upsert into queues table (idempotent across restarts) + _, err := s.db.ExecContext(ctx, ` + INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(name) DO UPDATE SET + concurrency = excluded.concurrency, + max_retries = excluded.max_retries, + backoff_ms = excluded.backoff_ms, + delay_ms = excluded.delay_ms, + retention_ms = excluded.retention_ms + `, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs) + if err != nil { + return fmt.Errorf("creating queue: %w", err) + } + + // Reset stale running tasks from previous crash + now := time.Now().UnixMilli() + _, err = s.db.ExecContext(ctx, ` + UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ? + `, taskStatusPending, now, name, taskStatusRunning) + if err != nil { + return fmt.Errorf("resetting stale tasks: %w", err) + } + + qs := &queueState{ + config: config, + signal: make(chan struct{}, 1), + } + if config.DelayMs > 0 { + // Rate limit dispatches to enforce delay between tasks. + // Burst of 1 allows one immediate dispatch, then enforces the delay interval. + qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1) + } + s.queues[name] = qs + + for i := int32(0); i < config.Concurrency; i++ { + s.wg.Go(func() { s.worker(name, qs) }) + } + + log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name, + "concurrency", config.Concurrency, "maxRetries", config.MaxRetries, + "backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs) + return nil +} + +func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) { + s.mu.Lock() + qs, exists := s.queues[queueName] + s.mu.Unlock() + + if !exists { + return "", fmt.Errorf("queue %q does not exist", queueName) + } + if len(payload) > maxPayloadSize { + return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize) + } + + taskID := id.NewRandom() + now := time.Now().UnixMilli() + + _, err := s.db.ExecContext(ctx, ` + INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at) + VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?) + `, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now) + if err != nil { + return "", fmt.Errorf("enqueuing task: %w", err) + } + + qs.notifyWorkers() + log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID) + return taskID, nil +} + +// Get returns the current state of a task. +func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) { + var info host.TaskInfo + err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID). + Scan(&info.Status, &info.Message, &info.Attempt) + if errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("task %q not found", taskID) + } + if err != nil { + return nil, fmt.Errorf("getting task info: %w", err) + } + return &info, nil +} + +// Cancel cancels a pending task. +func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error { + now := time.Now().UnixMilli() + result, err := s.db.ExecContext(ctx, ` + UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ? + `, taskStatusCancelled, now, taskID, taskStatusPending) + if err != nil { + return fmt.Errorf("cancelling task: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("checking cancel result: %w", err) + } + + if rowsAffected == 0 { + // Check if task exists at all + var status string + err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status) + if errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("task %q not found", taskID) + } + if err != nil { + return fmt.Errorf("checking task existence: %w", err) + } + return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status) + } + + log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID) + return nil +} + +// ClearQueue removes all pending tasks from the named queue. +// Running tasks are not affected. Returns the number of tasks removed. +func (s *taskQueueServiceImpl) ClearQueue(ctx context.Context, queueName string) (int64, error) { + s.mu.Lock() + _, exists := s.queues[queueName] + s.mu.Unlock() + + if !exists { + return 0, fmt.Errorf("queue %q does not exist", queueName) + } + + now := time.Now().UnixMilli() + result, err := s.db.ExecContext(ctx, ` + UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ? + `, taskStatusCancelled, now, queueName, taskStatusPending) + if err != nil { + return 0, fmt.Errorf("clearing queue: %w", err) + } + + cleared, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("checking clear result: %w", err) + } + + if cleared > 0 { + log.Debug(ctx, "Cleared pending tasks from queue", "plugin", s.pluginName, "queue", queueName, "cleared", cleared) + } + return cleared, nil +} + +// worker is the main loop for a single worker goroutine. +func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) { + // Process any existing pending tasks immediately on startup + s.drainQueue(queueName, qs) + + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-qs.signal: + s.drainQueue(queueName, qs) + case <-ticker.C: + s.drainQueue(queueName, qs) + } + } +} + +func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) { + for s.ctx.Err() == nil && s.processTask(queueName, qs) { + } +} + +// processTask dequeues and processes a single task. Returns true if a task was processed. +func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool { + now := time.Now().UnixMilli() + + // Atomically dequeue a task + var taskID string + var payload []byte + var attempt, maxRetries int32 + err := s.db.QueryRowContext(s.ctx, ` + UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ? + WHERE id = ( + SELECT id FROM tasks + WHERE queue_name = ? AND status = ? AND next_run_at <= ? + ORDER BY next_run_at, created_at LIMIT 1 + ) + RETURNING id, payload, attempt, max_retries + `, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries) + if errors.Is(err, sql.ErrNoRows) { + return false + } + if err != nil { + log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err) + return false + } + + // Enforce delay between task dispatches using a rate limiter. + // This is done after dequeue so that empty polls don't consume rate tokens. + if qs.limiter != nil { + if err := qs.limiter.Wait(s.ctx); err != nil { + // Context cancelled during wait — revert task to pending for recovery + s.revertTaskToPending(taskID) + return false + } + } + + // Invoke callback + log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt) + message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt) + + // If context was cancelled (shutdown), revert task to pending for recovery + if s.ctx.Err() != nil { + s.revertTaskToPending(taskID) + return false + } + + if callbackErr == nil { + s.completeTask(queueName, taskID, message) + } else { + s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message) + } + return true +} + +func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) { + now := time.Now().UnixMilli() + if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil { + log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err) + } + log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID) +} + +func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) { + log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName, + "taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr) + + // Use error message as fallback if no message was provided + if message == "" { + message = callbackErr.Error() + } + + now := time.Now().UnixMilli() + if attempt > maxRetries { + if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil { + log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err) + } + log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID) + return + } + + // Exponential backoff: backoffMs * 2^(attempt-1) + backoff := qs.config.BackoffMs << (attempt - 1) + if backoff <= 0 || backoff > maxBackoffMs { + backoff = maxBackoffMs + } + nextRunAt := now + backoff + if _, err := s.db.ExecContext(s.ctx, ` + UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ? + `, taskStatusPending, nextRunAt, now, taskID); err != nil { + log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err) + } + + // Wake worker after backoff expires + time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() { + qs.notifyWorkers() + }) +} + +// revertTaskToPending puts a running task back to pending status and decrements the attempt +// counter (used during shutdown to ensure the interrupted attempt doesn't count). +func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) { + now := time.Now().UnixMilli() + _, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning) + if err != nil { + log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err) + } +} + +// defaultInvokeCallback calls the plugin's nd_task_execute function. +func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) { + s.manager.mu.RLock() + p, ok := s.manager.plugins[s.pluginName] + s.manager.mu.RUnlock() + + if !ok { + return "", fmt.Errorf("plugin %s not loaded", s.pluginName) + } + + input := capabilities.TaskExecuteRequest{ + QueueName: queueName, + TaskID: taskID, + Payload: payload, + Attempt: attempt, + } + + message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input) + if err != nil { + return "", err + } + return message, nil +} + +// cleanupLoop periodically removes terminal tasks past their retention period. +func (s *taskQueueServiceImpl) cleanupLoop() { + ticker := time.NewTicker(taskCleanupInterval) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.runCleanup() + } + } +} + +// runCleanup deletes terminal tasks past their retention period. +func (s *taskQueueServiceImpl) runCleanup() { + s.mu.Lock() + queues := make(map[string]*queueState, len(s.queues)) + for k, v := range s.queues { + queues[k] = v + } + s.mu.Unlock() + + now := time.Now().UnixMilli() + for name, qs := range queues { + result, err := s.db.ExecContext(s.ctx, ` + DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ? + `, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now) + if err != nil { + log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err) + continue + } + if deleted, _ := result.RowsAffected(); deleted > 0 { + log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted) + } + } +} + +// Close shuts down the task queue service, stopping all workers and closing the database. +func (s *taskQueueServiceImpl) Close() error { + // Cancel context to signal all goroutines + s.cancel() + + // Wait for goroutines with timeout + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(shutdownTimeout): + log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName) + } + + // Mark running tasks as pending for recovery on next startup + if s.db != nil { + now := time.Now().UnixMilli() + if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil { + log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err) + } + log.Debug("Closing plugin taskqueue", "plugin", s.pluginName) + return s.db.Close() + } + return nil +} + +// Compile-time verification +var _ host.TaskService = (*taskQueueServiceImpl)(nil) +var _ io.Closer = (*taskQueueServiceImpl)(nil) diff --git a/plugins/host_taskqueue_test.go b/plugins/host_taskqueue_test.go new file mode 100644 index 00000000..c3ab8d11 --- /dev/null +++ b/plugins/host_taskqueue_test.go @@ -0,0 +1,1221 @@ +//go:build !windows + +package plugins + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "net/http" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/conf/configtest" + "github.com/navidrome/navidrome/model" + "github.com/navidrome/navidrome/plugins/host" + "github.com/navidrome/navidrome/tests" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("TaskQueueService", func() { + var tmpDir string + var service *taskQueueServiceImpl + var ctx context.Context + var manager *Manager + + BeforeEach(func() { + ctx = GinkgoT().Context() + var err error + tmpDir, err = os.MkdirTemp("", "taskqueue-test-*") + Expect(err).ToNot(HaveOccurred()) + + DeferCleanup(configtest.SetupConfig()) + conf.Server.DataFolder = tmpDir + + // Create a mock manager with context + managerCtx, cancel := context.WithCancel(ctx) + manager = &Manager{ + plugins: make(map[string]*plugin), + ctx: managerCtx, + } + DeferCleanup(cancel) + + service, err = newTaskQueueService("test_plugin", manager, 5) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + if service != nil { + service.Close() + } + os.RemoveAll(tmpDir) + }) + + Describe("CreateQueue", func() { + It("creates a queue successfully", func() { + err := service.CreateQueue(ctx, "my-queue", host.QueueConfig{ + Concurrency: 2, + MaxRetries: 3, + BackoffMs: 2000, + RetentionMs: 7200000, + }) + Expect(err).ToNot(HaveOccurred()) + + service.mu.Lock() + qs, exists := service.queues["my-queue"] + service.mu.Unlock() + Expect(exists).To(BeTrue()) + Expect(qs.config.Concurrency).To(Equal(int32(2))) + Expect(qs.config.MaxRetries).To(Equal(int32(3))) + Expect(qs.config.BackoffMs).To(Equal(int64(2000))) + Expect(qs.config.RetentionMs).To(Equal(int64(7200000))) + }) + + It("returns error for duplicate queue name", func() { + err := service.CreateQueue(ctx, "dup-queue", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + err = service.CreateQueue(ctx, "dup-queue", host.QueueConfig{}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("already exists")) + }) + }) + + Describe("CreateQueue name validation", func() { + It("rejects empty queue name", func() { + err := service.CreateQueue(ctx, "", host.QueueConfig{}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("queue name cannot be empty")) + }) + + It("rejects over-length queue name", func() { + longName := strings.Repeat("a", maxQueueNameLength+1) + err := service.CreateQueue(ctx, longName, host.QueueConfig{}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("exceeds maximum length")) + }) + + It("accepts queue name at maximum length", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil + } + exactName := strings.Repeat("a", maxQueueNameLength) + err := service.CreateQueue(ctx, exactName, host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Describe("CreateQueue defaults", func() { + It("applies defaults for zero-value config", func() { + err := service.CreateQueue(ctx, "defaults-queue", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + service.mu.Lock() + qs := service.queues["defaults-queue"] + service.mu.Unlock() + Expect(qs.config.Concurrency).To(Equal(defaultConcurrency)) + Expect(qs.config.BackoffMs).To(Equal(defaultBackoffMs)) + Expect(qs.config.RetentionMs).To(Equal(defaultRetentionMs)) + }) + }) + + Describe("CreateQueue defaults with negative values", func() { + It("applies default RetentionMs for negative value", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil + } + err := service.CreateQueue(ctx, "neg-retention", host.QueueConfig{ + RetentionMs: -500, + }) + Expect(err).ToNot(HaveOccurred()) + + service.mu.Lock() + qs := service.queues["neg-retention"] + service.mu.Unlock() + Expect(qs.config.RetentionMs).To(Equal(defaultRetentionMs)) + }) + }) + + Describe("CreateQueue clamping", func() { + It("clamps concurrency exceeding maxConcurrency", func() { + // maxConcurrency is 5; request 10 + err := service.CreateQueue(ctx, "clamped-queue", host.QueueConfig{ + Concurrency: 10, + }) + Expect(err).ToNot(HaveOccurred()) + + service.mu.Lock() + qs := service.queues["clamped-queue"] + service.mu.Unlock() + Expect(qs.config.Concurrency).To(Equal(int32(5))) + }) + + It("returns error when concurrency budget is exhausted", func() { + // maxConcurrency is 5; create a queue that uses all 5 + err := service.CreateQueue(ctx, "full-budget", host.QueueConfig{ + Concurrency: 5, + }) + Expect(err).ToNot(HaveOccurred()) + + // Next queue should fail — no budget remaining + err = service.CreateQueue(ctx, "over-budget", host.QueueConfig{ + Concurrency: 1, + }) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("concurrency budget exhausted")) + }) + + It("clamps retention below minimum", func() { + err := service.CreateQueue(ctx, "low-retention", host.QueueConfig{ + RetentionMs: 100, // below minRetentionMs + }) + Expect(err).ToNot(HaveOccurred()) + + service.mu.Lock() + qs := service.queues["low-retention"] + service.mu.Unlock() + Expect(qs.config.RetentionMs).To(Equal(minRetentionMs)) + }) + + It("clamps retention above maximum", func() { + err := service.CreateQueue(ctx, "high-retention", host.QueueConfig{ + RetentionMs: 999_999_999_999, // above maxRetentionMs + }) + Expect(err).ToNot(HaveOccurred()) + + service.mu.Lock() + qs := service.queues["high-retention"] + service.mu.Unlock() + Expect(qs.config.RetentionMs).To(Equal(maxRetentionMs)) + }) + }) + + Describe("Enqueue", func() { + BeforeEach(func() { + // Use a no-op callback to prevent actual execution attempts + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil + } + err := service.CreateQueue(ctx, "enqueue-test", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + }) + + It("enqueues a task and returns task ID", func() { + taskID, err := service.Enqueue(ctx, "enqueue-test", []byte("payload")) + Expect(err).ToNot(HaveOccurred()) + Expect(taskID).ToNot(BeEmpty()) + }) + + It("returns error for non-existent queue", func() { + _, err := service.Enqueue(ctx, "no-such-queue", []byte("payload")) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("does not exist")) + }) + + It("rejects payload exceeding maximum size", func() { + bigPayload := make([]byte, maxPayloadSize+1) + _, err := service.Enqueue(ctx, "enqueue-test", bigPayload) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("exceeds maximum")) + }) + + It("accepts payload at maximum size", func() { + exactPayload := make([]byte, maxPayloadSize) + taskID, err := service.Enqueue(ctx, "enqueue-test", exactPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(taskID).ToNot(BeEmpty()) + }) + }) + + Describe("GetTaskStatus", func() { + BeforeEach(func() { + // Use a callback that blocks until context is cancelled so tasks stay pending + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { + <-ctx.Done() + return "", ctx.Err() + } + }) + + It("returns pending for a new task", func() { + err := service.CreateQueue(ctx, "status-test", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "status-test", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + // The task may get picked up quickly; check initial status + // Since the callback blocks, it should be either pending or running + info, err := service.Get(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + Expect(info).ToNot(BeNil()) + Expect(info.Status).To(BeElementOf("pending", "running")) + }) + + It("returns error for unknown task ID", func() { + _, err := service.Get(ctx, "nonexistent-id") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not found")) + }) + }) + + Describe("CancelTask", func() { + BeforeEach(func() { + // Block callback so tasks stay in pending/running + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { + <-ctx.Done() + return "", ctx.Err() + } + }) + + It("cancels a pending task", func() { + // Block the callback so the first task occupies the worker + started := make(chan struct{}) + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { + close(started) + <-ctx.Done() + return "", ctx.Err() + } + + err := service.CreateQueue(ctx, "cancel-test", host.QueueConfig{ + Concurrency: 1, + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue a blocker task to occupy the single worker + _, err = service.Enqueue(ctx, "cancel-test", []byte("blocker")) + Expect(err).ToNot(HaveOccurred()) + + // Wait for the blocker task to start running + Eventually(started).WithTimeout(5 * time.Second).Should(BeClosed()) + + // Enqueue a second task — it stays pending since the worker is busy + taskID, err := service.Enqueue(ctx, "cancel-test", []byte("cancel-me")) + Expect(err).ToNot(HaveOccurred()) + + err = service.Cancel(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + + info, err := service.Get(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Status).To(Equal("cancelled")) + }) + + It("returns error for unknown task ID", func() { + err := service.Cancel(ctx, "nonexistent-id") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not found")) + }) + + It("returns error for non-pending task", func() { + // Create a queue where tasks complete immediately + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil + } + err := service.CreateQueue(ctx, "completed-test", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "completed-test", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + // Wait for task to complete + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + + // Try to cancel completed task + err = service.Cancel(ctx, taskID) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("cannot be cancelled")) + }) + }) + + Describe("ClearQueue", func() { + It("clears all pending tasks from a queue", func() { + // Block the callback so the first task occupies the worker + started := make(chan struct{}) + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { + close(started) + <-ctx.Done() + return "", ctx.Err() + } + + err := service.CreateQueue(ctx, "clear-test", host.QueueConfig{ + Concurrency: 1, + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue a blocker task to occupy the single worker + _, err = service.Enqueue(ctx, "clear-test", []byte("blocker")) + Expect(err).ToNot(HaveOccurred()) + + // Wait for the blocker task to start running + Eventually(started).WithTimeout(5 * time.Second).Should(BeClosed()) + + // Enqueue several more tasks — they stay pending since the worker is busy + var pendingIDs []string + for i := 0; i < 3; i++ { + taskID, err := service.Enqueue(ctx, "clear-test", []byte(fmt.Sprintf("task-%d", i))) + Expect(err).ToNot(HaveOccurred()) + pendingIDs = append(pendingIDs, taskID) + } + + // Clear the queue + cleared, err := service.ClearQueue(ctx, "clear-test") + Expect(err).ToNot(HaveOccurred()) + Expect(cleared).To(Equal(int64(3))) + + // Verify all pending tasks are now cancelled + for _, id := range pendingIDs { + info, err := service.Get(ctx, id) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Status).To(Equal("cancelled")) + } + }) + + It("returns zero when queue has no pending tasks", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil + } + err := service.CreateQueue(ctx, "empty-clear", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + cleared, err := service.ClearQueue(ctx, "empty-clear") + Expect(err).ToNot(HaveOccurred()) + Expect(cleared).To(Equal(int64(0))) + }) + + It("returns error for non-existent queue", func() { + _, err := service.ClearQueue(ctx, "no-such-queue") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("does not exist")) + }) + + It("does not affect running tasks", func() { + // Block the callback so tasks stay running + started := make(chan struct{}, 1) + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { + select { + case started <- struct{}{}: + default: + } + <-ctx.Done() + return "", ctx.Err() + } + + err := service.CreateQueue(ctx, "clear-running", host.QueueConfig{ + Concurrency: 1, + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue a task that will start running + runningID, err := service.Enqueue(ctx, "clear-running", []byte("running-task")) + Expect(err).ToNot(HaveOccurred()) + + // Wait for it to start running + Eventually(started).WithTimeout(5 * time.Second).Should(Receive()) + + // Clear the queue — should not affect the running task + cleared, err := service.ClearQueue(ctx, "clear-running") + Expect(err).ToNot(HaveOccurred()) + Expect(cleared).To(Equal(int64(0))) + + // Verify the running task is still running + info, err := service.Get(ctx, runningID) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Status).To(Equal("running")) + }) + }) + + Describe("Worker execution", func() { + It("invokes callback and completes task", func() { + var callCount atomic.Int32 + var receivedQueueName, receivedTaskID string + var receivedPayload []byte + var receivedAttempt int32 + + service.invokeCallbackFn = func(_ context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) { + callCount.Add(1) + receivedQueueName = queueName + receivedTaskID = taskID + receivedPayload = payload + receivedAttempt = attempt + return "", nil + } + + err := service.CreateQueue(ctx, "worker-test", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "worker-test", []byte("test-payload")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + + Expect(callCount.Load()).To(Equal(int32(1))) + Expect(receivedQueueName).To(Equal("worker-test")) + Expect(receivedTaskID).To(Equal(taskID)) + Expect(receivedPayload).To(Equal([]byte("test-payload"))) + Expect(receivedAttempt).To(Equal(int32(1))) + }) + }) + + Describe("Message storage", func() { + It("stores message on successful completion", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "task completed successfully", nil + } + + err := service.CreateQueue(ctx, "msg-success", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "msg-success", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + + info, err := service.Get(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Message).To(Equal("task completed successfully")) + Expect(info.Attempt).To(Equal(int32(1))) + }) + + It("stores error message on failure", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", fmt.Errorf("something went wrong") + } + + err := service.CreateQueue(ctx, "msg-fail", host.QueueConfig{ + MaxRetries: 0, + }) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "msg-fail", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("failed")) + + info, err := service.Get(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Message).To(Equal("something went wrong")) + }) + + It("uses explicit message over error message on failure", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "partial progress made", fmt.Errorf("timeout exceeded") + } + + err := service.CreateQueue(ctx, "msg-fail-with-msg", host.QueueConfig{ + MaxRetries: 0, + }) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "msg-fail-with-msg", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("failed")) + + info, err := service.Get(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Message).To(Equal("partial progress made")) + }) + }) + + Describe("Retry on failure", func() { + It("retries and eventually fails after exhausting retries", func() { + var callCount atomic.Int32 + + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + callCount.Add(1) + return "", fmt.Errorf("task failed") + } + + err := service.CreateQueue(ctx, "retry-test", host.QueueConfig{ + MaxRetries: 2, + BackoffMs: 10, // Very short for testing + }) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "retry-test", []byte("retry-payload")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("failed")) + + // 1 initial attempt + 2 retries = 3 total calls + Expect(callCount.Load()).To(Equal(int32(3))) + }) + }) + + Describe("Retry then succeed", func() { + It("retries and succeeds on second attempt", func() { + var callCount atomic.Int32 + + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, attempt int32) (string, error) { + callCount.Add(1) + if attempt == 1 { + return "", fmt.Errorf("temporary error") + } + return "success", nil + } + + err := service.CreateQueue(ctx, "retry-succeed", host.QueueConfig{ + MaxRetries: 1, + BackoffMs: 10, // Very short for testing + }) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "retry-succeed", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + + Expect(callCount.Load()).To(Equal(int32(2))) + }) + }) + + Describe("Backoff overflow cap", func() { + It("caps backoff at maxRetentionMs to prevent overflow", func() { + var callCount atomic.Int32 + + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + callCount.Add(1) + return "", fmt.Errorf("always fail") + } + + err := service.CreateQueue(ctx, "backoff-overflow", host.QueueConfig{ + MaxRetries: 3, + BackoffMs: 1_000_000_000, // Very large backoff to trigger overflow on exponentiation + }) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "backoff-overflow", []byte("overflow-test")) + Expect(err).ToNot(HaveOccurred()) + + // Wait for first attempt to fail + Eventually(func() int32 { + return callCount.Load() + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(BeNumerically(">=", int32(1))) + + // Check next_run_at is positive and reasonable (capped at maxRetentionMs from now) + var nextRunAt int64 + err = service.db.QueryRow(`SELECT next_run_at FROM tasks WHERE id = ?`, taskID).Scan(&nextRunAt) + Expect(err).ToNot(HaveOccurred()) + + now := time.Now().UnixMilli() + Expect(nextRunAt).To(BeNumerically(">", int64(0)), "next_run_at should be positive") + Expect(nextRunAt).To(BeNumerically("<=", now+maxBackoffMs+1000), "next_run_at should be at most maxBackoffMs from now") + }) + }) + + Describe("Delay enforcement with concurrent workers", func() { + It("enforces delay between dispatches even with multiple workers", func() { + var mu sync.Mutex + var dispatchTimes []time.Time + + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + mu.Lock() + dispatchTimes = append(dispatchTimes, time.Now()) + mu.Unlock() + return "", nil + } + + err := service.CreateQueue(ctx, "delay-concurrent", host.QueueConfig{ + Concurrency: 3, + DelayMs: 200, + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue 5 tasks + for i := 0; i < 5; i++ { + _, err := service.Enqueue(ctx, "delay-concurrent", []byte(fmt.Sprintf("task-%d", i))) + Expect(err).ToNot(HaveOccurred()) + } + + // Wait for all tasks to complete + Eventually(func() int { + mu.Lock() + defer mu.Unlock() + return len(dispatchTimes) + }).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal(5)) + + // Sort dispatch times and verify gaps + mu.Lock() + sort.Slice(dispatchTimes, func(i, j int) bool { + return dispatchTimes[i].Before(dispatchTimes[j]) + }) + times := make([]time.Time, len(dispatchTimes)) + copy(times, dispatchTimes) + mu.Unlock() + + // Consecutive dispatches should have at least ~160ms gap (80% of 200ms) + for i := 1; i < len(times); i++ { + gap := times[i].Sub(times[i-1]) + Expect(gap).To(BeNumerically(">=", 160*time.Millisecond), + fmt.Sprintf("gap between dispatch %d and %d was %v, expected >= 160ms", i-1, i, gap)) + } + }) + }) + + Describe("Shutdown recovery", func() { + It("resets stale running tasks on CreateQueue", func() { + // Create a first service and queue, enqueue a task + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { + <-ctx.Done() + return "", ctx.Err() + } + err := service.CreateQueue(ctx, "recovery-queue", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "recovery-queue", []byte("stale-task")) + Expect(err).ToNot(HaveOccurred()) + + // Wait for the task to start running + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("running")) + + // Close the service (simulates crash - tasks left in running state) + service.Close() + + // Create a new service pointing to the same DB + managerCtx2, cancel2 := context.WithCancel(ctx) + DeferCleanup(cancel2) + manager2 := &Manager{ + plugins: make(map[string]*plugin), + ctx: managerCtx2, + } + + service, err = newTaskQueueService("test_plugin", manager2, 5) + Expect(err).ToNot(HaveOccurred()) + + // Override callback to succeed + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil + } + + // Re-create the queue - the upsert handles the existing row from the old service + err = service.CreateQueue(ctx, "recovery-queue", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + // The stale running task should now be reset to pending and eventually completed + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + }) + }) + + Describe("Close", func() { + It("prevents subsequent operations after close", func() { + err := service.CreateQueue(ctx, "close-test", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + service.Close() + + // After close, operations should fail + _, err = service.Enqueue(ctx, "close-test", []byte("data")) + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("Plugin isolation", func() { + It("uses separate databases for different plugins", func() { + managerCtx2, cancel2 := context.WithCancel(ctx) + DeferCleanup(cancel2) + manager2 := &Manager{ + plugins: make(map[string]*plugin), + ctx: managerCtx2, + } + + service2, err := newTaskQueueService("other_plugin", manager2, 5) + Expect(err).ToNot(HaveOccurred()) + defer service2.Close() + + // Check that separate database files exist + _, err = os.Stat(filepath.Join(tmpDir, "plugins", "test_plugin", "taskqueue.db")) + Expect(err).ToNot(HaveOccurred()) + _, err = os.Stat(filepath.Join(tmpDir, "plugins", "other_plugin", "taskqueue.db")) + Expect(err).ToNot(HaveOccurred()) + + // Both services should be able to create queues with the same name independently + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { return "", nil } + service2.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { return "", nil } + + err = service.CreateQueue(ctx, "shared-name", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + err = service2.CreateQueue(ctx, "shared-name", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue to each and verify they work independently + taskID1, err := service.Enqueue(ctx, "shared-name", []byte("plugin1")) + Expect(err).ToNot(HaveOccurred()) + taskID2, err := service2.Enqueue(ctx, "shared-name", []byte("plugin2")) + Expect(err).ToNot(HaveOccurred()) + + Expect(taskID1).ToNot(Equal(taskID2)) + + // Both should complete + Eventually(func() string { + info, err := service.Get(ctx, taskID1) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + + Eventually(func() string { + info, err := service2.Get(ctx, taskID2) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + }) + }) +}) + +var _ = Describe("TaskQueueService Integration", Ordered, func() { + var manager *Manager + var tmpDir string + + BeforeAll(func() { + var err error + tmpDir, err = os.MkdirTemp("", "taskqueue-integration-test-*") + Expect(err).ToNot(HaveOccurred()) + + // Copy the test-taskqueue plugin + srcPath := filepath.Join(testdataDir, "test-taskqueue"+PackageExtension) + destPath := filepath.Join(tmpDir, "test-taskqueue"+PackageExtension) + data, err := os.ReadFile(srcPath) + Expect(err).ToNot(HaveOccurred()) + err = os.WriteFile(destPath, data, 0600) + Expect(err).ToNot(HaveOccurred()) + + // Compute SHA256 for the plugin + hash := sha256.Sum256(data) + hashHex := hex.EncodeToString(hash[:]) + + // Setup config + DeferCleanup(configtest.SetupConfig()) + conf.Server.Plugins.Enabled = true + conf.Server.Plugins.Folder = tmpDir + conf.Server.Plugins.AutoReload = false + conf.Server.CacheFolder = filepath.Join(tmpDir, "cache") + conf.Server.DataFolder = tmpDir + + // Setup mock DataStore with pre-enabled plugin + mockPluginRepo := tests.CreateMockPluginRepo() + mockPluginRepo.Permitted = true + mockPluginRepo.SetData(model.Plugins{{ + ID: "test-taskqueue", + Path: destPath, + SHA256: hashHex, + Enabled: true, + }}) + dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo} + + // Create and start manager + manager = &Manager{ + plugins: make(map[string]*plugin), + ds: dataStore, + metrics: noopMetricsRecorder{}, + subsonicRouter: http.NotFoundHandler(), + } + err = manager.Start(GinkgoT().Context()) + Expect(err).ToNot(HaveOccurred()) + + DeferCleanup(func() { + _ = manager.Stop() + _ = os.RemoveAll(tmpDir) + }) + }) + + // Helper types for calling the test plugin + type testQueueConfig struct { + Concurrency int32 `json:"concurrency,omitempty"` + MaxRetries int32 `json:"maxRetries,omitempty"` + BackoffMs int64 `json:"backoffMs,omitempty"` + DelayMs int64 `json:"delayMs,omitempty"` + RetentionMs int64 `json:"retentionMs,omitempty"` + } + + type testTaskQueueInput struct { + Operation string `json:"operation"` + QueueName string `json:"queueName,omitempty"` + Config *testQueueConfig `json:"config,omitempty"` + Payload []byte `json:"payload,omitempty"` + TaskID string `json:"taskId,omitempty"` + } + + type testTaskQueueOutput struct { + TaskID string `json:"taskId,omitempty"` + Status string `json:"status,omitempty"` + Message string `json:"message,omitempty"` + Attempt int32 `json:"attempt,omitempty"` + Cleared int64 `json:"cleared,omitempty"` + Error *string `json:"error,omitempty"` + } + + callTestTaskQueue := func(ctx context.Context, input testTaskQueueInput) (*testTaskQueueOutput, error) { + manager.mu.RLock() + p := manager.plugins["test-taskqueue"] + manager.mu.RUnlock() + + instance, err := p.instance(ctx) + if err != nil { + return nil, err + } + defer instance.Close(ctx) + + inputBytes, _ := json.Marshal(input) + _, outputBytes, err := instance.Call("nd_test_taskqueue", inputBytes) + if err != nil { + return nil, err + } + + var output testTaskQueueOutput + if err := json.Unmarshal(outputBytes, &output); err != nil { + return nil, err + } + if output.Error != nil { + return nil, errors.New(*output.Error) + } + return &output, nil + } + + Describe("Plugin Loading", func() { + It("should load plugin with taskqueue permission and TaskWorker capability", func() { + manager.mu.RLock() + p, ok := manager.plugins["test-taskqueue"] + manager.mu.RUnlock() + Expect(ok).To(BeTrue()) + Expect(p.manifest.Permissions).ToNot(BeNil()) + Expect(p.manifest.Permissions.Taskqueue).ToNot(BeNil()) + Expect(p.manifest.Permissions.Taskqueue.MaxConcurrency).To(Equal(10)) + Expect(p.capabilities).To(ContainElement(CapabilityTaskWorker)) + }) + }) + + Describe("Create Queue", func() { + It("should create a queue without error", func() { + ctx := GinkgoT().Context() + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "create_queue", + QueueName: "test-create", + }) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should return error for duplicate queue name", func() { + ctx := GinkgoT().Context() + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "create_queue", + QueueName: "test-dup", + }) + Expect(err).ToNot(HaveOccurred()) + + _, err = callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "create_queue", + QueueName: "test-dup", + }) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("already exists")) + }) + }) + + Describe("Enqueue and Task Completion", func() { + It("should enqueue a task and complete successfully", func() { + ctx := GinkgoT().Context() + + // Create queue + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "create_queue", + QueueName: "test-complete", + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue task with payload "hello" + output, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "enqueue", + QueueName: "test-complete", + Payload: []byte("hello"), + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output.TaskID).ToNot(BeEmpty()) + + taskID := output.TaskID + + // Poll until completed + Eventually(func() string { + out, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "get_task_status", + TaskID: taskID, + }) + if err != nil { + return "error" + } + return out.Status + }).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("completed")) + }) + }) + + Describe("Enqueue with Failure, No Retries", func() { + It("should fail when payload is 'fail' and maxRetries is 0", func() { + ctx := GinkgoT().Context() + + // Create queue with no retries + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "create_queue", + QueueName: "test-fail-no-retry", + Config: &testQueueConfig{ + MaxRetries: 0, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue task that will fail + output, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "enqueue", + QueueName: "test-fail-no-retry", + Payload: []byte("fail"), + }) + Expect(err).ToNot(HaveOccurred()) + + taskID := output.TaskID + + // Poll until failed + Eventually(func() string { + out, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "get_task_status", + TaskID: taskID, + }) + if err != nil { + return "error" + } + return out.Status + }).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("failed")) + }) + }) + + Describe("Enqueue with Retry Then Success", func() { + It("should retry and eventually succeed with 'fail-then-succeed' payload", func() { + ctx := GinkgoT().Context() + + // Create queue with retries and short backoff + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "create_queue", + QueueName: "test-retry-succeed", + Config: &testQueueConfig{ + MaxRetries: 2, + BackoffMs: 100, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue task that fails on attempt < 2, then succeeds + output, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "enqueue", + QueueName: "test-retry-succeed", + Payload: []byte("fail-then-succeed"), + }) + Expect(err).ToNot(HaveOccurred()) + + taskID := output.TaskID + + // Poll until completed + Eventually(func() string { + out, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "get_task_status", + TaskID: taskID, + }) + if err != nil { + return "error" + } + return out.Status + }).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("completed")) + }) + }) + + Describe("Cancel Pending Task", func() { + It("should cancel a pending task", func() { + ctx := GinkgoT().Context() + + // Create queue with concurrency=1 and a large delay between dispatches. + // The first task completes immediately (burst token), the second is dequeued + // but blocks on the rate limiter. Tasks 3+ remain in 'pending' and can be cancelled. + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "create_queue", + QueueName: "test-cancel", + Config: &testQueueConfig{ + Concurrency: 1, + DelayMs: 60000, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue several tasks - the first will complete immediately, + // the second will be dequeued but block on the rate limiter (status=running), + // the rest will stay pending. + var taskIDs []string + for i := 0; i < 5; i++ { + output, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "enqueue", + QueueName: "test-cancel", + Payload: []byte("hello"), + }) + Expect(err).ToNot(HaveOccurred()) + taskIDs = append(taskIDs, output.TaskID) + } + + // Wait for the first task to complete (it has no delay) + Eventually(func() string { + out, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "get_task_status", + TaskID: taskIDs[0], + }) + if err != nil { + return "error" + } + return out.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + + // Give the worker a moment to dequeue the second task (which will + // block on the delay) so tasks 3+ stay in 'pending' + time.Sleep(100 * time.Millisecond) + + // Cancel the last task - it should still be pending + lastTaskID := taskIDs[len(taskIDs)-1] + _, err = callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "cancel_task", + TaskID: lastTaskID, + }) + Expect(err).ToNot(HaveOccurred()) + + // Verify status is cancelled + statusOut, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "get_task_status", + TaskID: lastTaskID, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(statusOut.Status).To(Equal("cancelled")) + }) + }) + + Describe("Enqueue to Non-Existent Queue", func() { + It("should return error when enqueueing to a queue that does not exist", func() { + ctx := GinkgoT().Context() + + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "enqueue", + QueueName: "nonexistent-queue", + Payload: []byte("payload"), + }) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("does not exist")) + }) + }) + + Describe("Clear Queue", func() { + It("should clear pending tasks and return the count", func() { + ctx := GinkgoT().Context() + + // Create queue with large delay so tasks stay pending after the first completes + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "create_queue", + QueueName: "test-clear", + Config: &testQueueConfig{ + Concurrency: 1, + DelayMs: 60000, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + // Enqueue several tasks + for i := 0; i < 4; i++ { + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "enqueue", + QueueName: "test-clear", + Payload: []byte(fmt.Sprintf("task-%d", i)), + }) + Expect(err).ToNot(HaveOccurred()) + } + + // Wait for the first task to complete (burst token) + time.Sleep(200 * time.Millisecond) + + // Clear the queue — should cancel remaining pending tasks + output, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "clear_queue", + QueueName: "test-clear", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output.Cleared).To(BeNumerically(">=", int64(1))) + }) + + It("should return error for non-existent queue", func() { + ctx := GinkgoT().Context() + + _, err := callTestTaskQueue(ctx, testTaskQueueInput{ + Operation: "clear_queue", + QueueName: "nonexistent-clear", + }) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("does not exist")) + }) + }) +}) diff --git a/plugins/manager_loader.go b/plugins/manager_loader.go index 610dbd02..59f48453 100644 --- a/plugins/manager_loader.go +++ b/plugins/manager_loader.go @@ -128,6 +128,23 @@ var hostServices = []hostServiceEntry{ return host.RegisterHTTPHostFunctions(service), nil }, }, + { + name: "Task", + hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil }, + create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) { + perm := ctx.permissions.Taskqueue + maxConcurrency := int32(1) + if perm.MaxConcurrency > 0 { + maxConcurrency = int32(perm.MaxConcurrency) + } + service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency) + if err != nil { + log.Error("Failed to create Task service", "plugin", ctx.pluginName, err) + return nil, nil + } + return host.RegisterTaskHostFunctions(service), service + }, + }, } // extractManifest reads manifest from an .ndp package and computes its SHA-256 hash. diff --git a/plugins/manifest-schema.json b/plugins/manifest-schema.json index 4e64ca6e..8daf88cc 100644 --- a/plugins/manifest-schema.json +++ b/plugins/manifest-schema.json @@ -110,6 +110,9 @@ }, "users": { "$ref": "#/$defs/UsersPermission" + }, + "taskqueue": { + "$ref": "#/$defs/TaskQueuePermission" } } }, @@ -224,6 +227,23 @@ } } }, + "TaskQueuePermission": { + "type": "object", + "description": "Task queue permissions for background task processing", + "additionalProperties": false, + "properties": { + "reason": { + "type": "string", + "description": "Explanation for why task queue access is needed" + }, + "maxConcurrency": { + "type": "integer", + "description": "Maximum total concurrent workers across all queues. Default: 1", + "minimum": 1, + "default": 1 + } + } + }, "UsersPermission": { "type": "object", "description": "Users service permissions for accessing user information", diff --git a/plugins/manifest.go b/plugins/manifest.go index 3ca2657c..375e73e7 100644 --- a/plugins/manifest.go +++ b/plugins/manifest.go @@ -72,6 +72,13 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error { } } + // Task (taskqueue) permission requires TaskWorker capability + if m.Permissions != nil && m.Permissions.Taskqueue != nil { + if !hasCapability(capabilities, CapabilityTaskWorker) { + return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback) + } + } + return nil } diff --git a/plugins/manifest_gen.go b/plugins/manifest_gen.go index 27c3c067..a565ed3d 100644 --- a/plugins/manifest_gen.go +++ b/plugins/manifest_gen.go @@ -181,6 +181,9 @@ type Permissions struct { // Subsonicapi corresponds to the JSON schema field "subsonicapi". Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"` + // Taskqueue corresponds to the JSON schema field "taskqueue". + Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"` + // Users corresponds to the JSON schema field "users". Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"` @@ -200,6 +203,36 @@ type SubsonicAPIPermission struct { Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"` } +// Task queue permissions for background task processing +type TaskQueuePermission struct { + // Maximum total concurrent workers across all queues. Default: 1 + MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"` + + // Explanation for why task queue access is needed + Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(value, &raw); err != nil { + return err + } + type Plain TaskQueuePermission + var plain Plain + if err := json.Unmarshal(value, &plain); err != nil { + return err + } + if v, ok := raw["maxConcurrency"]; !ok || v == nil { + plain.MaxConcurrency = 1.0 + } + if 1 > plain.MaxConcurrency { + return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1) + } + *j = TaskQueuePermission(plain) + return nil +} + // Enable experimental WebAssembly threads support type ThreadsFeature struct { // Explanation for why threads support is needed diff --git a/plugins/pdk/go/host/doc.go b/plugins/pdk/go/host/doc.go index b801db44..5781a04c 100644 --- a/plugins/pdk/go/host/doc.go +++ b/plugins/pdk/go/host/doc.go @@ -43,6 +43,7 @@ The following host services are available: - Library: provides access to music library metadata for plugins. - Scheduler: provides task scheduling capabilities for plugins. - SubsonicAPI: provides access to Navidrome's Subsonic API from plugins. + - Task: provides persistent task queues for plugins. - Users: provides access to user information for plugins. - WebSocket: provides WebSocket communication capabilities for plugins. diff --git a/plugins/pdk/go/host/nd_host_task.go b/plugins/pdk/go/host/nd_host_task.go new file mode 100644 index 00000000..92a41c5b --- /dev/null +++ b/plugins/pdk/go/host/nd_host_task.go @@ -0,0 +1,277 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains client wrappers for the Task host service. +// It is intended for use in Navidrome plugins built with TinyGo. +// +//go:build wasip1 + +package host + +import ( + "encoding/json" + "errors" + + "github.com/navidrome/navidrome/plugins/pdk/go/pdk" +) + +// QueueConfig represents the QueueConfig data structure. +// QueueConfig holds configuration for a task queue. +type QueueConfig struct { + Concurrency int32 `json:"concurrency"` + MaxRetries int32 `json:"maxRetries"` + BackoffMs int64 `json:"backoffMs"` + DelayMs int64 `json:"delayMs"` + RetentionMs int64 `json:"retentionMs"` +} + +// TaskInfo represents the TaskInfo data structure. +// TaskInfo holds the current state of a task. +type TaskInfo struct { + Status string `json:"status"` + Message string `json:"message"` + Attempt int32 `json:"attempt"` +} + +// task_createqueue is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user task_createqueue +func task_createqueue(uint64) uint64 + +// task_enqueue is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user task_enqueue +func task_enqueue(uint64) uint64 + +// task_get is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user task_get +func task_get(uint64) uint64 + +// task_cancel is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user task_cancel +func task_cancel(uint64) uint64 + +// task_clearqueue is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user task_clearqueue +func task_clearqueue(uint64) uint64 + +type taskCreateQueueRequest struct { + Name string `json:"name"` + Config QueueConfig `json:"config"` +} + +type taskEnqueueRequest struct { + QueueName string `json:"queueName"` + Payload []byte `json:"payload"` +} + +type taskEnqueueResponse struct { + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +type taskGetRequest struct { + TaskID string `json:"taskId"` +} + +type taskGetResponse struct { + Result *TaskInfo `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +type taskCancelRequest struct { + TaskID string `json:"taskId"` +} + +type taskClearQueueRequest struct { + QueueName string `json:"queueName"` +} + +type taskClearQueueResponse struct { + Result int64 `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// TaskCreateQueue calls the task_createqueue host function. +// CreateQueue creates a named task queue with the given configuration. +// Zero-value fields in config use sensible defaults. +// If a queue with the same name already exists, returns an error. +// On startup, this also recovers any stale "running" tasks from a previous crash. +func TaskCreateQueue(name string, config QueueConfig) error { + // Marshal request to JSON + req := taskCreateQueueRequest{ + Name: name, + Config: config, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := task_createqueue(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse error-only response + var response struct { + Error string `json:"error,omitempty"` + } + if err := json.Unmarshal(responseBytes, &response); err != nil { + return err + } + if response.Error != "" { + return errors.New(response.Error) + } + return nil +} + +// TaskEnqueue calls the task_enqueue host function. +// Enqueue adds a task to the named queue. Returns the task ID. +// payload is opaque bytes passed back to the plugin on execution. +func TaskEnqueue(queueName string, payload []byte) (string, error) { + // Marshal request to JSON + req := taskEnqueueRequest{ + QueueName: queueName, + Payload: payload, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return "", err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := task_enqueue(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse the response + var response taskEnqueueResponse + if err := json.Unmarshal(responseBytes, &response); err != nil { + return "", err + } + + // Convert Error field to Go error + if response.Error != "" { + return "", errors.New(response.Error) + } + + return response.Result, nil +} + +// TaskGet calls the task_get host function. +// Get returns the current state of a task including its status, +// message, and attempt count. +func TaskGet(taskID string) (*TaskInfo, error) { + // Marshal request to JSON + req := taskGetRequest{ + TaskID: taskID, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return nil, err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := task_get(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse the response + var response taskGetResponse + if err := json.Unmarshal(responseBytes, &response); err != nil { + return nil, err + } + + // Convert Error field to Go error + if response.Error != "" { + return nil, errors.New(response.Error) + } + + return response.Result, nil +} + +// TaskCancel calls the task_cancel host function. +// Cancel cancels a pending task. Returns error if already +// running, completed, or failed. +func TaskCancel(taskID string) error { + // Marshal request to JSON + req := taskCancelRequest{ + TaskID: taskID, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := task_cancel(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse error-only response + var response struct { + Error string `json:"error,omitempty"` + } + if err := json.Unmarshal(responseBytes, &response); err != nil { + return err + } + if response.Error != "" { + return errors.New(response.Error) + } + return nil +} + +// TaskClearQueue calls the task_clearqueue host function. +// ClearQueue removes all pending tasks from the named queue. +// Running tasks are not affected. Returns the number of tasks removed. +func TaskClearQueue(queueName string) (int64, error) { + // Marshal request to JSON + req := taskClearQueueRequest{ + QueueName: queueName, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return 0, err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := task_clearqueue(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse the response + var response taskClearQueueResponse + if err := json.Unmarshal(responseBytes, &response); err != nil { + return 0, err + } + + // Convert Error field to Go error + if response.Error != "" { + return 0, errors.New(response.Error) + } + + return response.Result, nil +} diff --git a/plugins/pdk/go/host/nd_host_task_stub.go b/plugins/pdk/go/host/nd_host_task_stub.go new file mode 100644 index 00000000..4dde0e23 --- /dev/null +++ b/plugins/pdk/go/host/nd_host_task_stub.go @@ -0,0 +1,105 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains mock implementations for non-WASM builds. +// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms. +// Plugin authors can use the exported mock instances to set expectations in tests. +// +//go:build !wasip1 + +package host + +import "github.com/stretchr/testify/mock" + +// QueueConfig represents the QueueConfig data structure. +// QueueConfig holds configuration for a task queue. +type QueueConfig struct { + Concurrency int32 `json:"concurrency"` + MaxRetries int32 `json:"maxRetries"` + BackoffMs int64 `json:"backoffMs"` + DelayMs int64 `json:"delayMs"` + RetentionMs int64 `json:"retentionMs"` +} + +// TaskInfo represents the TaskInfo data structure. +// TaskInfo holds the current state of a task. +type TaskInfo struct { + Status string `json:"status"` + Message string `json:"message"` + Attempt int32 `json:"attempt"` +} + +// mockTaskService is the mock implementation for testing. +type mockTaskService struct { + mock.Mock +} + +// TaskMock is the auto-instantiated mock instance for testing. +// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...) +var TaskMock = &mockTaskService{} + +// CreateQueue is the mock method for TaskCreateQueue. +func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error { + args := m.Called(name, config) + return args.Error(0) +} + +// TaskCreateQueue delegates to the mock instance. +// CreateQueue creates a named task queue with the given configuration. +// Zero-value fields in config use sensible defaults. +// If a queue with the same name already exists, returns an error. +// On startup, this also recovers any stale "running" tasks from a previous crash. +func TaskCreateQueue(name string, config QueueConfig) error { + return TaskMock.CreateQueue(name, config) +} + +// Enqueue is the mock method for TaskEnqueue. +func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) { + args := m.Called(queueName, payload) + return args.String(0), args.Error(1) +} + +// TaskEnqueue delegates to the mock instance. +// Enqueue adds a task to the named queue. Returns the task ID. +// payload is opaque bytes passed back to the plugin on execution. +func TaskEnqueue(queueName string, payload []byte) (string, error) { + return TaskMock.Enqueue(queueName, payload) +} + +// Get is the mock method for TaskGet. +func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) { + args := m.Called(taskID) + return args.Get(0).(*TaskInfo), args.Error(1) +} + +// TaskGet delegates to the mock instance. +// Get returns the current state of a task including its status, +// message, and attempt count. +func TaskGet(taskID string) (*TaskInfo, error) { + return TaskMock.Get(taskID) +} + +// Cancel is the mock method for TaskCancel. +func (m *mockTaskService) Cancel(taskID string) error { + args := m.Called(taskID) + return args.Error(0) +} + +// TaskCancel delegates to the mock instance. +// Cancel cancels a pending task. Returns error if already +// running, completed, or failed. +func TaskCancel(taskID string) error { + return TaskMock.Cancel(taskID) +} + +// ClearQueue is the mock method for TaskClearQueue. +func (m *mockTaskService) ClearQueue(queueName string) (int64, error) { + args := m.Called(queueName) + return args.Get(0).(int64), args.Error(1) +} + +// TaskClearQueue delegates to the mock instance. +// ClearQueue removes all pending tasks from the named queue. +// Running tasks are not affected. Returns the number of tasks removed. +func TaskClearQueue(queueName string) (int64, error) { + return TaskMock.ClearQueue(queueName) +} diff --git a/plugins/pdk/go/taskworker/taskworker.go b/plugins/pdk/go/taskworker/taskworker.go new file mode 100644 index 00000000..5d09a320 --- /dev/null +++ b/plugins/pdk/go/taskworker/taskworker.go @@ -0,0 +1,79 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains export wrappers for the TaskWorker capability. +// It is intended for use in Navidrome plugins built with TinyGo. +// +//go:build wasip1 + +package taskworker + +import ( + "github.com/navidrome/navidrome/plugins/pdk/go/pdk" +) + +// TaskExecuteRequest is the request provided when a task is ready to execute. +type TaskExecuteRequest struct { + // QueueName is the name of the queue this task belongs to. + QueueName string `json:"queueName"` + // TaskID is the unique identifier for this task. + TaskID string `json:"taskId"` + // Payload is the opaque data provided when the task was enqueued. + Payload []byte `json:"payload"` + // Attempt is the current attempt number (1-based: first attempt = 1). + Attempt int32 `json:"attempt"` +} + +// TaskWorker is the marker interface for taskworker plugins. +// Implement one or more of the provider interfaces below. +// TaskWorker provides task execution handling. +// This capability allows plugins to receive callbacks when their queued tasks +// are ready to execute. Plugins that use the taskqueue host service must +// implement this capability. +type TaskWorker interface{} + +// TaskExecuteProvider provides the OnTaskExecute function. +type TaskExecuteProvider interface { + OnTaskExecute(TaskExecuteRequest) (string, error) +} // Internal implementation holders +var ( + taskExecuteImpl func(TaskExecuteRequest) (string, error) +) + +// Register registers a taskworker implementation. +// The implementation is checked for optional provider interfaces. +func Register(impl TaskWorker) { + if p, ok := impl.(TaskExecuteProvider); ok { + taskExecuteImpl = p.OnTaskExecute + } +} + +// NotImplementedCode is the standard return code for unimplemented functions. +// The host recognizes this and skips the plugin gracefully. +const NotImplementedCode int32 = -2 + +//go:wasmexport nd_task_execute +func _NdTaskExecute() int32 { + if taskExecuteImpl == nil { + // Return standard code - host will skip this plugin gracefully + return NotImplementedCode + } + + var input TaskExecuteRequest + if err := pdk.InputJSON(&input); err != nil { + pdk.SetError(err) + return -1 + } + + output, err := taskExecuteImpl(input) + if err != nil { + pdk.SetError(err) + return -1 + } + + if err := pdk.OutputJSON(output); err != nil { + pdk.SetError(err) + return -1 + } + + return 0 +} diff --git a/plugins/pdk/go/taskworker/taskworker_stub.go b/plugins/pdk/go/taskworker/taskworker_stub.go new file mode 100644 index 00000000..e45054e8 --- /dev/null +++ b/plugins/pdk/go/taskworker/taskworker_stub.go @@ -0,0 +1,41 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file provides stub implementations for non-WASM platforms. +// It allows Go plugins to compile and run tests outside of WASM, +// but the actual functionality is only available in WASM builds. +// +//go:build !wasip1 + +package taskworker + +// TaskExecuteRequest is the request provided when a task is ready to execute. +type TaskExecuteRequest struct { + // QueueName is the name of the queue this task belongs to. + QueueName string `json:"queueName"` + // TaskID is the unique identifier for this task. + TaskID string `json:"taskId"` + // Payload is the opaque data provided when the task was enqueued. + Payload []byte `json:"payload"` + // Attempt is the current attempt number (1-based: first attempt = 1). + Attempt int32 `json:"attempt"` +} + +// TaskWorker is the marker interface for taskworker plugins. +// Implement one or more of the provider interfaces below. +// TaskWorker provides task execution handling. +// This capability allows plugins to receive callbacks when their queued tasks +// are ready to execute. Plugins that use the taskqueue host service must +// implement this capability. +type TaskWorker interface{} + +// TaskExecuteProvider provides the OnTaskExecute function. +type TaskExecuteProvider interface { + OnTaskExecute(TaskExecuteRequest) (string, error) +} + +// NotImplementedCode is the standard return code for unimplemented functions. +const NotImplementedCode int32 = -2 + +// Register is a no-op on non-WASM platforms. +// This stub allows code to compile outside of WASM. +func Register(_ TaskWorker) {} diff --git a/plugins/pdk/python/host/nd_host_task.py b/plugins/pdk/python/host/nd_host_task.py new file mode 100644 index 00000000..5d6e7474 --- /dev/null +++ b/plugins/pdk/python/host/nd_host_task.py @@ -0,0 +1,188 @@ +# Code generated by ndpgen. DO NOT EDIT. +# +# This file contains client wrappers for the Task host service. +# It is intended for use in Navidrome plugins built with extism-py. +# +# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly. +# The @extism.import_fn decorators are only detected when defined in the plugin's +# main __init__.py file. Copy the needed functions from this file into your plugin. + +from dataclasses import dataclass +from typing import Any + +import extism +import json +import base64 + + +class HostFunctionError(Exception): + """Raised when a host function returns an error.""" + pass + + +@extism.import_fn("extism:host/user", "task_createqueue") +def _task_createqueue(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +@extism.import_fn("extism:host/user", "task_enqueue") +def _task_enqueue(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +@extism.import_fn("extism:host/user", "task_get") +def _task_get(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +@extism.import_fn("extism:host/user", "task_cancel") +def _task_cancel(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +@extism.import_fn("extism:host/user", "task_clearqueue") +def _task_clearqueue(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +def task_create_queue(name: str, config: Any) -> None: + """CreateQueue creates a named task queue with the given configuration. +Zero-value fields in config use sensible defaults. +If a queue with the same name already exists, returns an error. +On startup, this also recovers any stale "running" tasks from a previous crash. + + Args: + name: str parameter. + config: Any parameter. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "name": name, + "config": config, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _task_createqueue(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + + +def task_enqueue(queue_name: str, payload: bytes) -> str: + """Enqueue adds a task to the named queue. Returns the task ID. +payload is opaque bytes passed back to the plugin on execution. + + Args: + queue_name: str parameter. + payload: bytes parameter. + + Returns: + str: The result value. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "queueName": queue_name, + "payload": base64.b64encode(payload).decode("ascii"), + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _task_enqueue(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + return response.get("result", "") + + +def task_get(task_id: str) -> Any: + """Get returns the current state of a task including its status, +message, and attempt count. + + Args: + task_id: str parameter. + + Returns: + Any: The result value. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "taskId": task_id, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _task_get(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + return response.get("result", None) + + +def task_cancel(task_id: str) -> None: + """Cancel cancels a pending task. Returns error if already +running, completed, or failed. + + Args: + task_id: str parameter. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "taskId": task_id, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _task_cancel(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + + +def task_clear_queue(queue_name: str) -> int: + """ClearQueue removes all pending tasks from the named queue. +Running tasks are not affected. Returns the number of tasks removed. + + Args: + queue_name: str parameter. + + Returns: + int: The result value. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "queueName": queue_name, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _task_clearqueue(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + return response.get("result", 0) diff --git a/plugins/pdk/rust/nd-pdk-capabilities/src/lib.rs b/plugins/pdk/rust/nd-pdk-capabilities/src/lib.rs index 0f0daf80..06c2c5c0 100644 --- a/plugins/pdk/rust/nd-pdk-capabilities/src/lib.rs +++ b/plugins/pdk/rust/nd-pdk-capabilities/src/lib.rs @@ -9,4 +9,5 @@ pub mod lifecycle; pub mod metadata; pub mod scheduler; pub mod scrobbler; +pub mod taskworker; pub mod websocket; diff --git a/plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs b/plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs new file mode 100644 index 00000000..e8aa106a --- /dev/null +++ b/plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs @@ -0,0 +1,102 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains export wrappers for the TaskWorker capability. +// It is intended for use in Navidrome plugins built with extism-pdk. + +use serde::{Deserialize, Serialize}; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64; + +mod base64_bytes { + use serde::{self, Deserialize, Deserializer, Serializer}; + use base64::Engine as _; + use base64::engine::general_purpose::STANDARD as BASE64; + + pub fn serialize(bytes: &Vec, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&BASE64.encode(bytes)) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + BASE64.decode(&s).map_err(serde::de::Error::custom) + } +} + +// Helper functions for skip_serializing_if with numeric types +#[allow(dead_code)] +fn is_zero_i32(value: &i32) -> bool { *value == 0 } +#[allow(dead_code)] +fn is_zero_u32(value: &u32) -> bool { *value == 0 } +#[allow(dead_code)] +fn is_zero_i64(value: &i64) -> bool { *value == 0 } +#[allow(dead_code)] +fn is_zero_u64(value: &u64) -> bool { *value == 0 } +#[allow(dead_code)] +fn is_zero_f32(value: &f32) -> bool { *value == 0.0 } +#[allow(dead_code)] +fn is_zero_f64(value: &f64) -> bool { *value == 0.0 } +/// TaskExecuteRequest is the request provided when a task is ready to execute. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskExecuteRequest { + /// QueueName is the name of the queue this task belongs to. + #[serde(default)] + pub queue_name: String, + /// TaskID is the unique identifier for this task. + #[serde(default)] + pub task_id: String, + /// Payload is the opaque data provided when the task was enqueued. + #[serde(default)] + #[serde(with = "base64_bytes")] + pub payload: Vec, + /// Attempt is the current attempt number (1-based: first attempt = 1). + #[serde(default)] + pub attempt: i32, +} + +/// Error represents an error from a capability method. +#[derive(Debug)] +pub struct Error { + pub message: String, +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for Error {} + +impl Error { + pub fn new(message: impl Into) -> Self { + Self { message: message.into() } + } +} + +/// TaskExecuteProvider provides the OnTaskExecute function. +pub trait TaskExecuteProvider { + fn on_task_execute(&self, req: TaskExecuteRequest) -> Result; +} + +/// Register the on_task_execute export. +/// This macro generates the WASM export function for this method. +#[macro_export] +macro_rules! register_taskworker_task_execute { + ($plugin_type:ty) => { + #[extism_pdk::plugin_fn] + pub fn nd_task_execute( + req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest> + ) -> extism_pdk::FnResult> { + let plugin = <$plugin_type>::default(); + let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?; + Ok(extism_pdk::Json(result)) + } + }; +} diff --git a/plugins/pdk/rust/nd-pdk-host/src/lib.rs b/plugins/pdk/rust/nd-pdk-host/src/lib.rs index 52a3a86c..3a31bc48 100644 --- a/plugins/pdk/rust/nd-pdk-host/src/lib.rs +++ b/plugins/pdk/rust/nd-pdk-host/src/lib.rs @@ -40,6 +40,7 @@ //! - [`library`] - provides access to music library metadata for plugins. //! - [`scheduler`] - provides task scheduling capabilities for plugins. //! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins. +//! - [`task`] - provides persistent task queues for plugins. //! - [`users`] - provides access to user information for plugins. //! - [`websocket`] - provides WebSocket communication capabilities for plugins. @@ -99,6 +100,13 @@ pub mod subsonicapi { pub use super::nd_host_subsonicapi::*; } +#[doc(hidden)] +mod nd_host_task; +/// provides persistent task queues for plugins. +pub mod task { + pub use super::nd_host_task::*; +} + #[doc(hidden)] mod nd_host_users; /// provides access to user information for plugins. diff --git a/plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs b/plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs new file mode 100644 index 00000000..4f43e165 --- /dev/null +++ b/plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs @@ -0,0 +1,258 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains client wrappers for the Task host service. +// It is intended for use in Navidrome plugins built with extism-pdk. + +use extism_pdk::*; +use serde::{Deserialize, Serialize}; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64; + +mod base64_bytes { + use serde::{self, Deserialize, Deserializer, Serializer}; + use base64::Engine as _; + use base64::engine::general_purpose::STANDARD as BASE64; + + pub fn serialize(bytes: &Vec, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&BASE64.encode(bytes)) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + BASE64.decode(&s).map_err(serde::de::Error::custom) + } +} + +/// QueueConfig holds configuration for a task queue. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct QueueConfig { + pub concurrency: i32, + pub max_retries: i32, + pub backoff_ms: i64, + pub delay_ms: i64, + pub retention_ms: i64, +} + +/// TaskInfo holds the current state of a task. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskInfo { + pub status: String, + pub message: String, + pub attempt: i32, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskCreateQueueRequest { + name: String, + config: QueueConfig, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskCreateQueueResponse { + #[serde(default)] + error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskEnqueueRequest { + queue_name: String, + #[serde(with = "base64_bytes")] + payload: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskEnqueueResponse { + #[serde(default)] + result: String, + #[serde(default)] + error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskGetRequest { + task_id: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskGetResponse { + #[serde(default)] + result: Option, + #[serde(default)] + error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskCancelRequest { + task_id: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskCancelResponse { + #[serde(default)] + error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskClearQueueRequest { + queue_name: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskClearQueueResponse { + #[serde(default)] + result: i64, + #[serde(default)] + error: Option, +} + +#[host_fn] +extern "ExtismHost" { + fn task_createqueue(input: Json) -> Json; + fn task_enqueue(input: Json) -> Json; + fn task_get(input: Json) -> Json; + fn task_cancel(input: Json) -> Json; + fn task_clearqueue(input: Json) -> Json; +} + +/// CreateQueue creates a named task queue with the given configuration. +/// Zero-value fields in config use sensible defaults. +/// If a queue with the same name already exists, returns an error. +/// On startup, this also recovers any stale "running" tasks from a previous crash. +/// +/// # Arguments +/// * `name` - String parameter. +/// * `config` - QueueConfig parameter. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> { + let response = unsafe { + task_createqueue(Json(TaskCreateQueueRequest { + name: name.to_owned(), + config: config, + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(()) +} + +/// Enqueue adds a task to the named queue. Returns the task ID. +/// payload is opaque bytes passed back to the plugin on execution. +/// +/// # Arguments +/// * `queue_name` - String parameter. +/// * `payload` - Vec parameter. +/// +/// # Returns +/// The result value. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn enqueue(queue_name: &str, payload: Vec) -> Result { + let response = unsafe { + task_enqueue(Json(TaskEnqueueRequest { + queue_name: queue_name.to_owned(), + payload: payload, + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(response.0.result) +} + +/// Get returns the current state of a task including its status, +/// message, and attempt count. +/// +/// # Arguments +/// * `task_id` - String parameter. +/// +/// # Returns +/// The result value. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn get(task_id: &str) -> Result, Error> { + let response = unsafe { + task_get(Json(TaskGetRequest { + task_id: task_id.to_owned(), + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(response.0.result) +} + +/// Cancel cancels a pending task. Returns error if already +/// running, completed, or failed. +/// +/// # Arguments +/// * `task_id` - String parameter. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn cancel(task_id: &str) -> Result<(), Error> { + let response = unsafe { + task_cancel(Json(TaskCancelRequest { + task_id: task_id.to_owned(), + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(()) +} + +/// ClearQueue removes all pending tasks from the named queue. +/// Running tasks are not affected. Returns the number of tasks removed. +/// +/// # Arguments +/// * `queue_name` - String parameter. +/// +/// # Returns +/// The result value. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn clear_queue(queue_name: &str) -> Result { + let response = unsafe { + task_clearqueue(Json(TaskClearQueueRequest { + queue_name: queue_name.to_owned(), + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(response.0.result) +} diff --git a/plugins/testdata/test-taskqueue/go.mod b/plugins/testdata/test-taskqueue/go.mod new file mode 100644 index 00000000..37f857e5 --- /dev/null +++ b/plugins/testdata/test-taskqueue/go.mod @@ -0,0 +1,16 @@ +module test-taskqueue + +go 1.25 + +require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/extism/go-pdk v1.1.3 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect + github.com/stretchr/testify v1.11.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go diff --git a/plugins/testdata/test-taskqueue/go.sum b/plugins/testdata/test-taskqueue/go.sum new file mode 100644 index 00000000..af880eb5 --- /dev/null +++ b/plugins/testdata/test-taskqueue/go.sum @@ -0,0 +1,14 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ= +github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/plugins/testdata/test-taskqueue/main.go b/plugins/testdata/test-taskqueue/main.go new file mode 100644 index 00000000..9b734ed6 --- /dev/null +++ b/plugins/testdata/test-taskqueue/main.go @@ -0,0 +1,114 @@ +// Test TaskQueue plugin for Navidrome plugin system integration tests. +// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared . +package main + +import ( + "fmt" + + "github.com/navidrome/navidrome/plugins/pdk/go/host" + "github.com/navidrome/navidrome/plugins/pdk/go/pdk" + "github.com/navidrome/navidrome/plugins/pdk/go/taskworker" +) + +func init() { + taskworker.Register(&handler{}) +} + +type handler struct{} + +func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) { + payload := string(req.Payload) + if payload == "fail" { + return "", fmt.Errorf("task failed as instructed") + } + if payload == "fail-then-succeed" && req.Attempt < 2 { + return "", fmt.Errorf("transient failure") + } + return "completed successfully", nil +} + +// Test helper types +type TestInput struct { + Operation string `json:"operation"` + QueueName string `json:"queueName,omitempty"` + Config *host.QueueConfig `json:"config,omitempty"` + Payload []byte `json:"payload,omitempty"` + TaskID string `json:"taskId,omitempty"` +} + +type TestOutput struct { + TaskID string `json:"taskId,omitempty"` + Status string `json:"status,omitempty"` + Message string `json:"message,omitempty"` + Attempt int32 `json:"attempt,omitempty"` + Cleared int64 `json:"cleared,omitempty"` + Error *string `json:"error,omitempty"` +} + +//go:wasmexport nd_test_taskqueue +func ndTestTaskQueue() int32 { + var input TestInput + if err := pdk.InputJSON(&input); err != nil { + errStr := err.Error() + pdk.OutputJSON(TestOutput{Error: &errStr}) + return 0 + } + + switch input.Operation { + case "create_queue": + config := host.QueueConfig{} + if input.Config != nil { + config = *input.Config + } + err := host.TaskCreateQueue(input.QueueName, config) + if err != nil { + errStr := err.Error() + pdk.OutputJSON(TestOutput{Error: &errStr}) + return 0 + } + pdk.OutputJSON(TestOutput{}) + + case "enqueue": + taskID, err := host.TaskEnqueue(input.QueueName, input.Payload) + if err != nil { + errStr := err.Error() + pdk.OutputJSON(TestOutput{Error: &errStr}) + return 0 + } + pdk.OutputJSON(TestOutput{TaskID: taskID}) + + case "get_task_status": + info, err := host.TaskGet(input.TaskID) + if err != nil { + errStr := err.Error() + pdk.OutputJSON(TestOutput{Error: &errStr}) + return 0 + } + pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt}) + + case "cancel_task": + err := host.TaskCancel(input.TaskID) + if err != nil { + errStr := err.Error() + pdk.OutputJSON(TestOutput{Error: &errStr}) + return 0 + } + pdk.OutputJSON(TestOutput{}) + + case "clear_queue": + cleared, err := host.TaskClearQueue(input.QueueName) + if err != nil { + errStr := err.Error() + pdk.OutputJSON(TestOutput{Error: &errStr}) + return 0 + } + pdk.OutputJSON(TestOutput{Cleared: cleared}) + + default: + errStr := "unknown operation: " + input.Operation + pdk.OutputJSON(TestOutput{Error: &errStr}) + } + return 0 +} + +func main() {} diff --git a/plugins/testdata/test-taskqueue/manifest.json b/plugins/testdata/test-taskqueue/manifest.json new file mode 100644 index 00000000..3cd3b0f0 --- /dev/null +++ b/plugins/testdata/test-taskqueue/manifest.json @@ -0,0 +1,12 @@ +{ + "name": "Test TaskQueue Plugin", + "author": "Navidrome Test", + "version": "1.0.0", + "description": "A test plugin for TaskQueue integration testing", + "permissions": { + "taskqueue": { + "reason": "For testing task queue operations", + "maxConcurrency": 10 + } + } +}