首页主机资讯Ubuntu RabbitMQ消息确认机制

Ubuntu RabbitMQ消息确认机制

时间2025-12-03 12:55:05发布访客分类主机资讯浏览1311
导读:Ubuntu 上 RabbitMQ 消息确认机制全览 一 核心概念与适用场景 消费者确认(Consumer Acknowledgements):消费者处理完成后显式返回 ack/nack/reject。开启手动确认需将 autoAck=f...

Ubuntu 上 RabbitMQ 消息确认机制全览

一 核心概念与适用场景

  • 消费者确认(Consumer Acknowledgements):消费者处理完成后显式返回 ack/nack/reject。开启手动确认需将 autoAck=false;此时队列会分为 Ready(待投递)与 Unacked(已投递未确认)两部分。若消费者连接断开,RabbitMQ 会把 Unacked 消息重新入队;RabbitMQ 对未确认消息不设超时,这是为了让消费者能进行耗时处理。确认必须在**同一信道(Channel)**完成,跨信道 ack 会触发 “unknown delivery tag” 异常。
  • 发布者确认(Publisher Confirms):生产者开启确认模式后,每条消息会被分配 deliveryTag,Broker 在消息被写入所有匹配队列(持久化则落盘)后返回 Basic.Ack;若内部错误则返回 Basic.Nack。确认是逐条且仅一次的,但到达时间不保证;该机制只保证消息到达 Broker,不保证路由到队列。为提高吞吐,可用批量确认异步监听。此外,事务(txSelect/txCommit/txRollback)与发布者确认互斥,二者不应在同一信道混用。

二 消费者确认配置与要点

  • 启用手动确认:将 autoAck=false,在处理完成后调用 channel.basicAck(deliveryTag, multiple);处理失败可调用 basicNack(deliveryTag, multiple, requeue)basicReject(deliveryTag, requeue)。注意 deliveryTag 的作用域为 Channel,必须在同一信道 ack。
  • 配合限流:使用 channel.basicQos(prefetchCount) 限制未确认消息数量,避免消费者被突发流量压垮。
  • 连接断开重投递:消费者异常或关闭时,RabbitMQ 会把该连接上的 Unacked 消息重新入队,可能造成重复消费,业务需具备幂等性。
  • 运维观测:在管理界面关注队列的 Ready/Unacked 计数,快速判断是否存在堆积或确认滞后。

三 生产者确认配置与要点

  • 开启确认模式:在信道上调用 channel.confirmSelect() 后,所有后续消息进入待确认状态。
  • 确认方式:
    • 同步等待:channel.waitForConfirms() / waitForConfirmsOrDie(),简单但吞吐受限;
    • 批量等待:按批次发送后统一等待,效率高,但一旦出现 Nack/超时,整批需重发,重复量可能上升;
    • 异步监听:通过 addConfirmListener 处理 Ack/Nack,结合有序集合维护未确认序号,效率最佳。
  • 路由失败兜底:开启 mandatory=true,配合 ReturnCallback 捕获无法路由到队列的消息;也可使用备份交换器提升可靠性。
  • 重要边界:发布者确认只覆盖到 Broker 的投递成功与否;若消息到达 Broker 但没有匹配队列,仍会丢失,需结合 mandatory/备份交换器或业务侧路由校验。

四 Ubuntu 快速上手示例

  • 安装与启动(Ubuntu 20.04/22.04 常见做法)
    • 安装:sudo apt update & & sudo apt install rabbitmq-server
    • 启动与管理:sudo systemctl start rabbitmq-server & & sudo systemctl enable rabbitmq-server
    • 启用管理插件(便于观测队列与通道):sudo rabbitmq-plugins enable rabbitmq_management
  • 消费者手动确认示例(Python + pika)
    • 要点:auto_ack=False,处理完成后 basic_ack;建议设置 prefetch 限流。
    • 示例:
      • import pika
      • conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))
      • ch = conn.channel()
      • ch.queue_declare(queue=‘task_q’, durable=True)
      • ch.basic_qos(prefetch_count=1)
      • def callback(ch, method, properties, body):
        • TODO: 业务处理

        • ch.basic_ack(delivery_tag=method.delivery_tag)
        • ch.basic_consume(queue=‘task_q’, on_message_callback=callback, auto_ack=False)
        • ch.start_consuming()
  • 生产者确认示例(Python + pika,异步 Confirm)
    • 要点:开启 confirm_select,维护未确认序号集合,使用 add_confirm_listener 处理 Ack/Nack。
    • 示例:
      • import pika
      • conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))
      • ch = conn.channel()
      • ch.confirm_select()
      • unconfirmed = set()
      • def on_ack(delivery_tag, multiple):
        • removed = range(delivery_tag, delivery_tag+1) if not multiple else
          [t for t in sorted(unconfirmed) if t < = delivery_tag]
        • for t in removed: unconfirmed.remove(t)
        • def on_nack(delivery_tag, multiple):
          • 依据业务选择重发或落库告警

          • pass
        • ch.add_confirm_listener(on_ack, on_nack)
        • for i in range(10):
          • seq = ch.basic_publish(‘’, ‘task_q’, f’msg-{ i} '.encode(), properties=pika.BasicProperties(delivery_mode=2))
          • unconfirmed.add(seq[1]) # delivery_tag[1] 为序号
  • Spring Boot 常用配置(application.yml 片段)
    • spring.rabbitmq.host=127.0.0.1
    • spring.rabbitmq.port=5672
    • spring.rabbitmq.username=guest
    • spring.rabbitmq.password=guest
    • 生产者确认

    • spring.rabbitmq.publisher-confirm-type=correlated
    • spring.rabbitmq.publisher-returns=true
    • 消费者手动确认

    • spring.rabbitmq.listener.simple.acknowledge-mode=manual

五 生产实践与常见问题

  • 可靠性组合建议:队列与消息持久化 + 生产者发布者确认 + 消费者手动确认 + mandatory/备份交换器兜底路由失败。
  • 性能与一致性权衡:追求高吞吐优先选异步确认批量确认;对一致性要求极高时,可在业务侧实现去重(如业务唯一键、幂等表)。
  • 常见误区:
    • 误以为发布者确认=到达队列,实际只到 Broker
    • 忘记 basicQos,导致消费者被大量 Unacked 拖垮;
    • Channel 调用 basicAck,触发 “unknown delivery tag”;
    • 事务与发布者确认混用,导致性能与语义异常。

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: Ubuntu RabbitMQ消息确认机制
本文地址: https://pptw.com/jishu/762270.html
Informix在Linux上的内存管理策略是什么 Informix在Linux上如何实现高可用

游客 回复需填写必要信息