/rabbitmq - RabbitMQ integration
RabbitMQ¶
Integration with RabbitMQ: configuration, topology, and handler semantics.
Overview¶
Cozy Stack can consume messages from RabbitMQ and dispatch them to Go handlers. The consumers are managed by background managers which:
- establish and monitor the AMQP connection with their node (with optional TLS),
- declare exchanges and queues (if configured),
- bind queues to routing keys,
- start per-queue consumers with QoS and redelivery limits,
- dispatch deliveries to queue-specific handlers.
Configuration¶
RabbitMQ is configured in cozy.yaml under the rabbitmq key.
Key fields:
enabled: Enable communication with RabbitMQ nodes.nodes: Map of RabbitMQ node configuration by context<context>:url: AMQP URL, e.g.amqp://guest:guest@localhost:5672/.tls: Optional TLS settings (root_ca,insecure_skip_validation,server_name).
exchanges[]: List of exchanges the stack should consume from.name: Exchange name.kind: Exchange type (e.g.topic).durable: Whether the exchange is durable.declare_exchange: If true, the exchange is declared on startup.dlx_name,dlq_name: Optional defaults for queues under this exchange.queues[]: List of queues to consume.name: Queue name.bindings[]: Routing keys to bind to the exchange.declare: If true, declare the queue on startup.prefetch: Per-consumer QoS prefetch.delivery_limit: x-delivery-limit for quorum queues.declare_dlx: If true, declare the Dead Letter Exchange (DLX) on startup.declare_dlq: If true, declare the Dead Letter Queue (DLQ) on startup.dlx_name,dlq_name: Optional overrides per queue.
Example:
rabbitmq:
enabled: true
nodes:
default:
url: amqp://guest:guest@localhost:5672/
tls:
# root_ca: /etc/ssl/certs/ca.pem
insecure_skip_validation: false
# server_name: rabbit.internal
exchanges:
- name: auth
kind: topic
durable: true
declare_exchange: true
dlx_name: auth.dlx
dlq_name: auth.dlq
queues:
- name: user.password.updated
declare: true
declare_dlx: true
declare_dlq: true
prefetch: 8
delivery_limit: 5
bindings:
- password.changed
- name: user.created
declare: true
declare_dlx: false
declare_dlq: true
prefetch: 8
delivery_limit: 5
bindings:
- user.created
Dead Letter Exchange (DLX) and Dead Letter Queue (DLQ)¶
The RabbitMQ integration supports Dead Letter Exchange (DLX) and Dead Letter Queue (DLQ) functionality for handling failed messages:
- DLX (Dead Letter Exchange): An exchange where messages are sent when they cannot be delivered to their original destination.
- DLQ (Dead Letter Queue): A queue bound to the DLX that receives failed messages for analysis or reprocessing.
Configuration¶
DLX and DLQ can be configured at both exchange and queue levels:
- Exchange level: Set
dlx_nameanddlq_namein the exchange configuration to provide defaults for all queues under that exchange. - Queue level: Set
dlx_nameanddlq_namein the queue configuration to override exchange defaults. - Declaration control: Use
declare_dlxanddeclare_dlqflags to control whether the DLX and DLQ are automatically declared on startup.
Example with DLX/DLQ¶
rabbitmq:
enabled: true
nodes:
default:
url: amqp://guest:guest@localhost:5672/
exchanges:
- name: auth
kind: topic
durable: true
declare_exchange: true
dlx_name: auth.dlx
dlq_name: auth.dlq
queues:
- name: user.password.updated
declare: true
declare_dlx: true
declare_dlq: true
prefetch: 8
delivery_limit: 5
bindings:
- password.changed
- name: user.created
declare: true
declare_dlx: false
declare_dlq: true
dlq_name: user.created.dlq # Override exchange default
prefetch: 8
delivery_limit: 5
bindings:
- user.created
Behavior¶
- When
declare_dlx: trueand adlx_nameis provided, the DLX is declared as a fanout exchange on startup. - When
declare_dlq: trueand adlq_nameis provided, the DLQ is declared and bound to the DLX on startup. - If queue-level
dlx_name/dlq_nameare not specified, exchange-level defaults are used. - Messages that exceed the
delivery_limitor are rejected will be sent to the DLX and routed to the DLQ.
Handlers¶
Handlers implement a simple interface:
type Handler interface {
Handle(ctx context.Context, d amqp.Delivery) error
}
Returning nil acknowledges the message. Returning a non-nil error causes the message to be requeued (subject to broker policies and delivery limits).
Queue names are mapped to handlers in the stack. For example:
user.password.updated→ updates an instance passphrase when apassword.changedrouting key is received.user.created→ validates and processes user creation events.user.phone.updated→ updates the phone number stored in user settings.
Message schemas are JSON and validated in the handler. Example payload for user.password.updated:
{
"twakeId": "string",
"iterations": 100000,
"hash": "base64",
"publicKey": "base64",
"privateKey": "cipherString",
"key": "cipherString",
"timestamp": 1726040000,
"domain": "example.cozy.cloud"
}
Example payload for user.created:
{
"twakeId": "string",
"mobile": "string",
"internalEmail": "string",
"iterations": 100000,
"hash": "base64",
"publicKey": "base64",
"privateKey": "cipherString",
"key": "cipherString",
"timestamp": 1726040000
}
Example payload for user.phone.updated:
{
"twakeId": "string",
"mobile": "+33123456789",
"internalEmail": "string",
"workplaceFqdn": "example.twake.app"
}
Lifecycle¶
On startup, if rabbitmq.enabled is true:
- The RabbitMQ service starts a manager for every context with configuration.
- The managers create an AMQP connection with their node (using TLS if configured) and retry with exponential backoff.
- They declare configured exchanges and queues (if
declare_*flags are set). - They declare Dead Letter Exchanges and Dead Letter Queues (if
declare_dlx/declare_dlqflags are set). - They bind queues to their routing keys and start consumers.
- They expose a readiness channel internally so tests can wait until consumption is active.
- They monitor the connection and restart consumers upon reconnection.
Adding a new queue handler¶
Follow these steps to introduce a new queue and its handler.
1) Define the message schema and the handler
Create a handler type that implements the Handle(ctx, d) method and an accompanying message struct.
// Example message payload consumed from the queue
type ExampleEvent struct {
ID string `json:"id"`
Action string `json:"action"`
Timestamp int64 `json:"timestamp"`
}
// ExampleHandler processes ExampleEvent messages
type ExampleHandler struct{}
func NewExampleHandler() *ExampleHandler { return &ExampleHandler{} }
func (h *ExampleHandler) Handle(ctx context.Context, d amqp.Delivery) error {
var msg ExampleEvent
if err := json.Unmarshal(d.Body, &msg); err != nil {
return fmt.Errorf("example: invalid payload: %w", err)
}
if msg.ID == "" {
return fmt.Errorf("example: missing id")
}
// TODO: implement business logic
return nil // ack
}
2) Register the handler for a queue name
Map the queue name to the handler so the manager knows which handler to use. This usually happens where queues are built from config.
switch configQueue.Name {
case "example.queue":
handler = NewExampleHandler()
}
3) Configure the exchange and queue in cozy.yaml
Add the queue under an exchange with the routing keys to bind.
rabbitmq:
enabled: true
nodes:
default:
url: amqp://guest:guest@localhost:5672/
exchanges:
- name: auth
kind: topic
durable: true
declare_exchange: true
queues:
- name: example.queue
declare: true
prefetch: 8
delivery_limit: 5
bindings:
- example.created
- example.updated
4) Publish a message (example)
ch.PublishWithContext(ctx,
"auth", // exchange
"example.created", // routing key
false, false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: []byte(`{"id":"123","action":"created","timestamp":1726040000}`),
},
)