rabbitmqctl status显示 broker 状态:运行中的应用、版本、OS 和 Erlang 版本、内存/磁盘用量。
rabbitmqctl status
rabbitmqctl status --formatter=json
RabbitMQ 速查,90+ 条命令和模式,涵盖队列、交换机、集群和常见坑。
rabbitmqctl status显示 broker 状态:运行中的应用、版本、OS 和 Erlang 版本、内存/磁盘用量。
rabbitmqctl status
rabbitmqctl status --formatter=json
rabbitmqctl list_queues name messages consumers unacked_messages列出队列及消息数、消费者数、未确认消息数。加 -p /vhost 限定 vhost。
rabbitmqctl list_queues name messages consumers
rabbitmqctl list_queues -p /production name messages consumers unacked_messages memory
rabbitmqctl list_exchanges name type durable auto_delete列出所有交换机及类型、持久化、自动删除标志。预声明的系统交换机名称为空。
rabbitmqctl list_exchanges name type durable
rabbitmqctl list_exchanges -p /myapp name type
rabbitmqctl list_bindings source_name source_kind destination_name routing_key列出所有交换机到队列的绑定关系及路由键,用于排查路由不匹配问题。
rabbitmqctl list_bindings
rabbitmqctl list_bindings -p /myapp source_name destination_name routing_key
rabbitmqctl list_connections name state user vhost recv_oct send_oct列出所有 AMQP 连接,含状态、用户、vhost、收发字节数。用 state 字段找僵死/阻塞连接。
连接状态为 "blocked" 表示 broker 触发了资源告警(内存或磁盘)。客户端正在被限流。
rabbitmqctl list_connections name state user
rabbitmqctl list_connections name state user vhost recv_oct send_oct
rabbitmqctl list_channels name number user connection state consumer_count列出所有 AMQP 通道。每条连接可以复用多个通道。单连接通道数过多是代码异味。
rabbitmqctl list_channels
rabbitmqctl list_channels name number consumer_count
rabbitmqctl list_consumers queue_name consumer_tag ack_required prefetch_count列出所有消费者及队列名、消费者标签、确认模式、预取数。
rabbitmqctl list_consumers
rabbitmqctl list_consumers queue_name consumer_tag ack_required prefetch_count
rabbitmqctl add_user <username> <password>创建新 RabbitMQ 用户。新用户在任何 vhost 上都没有权限。
rabbitmqctl add_user alice secretpassword
rabbitmqctl add_user ci-agent "$(openssl rand -base64 32)"
rabbitmqctl set_user_tags <username> <tag...>设置管理界面角色。Tags: administrator(全权)、monitoring(只读)、management(基础界面)、policymaker(仅策略)。
rabbitmqctl set_user_tags alice administrator
rabbitmqctl set_user_tags ci-agent monitoring
rabbitmqctl set_user_tags alice # clears all tags
rabbitmqctl set_permissions -p <vhost> <user> <conf> <write> <read>授予用户在 vhost 上的权限。三个正则:configure(声明/删除)、write(发布)、read(消费/获取)。`".*"` 匹配所有。
rabbitmqctl set_permissions -p / alice ".*" ".*" ".*"
rabbitmqctl set_permissions -p /prod alice "^app-" "^app-" "^app-"
rabbitmqctl list_users列出所有用户及其 tag。
rabbitmqctl list_users
rabbitmqctl delete_user <username>删除用户及其所有权限。
rabbitmqctl delete_user alice
rabbitmqctl change_password <username> <newpassword>修改指定用户的登录密码,立即生效。
rabbitmqctl change_password alice newpass123
rabbitmqctl list_permissions -p <vhost>列出某 vhost 下所有权限条目。
rabbitmqctl list_permissions -p /
rabbitmqctl list_permissions -p /production
rabbitmqctl add_vhost <vhost>创建虚拟主机。vhost 在租户或环境之间提供逻辑隔离。
rabbitmqctl add_vhost /production
rabbitmqctl add_vhost /staging
rabbitmqctl list_vhosts列出 broker 上所有已创建的虚拟主机。
rabbitmqctl list_vhosts
rabbitmqctl list_vhosts name tracing
rabbitmqctl delete_vhost <vhost>删除 vhost 及其所有队列、交换机、绑定和策略。
这是不可逆的。如需备份,先用 export_definitions 导出定义。
rabbitmqctl delete_vhost /staging
rabbitmqctl purge_queue <queuename>清空队列所有消息,但保留队列本身。
清空是永久的,消息无法恢复。生产环境跑之前确认你真的想这么做。
rabbitmqctl purge_queue payment-queue
rabbitmqctl purge_queue -p /prod orders
rabbitmqctl delete_queue <queuename>删除队列及其所有消息。
rabbitmqctl delete_queue old-test-queue
rabbitmqctl delete_queue -p /prod deadletter
rabbitmqctl delete_exchange <exchangename>删除交换机。已路由到队列的飞行中消息不受影响。
rabbitmqctl delete_exchange old-events
rabbitmqctl delete_exchange -p /prod notifications
rabbitmqctl export_definitions /path/to/defs.json把所有 broker 定义(vhost、队列、交换机、绑定、用户、策略)导出为 JSON 文件。用于备份和迁移。
rabbitmqctl export_definitions /tmp/rabbitmq-backup.json
rabbitmqctl export_definitions - | jq . # stream to stdout
rabbitmqctl import_definitions /path/to/defs.json从 JSON 文件导入 broker 定义。与现有配置合并,不会删除文件里没有的内容。
rabbitmqctl import_definitions /tmp/rabbitmq-backup.json
curl -s http://localhost:15672/api/definitions -u guest:guest | rabbitmqctl import_definitions -
rabbitmqctl cluster_status显示集群成员、节点类型和分区状态。健康集群的 "partitions" 字段应始终为空。
rabbitmqctl cluster_status
rabbitmqctl cluster_status --formatter=json | jq .partitions
rabbitmqctl join_cluster rabbit@<nodename>加入现有集群。必须先跑 `rabbitmqctl stop_app`。节点须通过 Erlang cookie 和主机名可达。
rabbitmqctl stop_app && rabbitmqctl join_cluster rabbit@rabbit2 && rabbitmqctl start_app
rabbitmqctl join_cluster --ram rabbit@rabbit1 # RAM node (not recommended)
rabbitmqctl forget_cluster_node rabbit@<deadnode>从集群元数据中移除永久离线的节点。只在该节点不可能再上线时使用。
只有在确定该节点永远不会再加入时才执行。移除还持有仲裁队列数据的节点会导致数据丢失。
rabbitmqctl forget_cluster_node rabbit@dead-node
rabbitmqctl forget_cluster_node --offline rabbit@dead-node
rabbitmqctl reset危险:把节点重置为出厂状态,清除所有数据、定义和集群成员关系。需要先执行 stop_app。
不可逆。所有队列、交换机、绑定、消息、用户(除默认外)全部消失。
rabbitmqctl stop_app && rabbitmqctl reset && rabbitmqctl start_app
rabbitmq-plugins enable <plugin>启用插件。大多数插件无需重启即时生效。常用插件:rabbitmq_management、rabbitmq_shovel、rabbitmq_federation、rabbitmq_stream。
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
rabbitmq-plugins list列出所有插件及启用/禁用状态。[E] 表示显式启用,[e] 表示作为依赖启用。
rabbitmq-plugins list
rabbitmq-plugins list --enabled
rabbitmq-diagnostics check_running健康检查:broker 运行且内部健康检查通过时退出 0,否则非零。适合做就绪探针。
rabbitmq-diagnostics check_running && echo "healthy"
rabbitmq-diagnostics check_port_connectivity && rabbitmq-diagnostics check_running
rabbitmq-diagnostics memory_breakdown按类别显示内存用量(队列、连接、通道、绑定、mnesia 等)。诊断内存压力的必备命令。
rabbitmq-diagnostics memory_breakdown
rabbitmq-diagnostics memory_breakdown --formatter=json | jq .
rabbitmq-diagnostics check_port_connectivity验证 AMQP、管理界面和集群端口是否可达。
rabbitmq-diagnostics check_port_connectivity
rabbitmqctl set_policy <name> <pattern> <definition> --apply-to <queues|exchanges|all>对名称匹配正则的所有队列/交换机应用策略。策略优先于每个队列的参数,且可实时修改。
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' --apply-to queuesrabbitmqctl set_policy ttl-1h "^transient\." '{"message-ttl":3600000}' --apply-to queuesrabbitmqctl set_policy max-length "^bounded\." '{"max-length":10000,"overflow":"reject-publish-dlx"}' --apply-to queuesrabbitmqctl eval '<Erlang expression>'在运行中的节点上执行 Erlang 表达式。适合标准 CLI 命令未暴露的一次性生产诊断。
仅在紧急情况下谨慎使用——错误的 Erlang 表达式可能导致节点崩溃。日常运维优先使用 rabbitmq-diagnostics 命令。
rabbitmqctl eval 'rabbit_nodes:all_running().'
rabbitmqctl eval 'length(rabbit_amqqueue:list()).'
rabbitmqctl eval 'mnesia:system_info(all).'
Direct exchange (type: direct)按绑定键与消息路由键的精确匹配路由消息。一个键→一个(或多个)有相同绑定的队列。
# Declare: channel.exchangeDeclare("events", "direct", true)# Bind: channel.queueBind("order-created-queue", "events", "order.created")# Publish: channel.basicPublish("events", "order.created", null, body)Topic exchange (type: topic)用通配符模式路由。`*` 匹配且仅匹配一个点分隔的单词。`#` 匹配零个或多个单词。如 `user.#` 能匹配 `user.login`、`user.profile.updated`。
# Binding "*.error" catches order.error, payment.error but not order.payment.error
# Binding "order.#" catches order.created, order.item.added, order.fulfilled
# Binding "#" catches everything (same as fanout but slower)
Fanout exchange (type: fanout)把每条消息的副本发给所有绑定的队列,完全忽略路由键。适合广播/发布-订阅场景。
路由键被忽略,不小心把队列绑到 fanout 交换机上就会静默接收所有消息。
# Declare: channel.exchangeDeclare("broadcast", "fanout", true)# Each subscriber declares its own queue and binds it — no routing key needed
# Publish: channel.basicPublish("broadcast", "", null, body) // key ignoredHeaders exchange (type: headers)按消息头的键值对路由,而非路由键。绑定时指定所需的头值,以及 x-match: all(AND)或 any(OR)。
# Bind with: {"format":"pdf","type":"report","x-match":"all"}# Matches messages with BOTH format=pdf AND type=report headers
Default exchange (name: "")内置的无名 direct 交换机。每个队列都以自身名字作为路由键自动绑定上去。点对点时很方便,但无法做 pub/sub。
默认交换机看起来是捷径,但把发送者和队列名耦合在一起了。用命名交换机才能做到真正解耦。
# channel.basicPublish("", "my-queue", null, body) sends direct to my-queueDead Letter Exchange (x-dead-letter-exchange)接收源队列无法投递的消息的交换机:TTL 到期、队列满(溢出模式为 reject)、或消费者 nack 且 requeue=false。
# Declare source queue with DLX:
# channel.queueDeclare("orders", true, false, false, {"x-dead-letter-exchange": "dlx", "x-dead-letter-routing-key": "orders.dead"})# Bind a "parking lot" queue to dlx with routing key "orders.dead"
Alternate Exchange (x-alternate-exchange)接收无法路由(没有队列匹配路由键)的消息的交换机。防止路由不匹配时静默丢消息。
# Declare exchange with AE:
# channel.exchangeDeclare("main", "topic", true, false, {"alternate-exchange": "unrouted"})# channel.exchangeDeclare("unrouted", "fanout", true)Classic queue标准 AMQP 队列,由 Mnesia 支撑。除非配置经典镜像策略(3.9 弃用,4.0 移除)否则不做复制。仅用于不重要的临时工作负载。
# channel.queueDeclare("task-queue", true, false, false, null)# durable=true means the queue survives broker restart
Quorum queue (x-queue-type: quorum)用 Raft 共识复制的队列。少数节点故障不丢数据。推荐用于所有生产队列。支持 DLX、单消息 TTL、x-max-length 和投递次数限制。
仲裁队列始终持久化。不支持优先级队列或全局 QoS。每个队列每个节点选一个 leader,小集群上避免几百个队列。
# channel.queueDeclare("orders", true, false, false, {"x-queue-type": "quorum"})# Set leader election strategy via policy: {"queue-leader-locator": "balanced"}Stream queue (x-queue-type: stream)不可变追加日志。消费者可从任意偏移量重放。按配置的大小或时间保留数据,而非等消费者 ack。需要 rabbitmq_stream 插件。
# channel.queueDeclare("events", true, false, false, {"x-queue-type": "stream", "x-max-age": "7D", "x-stream-max-segment-size-bytes": 536870912})# Consumer subscribes with offset: {"x-stream-offset": "first"} or {"x-stream-offset": <timestamp>}Message TTL (x-message-ttl)队列级别的 TTL,单位毫秒。超时消息被死信化(如果配了 DLX)或丢弃。在队列声明时设置。
# {"x-message-ttl": 60000} // 60 seconds# Or via policy: rabbitmqctl set_policy ttl "^cache\." '{"message-ttl":5000}' --apply-to queuesPer-message TTL (expiration header)单消息的过期时间,单位毫秒,设在消息属性里。如果队列级别 TTL 也设了,取两者中较小的。
# AMQP props: { expiration: "30000" } // 30 seconds# Note: expiration is a STRING in the AMQP spec, not an integer
Queue TTL (x-expires)队列没有消费者且没有消息投递 N 毫秒后自动删除。适合自动清理临时的回复队列。
# {"x-expires": 3600000} // queue disappears after 1 hour idlePublisher confirms (channel.confirmSelect)Broker 逐条确认发布的消息。basic.ack = 已接受(持久化消息已写盘)。basic.nack = 已丢弃(队列满、资源告警)。实现至少一次发布保证。
单消息确认有 5–10% 吞吐损耗。批量确认(发 N 条等 N 个 ack)能找回大部分性能。
# channel.confirmSelect();
# channel.waitForConfirmsOrDie(5000); // Java: wait up to 5s for all outstanding acks
# async: channel.addConfirmListener((tag, multiple) => { /* handle ack */ }, (tag, multiple) => { /* handle nack */ })Consumer acknowledgement modes三种确认模式:basic.ack(成功,从队列删除)、basic.nack(失败,可选 requeue=true/false)、basic.reject(同 nack 但只针对单条)。手动 ack 始终比 auto-ack 安全。
basic.nack 配 requeue=true 会立即重投,如果处理始终失败,消息会无限循环。改为路由到 DLX。
# channel.basicAck(deliveryTag, false)
# channel.basicNack(deliveryTag, false, false) // nack, no requeue → DLX
# channel.basicReject(deliveryTag, false) // reject single message
Basic.QoS / prefetch (channel.basicQos)限制同时推给消费者的未确认消息数。不设的话 broker 会把消息都塞给最快的消费者。在 basicConsume 前设置。作用范围:每通道(默认)或每消费者。
prefetch=0 表示不限制。prefetch=1 表示消费者一次只处理一条消息——严格顺序时正确,高吞吐时性能差。
# channel.basicQos(10) // at most 10 unacked messages per channel
# channel.basicQos(1, true) // prefetch=1, global=true (applies per-channel not per-consumer)
Durable queue + delivery-mode=2 (persistent message)消息要在 broker 重启后存活,必须同时满足两个条件:(1) 队列声明为 durable,(2) 消息发布时 delivery-mode=2。
持久化队列里的非持久化消息(delivery-mode=1)在 broker 重启后会消失。这是最常见的持久化误区。
# queue: durable=true
# message props: deliveryMode=2
# AMQP props: { deliveryMode: 2, contentType: "application/json" }x-max-length and x-overflow按消息数(x-max-length)或字节数(x-max-length-bytes)限制队列深度。x-overflow 控制达到上限时的行为:drop-head(默认,丢弃最旧)、reject-publish(nack 发布者)、reject-publish-dlx(nack 发布者 + 被丢消息走 DLX)。
# {"x-max-length": 10000, "x-overflow": "reject-publish"}# Via policy: rabbitmqctl set_policy bounded "^bnd\." '{"max-length":5000,"overflow":"reject-publish-dlx"}' --apply-to queuesPriority queue (x-max-priority)最多 255 个优先级的队列。数字越大优先级越高。消息必须携带数字型 `priority` 属性。不配消费者优先级的话,优先级只在单消费者时有效。
x-max-priority > 5 浪费内存——RabbitMQ 为每个优先级级别创建内部队列。大多数场景只需 2–5 个级别。
# {"x-max-priority": 5}# Publish high-priority: { priority: 5 }# Publish normal: { priority: 1 }Auto-delete and exclusive queues自动删除:最后一个消费者断连后队列自动删除。独占:仅对声明它的连接可见,连接关闭后删除。两者都适合临时工作或 RPC 回复队列。
# exclusive=true: channel.queueDeclare("", false, true, true, null) // "" = server-generated name# auto-delete=true: channel.queueDeclare("reply-12345", false, false, true, null)Work queue (competing consumers)多个消费者订阅同一个队列。broker 轮询分发消息。每条消息恰好由一个消费者处理。后台任务水平扩展的经典模式。
# Declare one shared durable queue
# Start N worker processes each calling channel.basicConsume("tasks", ...)# Set prefetch=1 to prevent work imbalance: channel.basicQos(1)
Pub/Sub with fanout exchange每个订阅者声明自己的独占队列并绑定到 fanout 交换机。所有订阅者独立接收每条消息。适合事件广播。
# Publisher: channel.basicPublish("events-fanout", "", null, body)# Subscriber A: queueDeclare("", ..., true, true, ...) → queueBind(q, "events-fanout", "")# Subscriber B: queueDeclare("", ..., true, true, ...) → queueBind(q, "events-fanout", "")Topic routing (wildcard routing key)用 topic 交换机让消费者按模式订阅。如日志消费者绑定 `#.error` 接收所有服务的错误;审计消费者绑定 `order.#` 接收所有订单事件。
# channel.exchangeDeclare("events", "topic", true)# Bind error queue: channel.queueBind("errors", "events", "#.error")# Bind order queue: channel.queueBind("order-audit", "events", "order.#")# Publish: channel.basicPublish("events", "order.payment.failed", null, body)RPC over AMQP (reply_to + correlation_id)客户端发消息到服务端队列,携带 reply_to=<独占队列名> 和唯一 correlation_id。服务端处理后把响应发到 reply_to 队列,携带相同 correlation_id。
AMQP RPC 是合法模式但有延迟。只有在栈里已有 RabbitMQ 且需要请求路由或背压时才用它代替 HTTP。
# Client: channel.basicPublish("", "rpc-server-queue", {replyTo: "amq.rabbitmq.reply-to", correlationId: uuid}, body)# Use "amq.rabbitmq.reply-to" as the reply-to address for the fast direct reply protocol
# Server: on message, channel.basicPublish("", msg.properties.replyTo, {correlationId: msg.properties.correlationId}, responseBody)Dead letter queue (DLQ parking lot)源队列把失败/过期消息发给 DLX。绑定到 DLX 的"停车场"队列保存这些消息供检查、重放或手动丢弃。运维团队用它调试故障而不丢消息。
# 1. Declare DLX: channel.exchangeDeclare("dlx", "direct", true)# 2. Bind parking lot: channel.queueDeclare("parking-lot", true, ...); channel.queueBind("parking-lot", "dlx", "parking")# 3. Source queue: {"x-dead-letter-exchange": "dlx", "x-dead-letter-routing-key": "parking"}# 4. When consumer nacks with requeue=false, message lands in parking-lot
Exponential backoff retry via TTL + DLX chain链路:source-queue → nack → dlx → wait-1s-queue(TTL=1000,DLX→source) → 到期 → 回到 source。加 wait-10s-queue 做第二次重试。用消息头追踪尝试次数。
# wait-1s-queue: {"x-dead-letter-exchange": "main-exchange", "x-message-ttl": 1000}# wait-10s-queue: {"x-dead-letter-exchange": "main-exchange", "x-message-ttl": 10000}# After max retries, nack to a final parking-lot queue instead
Delayed message (per-message TTL + DLX)给消息设 expiration,路由到无消费者的暂存队列。TTL 到期后 DLX 重新路由到实际目标。
生产环境用 rabbitmq-delayed-message-exchange 插件更干净,它按节点存储消息,支持精确延迟,不用创建额外队列。
# Install: rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# channel.exchangeDeclare("delayed", "x-delayed-message", true, false, {"x-delayed-type": "direct"})# Publish with header: {"x-delay": 5000} // delay 5 secondsShovel plugin (move messages between brokers)Shovel 动态地把消息从源队列/交换机移到同一个或不同 broker 上的目标。用于跨数据中心复制或 vhost 迁移。
# Enable: rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
# Set shovel via policy or management API: PUT /api/parameters/shovel/%2F/my-shovel
# {"src-uri":"amqp://","src-queue":"old-queue","dest-uri":"amqp://remote-host","dest-queue":"new-queue"}Idempotent consumer (message deduplication)处理前把 message-id(或业务键)存入 Redis/DB。如果已存在,ack 后跳过。这把至少一次投递变成有效一次处理。
# 1. Receive message with unique messageId in properties
# 2. if redis.set(messageId, "1", NX, EX, 86400): process and ack
# 3. else: ack without processing (already handled)
Outbox pattern (atomic write + publish)和业务变更在同一个 DB 事务里把事件写入 outbox 表。后台轮询器读 outbox 并发布到 RabbitMQ。保证服务在 DB 写入后、发布前崩溃也不丢消息。
# 1. BEGIN TRANSACTION
# 2. UPDATE orders SET status="paid" WHERE id=123
# 3. INSERT INTO outbox(event_type, payload) VALUES ("order.paid", "...")# 4. COMMIT
# 5. Poller: SELECT * FROM outbox WHERE published_at IS NULL; → publish; UPDATE outbox SET published_at=NOW()
Circuit breaker via x-overflow: reject-publish在源队列设 x-max-length + x-overflow: reject-publish。队列达到容量上限时,发布者收到 nack 并应停止发送(熔断器开)。消费者清空队列后,发布者重新收到 ack(熔断器关)。
# {"x-max-length": 1000, "x-overflow": "reject-publish"}# Publisher must handle nack: pause publishing or fall back to local buffer
Event sourcing with stream queues用流式队列做不可变的事件日志。消费者从任意偏移量订阅(first、last、特定时间戳或字节偏移)。多个消费组独立重放互不干扰。
# {"x-queue-type": "stream", "x-max-age": "30D"}# Subscribe from beginning: consumer args {"x-stream-offset": "first"}# Subscribe from timestamp: consumer args {"x-stream-offset": <unix-timestamp-ms>}Blue/green deployment via vhosts把流量路由到 /blue 或 /green vhost。新版本对 /green 部署,/blue 继续服务生产。切换负载均衡器或应用配置指向 /green;保留 /blue 用于即时回滚。
# rabbitmqctl add_vhost /green
# rabbitmqctl set_permissions -p /green deploy ".*" ".*" ".*"
# Configure app to use AMQP_VHOST=/green for the new deployment
autoAck=true → silent data loss on consumer crashautoAck=true 时 broker 投递消息的瞬间就把它从队列删除。消费者在处理完之前崩溃或抛出异常,消息就永久消失了。
始终用手动 ack。成功处理后才调 basicAck。失败时调 basicNack/basicReject。
# Wrong: channel.basicConsume("tasks", true, callback) // autoAck=true# Right: channel.basicConsume("tasks", false, callback) // manual ack# In callback: try { process(msg); channel.basicAck(tag, false); } catch { channel.basicNack(tag, false, false); }Missing prefetch → memory balloon + consumer starvation没有预取限制时,broker 把所有排队消息都推给第一个可用的消费者。这个消费者在内存里缓存了成千上万条消息,而其他消费者空闲等待。
在 channel.basicConsume() 前始终调 channel.basicQos()。工作队列:预取 10–100。慢处理:预取 1。
# channel.basicQos(10) // then channel.basicConsume(...)
Re-using a channel after an error closes itAMQP 通道在 broker 报错后会失效(权限不对、声明冲突的队列、ack 未知的投递 tag)。channel 级别出错后必须创建新通道,旧通道已永久关闭。
Channel 错误在日志里看起来像连接错误。通过错误码区分:4xx 是 channel 错误,5xx 是连接错误。
# Wrong: ignore the error and keep using the same channel object
# Right: listen for channel.close event; create a new channel in the handler
# channel.on("error", (err) => { channel = connection.createChannel(); })Classic mirrored queues (deprecated since 3.9)经典镜像队列(ha-policy)在网络分区下不可靠,同步时可能丢消息,节点故障需要手动运维介入。3.9 中弃用,4.0 中移除。
现在就迁移到仲裁队列。用 x-queue-type: quorum 声明。它们自动处理脑裂和节点故障。
# Remove ha-all policy: rabbitmqctl clear_policy ha-all
# Migrate: drain classic queue, delete it, redeclare with x-queue-type: quorum
Durable queue + non-persistent messages → data lost on restart持久化队列在 broker 重启后存活,但消息不会,除非 delivery-mode=2(持久化)。持久化队列里的 delivery-mode=1 消息在重启后悄悄消失。
两个条件必须同时满足:queue.durable=true 且 message.deliveryMode=2。检查发布代码,不只是队列声明。
# Wrong: channel.basicPublish("ex", "key", null, body) // default deliveryMode=1# Right: channel.basicPublish("ex", "key", {deliveryMode: 2}, body)Memory watermark alarm blocks ALL publishers当 broker 内存用量超过高水位线(默认可用内存的 40%)时,所有进入发布路径的连接都被阻塞。broker 向客户端发送 Connection.Blocked。
症状:发布操作无限期挂起,消费者还在正常消费。修复:加内存、提高水位线或加速消费。跑 `rabbitmq-diagnostics memory_breakdown` 找原因。
# Temporary relief: rabbitmqctl set_vm_memory_high_watermark 0.6
# Check current: rabbitmq-diagnostics memory_breakdown
# Long-term: add RAM, enable lazy queues (page to disk), or reduce queue depth
Too many connections (connection storm)每条 AMQP 连接在 broker 上消耗约 100KB 内存和一个文件描述符。连接风暴(如每个 Lambda 调用都创建新连接)会迅速耗尽 broker 资源。
用连接池或单例连接模式。每进程共享一条连接,通过该连接上的多个通道多路复用工作。
# Wrong: create a new connection per message (serverless anti-pattern)
# Right: create one connection at startup; reuse across all handlers
# Check: rabbitmqctl list_connections | wc -l
Missing heartbeat → ghost connections accumulate没有心跳的话,一条因防火墙超时或 NAT 表清除而失去底层路由的 TCP 连接在双方看来都是活的,直到有人尝试发送。broker 上积累的僵尸连接永远不会消失。
始终协商心跳。RabbitMQ 默认 60 秒。大多数客户端默认也是 60 秒。显式设置以避免不匹配。
# Node.js amqplib: amqp.connect("amqp://localhost?heartbeat=60")# Java Spring AMQP: factory.setRequestedHeartbeat(60)
# Check ghost conns: rabbitmqctl list_connections name state // look for "blocking" with no traffic
Routing key mismatch → silent message black-hole如果用一个不匹配任何绑定的路由键发布消息,且没有配置备用交换机,消息会被静默丢弃。没有错误,没有日志条目。
发布时使用 mandatory 标志。mandatory=true 时,不可路由的消息会触发 basic.return 回调,而不是悄悄消失。
# channel.basicPublish("events", "wrong.key", {mandatory: true}, body)# channel.on("return", (msg) => { console.error("Unroutable:", msg.fields.routingKey) })# Or configure AE: {"alternate-exchange": "unrouted-ae"}nack with requeue=true → infinite retry loopbasicNack(tag, false, requeue=true) 把消息立即重新投递到队列前面。如果处理始终失败,消息以 100% CPU 无限循环,阻塞新消息。
不要无条件使用 requeue=true。用消息头计数重试次数;达到最大次数后,用 requeue=false nack,把消息送到 DLX。
# Wrong: channel.basicNack(tag, false, true) // will loop on poison message
# Right: check x-death header count; after 3 retries: channel.basicNack(tag, false, false)
Not setting DLX → failed messages silently vanish没有配置 DLX 的话,过期或 requeue=false 被拒绝的消息会被 broker 丢弃。你对丢失的内容和原因一无所知。
处理重要工作的队列始终配置 DLX。路由到停车场队列或另一个处理队列。绝不让消息悄悄消失。
# Minimum DLX setup:
# channel.exchangeDeclare("dlx", "fanout", true)# channel.queueDeclare("dead-letters", true, false, false, null)# channel.queueBind("dead-letters", "dlx", "")# source queue: {"x-dead-letter-exchange": "dlx"}Connection vs channel confusion连接是到 broker 的一个 TCP socket。通道是在其上多路复用的轻量级虚拟连接。每进程一条连接;用多个通道实现并行。
通道在大多数客户端库里不是线程安全的。每个线程创建一个通道。开关通道很便宜,开关连接很昂贵。
# Wrong: share one channel across multiple threads
# Right: one connection → N channels (one per thread/goroutine/coroutine)
Publishing with transactions instead of publisher confirmsAMQP 事务(tx.select / tx.commit)提供精确一次语义,但代价是发布确认吞吐量的 5–10 倍。改用确认模式——以极小的代价实现至少一次保证。
事务还会阻塞通道直到提交,增加延迟。实践中,发布确认 + 幂等消费者能以远好得多的吞吐量实现同样的有效保证。
# Avoid: channel.txSelect(); channel.basicPublish(...); channel.txCommit();
# Use: channel.confirmSelect(); channel.basicPublish(...); channel.waitForConfirmsOrDie();
Pitfall: ignoring connection-level errors很多 SDK 分别暴露通道级和连接级错误事件。通道在协议错误时关闭,底层连接可能仍存活。不监听连接错误会导致 broker 重启或网络抖动时静默丢消息。
在连接对象(而非仅通道)上注册 `on("error")` 处理程序,并在连接层面实现重连逻辑(指数退避),而不只是重开通道。
// Bad: only handles channel errors
channel.on('error', (err) => console.error('channel error', err));// Good: handle both
connection.on('error', (err) => { reconnect(); });channel.on('error', (err) => { reopenChannel(); });可搜索的 RabbitMQ 速查,90+ 条,覆盖日常 RabbitMQ 工作的方方面面。 CLI 一节:rabbitmqctl 管理队列、交换机、绑定、连接、通道、用户、权限、 vhost、清空队列、导出/导入定义、集群管理(加入、查状态、移除死节点、 重置),以及 rabbitmq-plugins(启用 management/shovel/federation/streams)和 rabbitmq-diagnostics(check_running、memory_breakdown、 check_port_connectivity)。概念一节:全部四种交换机类型(direct、topic、 fanout、headers)、默认交换机及为何不适合做 pub/sub、死信交换机(DLX)和 备用交换机(AE)、经典/仲裁/流式三种队列类型、消息 TTL 和队列 TTL、 发布确认、消费者确认模式(ack/nack/reject)、Basic.QoS 预取、持久化队列配 合 delivery-mode=2、自动删除和独占队列、x-max-length 与溢出策略、优先级 队列。模式一节:竞争消费者工作队列、fanout pub/sub、direct 路由、topic 通配路由、带 reply_to/correlation_id 的 AMQP RPC、死信队列停车场、TTL+DLX 循环实现指数退避重试、延迟消息、铲子(Shovel)和联邦(Federation)、幂等消 费者、Outbox 模式、x-overflow 实现熔断、用流做事件溯源、Saga 协调。常见 坑一节:auto-ack 数据丢失、缺少预取导致内存爆炸、channel 出错后复用、经典 镜像队列 vs 仲裁队列、非持久化消息放持久化队列、内存水位报警停止所有 发布者、连接风暴、缺心跳、路由键黑洞、DLX 配置错误。每条都含中英文 双语说明、可直接复制的命令或配置、真实场景背景。搜索跨命令/说明/例子/ 坑四个维度一起过滤,分类胶囊切 CLI / 概念 / 模式 / 常见坑。 完全在浏览器里跑,零请求,断网可用。
把内容粘贴或拖入工具面板。
点击按钮,在浏览器内本地处理,文件不上传。
一键复制结果或下载到本地。
适合穿插在写代码、查问题、做 Review、上线前的小任务里。
这些入口会把当前任务接到更完整的工具链里。
告警响了,支付处理队列有 5 万条未确认消息,消费者数降为零。你翻开速查, 跑 `rabbitmqctl list_queues name messages consumers unacked_messages` 确认, 再跑 `rabbitmqctl list_connections name state` 找到那个僵死连接。 cluster_status 条目确认节点还活着,memory_breakdown 条目指向真正的元凶: 一个没关闭 channel 就崩掉的消费者。
团队到处都是 ha-all 的经典镜像队列,要升到 3.9 以上得先迁。速查里仲裁 队列概念条目给你 x-queue-type 声明方法,export_definitions 条目让你先给 当前拓扑打个快照。按维护窗口逐队列迁移,每迁一个就用 cluster_status 确 认复制状态。
你的订单处理器有时会碰到返回 503 的下游 API,需要带退避的重试,不能无限 循环。速查里"TTL + DLX 指数退避"模式条目给你完整的队列链:order-queue → reject → dlx → retry-1s → TTL 到期 → dlx → retry-10s → … → dead-letter-停车场。DLX 概念条目和 x-dead-letter-exchange 参数条目给你 所有需要的原语。
声明了 durable=true 的队列,却用 deliveryMode=1(瞬态)发布消息。消息在重启后存活需要两个条件同时满足:队列持久化 AND 消息 delivery-mode=2(持久化)。
在 RabbitMQ 3.9+ 里继续用经典镜像队列(ha-policy)。这个特性已弃用,在网络分区下不可靠,4.0 中已彻底移除。改用仲裁队列。
开了 autoAck=true 然后做耗时处理。消费者收到消息后崩了、处理还没完成,消息就没了。始终用手动确认,处理成功后再 ack。
每条命令、每个概念、每种模式、每个坑,全都在页面加载时一次性放进静态内存 数组里。搜索框、分类胶囊和复制按钮完全在你的浏览器里运行。你搜的、拷的、 打的任何内容都不会发到服务器,也不会写进 URL。打开 DevTools 的 Network 一 边搜索一边看:零出站请求。在公司代理后面、飞机上、或者离网的机房机器上都 照常用,恰好是你最需要 RabbitMQ 速查的场景。
做你这行的人, 还会一起用这些。