Skip to content

Queue Drivers

Queue drivers adapt generated sdk.QueueJob handlers to broker-specific delivery.

Install only the queue driver your backend uses:

Terminal window
go get github.com/TDB-Group/anvil-queue/memory
go get github.com/TDB-Group/anvil-queue/redis
go get github.com/TDB-Group/anvil-queue/nats
go get github.com/TDB-Group/anvil-queue/sqs

All official queue drivers implement sdk.QueueTransport. They register generated jobs through RegisterQueueJob and execute HandleQueue middleware before the job handler.

Redis, NATS, and SQS drivers fill a nil ErrorPipeline from the owning Anvil app when installed through Driver. Their DriverFrom helpers install an existing transport and set that transport’s error pipeline to the app pipeline. The memory driver has no error pipeline because failures are returned directly from Publish, Dispatch, or its testbed runner.

Each broker-backed Driver(...) and New(...) constructor accepts at most one options value. More than one options value is stored as a startup/configuration error and the transport fails before it begins consuming messages.

Queue transports are process-lifecycle objects, not restartable worker pools. Create a new transport for a new app run. Start blocks until shutdown or a configuration failure, and Shutdown signals the running transport. Official queue drivers guard startup and shutdown with one-shot state.

Queue jobs embed sdk.QueueJob:

type ReindexProjects struct {
sdk.QueueJob `queue:"projects.reindex"`
Store project.Store `inject:""`
}
func (j *ReindexProjects) Handle(ctx context.Context, message sdk.QueueMessage) error {
return j.Store.Reindex(ctx, message.ID)
}

Generated metadata includes the queue name, job name, handler, and middleware chain.

All official queue drivers reject generated jobs with an empty queue name, an empty job name, a nil handler, duplicate (queue, job) registration, or a middleware value that does not implement sdk.QueueMiddleware.

Drivers convert broker messages into sdk.QueueMessage:

type QueueMessage struct {
ID string
Queue string
Body []byte
Headers map[string]string
Attempt int
}

The Queue field is set by the driver before delivery. Drivers clone message bodies and headers before passing them to handlers so application code does not mutate broker-owned buffers.

Driver-specific message IDs are documented below. Treat QueueMessage.ID as a driver-provided identifier; Redis, NATS, JetStream, SQS, and the memory driver each expose the identifier that belongs to their delivery model.

Queue middleware uses HandleQueue:

func (QueueTrace) HandleQueue(ctx sdk.QueueCtx) error {
message := ctx.Message()
_ = message
return ctx.Next()
}

HandleQueue wraps the generated job handler after the selected queue driver has converted a broker message into sdk.QueueMessage. It is not a broker ack callback. Ack, nack, retry, and visibility behavior stay in the selected driver.

Attach queue middleware to a parent group:

type V1 struct {
sdk.Group `path:"/v1"`
_ sdk.Use[middleware.QueueTrace]
Reindex *jobs.ReindexProjects
}

Generated queue middleware is group-scoped. Put job-wide middleware on the nearest group that contains the job. The SDK has no per-job queue policy marker.

ctx.Next() can be called once. Calling it twice returns an internal sdk.Failure.

Queue middleware is useful for delivery logging, idempotency, tenant loading, retry metadata, and dead-letter preparation. Broker-specific behavior stays in the driver or application composition root.

Queue middleware has no response body to shape. Put delivery setup before ctx.Next() and retry, audit, or cleanup logic after it returns. Returning an error leaves the outcome to the selected queue driver:

  • Memory joins and returns handler errors to the caller.
  • Redis publishes a mapped handler error and does not acknowledge the stream entry.
  • Core NATS publishes a mapped handler error; core NATS itself has no ack or retry contract.
  • NATS JetStream publishes a mapped handler error and negatively acknowledges the message.
  • SQS publishes a mapped handler error and leaves the message in SQS.

Queue drivers recover panics from middleware and handlers and turn them into ordinary errors before applying the driver-specific delivery outcome above. For broker-backed drivers, those recovered panics are published as handler-phase failures, so event.Recovered is false. The memory driver has no error pipeline and returns the recovered error to the caller.

Read Middleware for execution order and sdk.QueueCtx.

The memory driver is deterministic and in-process. It is intended for tests, local development, and as the reference queue implementation.

queue := memory.New()
app := anvil.New(memory.DriverFrom(queue))

Behavior:

  • Publish(ctx, queue, message) dispatches the message to every job registered on that queue.
  • Dispatch(ctx, queue, job, message) sends the message to one registered job.
  • Publish requires a non-nil context, a non-empty queue name, and at least one registered job for the queue.
  • Dispatch requires a non-nil context and a registered (queue, job) route.
  • Jobs are sorted by job name before Publish dispatches them.
  • Handler errors are joined and returned to the caller.
  • Start blocks until Shutdown closes the transport. It is one-shot; after shutdown, create a new memory transport for another run.
  • Message bodies and headers are cloned before handler execution.
  • The memory driver has no error pipeline. Handler and middleware failures are returned to the caller of Publish or Dispatch.

Middleware runs before the selected job handler. If middleware returns without calling ctx.Next(), the job handler does not run and the returned error value decides the dispatch result. A nil error means the dispatch succeeded.

The memory driver also provides the in-process queue testbed runner:

report, err := memory.RunTestbed(ctx, suite, queue)

RunTestbed dispatches each queue case to the named queue and job through Dispatch. It marks the case as passed when the presence or absence of a handler error matches the case expectation. A nil testbed context is replaced with context.Background() before cases run.

The Redis driver uses Redis Streams.

app := anvil.New(
redisqueue.Driver(redisqueue.Options{
ClientOptions: &redis.Options{Addr: "127.0.0.1:6379"},
ConsumerGroup: "portal",
ConsumerName: "api-1",
ReadCount: 10,
ReadBlock: time.Second,
}),
)

Behavior:

  • Defaults to 127.0.0.1:6379 when no client or client options are supplied.
  • Provide either Client or ClientOptions, not both.
  • Defaults ConsumerGroup and ConsumerName to anvil after trimming whitespace.
  • Defaults ReadCount to 10 and ReadBlock to 1s.
  • ReadCount: 0 and ReadBlock: 0 select those defaults.
  • ReadCount and ReadBlock cannot be negative.
  • Each generated job subscribes with consumer group <ConsumerGroup>.<JobName>.
  • Handler errors are mapped and published to the error pipeline. Successful handlers acknowledge the Redis stream entry; failed handlers leave the entry available for the configured retry behavior.
  • Publish requires a non-nil context, appends to one Redis stream, and returns the Redis message ID.
  • Start(addr) overrides the Redis address when the driver is using ClientOptions and the broker has not been opened. Caller-owned Redis clients keep their own address.
  • Publish opens the Redis broker on first use when the broker is not already open. If application code publishes before Start, a later Start(addr) uses the existing broker and does not change its address.
  • Start fails when no queue jobs are registered.
  • Redis read errors inside the consume loop are retried after a one-second delay. Handler errors are not acknowledged.
  • Shutdown closes active Redis subscriptions and the driver-owned Redis client from the running Start path. A caller-owned Redis client is not closed.

Redis middleware runs after a stream entry is converted into sdk.QueueMessage and before the generated job handler. If middleware returns an error, the driver maps and publishes that error, and the Redis stream entry is left unacknowledged.

Message fields:

  • Body is stored in the body field.
  • QueueMessage.ID is stored in the Redis id field when it is non-empty. Incoming messages use that field when present and fall back to the Redis stream entry ID when it is absent.
  • QueueMessage.Attempt is stored only when it is greater than zero. Incoming negative or invalid attempt values become 0.
  • Headers are stored with a header. prefix.

The NATS driver can use core NATS subjects or JetStream.

app := anvil.New(
natsqueue.Driver(natsqueue.Options{
URL: nats.DefaultURL,
QueueGroup: "portal",
}),
)

Behavior:

  • Start(addr) uses addr as the NATS URL when the broker has not been opened. If the broker already exists, Start keeps the existing connection.
  • Connection lets the application provide a caller-owned *nats.Conn.
  • ConnectOptions are passed to nats.Connect.
  • ConnectOptions cannot contain nil options.
  • QueueGroup is trimmed. An empty queue group disables core NATS queue subscriptions.
  • New() with no options defaults QueueGroup to anvil.
  • Driver() without options and Driver(Options{}) leave QueueGroup empty because the app constructor injects only the shared error pipeline. Set QueueGroup explicitly when core NATS queue-subscription behavior is required in an Anvil app.
  • QueueGroup controls core NATS queue subscriptions. When it is empty, the driver uses a normal NATS subscription instead of a queue subscription.
  • When QueueGroup is set, each generated job subscribes with queue group <QueueGroup>.<JobName>.
  • Handler errors are mapped and published to the error pipeline.
  • Publish sends one message to the NATS subject named by the queue.
  • Publish requires a non-nil context and sends one message to the NATS subject named by the queue.
  • Core NATS Publish calls FlushWithContext, so the publish context controls how long the driver waits for NATS to flush the message. JetStream wraps the context with OperationTimeout.
  • Publish opens the NATS broker on first use when the broker is not already open. If application code publishes before Start, a later Start(addr) uses the existing broker and does not change its URL.
  • Start fails when no queue jobs are registered.
  • Core NATS invokes job handlers with context.Background(). Use message headers or injected services for delivery metadata that must survive broker boundaries.
  • Shutdown unsubscribes active subscriptions and drains a driver-owned NATS connection from the running Start path. A caller-owned connection is not closed by the core NATS broker.

Core NATS message mapping:

  • QueueMessage.Body is copied to nats.Msg.Data.
  • QueueMessage.Headers are copied to NATS headers.
  • QueueMessage.ID is stored in the Anvil-Message-Id header when it is non-empty.
  • Incoming core NATS messages read QueueMessage.ID only from the Anvil-Message-Id header. There is no generated fallback ID for core NATS messages without that header.
  • QueueMessage.Attempt is stored in the Anvil-Attempt header when it is greater than zero.
  • Incoming headers include all NATS headers. When a header has multiple values, Anvil keeps the first value.
  • Incoming negative or invalid attempt values become 0.

NATS middleware runs after the message is converted into sdk.QueueMessage. For core NATS, handler and middleware errors are mapped and published, but core NATS does not provide an ack or retry decision for the driver to control. For JetStream, the same returned error causes the driver to Nak the message.

When QueueGroup is empty, no <QueueGroup>.<JobName> value is sent to core NATS. JetStream still derives durable consumer names from the queue group when one is provided, and falls back to anvil when it needs a non-empty durable token.

JetStream is enabled with Options.JetStream:

app := anvil.New(
natsqueue.Driver(natsqueue.Options{
JetStream: &natsqueue.JetStreamOptions{
Stream: "PROJECTS",
StreamConfig: natsjs.StreamConfig{
Subjects: []string{"projects.*"},
},
AutoCreateStream: true,
},
}),
)

JetStream behavior:

  • Consumer name, durable name, and filter subject are derived from the generated route so jobs stay deterministic.
  • Consumer names are sanitized for JetStream token rules. Long names are truncated to JetStream’s supported length and get a hash suffix so the generated name stays deterministic.
  • Ack policy is explicit.
  • Handler success calls Ack.
  • Handler failure calls Nak.
  • OperationTimeout defaults to 5s.
  • Negative OperationTimeout values are rejected.
  • Stream and StreamConfig.Name can both be set only when they match.
  • When AutoCreateStream is true, stream subjects must be configured.
  • JetStream, publish, and consume option slices cannot contain nil options.
  • JetStream consumer drain waits up to OperationTimeout during unsubscribe; if the drain does not complete, the driver stops the consumer and returns an error.
  • Publish uses QueueMessage.ID as the JetStream message ID when it is non-empty.
  • Incoming JetStream messages read QueueMessage.ID from the Anvil-Message-Id header. When producers need the de-duplication ID visible to handlers, they should also put that value in the Anvil message ID header.
  • JetStream delivery attempts come from message metadata. If metadata is not available, QueueMessage.Attempt is 0.

The SQS driver uses a caller-owned AWS SQS client.

app := anvil.New(
sqsqueue.Driver(sqsqueue.Options{
Client: awsSQSClient,
QueueURLs: map[string]string{
"projects.reindex": "https://sqs.eu-west-1.amazonaws.com/123/projects-reindex",
},
WaitTimeSeconds: 10,
MaxMessages: 10,
}),
)

Behavior:

  • Client is required.
  • QueueURLs maps Anvil queue names to SQS queue URLs.
  • WaitTimeSeconds defaults to 10; setting it to 0 selects that default. Explicit non-zero values must be between 1 and 20.
  • MaxMessages defaults to 10; setting it to 0 selects that default. Explicit non-zero values must be between 1 and 10.
  • PollInterval defaults to 250ms; setting it to 0 selects that default.
  • VisibilityTimeout cannot be negative.
  • Polling starts one goroutine per queue.
  • A successful handler deletes the SQS message.
  • A failed handler leaves the message in SQS and publishes a mapped error event.
  • Publish requires a non-nil context, a non-empty queue name, a non-empty message body, and a configured queue URL.
  • Start fails when no queue jobs are registered or a registered queue has no configured URL.
  • Receive/delete/routing failures publish transport-phase error events.
  • Start ignores the public Anvil address. SQS is a polling transport, not a listener.
  • Shutdown cancels polling. It does not delete in-flight messages itself; normal SQS visibility timeout behavior decides when unprocessed messages are visible again.
  • Start is one-shot. After shutdown, create a new SQS transport for another app run.

SQS middleware runs after a received message is routed to a generated job. A nil result from middleware and handler deletes the SQS message. Any returned error is mapped and published, and the message remains in SQS for normal visibility-timeout redelivery.

When more than one job is registered on the same queue, the SQS message must include the Anvil-Job message attribute. If only one job is registered for a queue, the driver can route messages without that attribute.

Publish sends:

  • Message body from QueueMessage.Body
  • Message attributes from QueueMessage.Headers
  • Anvil-Message-Id from QueueMessage.ID
  • FIFO deduplication ID from QueueMessage.ID when present
  • FIFO message group ID from the Anvil-Message-Group-Id header or DefaultMessageGroupID

Incoming SQS messages expose ApproximateReceiveCount as QueueMessage.Attempt. Missing, invalid, or negative values become 0. Incoming QueueMessage.ID is the SQS MessageId; the Anvil message ID remains available in QueueMessage.Headers["Anvil-Message-Id"] when the SQS message attribute exists.

Broker-backed drivers publish mapped error events for handler failures.

SQS also publishes transport-phase events for receive/delete/routing failures that happen before a job handler runs. Redis and NATS publish handler-phase events when job execution fails.