/rabbitmq - RabbitMQ integration
RabbitMQ¶
Integration with RabbitMQ: configuration, topology, and handler semantics.
Overview¶
Cozy Stack can consume messages from RabbitMQ and dispatch them to Go handlers. The consumer is managed by a background manager which:
- establishes and monitors the AMQP connection (with optional TLS),
- declares exchanges and queues (if configured to do so),
- binds queues to routing keys,
- starts per-queue consumers with QoS and redelivery limits,
- dispatches deliveries to queue-specific handlers.
Configuration¶
RabbitMQ is configured in cozy.yaml
under the rabbitmq
key.
Key fields:
enabled
: Enable the consumer.url
: AMQP URL, e.g.amqp://guest:guest@localhost:5672/
.tls
: Optional TLS settings (root_ca
,insecure_skip_validation
,server_name
).exchanges[]
: List of exchanges the stack should consume from.name
: Exchange name.kind
: Exchange type (e.g.topic
).durable
: Whether the exchange is durable.declare_exchange
: If true, the exchange is declared on startup.dlx_name
,dlq_name
: Optional defaults for queues under this exchange.queues[]
: List of queues to consume.name
: Queue name.bindings[]
: Routing keys to bind to the exchange.declare
: If true, declare the queue on startup.prefetch
: Per-consumer QoS prefetch.delivery_limit
: x-delivery-limit for quorum queues.dlx_name
,dlq_name
: Optional overrides per queue.
Example:
rabbitmq: enabled: true url: amqp://guest:guest@localhost:5672/ tls: # root_ca: /etc/ssl/certs/ca.pem insecure_skip_validation: false # server_name: rabbit.internal exchanges: - name: auth kind: topic durable: true declare_exchange: true queues: - name: user.password.updated declare: true prefetch: 8 delivery_limit: 5 bindings: - password.changed - name: user.created declare: true prefetch: 8 delivery_limit: 5 bindings: - user.created
Handlers¶
Handlers implement a simple interface:
type Handler interface { Handle(ctx context.Context, d amqp.Delivery) error }
Returning nil
acknowledges the message. Returning a non-nil error causes the message to be requeued (subject to broker policies and delivery limits).
Queue names are mapped to handlers in the stack. For example:
user.password.updated
→ updates an instance passphrase when apassword.changed
routing key is received.user.created
→ validates and processes user creation events.
Message schemas are JSON and validated in the handler. Example payload for user.password.updated
:
{ "twakeId": "string", "iterations": 100000, "hash": "base64", "publicKey": "base64", "privateKey": "cipherString", "key": "cipherString", "timestamp": 1726040000, "domain": "example.cozy.cloud" }
Example payload for user.created
:
{ "twakeId": "string", "mobile": "string", "internalEmail": "string", "iterations": 100000, "hash": "base64", "publicKey": "base64", "privateKey": "cipherString", "key": "cipherString", "timestamp": 1726040000 }
Lifecycle¶
On startup, if rabbitmq.enabled
is true:
- The manager creates an AMQP connection (TLS if configured) and retries with exponential backoff.
- It declares configured exchanges and queues (if
declare_*
flags are set). - It binds queues to their routing keys and starts consumers.
- It exposes a readiness channel internally so tests can wait until consumption is active.
- It monitors the connection and restarts consumers upon reconnection.
Adding a new queue handler¶
Follow these steps to introduce a new queue and its handler.
1) Define the message schema and the handler
Create a handler type that implements the Handle(ctx, d)
method and an accompanying message struct.
// Example message payload consumed from the queue type ExampleEvent struct { ID string `json:"id"` Action string `json:"action"` Timestamp int64 `json:"timestamp"` } // ExampleHandler processes ExampleEvent messages type ExampleHandler struct{} func NewExampleHandler() *ExampleHandler { return &ExampleHandler{} } func (h *ExampleHandler) Handle(ctx context.Context, d amqp.Delivery) error { var msg ExampleEvent if err := json.Unmarshal(d.Body, &msg); err != nil { return fmt.Errorf("example: invalid payload: %w", err) } if msg.ID == "" { return fmt.Errorf("example: missing id") } // TODO: implement business logic return nil // ack }
2) Register the handler for a queue name
Map the queue name to the handler so the manager knows which handler to use. This usually happens where queues are built from config.
switch configQueue.Name { case "example.queue": handler = NewExampleHandler() }
3) Configure the exchange and queue in cozy.yaml
Add the queue under an exchange with the routing keys to bind.
rabbitmq: enabled: true url: amqp://guest:guest@localhost:5672/ exchanges: - name: auth kind: topic durable: true declare_exchange: true queues: - name: example.queue declare: true prefetch: 8 delivery_limit: 5 bindings: - example.created - example.updated
4) Publish a message (example)
ch.PublishWithContext(ctx, "auth", // exchange "example.created", // routing key false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: []byte(`{"id":"123","action":"created","timestamp":1726040000}`), }, )