跳到主要内容

RabbitMQ 速查表 — 90+ 条命令、模式与常见坑

RabbitMQ 速查,90+ 条命令和模式,涵盖队列、交换机、集群和常见坑。

  • 本地处理
  • 分类 开发运维
  • 适合 格式化、校验、压缩或检查和代码相关的文本。
分类:
80
CLI 命令 (33)
rabbitmqctl status

显示 broker 状态:运行中的应用、版本、OS 和 Erlang 版本、内存/磁盘用量。

例子
rabbitmqctl status
rabbitmqctl status --formatter=json
rabbitmqctl list_queues name messages consumers unacked_messages

列出队列及消息数、消费者数、未确认消息数。加 -p /vhost 限定 vhost。

例子
rabbitmqctl list_queues name messages consumers
rabbitmqctl list_queues -p /production name messages consumers unacked_messages memory
rabbitmqctl list_exchanges name type durable auto_delete

列出所有交换机及类型、持久化、自动删除标志。预声明的系统交换机名称为空。

例子
rabbitmqctl list_exchanges name type durable
rabbitmqctl list_exchanges -p /myapp name type
rabbitmqctl list_bindings source_name source_kind destination_name routing_key

列出所有交换机到队列的绑定关系及路由键,用于排查路由不匹配问题。

例子
rabbitmqctl list_bindings
rabbitmqctl list_bindings -p /myapp source_name destination_name routing_key
rabbitmqctl list_connections name state user vhost recv_oct send_oct

列出所有 AMQP 连接,含状态、用户、vhost、收发字节数。用 state 字段找僵死/阻塞连接。

常见坑

连接状态为 "blocked" 表示 broker 触发了资源告警(内存或磁盘)。客户端正在被限流。

例子
rabbitmqctl list_connections name state user
rabbitmqctl list_connections name state user vhost recv_oct send_oct
rabbitmqctl list_channels name number user connection state consumer_count

列出所有 AMQP 通道。每条连接可以复用多个通道。单连接通道数过多是代码异味。

例子
rabbitmqctl list_channels
rabbitmqctl list_channels name number consumer_count
rabbitmqctl list_consumers queue_name consumer_tag ack_required prefetch_count

列出所有消费者及队列名、消费者标签、确认模式、预取数。

例子
rabbitmqctl list_consumers
rabbitmqctl list_consumers queue_name consumer_tag ack_required prefetch_count
rabbitmqctl add_user <username> <password>

创建新 RabbitMQ 用户。新用户在任何 vhost 上都没有权限。

例子
rabbitmqctl add_user alice secretpassword
rabbitmqctl add_user ci-agent "$(openssl rand -base64 32)"
rabbitmqctl set_user_tags <username> <tag...>

设置管理界面角色。Tags: administrator(全权)、monitoring(只读)、management(基础界面)、policymaker(仅策略)。

例子
rabbitmqctl set_user_tags alice administrator
rabbitmqctl set_user_tags ci-agent monitoring
rabbitmqctl set_user_tags alice   # clears all tags
rabbitmqctl set_permissions -p <vhost> <user> <conf> <write> <read>

授予用户在 vhost 上的权限。三个正则:configure(声明/删除)、write(发布)、read(消费/获取)。`".*"` 匹配所有。

例子
rabbitmqctl set_permissions -p / alice ".*" ".*" ".*"
rabbitmqctl set_permissions -p /prod alice "^app-" "^app-" "^app-"
rabbitmqctl list_users

列出所有用户及其 tag。

例子
rabbitmqctl list_users
rabbitmqctl delete_user <username>

删除用户及其所有权限。

例子
rabbitmqctl delete_user alice
rabbitmqctl change_password <username> <newpassword>

修改指定用户的登录密码,立即生效。

例子
rabbitmqctl change_password alice newpass123
rabbitmqctl list_permissions -p <vhost>

列出某 vhost 下所有权限条目。

例子
rabbitmqctl list_permissions -p /
rabbitmqctl list_permissions -p /production
rabbitmqctl add_vhost <vhost>

创建虚拟主机。vhost 在租户或环境之间提供逻辑隔离。

例子
rabbitmqctl add_vhost /production
rabbitmqctl add_vhost /staging
rabbitmqctl list_vhosts

列出 broker 上所有已创建的虚拟主机。

例子
rabbitmqctl list_vhosts
rabbitmqctl list_vhosts name tracing
rabbitmqctl delete_vhost <vhost>

删除 vhost 及其所有队列、交换机、绑定和策略。

常见坑

这是不可逆的。如需备份,先用 export_definitions 导出定义。

例子
rabbitmqctl delete_vhost /staging
rabbitmqctl purge_queue <queuename>

清空队列所有消息,但保留队列本身。

常见坑

清空是永久的,消息无法恢复。生产环境跑之前确认你真的想这么做。

例子
rabbitmqctl purge_queue payment-queue
rabbitmqctl purge_queue -p /prod orders
rabbitmqctl delete_queue <queuename>

删除队列及其所有消息。

例子
rabbitmqctl delete_queue old-test-queue
rabbitmqctl delete_queue -p /prod deadletter
rabbitmqctl delete_exchange <exchangename>

删除交换机。已路由到队列的飞行中消息不受影响。

例子
rabbitmqctl delete_exchange old-events
rabbitmqctl delete_exchange -p /prod notifications
rabbitmqctl export_definitions /path/to/defs.json

把所有 broker 定义(vhost、队列、交换机、绑定、用户、策略)导出为 JSON 文件。用于备份和迁移。

例子
rabbitmqctl export_definitions /tmp/rabbitmq-backup.json
rabbitmqctl export_definitions - | jq .   # stream to stdout
rabbitmqctl import_definitions /path/to/defs.json

从 JSON 文件导入 broker 定义。与现有配置合并,不会删除文件里没有的内容。

例子
rabbitmqctl import_definitions /tmp/rabbitmq-backup.json
curl -s http://localhost:15672/api/definitions -u guest:guest | rabbitmqctl import_definitions -
rabbitmqctl cluster_status

显示集群成员、节点类型和分区状态。健康集群的 "partitions" 字段应始终为空。

例子
rabbitmqctl cluster_status
rabbitmqctl cluster_status --formatter=json | jq .partitions
rabbitmqctl join_cluster rabbit@<nodename>

加入现有集群。必须先跑 `rabbitmqctl stop_app`。节点须通过 Erlang cookie 和主机名可达。

例子
rabbitmqctl stop_app && rabbitmqctl join_cluster rabbit@rabbit2 && rabbitmqctl start_app
rabbitmqctl join_cluster --ram rabbit@rabbit1   # RAM node (not recommended)
rabbitmqctl forget_cluster_node rabbit@<deadnode>

从集群元数据中移除永久离线的节点。只在该节点不可能再上线时使用。

常见坑

只有在确定该节点永远不会再加入时才执行。移除还持有仲裁队列数据的节点会导致数据丢失。

例子
rabbitmqctl forget_cluster_node rabbit@dead-node
rabbitmqctl forget_cluster_node --offline rabbit@dead-node
rabbitmqctl reset

危险:把节点重置为出厂状态,清除所有数据、定义和集群成员关系。需要先执行 stop_app。

常见坑

不可逆。所有队列、交换机、绑定、消息、用户(除默认外)全部消失。

例子
rabbitmqctl stop_app && rabbitmqctl reset && rabbitmqctl start_app
rabbitmq-plugins enable <plugin>

启用插件。大多数插件无需重启即时生效。常用插件:rabbitmq_management、rabbitmq_shovel、rabbitmq_federation、rabbitmq_stream。

例子
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
rabbitmq-plugins list

列出所有插件及启用/禁用状态。[E] 表示显式启用,[e] 表示作为依赖启用。

例子
rabbitmq-plugins list
rabbitmq-plugins list --enabled
rabbitmq-diagnostics check_running

健康检查:broker 运行且内部健康检查通过时退出 0,否则非零。适合做就绪探针。

例子
rabbitmq-diagnostics check_running && echo "healthy"
rabbitmq-diagnostics check_port_connectivity && rabbitmq-diagnostics check_running
rabbitmq-diagnostics memory_breakdown

按类别显示内存用量(队列、连接、通道、绑定、mnesia 等)。诊断内存压力的必备命令。

例子
rabbitmq-diagnostics memory_breakdown
rabbitmq-diagnostics memory_breakdown --formatter=json | jq .
rabbitmq-diagnostics check_port_connectivity

验证 AMQP、管理界面和集群端口是否可达。

例子
rabbitmq-diagnostics check_port_connectivity
rabbitmqctl set_policy <name> <pattern> <definition> --apply-to <queues|exchanges|all>

对名称匹配正则的所有队列/交换机应用策略。策略优先于每个队列的参数,且可实时修改。

例子
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' --apply-to queues
rabbitmqctl set_policy ttl-1h "^transient\." '{"message-ttl":3600000}' --apply-to queues
rabbitmqctl set_policy max-length "^bounded\." '{"max-length":10000,"overflow":"reject-publish-dlx"}' --apply-to queues
rabbitmqctl eval '<Erlang expression>'

在运行中的节点上执行 Erlang 表达式。适合标准 CLI 命令未暴露的一次性生产诊断。

常见坑

仅在紧急情况下谨慎使用——错误的 Erlang 表达式可能导致节点崩溃。日常运维优先使用 rabbitmq-diagnostics 命令。

例子
rabbitmqctl eval 'rabbit_nodes:all_running().'
rabbitmqctl eval 'length(rabbit_amqqueue:list()).'
rabbitmqctl eval 'mnesia:system_info(all).'
核心概念 (20)
Direct exchange (type: direct)

按绑定键与消息路由键的精确匹配路由消息。一个键→一个(或多个)有相同绑定的队列。

例子
# Declare: channel.exchangeDeclare("events", "direct", true)
# Bind:    channel.queueBind("order-created-queue", "events", "order.created")
# Publish: channel.basicPublish("events", "order.created", null, body)
Topic exchange (type: topic)

用通配符模式路由。`*` 匹配且仅匹配一个点分隔的单词。`#` 匹配零个或多个单词。如 `user.#` 能匹配 `user.login`、`user.profile.updated`。

例子
# Binding "*.error" catches order.error, payment.error but not order.payment.error
# Binding "order.#" catches order.created, order.item.added, order.fulfilled
# Binding "#" catches everything (same as fanout but slower)
Fanout exchange (type: fanout)

把每条消息的副本发给所有绑定的队列,完全忽略路由键。适合广播/发布-订阅场景。

常见坑

路由键被忽略,不小心把队列绑到 fanout 交换机上就会静默接收所有消息。

例子
# Declare: channel.exchangeDeclare("broadcast", "fanout", true)
# Each subscriber declares its own queue and binds it — no routing key needed
# Publish: channel.basicPublish("broadcast", "", null, body)  // key ignored
Headers exchange (type: headers)

按消息头的键值对路由,而非路由键。绑定时指定所需的头值,以及 x-match: all(AND)或 any(OR)。

例子
# Bind with: {"format":"pdf","type":"report","x-match":"all"}
# Matches messages with BOTH format=pdf AND type=report headers
Default exchange (name: "")

内置的无名 direct 交换机。每个队列都以自身名字作为路由键自动绑定上去。点对点时很方便,但无法做 pub/sub。

常见坑

默认交换机看起来是捷径,但把发送者和队列名耦合在一起了。用命名交换机才能做到真正解耦。

例子
# channel.basicPublish("", "my-queue", null, body) sends direct to my-queue
Dead Letter Exchange (x-dead-letter-exchange)

接收源队列无法投递的消息的交换机:TTL 到期、队列满(溢出模式为 reject)、或消费者 nack 且 requeue=false。

例子
# Declare source queue with DLX:
# channel.queueDeclare("orders", true, false, false, {"x-dead-letter-exchange": "dlx", "x-dead-letter-routing-key": "orders.dead"})
# Bind a "parking lot" queue to dlx with routing key "orders.dead"
Alternate Exchange (x-alternate-exchange)

接收无法路由(没有队列匹配路由键)的消息的交换机。防止路由不匹配时静默丢消息。

例子
# Declare exchange with AE:
# channel.exchangeDeclare("main", "topic", true, false, {"alternate-exchange": "unrouted"})
# channel.exchangeDeclare("unrouted", "fanout", true)
Classic queue

标准 AMQP 队列,由 Mnesia 支撑。除非配置经典镜像策略(3.9 弃用,4.0 移除)否则不做复制。仅用于不重要的临时工作负载。

例子
# channel.queueDeclare("task-queue", true, false, false, null)
# durable=true means the queue survives broker restart
Quorum queue (x-queue-type: quorum)

用 Raft 共识复制的队列。少数节点故障不丢数据。推荐用于所有生产队列。支持 DLX、单消息 TTL、x-max-length 和投递次数限制。

常见坑

仲裁队列始终持久化。不支持优先级队列或全局 QoS。每个队列每个节点选一个 leader,小集群上避免几百个队列。

例子
# channel.queueDeclare("orders", true, false, false, {"x-queue-type": "quorum"})
# Set leader election strategy via policy: {"queue-leader-locator": "balanced"}
Stream queue (x-queue-type: stream)

不可变追加日志。消费者可从任意偏移量重放。按配置的大小或时间保留数据,而非等消费者 ack。需要 rabbitmq_stream 插件。

例子
# channel.queueDeclare("events", true, false, false, {"x-queue-type": "stream", "x-max-age": "7D", "x-stream-max-segment-size-bytes": 536870912})
# Consumer subscribes with offset: {"x-stream-offset": "first"} or {"x-stream-offset": <timestamp>}
Message TTL (x-message-ttl)

队列级别的 TTL,单位毫秒。超时消息被死信化(如果配了 DLX)或丢弃。在队列声明时设置。

例子
# {"x-message-ttl": 60000}  // 60 seconds
# Or via policy: rabbitmqctl set_policy ttl "^cache\." '{"message-ttl":5000}' --apply-to queues
Per-message TTL (expiration header)

单消息的过期时间,单位毫秒,设在消息属性里。如果队列级别 TTL 也设了,取两者中较小的。

例子
# AMQP props: { expiration: "30000" }  // 30 seconds
# Note: expiration is a STRING in the AMQP spec, not an integer
Queue TTL (x-expires)

队列没有消费者且没有消息投递 N 毫秒后自动删除。适合自动清理临时的回复队列。

例子
# {"x-expires": 3600000}  // queue disappears after 1 hour idle
Publisher confirms (channel.confirmSelect)

Broker 逐条确认发布的消息。basic.ack = 已接受(持久化消息已写盘)。basic.nack = 已丢弃(队列满、资源告警)。实现至少一次发布保证。

常见坑

单消息确认有 5–10% 吞吐损耗。批量确认(发 N 条等 N 个 ack)能找回大部分性能。

例子
# channel.confirmSelect();
# channel.waitForConfirmsOrDie(5000);  // Java: wait up to 5s for all outstanding acks
# async: channel.addConfirmListener((tag, multiple) => { /* handle ack */ }, (tag, multiple) => { /* handle nack */ })
Consumer acknowledgement modes

三种确认模式:basic.ack(成功,从队列删除)、basic.nack(失败,可选 requeue=true/false)、basic.reject(同 nack 但只针对单条)。手动 ack 始终比 auto-ack 安全。

常见坑

basic.nack 配 requeue=true 会立即重投,如果处理始终失败,消息会无限循环。改为路由到 DLX。

例子
# channel.basicAck(deliveryTag, false)
# channel.basicNack(deliveryTag, false, false)  // nack, no requeue → DLX
# channel.basicReject(deliveryTag, false)       // reject single message
Basic.QoS / prefetch (channel.basicQos)

限制同时推给消费者的未确认消息数。不设的话 broker 会把消息都塞给最快的消费者。在 basicConsume 前设置。作用范围:每通道(默认)或每消费者。

常见坑

prefetch=0 表示不限制。prefetch=1 表示消费者一次只处理一条消息——严格顺序时正确,高吞吐时性能差。

例子
# channel.basicQos(10)  // at most 10 unacked messages per channel
# channel.basicQos(1, true)  // prefetch=1, global=true (applies per-channel not per-consumer)
Durable queue + delivery-mode=2 (persistent message)

消息要在 broker 重启后存活,必须同时满足两个条件:(1) 队列声明为 durable,(2) 消息发布时 delivery-mode=2。

常见坑

持久化队列里的非持久化消息(delivery-mode=1)在 broker 重启后会消失。这是最常见的持久化误区。

例子
# queue: durable=true
# message props: deliveryMode=2
# AMQP props: { deliveryMode: 2, contentType: "application/json" }
x-max-length and x-overflow

按消息数(x-max-length)或字节数(x-max-length-bytes)限制队列深度。x-overflow 控制达到上限时的行为:drop-head(默认,丢弃最旧)、reject-publish(nack 发布者)、reject-publish-dlx(nack 发布者 + 被丢消息走 DLX)。

例子
# {"x-max-length": 10000, "x-overflow": "reject-publish"}
# Via policy: rabbitmqctl set_policy bounded "^bnd\." '{"max-length":5000,"overflow":"reject-publish-dlx"}' --apply-to queues
Priority queue (x-max-priority)

最多 255 个优先级的队列。数字越大优先级越高。消息必须携带数字型 `priority` 属性。不配消费者优先级的话,优先级只在单消费者时有效。

常见坑

x-max-priority > 5 浪费内存——RabbitMQ 为每个优先级级别创建内部队列。大多数场景只需 2–5 个级别。

例子
# {"x-max-priority": 5}
# Publish high-priority: { priority: 5 }
# Publish normal: { priority: 1 }
Auto-delete and exclusive queues

自动删除:最后一个消费者断连后队列自动删除。独占:仅对声明它的连接可见,连接关闭后删除。两者都适合临时工作或 RPC 回复队列。

例子
# exclusive=true: channel.queueDeclare("", false, true, true, null)  // "" = server-generated name
# auto-delete=true: channel.queueDeclare("reply-12345", false, false, true, null)
消息模式 (13)
Work queue (competing consumers)

多个消费者订阅同一个队列。broker 轮询分发消息。每条消息恰好由一个消费者处理。后台任务水平扩展的经典模式。

例子
# Declare one shared durable queue
# Start N worker processes each calling channel.basicConsume("tasks", ...)
# Set prefetch=1 to prevent work imbalance: channel.basicQos(1)
Pub/Sub with fanout exchange

每个订阅者声明自己的独占队列并绑定到 fanout 交换机。所有订阅者独立接收每条消息。适合事件广播。

例子
# Publisher: channel.basicPublish("events-fanout", "", null, body)
# Subscriber A: queueDeclare("", ..., true, true, ...) → queueBind(q, "events-fanout", "")
# Subscriber B: queueDeclare("", ..., true, true, ...) → queueBind(q, "events-fanout", "")
Topic routing (wildcard routing key)

用 topic 交换机让消费者按模式订阅。如日志消费者绑定 `#.error` 接收所有服务的错误;审计消费者绑定 `order.#` 接收所有订单事件。

例子
# channel.exchangeDeclare("events", "topic", true)
# Bind error queue: channel.queueBind("errors", "events", "#.error")
# Bind order queue: channel.queueBind("order-audit", "events", "order.#")
# Publish: channel.basicPublish("events", "order.payment.failed", null, body)
RPC over AMQP (reply_to + correlation_id)

客户端发消息到服务端队列,携带 reply_to=<独占队列名> 和唯一 correlation_id。服务端处理后把响应发到 reply_to 队列,携带相同 correlation_id。

常见坑

AMQP RPC 是合法模式但有延迟。只有在栈里已有 RabbitMQ 且需要请求路由或背压时才用它代替 HTTP。

例子
# Client: channel.basicPublish("", "rpc-server-queue", {replyTo: "amq.rabbitmq.reply-to", correlationId: uuid}, body)
# Use "amq.rabbitmq.reply-to" as the reply-to address for the fast direct reply protocol
# Server: on message, channel.basicPublish("", msg.properties.replyTo, {correlationId: msg.properties.correlationId}, responseBody)
Dead letter queue (DLQ parking lot)

源队列把失败/过期消息发给 DLX。绑定到 DLX 的"停车场"队列保存这些消息供检查、重放或手动丢弃。运维团队用它调试故障而不丢消息。

例子
# 1. Declare DLX: channel.exchangeDeclare("dlx", "direct", true)
# 2. Bind parking lot: channel.queueDeclare("parking-lot", true, ...); channel.queueBind("parking-lot", "dlx", "parking")
# 3. Source queue: {"x-dead-letter-exchange": "dlx", "x-dead-letter-routing-key": "parking"}
# 4. When consumer nacks with requeue=false, message lands in parking-lot
Exponential backoff retry via TTL + DLX chain

链路:source-queue → nack → dlx → wait-1s-queue(TTL=1000,DLX→source) → 到期 → 回到 source。加 wait-10s-queue 做第二次重试。用消息头追踪尝试次数。

例子
# wait-1s-queue: {"x-dead-letter-exchange": "main-exchange", "x-message-ttl": 1000}
# wait-10s-queue: {"x-dead-letter-exchange": "main-exchange", "x-message-ttl": 10000}
# After max retries, nack to a final parking-lot queue instead
Delayed message (per-message TTL + DLX)

给消息设 expiration,路由到无消费者的暂存队列。TTL 到期后 DLX 重新路由到实际目标。

常见坑

生产环境用 rabbitmq-delayed-message-exchange 插件更干净,它按节点存储消息,支持精确延迟,不用创建额外队列。

例子
# Install: rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# channel.exchangeDeclare("delayed", "x-delayed-message", true, false, {"x-delayed-type": "direct"})
# Publish with header: {"x-delay": 5000}  // delay 5 seconds
Shovel plugin (move messages between brokers)

Shovel 动态地把消息从源队列/交换机移到同一个或不同 broker 上的目标。用于跨数据中心复制或 vhost 迁移。

例子
# Enable: rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
# Set shovel via policy or management API: PUT /api/parameters/shovel/%2F/my-shovel
# {"src-uri":"amqp://","src-queue":"old-queue","dest-uri":"amqp://remote-host","dest-queue":"new-queue"}
Idempotent consumer (message deduplication)

处理前把 message-id(或业务键)存入 Redis/DB。如果已存在,ack 后跳过。这把至少一次投递变成有效一次处理。

例子
# 1. Receive message with unique messageId in properties
# 2. if redis.set(messageId, "1", NX, EX, 86400): process and ack
# 3. else: ack without processing (already handled)
Outbox pattern (atomic write + publish)

和业务变更在同一个 DB 事务里把事件写入 outbox 表。后台轮询器读 outbox 并发布到 RabbitMQ。保证服务在 DB 写入后、发布前崩溃也不丢消息。

例子
# 1. BEGIN TRANSACTION
# 2. UPDATE orders SET status="paid" WHERE id=123
# 3. INSERT INTO outbox(event_type, payload) VALUES ("order.paid", "...")
# 4. COMMIT
# 5. Poller: SELECT * FROM outbox WHERE published_at IS NULL; → publish; UPDATE outbox SET published_at=NOW()
Circuit breaker via x-overflow: reject-publish

在源队列设 x-max-length + x-overflow: reject-publish。队列达到容量上限时,发布者收到 nack 并应停止发送(熔断器开)。消费者清空队列后,发布者重新收到 ack(熔断器关)。

例子
# {"x-max-length": 1000, "x-overflow": "reject-publish"}
# Publisher must handle nack: pause publishing or fall back to local buffer
Event sourcing with stream queues

用流式队列做不可变的事件日志。消费者从任意偏移量订阅(first、last、特定时间戳或字节偏移)。多个消费组独立重放互不干扰。

例子
# {"x-queue-type": "stream", "x-max-age": "30D"}
# Subscribe from beginning: consumer args {"x-stream-offset": "first"}
# Subscribe from timestamp: consumer args {"x-stream-offset": <unix-timestamp-ms>}
Blue/green deployment via vhosts

把流量路由到 /blue 或 /green vhost。新版本对 /green 部署,/blue 继续服务生产。切换负载均衡器或应用配置指向 /green;保留 /blue 用于即时回滚。

例子
# rabbitmqctl add_vhost /green
# rabbitmqctl set_permissions -p /green deploy ".*" ".*" ".*"
# Configure app to use AMQP_VHOST=/green for the new deployment
常见坑 (14)
autoAck=true → silent data loss on consumer crash

autoAck=true 时 broker 投递消息的瞬间就把它从队列删除。消费者在处理完之前崩溃或抛出异常,消息就永久消失了。

常见坑

始终用手动 ack。成功处理后才调 basicAck。失败时调 basicNack/basicReject。

例子
# Wrong: channel.basicConsume("tasks", true, callback)  // autoAck=true
# Right: channel.basicConsume("tasks", false, callback) // manual ack
# In callback: try { process(msg); channel.basicAck(tag, false); } catch { channel.basicNack(tag, false, false); }
Missing prefetch → memory balloon + consumer starvation

没有预取限制时,broker 把所有排队消息都推给第一个可用的消费者。这个消费者在内存里缓存了成千上万条消息,而其他消费者空闲等待。

常见坑

在 channel.basicConsume() 前始终调 channel.basicQos()。工作队列:预取 10–100。慢处理:预取 1。

例子
# channel.basicQos(10)  // then channel.basicConsume(...)
Re-using a channel after an error closes it

AMQP 通道在 broker 报错后会失效(权限不对、声明冲突的队列、ack 未知的投递 tag)。channel 级别出错后必须创建新通道,旧通道已永久关闭。

常见坑

Channel 错误在日志里看起来像连接错误。通过错误码区分:4xx 是 channel 错误,5xx 是连接错误。

例子
# Wrong: ignore the error and keep using the same channel object
# Right: listen for channel.close event; create a new channel in the handler
# channel.on("error", (err) => { channel = connection.createChannel(); })
Classic mirrored queues (deprecated since 3.9)

经典镜像队列(ha-policy)在网络分区下不可靠,同步时可能丢消息,节点故障需要手动运维介入。3.9 中弃用,4.0 中移除。

常见坑

现在就迁移到仲裁队列。用 x-queue-type: quorum 声明。它们自动处理脑裂和节点故障。

例子
# Remove ha-all policy: rabbitmqctl clear_policy ha-all
# Migrate: drain classic queue, delete it, redeclare with x-queue-type: quorum
Durable queue + non-persistent messages → data lost on restart

持久化队列在 broker 重启后存活,但消息不会,除非 delivery-mode=2(持久化)。持久化队列里的 delivery-mode=1 消息在重启后悄悄消失。

常见坑

两个条件必须同时满足:queue.durable=true 且 message.deliveryMode=2。检查发布代码,不只是队列声明。

例子
# Wrong: channel.basicPublish("ex", "key", null, body)  // default deliveryMode=1
# Right: channel.basicPublish("ex", "key", {deliveryMode: 2}, body)
Memory watermark alarm blocks ALL publishers

当 broker 内存用量超过高水位线(默认可用内存的 40%)时,所有进入发布路径的连接都被阻塞。broker 向客户端发送 Connection.Blocked。

常见坑

症状:发布操作无限期挂起,消费者还在正常消费。修复:加内存、提高水位线或加速消费。跑 `rabbitmq-diagnostics memory_breakdown` 找原因。

例子
# Temporary relief: rabbitmqctl set_vm_memory_high_watermark 0.6
# Check current: rabbitmq-diagnostics memory_breakdown
# Long-term: add RAM, enable lazy queues (page to disk), or reduce queue depth
Too many connections (connection storm)

每条 AMQP 连接在 broker 上消耗约 100KB 内存和一个文件描述符。连接风暴(如每个 Lambda 调用都创建新连接)会迅速耗尽 broker 资源。

常见坑

用连接池或单例连接模式。每进程共享一条连接,通过该连接上的多个通道多路复用工作。

例子
# Wrong: create a new connection per message (serverless anti-pattern)
# Right: create one connection at startup; reuse across all handlers
# Check: rabbitmqctl list_connections | wc -l
Missing heartbeat → ghost connections accumulate

没有心跳的话,一条因防火墙超时或 NAT 表清除而失去底层路由的 TCP 连接在双方看来都是活的,直到有人尝试发送。broker 上积累的僵尸连接永远不会消失。

常见坑

始终协商心跳。RabbitMQ 默认 60 秒。大多数客户端默认也是 60 秒。显式设置以避免不匹配。

例子
# Node.js amqplib: amqp.connect("amqp://localhost?heartbeat=60")
# Java Spring AMQP: factory.setRequestedHeartbeat(60)
# Check ghost conns: rabbitmqctl list_connections name state  // look for "blocking" with no traffic
Routing key mismatch → silent message black-hole

如果用一个不匹配任何绑定的路由键发布消息,且没有配置备用交换机,消息会被静默丢弃。没有错误,没有日志条目。

常见坑

发布时使用 mandatory 标志。mandatory=true 时,不可路由的消息会触发 basic.return 回调,而不是悄悄消失。

例子
# channel.basicPublish("events", "wrong.key", {mandatory: true}, body)
# channel.on("return", (msg) => { console.error("Unroutable:", msg.fields.routingKey) })
# Or configure AE: {"alternate-exchange": "unrouted-ae"}
nack with requeue=true → infinite retry loop

basicNack(tag, false, requeue=true) 把消息立即重新投递到队列前面。如果处理始终失败,消息以 100% CPU 无限循环,阻塞新消息。

常见坑

不要无条件使用 requeue=true。用消息头计数重试次数;达到最大次数后,用 requeue=false nack,把消息送到 DLX。

例子
# Wrong: channel.basicNack(tag, false, true)  // will loop on poison message
# Right: check x-death header count; after 3 retries: channel.basicNack(tag, false, false)
Not setting DLX → failed messages silently vanish

没有配置 DLX 的话,过期或 requeue=false 被拒绝的消息会被 broker 丢弃。你对丢失的内容和原因一无所知。

常见坑

处理重要工作的队列始终配置 DLX。路由到停车场队列或另一个处理队列。绝不让消息悄悄消失。

例子
# Minimum DLX setup:
# channel.exchangeDeclare("dlx", "fanout", true)
# channel.queueDeclare("dead-letters", true, false, false, null)
# channel.queueBind("dead-letters", "dlx", "")
# source queue: {"x-dead-letter-exchange": "dlx"}
Connection vs channel confusion

连接是到 broker 的一个 TCP socket。通道是在其上多路复用的轻量级虚拟连接。每进程一条连接;用多个通道实现并行。

常见坑

通道在大多数客户端库里不是线程安全的。每个线程创建一个通道。开关通道很便宜,开关连接很昂贵。

例子
# Wrong: share one channel across multiple threads
# Right: one connection → N channels (one per thread/goroutine/coroutine)
Publishing with transactions instead of publisher confirms

AMQP 事务(tx.select / tx.commit)提供精确一次语义,但代价是发布确认吞吐量的 5–10 倍。改用确认模式——以极小的代价实现至少一次保证。

常见坑

事务还会阻塞通道直到提交,增加延迟。实践中,发布确认 + 幂等消费者能以远好得多的吞吐量实现同样的有效保证。

例子
# Avoid: channel.txSelect(); channel.basicPublish(...); channel.txCommit();
# Use: channel.confirmSelect(); channel.basicPublish(...); channel.waitForConfirmsOrDie();
Pitfall: ignoring connection-level errors

很多 SDK 分别暴露通道级和连接级错误事件。通道在协议错误时关闭,底层连接可能仍存活。不监听连接错误会导致 broker 重启或网络抖动时静默丢消息。

常见坑

在连接对象(而非仅通道)上注册 `on("error")` 处理程序,并在连接层面实现重连逻辑(指数退避),而不只是重开通道。

例子
// Bad: only handles channel errors
channel.on('error', (err) => console.error('channel error', err));
// Good: handle both
connection.on('error', (err) => { reconnect(); });
channel.on('error', (err) => { reopenChannel(); });

这个工具能做什么

可搜索的 RabbitMQ 速查,90+ 条,覆盖日常 RabbitMQ 工作的方方面面。 CLI 一节:rabbitmqctl 管理队列、交换机、绑定、连接、通道、用户、权限、 vhost、清空队列、导出/导入定义、集群管理(加入、查状态、移除死节点、 重置),以及 rabbitmq-plugins(启用 management/shovel/federation/streams)和 rabbitmq-diagnostics(check_running、memory_breakdown、 check_port_connectivity)。概念一节:全部四种交换机类型(direct、topic、 fanout、headers)、默认交换机及为何不适合做 pub/sub、死信交换机(DLX)和 备用交换机(AE)、经典/仲裁/流式三种队列类型、消息 TTL 和队列 TTL、 发布确认、消费者确认模式(ack/nack/reject)、Basic.QoS 预取、持久化队列配 合 delivery-mode=2、自动删除和独占队列、x-max-length 与溢出策略、优先级 队列。模式一节:竞争消费者工作队列、fanout pub/sub、direct 路由、topic 通配路由、带 reply_to/correlation_id 的 AMQP RPC、死信队列停车场、TTL+DLX 循环实现指数退避重试、延迟消息、铲子(Shovel)和联邦(Federation)、幂等消 费者、Outbox 模式、x-overflow 实现熔断、用流做事件溯源、Saga 协调。常见 坑一节:auto-ack 数据丢失、缺少预取导致内存爆炸、channel 出错后复用、经典 镜像队列 vs 仲裁队列、非持久化消息放持久化队列、内存水位报警停止所有 发布者、连接风暴、缺心跳、路由键黑洞、DLX 配置错误。每条都含中英文 双语说明、可直接复制的命令或配置、真实场景背景。搜索跨命令/说明/例子/ 坑四个维度一起过滤,分类胶囊切 CLI / 概念 / 模式 / 常见坑。 完全在浏览器里跑,零请求,断网可用。

工具细节

输入
文本
页面会根据工具类型展示文本框、数值控件、文件选择或结构化输入。
输出
即时结果 + 复制
结果区优先给出可操作结果,支持项会显示复制、下载或可视化预览。
隐私
浏览器本地处理
主工具逻辑未发现外部 API 调用,输入通常留在当前标签页内处理。
保存 / 分享
可分享链接状态
关键设置会进入 URL,复制链接后别人能复现同一组参数。
性能预算
首屏 JS ≤ 32 KB
没有声明 WASM 依赖,适合快速打开和移动端使用。
适用场景
开发运维 · 程序员
分类和职业标签用于推荐相关工具、组织内链,并帮助用户快速判断是否适合当前任务。

怎么用

  1. 1. 输入

    把内容粘贴或拖入工具面板。

  2. 2. 处理

    点击按钮,在浏览器内本地处理,文件不上传。

  3. 3. 复制 / 下载

    一键复制结果或下载到本地。

RabbitMQ 速查表 适合怎么用

适合穿插在写代码、查问题、做 Review、上线前的小任务里。

适合开发场景

  • 格式化、校验、压缩或检查和代码相关的文本。
  • 把片段整理好再放进文档、工单、提交或交接材料。
  • 不切换工具,快速检查一个小 payload。

开发检查项

  • 压缩、混淆这类不可逆处理,先对副本操作。
  • 除非确认工具本地处理,不要粘贴密钥和敏感片段。
  • 转换后的代码上线前,仍要跑自己的测试或 lint。

下一步可以接着做

这些入口会把当前任务接到更完整的工具链里。

  1. 1 Docker 命令速查 Docker 命令速查,80+ 条命令带真实例子和常见坑,含 Compose 章节。 打开
  2. 2 Redis 速查表 Redis 速查表,80+ 命令覆盖字符串/哈希/列表/集合/有序集合/发布订阅/Streams/脚本,带真实例子和坑。 打开
  3. 3 kubectl 命令速查 kubectl 命令速查,100+ 条 K8s 命令,真实例子和常见坑,含 YAML 片段。 打开

真实使用场景

  • 凌晨两点调查生产队列积压

    告警响了,支付处理队列有 5 万条未确认消息,消费者数降为零。你翻开速查, 跑 `rabbitmqctl list_queues name messages consumers unacked_messages` 确认, 再跑 `rabbitmqctl list_connections name state` 找到那个僵死连接。 cluster_status 条目确认节点还活着,memory_breakdown 条目指向真正的元凶: 一个没关闭 channel 就崩掉的消费者。

  • 大版本发布前把经典镜像队列迁成仲裁队列

    团队到处都是 ha-all 的经典镜像队列,要升到 3.9 以上得先迁。速查里仲裁 队列概念条目给你 x-queue-type 声明方法,export_definitions 条目让你先给 当前拓扑打个快照。按维护窗口逐队列迁移,每迁一个就用 cluster_status 确 认复制状态。

  • 设计带指数退避的重试管道

    你的订单处理器有时会碰到返回 503 的下游 API,需要带退避的重试,不能无限 循环。速查里"TTL + DLX 指数退避"模式条目给你完整的队列链:order-queue → reject → dlx → retry-1s → TTL 到期 → dlx → retry-10s → … → dead-letter-停车场。DLX 概念条目和 x-dead-letter-exchange 参数条目给你 所有需要的原语。

常见踩坑

  • 声明了 durable=true 的队列,却用 deliveryMode=1(瞬态)发布消息。消息在重启后存活需要两个条件同时满足:队列持久化 AND 消息 delivery-mode=2(持久化)。

  • 在 RabbitMQ 3.9+ 里继续用经典镜像队列(ha-policy)。这个特性已弃用,在网络分区下不可靠,4.0 中已彻底移除。改用仲裁队列。

  • 开了 autoAck=true 然后做耗时处理。消费者收到消息后崩了、处理还没完成,消息就没了。始终用手动确认,处理成功后再 ack。

隐私说明

每条命令、每个概念、每种模式、每个坑,全都在页面加载时一次性放进静态内存 数组里。搜索框、分类胶囊和复制按钮完全在你的浏览器里运行。你搜的、拷的、 打的任何内容都不会发到服务器,也不会写进 URL。打开 DevTools 的 Network 一 边搜索一边看:零出站请求。在公司代理后面、飞机上、或者离网的机房机器上都 照常用,恰好是你最需要 RabbitMQ 速查的场景。

常见问题

类似工具组合

做你这行的人, 还会一起用这些。

Made by Toolora · 100% client-side · Updated 2026-07-01