Queue Drivers
Queue drivers adapt generated sdk.QueueJob handlers to broker-specific
delivery.
Install only the queue driver your backend uses:
go get github.com/TDB-Group/anvil-queue/memorygo get github.com/TDB-Group/anvil-queue/redisgo get github.com/TDB-Group/anvil-queue/natsgo get github.com/TDB-Group/anvil-queue/sqsAll official queue drivers implement sdk.QueueTransport. They register
generated jobs through RegisterQueueJob and execute HandleQueue middleware
before the job handler.
Redis, NATS, and SQS drivers fill a nil ErrorPipeline from the owning Anvil
app when installed through Driver. Their DriverFrom helpers install an
existing transport and set that transport’s error pipeline to the app pipeline.
The memory driver has no error pipeline because failures are returned directly
from Publish, Dispatch, or its testbed runner.
Each broker-backed Driver(...) and New(...) constructor accepts at most one
options value. More than one options value is stored as a startup/configuration
error and the transport fails before it begins consuming messages.
Queue transports are process-lifecycle objects, not restartable worker pools.
Create a new transport for a new app run. Start blocks until shutdown or a
configuration failure, and Shutdown signals the running transport. Official
queue drivers guard startup and shutdown with one-shot state.
Job Shape
Section titled “Job Shape”Queue jobs embed sdk.QueueJob:
type ReindexProjects struct { sdk.QueueJob `queue:"projects.reindex"`
Store project.Store `inject:""`}
func (j *ReindexProjects) Handle(ctx context.Context, message sdk.QueueMessage) error { return j.Store.Reindex(ctx, message.ID)}Generated metadata includes the queue name, job name, handler, and middleware chain.
All official queue drivers reject generated jobs with an empty queue name, an
empty job name, a nil handler, duplicate (queue, job) registration, or a
middleware value that does not implement sdk.QueueMiddleware.
Message Contract
Section titled “Message Contract”Drivers convert broker messages into sdk.QueueMessage:
type QueueMessage struct { ID string Queue string Body []byte Headers map[string]string Attempt int}The Queue field is set by the driver before delivery. Drivers clone message
bodies and headers before passing them to handlers so application code does not
mutate broker-owned buffers.
Driver-specific message IDs are documented below. Treat QueueMessage.ID as a
driver-provided identifier; Redis, NATS, JetStream, SQS, and the memory driver
each expose the identifier that belongs to their delivery model.
Middleware
Section titled “Middleware”Queue middleware uses HandleQueue:
func (QueueTrace) HandleQueue(ctx sdk.QueueCtx) error { message := ctx.Message() _ = message
return ctx.Next()}HandleQueue wraps the generated job handler after the selected queue driver
has converted a broker message into sdk.QueueMessage. It is not a broker ack
callback. Ack, nack, retry, and visibility behavior stay in the selected
driver.
Attach queue middleware to a parent group:
type V1 struct { sdk.Group `path:"/v1"` _ sdk.Use[middleware.QueueTrace]
Reindex *jobs.ReindexProjects}Generated queue middleware is group-scoped. Put job-wide middleware on the nearest group that contains the job. The SDK has no per-job queue policy marker.
ctx.Next() can be called once. Calling it twice returns an internal
sdk.Failure.
Queue middleware is useful for delivery logging, idempotency, tenant loading, retry metadata, and dead-letter preparation. Broker-specific behavior stays in the driver or application composition root.
Queue middleware has no response body to shape. Put delivery setup before
ctx.Next() and retry, audit, or cleanup logic after it returns. Returning an
error leaves the outcome to the selected queue driver:
- Memory joins and returns handler errors to the caller.
- Redis publishes a mapped handler error and does not acknowledge the stream entry.
- Core NATS publishes a mapped handler error; core NATS itself has no ack or retry contract.
- NATS JetStream publishes a mapped handler error and negatively acknowledges the message.
- SQS publishes a mapped handler error and leaves the message in SQS.
Queue drivers recover panics from middleware and handlers and turn them into
ordinary errors before applying the driver-specific delivery outcome above.
For broker-backed drivers, those recovered panics are published as handler-phase
failures, so event.Recovered is false. The memory driver has no error
pipeline and returns the recovered error to the caller.
Read Middleware for execution order and
sdk.QueueCtx.
Memory Driver
Section titled “Memory Driver”The memory driver is deterministic and in-process. It is intended for tests, local development, and as the reference queue implementation.
queue := memory.New()app := anvil.New(memory.DriverFrom(queue))Behavior:
Publish(ctx, queue, message)dispatches the message to every job registered on that queue.Dispatch(ctx, queue, job, message)sends the message to one registered job.Publishrequires a non-nil context, a non-empty queue name, and at least one registered job for the queue.Dispatchrequires a non-nil context and a registered(queue, job)route.- Jobs are sorted by job name before
Publishdispatches them. - Handler errors are joined and returned to the caller.
Startblocks untilShutdowncloses the transport. It is one-shot; after shutdown, create a new memory transport for another run.- Message bodies and headers are cloned before handler execution.
- The memory driver has no error pipeline. Handler and middleware failures are
returned to the caller of
PublishorDispatch.
Middleware runs before the selected job handler. If middleware returns without
calling ctx.Next(), the job handler does not run and the returned error value
decides the dispatch result. A nil error means the dispatch succeeded.
The memory driver also provides the in-process queue testbed runner:
report, err := memory.RunTestbed(ctx, suite, queue)RunTestbed dispatches each queue case to the named queue and job through
Dispatch. It marks the case as passed when the presence or absence of a
handler error matches the case expectation. A nil testbed context is replaced
with context.Background() before cases run.
Redis Driver
Section titled “Redis Driver”The Redis driver uses Redis Streams.
app := anvil.New( redisqueue.Driver(redisqueue.Options{ ClientOptions: &redis.Options{Addr: "127.0.0.1:6379"}, ConsumerGroup: "portal", ConsumerName: "api-1", ReadCount: 10, ReadBlock: time.Second, }),)Behavior:
- Defaults to
127.0.0.1:6379when no client or client options are supplied. - Provide either
ClientorClientOptions, not both. - Defaults
ConsumerGroupandConsumerNametoanvilafter trimming whitespace. - Defaults
ReadCountto10andReadBlockto1s. ReadCount: 0andReadBlock: 0select those defaults.ReadCountandReadBlockcannot be negative.- Each generated job subscribes with consumer group
<ConsumerGroup>.<JobName>. - Handler errors are mapped and published to the error pipeline. Successful handlers acknowledge the Redis stream entry; failed handlers leave the entry available for the configured retry behavior.
Publishrequires a non-nil context, appends to one Redis stream, and returns the Redis message ID.Start(addr)overrides the Redis address when the driver is usingClientOptionsand the broker has not been opened. Caller-owned Redis clients keep their own address.Publishopens the Redis broker on first use when the broker is not already open. If application code publishes beforeStart, a laterStart(addr)uses the existing broker and does not change its address.Startfails when no queue jobs are registered.- Redis read errors inside the consume loop are retried after a one-second delay. Handler errors are not acknowledged.
Shutdowncloses active Redis subscriptions and the driver-owned Redis client from the runningStartpath. A caller-owned Redis client is not closed.
Redis middleware runs after a stream entry is converted into sdk.QueueMessage
and before the generated job handler. If middleware returns an error, the
driver maps and publishes that error, and the Redis stream entry is left
unacknowledged.
Message fields:
- Body is stored in the
bodyfield. QueueMessage.IDis stored in the Redisidfield when it is non-empty. Incoming messages use that field when present and fall back to the Redis stream entry ID when it is absent.QueueMessage.Attemptis stored only when it is greater than zero. Incoming negative or invalid attempt values become0.- Headers are stored with a
header.prefix.
NATS Driver
Section titled “NATS Driver”The NATS driver can use core NATS subjects or JetStream.
app := anvil.New( natsqueue.Driver(natsqueue.Options{ URL: nats.DefaultURL, QueueGroup: "portal", }),)Behavior:
Start(addr)usesaddras the NATS URL when the broker has not been opened. If the broker already exists,Startkeeps the existing connection.Connectionlets the application provide a caller-owned*nats.Conn.ConnectOptionsare passed tonats.Connect.ConnectOptionscannot contain nil options.QueueGroupis trimmed. An empty queue group disables core NATS queue subscriptions.New()with no options defaultsQueueGrouptoanvil.Driver()without options andDriver(Options{})leaveQueueGroupempty because the app constructor injects only the shared error pipeline. SetQueueGroupexplicitly when core NATS queue-subscription behavior is required in an Anvil app.QueueGroupcontrols core NATS queue subscriptions. When it is empty, the driver uses a normal NATS subscription instead of a queue subscription.- When
QueueGroupis set, each generated job subscribes with queue group<QueueGroup>.<JobName>. - Handler errors are mapped and published to the error pipeline.
Publishsends one message to the NATS subject named by the queue.Publishrequires a non-nil context and sends one message to the NATS subject named by the queue.- Core NATS
PublishcallsFlushWithContext, so the publish context controls how long the driver waits for NATS to flush the message. JetStream wraps the context withOperationTimeout. Publishopens the NATS broker on first use when the broker is not already open. If application code publishes beforeStart, a laterStart(addr)uses the existing broker and does not change its URL.Startfails when no queue jobs are registered.- Core NATS invokes job handlers with
context.Background(). Use message headers or injected services for delivery metadata that must survive broker boundaries. Shutdownunsubscribes active subscriptions and drains a driver-owned NATS connection from the runningStartpath. A caller-owned connection is not closed by the core NATS broker.
Core NATS message mapping:
QueueMessage.Bodyis copied tonats.Msg.Data.QueueMessage.Headersare copied to NATS headers.QueueMessage.IDis stored in theAnvil-Message-Idheader when it is non-empty.- Incoming core NATS messages read
QueueMessage.IDonly from theAnvil-Message-Idheader. There is no generated fallback ID for core NATS messages without that header. QueueMessage.Attemptis stored in theAnvil-Attemptheader when it is greater than zero.- Incoming headers include all NATS headers. When a header has multiple values, Anvil keeps the first value.
- Incoming negative or invalid attempt values become
0.
NATS middleware runs after the message is converted into sdk.QueueMessage.
For core NATS, handler and middleware errors are mapped and published, but core
NATS does not provide an ack or retry decision for the driver to control. For
JetStream, the same returned error causes the driver to Nak the message.
When QueueGroup is empty, no <QueueGroup>.<JobName> value is sent to core
NATS. JetStream still derives durable consumer names from the queue group when
one is provided, and falls back to anvil when it needs a non-empty durable
token.
JetStream is enabled with Options.JetStream:
app := anvil.New( natsqueue.Driver(natsqueue.Options{ JetStream: &natsqueue.JetStreamOptions{ Stream: "PROJECTS", StreamConfig: natsjs.StreamConfig{ Subjects: []string{"projects.*"}, }, AutoCreateStream: true, }, }),)JetStream behavior:
- Consumer name, durable name, and filter subject are derived from the generated route so jobs stay deterministic.
- Consumer names are sanitized for JetStream token rules. Long names are truncated to JetStream’s supported length and get a hash suffix so the generated name stays deterministic.
- Ack policy is explicit.
- Handler success calls
Ack. - Handler failure calls
Nak. OperationTimeoutdefaults to5s.- Negative
OperationTimeoutvalues are rejected. StreamandStreamConfig.Namecan both be set only when they match.- When
AutoCreateStreamis true, stream subjects must be configured. - JetStream, publish, and consume option slices cannot contain nil options.
- JetStream consumer drain waits up to
OperationTimeoutduring unsubscribe; if the drain does not complete, the driver stops the consumer and returns an error. PublishusesQueueMessage.IDas the JetStream message ID when it is non-empty.- Incoming JetStream messages read
QueueMessage.IDfrom theAnvil-Message-Idheader. When producers need the de-duplication ID visible to handlers, they should also put that value in the Anvil message ID header. - JetStream delivery attempts come from message metadata. If metadata is not
available,
QueueMessage.Attemptis0.
SQS Driver
Section titled “SQS Driver”The SQS driver uses a caller-owned AWS SQS client.
app := anvil.New( sqsqueue.Driver(sqsqueue.Options{ Client: awsSQSClient, QueueURLs: map[string]string{ "projects.reindex": "https://sqs.eu-west-1.amazonaws.com/123/projects-reindex", }, WaitTimeSeconds: 10, MaxMessages: 10, }),)Behavior:
Clientis required.QueueURLsmaps Anvil queue names to SQS queue URLs.WaitTimeSecondsdefaults to10; setting it to0selects that default. Explicit non-zero values must be between1and20.MaxMessagesdefaults to10; setting it to0selects that default. Explicit non-zero values must be between1and10.PollIntervaldefaults to250ms; setting it to0selects that default.VisibilityTimeoutcannot be negative.- Polling starts one goroutine per queue.
- A successful handler deletes the SQS message.
- A failed handler leaves the message in SQS and publishes a mapped error event.
Publishrequires a non-nil context, a non-empty queue name, a non-empty message body, and a configured queue URL.Startfails when no queue jobs are registered or a registered queue has no configured URL.- Receive/delete/routing failures publish transport-phase error events.
Startignores the public Anvil address. SQS is a polling transport, not a listener.Shutdowncancels polling. It does not delete in-flight messages itself; normal SQS visibility timeout behavior decides when unprocessed messages are visible again.Startis one-shot. After shutdown, create a new SQS transport for another app run.
SQS middleware runs after a received message is routed to a generated job. A nil result from middleware and handler deletes the SQS message. Any returned error is mapped and published, and the message remains in SQS for normal visibility-timeout redelivery.
When more than one job is registered on the same queue, the SQS message must
include the Anvil-Job message attribute. If only one job is registered for a
queue, the driver can route messages without that attribute.
Publish sends:
- Message body from
QueueMessage.Body - Message attributes from
QueueMessage.Headers Anvil-Message-IdfromQueueMessage.ID- FIFO deduplication ID from
QueueMessage.IDwhen present - FIFO message group ID from the
Anvil-Message-Group-Idheader orDefaultMessageGroupID
Incoming SQS messages expose ApproximateReceiveCount as
QueueMessage.Attempt. Missing, invalid, or negative values become 0.
Incoming QueueMessage.ID is the SQS MessageId; the Anvil message ID remains
available in QueueMessage.Headers["Anvil-Message-Id"] when the SQS message
attribute exists.
Error Events
Section titled “Error Events”Broker-backed drivers publish mapped error events for handler failures.
SQS also publishes transport-phase events for receive/delete/routing failures that happen before a job handler runs. Redis and NATS publish handler-phase events when job execution fails.