首页主机资讯RabbitMQ消息确认机制在CentOS上的应用

RabbitMQ消息确认机制在CentOS上的应用

时间2025-10-22 15:54:03发布访客分类主机资讯浏览1026
导读:RabbitMQ消息确认机制在CentOS上的应用实践 一、环境准备 在CentOS上应用RabbitMQ消息确认机制前,需完成基础环境配置: 安装RabbitMQ:通过yum包管理器安装(sudo yum install rabbitm...

RabbitMQ消息确认机制在CentOS上的应用实践

一、环境准备

在CentOS上应用RabbitMQ消息确认机制前,需完成基础环境配置:

  1. 安装RabbitMQ:通过yum包管理器安装(sudo yum install rabbitmq-server),启动服务(sudo systemctl start rabbitmq-server)并设置开机自启(sudo systemctl enable rabbitmq-server)。
  2. 配置权限:添加用户并授权(sudo rabbitmqctl add_user admin 123456sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*")。
  3. 连接工具:确保生产者(如Java/Spring Boot应用)与消费者(如Python/Java应用)能访问CentOS服务器的5672端口(RabbitMQ默认端口)。

二、生产者端:发送确认机制(Confirm模式)

生产者确认机制用于确保消息成功到达RabbitMQ Broker正确路由到目标队列,分为同步确认批量确认异步确认三种方式。

1. 同步确认(最严谨,性能低)

每发送一条消息后,调用waitForConfirms()等待Broker返回确认结果(成功返回true,失败返回false)。
示例代码(Java)

Channel channel = connection.createChannel();
    
channel.queueDeclare("confirm_queue", true, false, false, null);
     // 队列持久化
channel.confirmSelect();
     // 开启Confirm模式

for (int i = 0;
     i <
     10;
 i++) {
    
    String message = "Message-" + i;
    
    channel.basicPublish("", "confirm_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

    if (channel.waitForConfirms()) {
    
        System.out.println("消息[" + message + "]发送成功");

    }
 else {
    
        System.out.println("消息[" + message + "]发送失败,需重试");

    }

}
    

适用场景:对消息可靠性要求极高的场景(如金融交易),但性能较差。

2. 批量确认(平衡可靠性与性能)

每发送N条消息后,调用一次waitForConfirms()批量确认。
示例代码(Java)

Channel channel = connection.createChannel();
    
channel.queueDeclare("confirm_queue", true, false, false, null);
    
channel.confirmSelect();
    

int batchSize = 100;
    
for (int i = 0;
     i <
     1000;
 i++) {
    
    String message = "Message-" + i;
    
    channel.basicPublish("", "confirm_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

    if (i % batchSize == 0) {
    
        channel.waitForConfirms();
     // 批量确认
        System.out.println("批量确认完成:" + i + "-" + (i + batchSize - 1));

    }

}
    

适用场景:兼顾可靠性与性能的场景(如日志收集),减少网络往返次数。

3. 异步确认(高性能,推荐)

通过addConfirmListener注册回调函数,处理成功(handleAck)与失败(handleNack)的情况。需维护未确认消息缓存(如ConcurrentSkipListMap),记录消息ID与内容,便于失败后重发。
示例代码(Java)

Channel channel = connection.createChannel();
    
channel.queueDeclare("confirm_queue", true, false, false, null);
    
channel.confirmSelect();
    

// 存储未确认消息(key: deliveryTag, value: message)
ConcurrentSkipListMap<
    Long, String>
     outstandingConfirms = new ConcurrentSkipListMap<
    >
    ();
    

// 成功回调
ConfirmCallback ackCallback = (deliveryTag, multiple) ->
 {

    if (multiple) {
    
        // 批量删除已确认的消息
        ConcurrentNavigableMap<
    Long, String>
     confirmed = outstandingConfirms.headMap(deliveryTag, true);
    
        confirmed.clear();

    }
 else {
    
        outstandingConfirms.remove(deliveryTag);

    }
    
    System.out.println("消息确认成功,deliveryTag:" + deliveryTag);

}
    ;
    

// 失败回调
ConfirmCallback nackCallback = (deliveryTag, multiple) ->
 {
    
    String message = outstandingConfirms.get(deliveryTag);
    
    System.out.println("消息确认失败,需重发:" + message + ", deliveryTag:" + deliveryTag);

    // 重发逻辑(如重新调用basicPublish)
}
    ;
    

channel.addConfirmListener(ackCallback, nackCallback);
    

for (int i = 0;
     i <
     1000;
 i++) {
    
    String message = "Message-" + i;
    
    channel.basicPublish("", "confirm_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    
    outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
 // 记录未确认消息
}

适用场景:高并发生产环境(如电商订单系统),性能最优。

三、消费者端:接收确认机制(ACK模式)

消费者确认机制用于确保消息被正确处理后再从队列中移除,避免消息丢失。需将autoAck设置为false(关闭自动确认),手动调用basicAck(成功)、basicNack(失败重试)、basicReject(失败丢弃)。

1. 手动确认(推荐)

示例代码(Python)

import pika

def callback(ch, method, properties, body):
    try:
        print(f"收到消息:{
body.decode()}
")
        # 模拟业务处理(如数据库操作)
        # 如果处理成功,发送ACK
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print("消息确认成功")
    except Exception as e:
        print(f"消息处理失败:{
e}
    ")
        # 失败后重新入队(可设置requeue=True)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

# 连接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.230.131'))
channel = connection.channel()

# 声明队列(确保队列存在)
channel.queue_declare(queue='confirm_queue', durable=True)

# 关闭自动确认,设置prefetch_count(避免消费者过载)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='confirm_queue', on_message_callback=callback)

print("等待消息...")
channel.start_consuming()

关键参数说明

  • autoAck=False:关闭自动确认,需手动发送ACK。
  • basic_ack:确认消息处理成功,RabbitMQ将消息从队列中移除。
  • basicNackrequeue=True表示重新入队(由其他消费者处理),requeue=False表示丢弃或进入死信队列。
  • basicReject:与basicNack类似,但不支持批量拒绝。

2. 自动确认(不推荐)

autoAck=True(默认值),消费者收到消息后立即发送ACK,RabbitMQ立即移除消息。若消费者处理过程中发生异常,消息会丢失。
适用场景:测试环境或对可靠性要求极低的场景(如实时通知)。

四、补充:队列与消息持久化

为确保RabbitMQ宕机后消息不丢失,需配合队列持久化消息持久化

  1. 队列持久化:声明队列时设置durable=True(如channel.queueDeclare("confirm_queue", true, false, false, null))。
  2. 消息持久化:发送消息时设置deliveryMode=2(如MessageProperties.PERSISTENT_TEXT_PLAIN)。
    注意:仅开启队列持久化不够,消息仍可能因未刷盘而丢失,需同时设置消息持久化。

五、常见问题排查

  1. 消息未到达队列:检查生产者是否开启Confirm模式(channel.confirmSelect()),确认回调中是否有ack=false的情况(需重发或记录日志)。
  2. 消费者未确认:检查消费者autoAck是否为false,确认业务处理逻辑是否有异常(如数据库连接失败)。
  3. 性能瓶颈:异步确认模式下,未确认消息缓存过大可能导致内存溢出,需定期清理或扩容内存。

通过以上步骤,可在CentOS上实现RabbitMQ消息确认机制,确保消息从生产者到消费者的全链路可靠性。

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: RabbitMQ消息确认机制在CentOS上的应用
本文地址: https://pptw.com/jishu/732395.html
jellyfin在centos运行慢怎么办 CentOS RabbitMQ性能调优实战

游客 回复需填写必要信息