RabbitMQ消息持久化Ubuntu如何操作
导读:Ubuntu上实现RabbitMQ消息持久化的完整步骤 一 环境准备与安装 更新软件源并安装 RabbitMQ: sudo apt update sudo apt install rabbitmq-server 启动服务并设为开机自...
Ubuntu上实现RabbitMQ消息持久化的完整步骤
一 环境准备与安装
- 更新软件源并安装 RabbitMQ:
- sudo apt update
- sudo apt install rabbitmq-server
- 启动服务并设为开机自启:
- sudo systemctl start rabbitmq-server
- sudo systemctl enable rabbitmq-server
- 启用管理插件并开放管理端口(便于可视化验证):
- sudo rabbitmq-plugins enable rabbitmq_management
- sudo ufw allow 15672 # 如启用防火墙,按需开放管理界面端口
二 持久化的关键配置
- 队列与交换机需设置为持久化(durable),消息需设置投递模式为持久化(delivery_mode=2)。
- 生产者发布确认(Publisher Confirm)可进一步确保消息写入磁盘后再返回确认。
- 消费者建议关闭自动确认(auto_ack=False),在处理完成后手动确认,避免意外丢失。
示例(Python + Pika,含发布确认与手动确认):
- 安装依赖:pip install pika
- 生产者(持久化 + 发布确认)
- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’)) channel = connection.channel() channel.queue_declare(queue=‘task_queue’, durable=True) channel.confirm_delivery() # 开启发布确认 try: channel.basic_publish( exchange=‘’, routing_key=‘task_queue’, body=‘Hello, Persistent!’, properties=pika.BasicProperties(delivery_mode=2), ) print(" [x] Sent") except pika.exceptions.UnroutableError: print(" [x] Message was returned (nack)") connection.close()
- 消费者(手动确认)
- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’)) channel = connection.channel() channel.queue_declare(queue=‘task_queue’, durable=True) channel.basic_qos(prefetch_count=1) # 一次只处理一条,提升可靠性 def callback(ch, method, properties, body): print(f" [x] Received { body} ") ch.basic_ack(delivery_tag=method.delivery_tag) # 处理完成后手动确认 channel.basic_consume(queue=‘task_queue’, on_message_callback=callback, auto_ack=False) print(’ [*] Waiting for messages. To exit press CTRL+C’) channel.start_consuming()
三 命令行与运维操作
- 使用 rabbitmqadmin 创建持久化队列/交换机并绑定(便于无代码快速验证):
- 持久化队列:sudo rabbitmqadmin declare queue name=my_durable_queue durable=true
- 持久化交换机:sudo rabbitmqadmin declare exchange name=my_durable_exchange type=direct durable=true
- 绑定关系:sudo rabbitmqadmin declare binding source=my_durable_exchange destination=my_durable_queue routing_key=my_routing_key
- 查看队列状态:sudo rabbitmqctl list_queues name messages consumers
四 验证持久化
- 重启服务后检查队列与消息是否仍在:
- sudo systemctl restart rabbitmq-server
- 通过管理界面(http://服务器IP:15672)或命令行查看队列与待处理消息是否保留。
- 注意:仅设置队列/消息持久化并不能保证“零丢失”。在崩溃或磁盘压力等场景下仍可能丢失未确认消息;生产环境建议同时开启发布确认、合理设置 QoS 与重试机制。
五 Docker部署的持久化补充
- 使用数据卷持久化节点数据(避免容器重建后元数据丢失):
- docker run -d
–name rabbitmq-prod
-p 5672:5672
-p 15672:15672
-v /data/rabbitmq/data:/var/lib/rabbitmq
-v /data/rabbitmq/logs:/var/log/rabbitmq
-e RABBITMQ_DEFAULT_USER=admin
-e RABBITMQ_DEFAULT_PASS=StrongPassword@2025
rabbitmq:3.13-management
- docker run -d
- 集群场景可挂载各自数据卷并加入集群,确保节点间元数据与队列定义持久化。
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: RabbitMQ消息持久化Ubuntu如何操作
本文地址: https://pptw.com/jishu/757628.html
