首页主机资讯RabbitMQ消息路由Linux如何实现

RabbitMQ消息路由Linux如何实现

时间2025-12-05 17:32:04发布访客分类主机资讯浏览944
导读:Linux上实现RabbitMQ消息路由的完整步骤 一 环境准备与安装 准备:选择较新的发行版(如 Ubuntu 20.04/22.04、CentOS 7/8、RHEL 8),生产建议内存≥4GB、/var 预留≥10GB、内核≥3.10...

Linux上实现RabbitMQ消息路由的完整步骤

一 环境准备与安装

  • 准备:选择较新的发行版(如 Ubuntu 20.04/22.04CentOS 7/8RHEL 8),生产建议内存≥4GB、/var 预留≥10GB、内核≥3.10+。安装依赖与 Erlang(RabbitMQ 3.12+ 要求 Erlang 25.x)。
  • Ubuntu/Debian:
    • 安装 Erlang:
      curl -fsSL https://packages.erlang-solutions.com/erlang-solutions_2.0_all.deb -o erlang.deb
      sudo dpkg -i erlang.deb & & sudo apt update & & sudo apt install -y esl-erlang
    • 安装 RabbitMQ(使用官方仓库脚本):
      curl -1sLf ‘https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.deb.sh’ | sudo -E bash
      sudo apt install -y rabbitmq-server
  • CentOS/RHEL:
    • 安装 Erlang:
      yum install -y https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
      yum install -y esl-erlang
    • 安装 RabbitMQ(使用官方仓库脚本):
      curl -1sLf ‘https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.rpm.sh’ | sudo -E bash
      yum install -y rabbitmq-server
  • 启动与开机自启:
    sudo systemctl start rabbitmq-server & & sudo systemctl enable rabbitmq-server
    sudo systemctl status rabbitmq-server(应显示 active (running))
  • 管理插件与访问:
    sudo rabbitmq-plugins enable rabbitmq_management
    访问 http://服务器IP:15672,默认账号 guest/guest(默认仅本地访问)。

二 核心概念与路由类型

  • 核心概念:消息先发送到 Exchange(交换机),通过 Binding(绑定)Queue(队列) 关联;投递依据 Routing Key(路由键) 与绑定规则匹配。
  • 路由类型与匹配规则:
    • Direct:精确匹配 Routing Key = Binding Key
    • Fanout:忽略路由键,广播到所有绑定队列。
    • Topic:按模式匹配路由键,支持通配符 “*”(一个单词)“#”(零个或多个单词)
    • Headers:按消息头部属性匹配(较少使用)。

三 命令行快速配置路由

  • 创建 vhost 与用户并赋权(示例:vhost=/myvhost,用户=admin):
    sudo rabbitmqctl add_vhost /myvhost
    sudo rabbitmqctl add_user admin StrongPass!
    sudo rabbitmqctl set_permissions -p /myvhost admin “." ".” “.*”
    sudo rabbitmqctl set_user_tags admin administrator
  • 声明交换机、队列与绑定(示例:交换机 logs,队列 q.infoq.error,路由键 infoerror):
    sudo rabbitmqadmin declare exchange name=logs type=direct durable=true
    sudo rabbitmqadmin declare queue name=q.info durable=true
    sudo rabbitmqadmin declare queue name=q.error durable=true
    sudo rabbitmqadmin declare binding source=logs destination=q.info routing_key=info
    sudo rabbitmqadmin declare binding source=logs destination=q.error routing_key=error
  • 说明:也可在管理界面(15672)完成上述对象与绑定创建。

四 代码示例 Python实现直连与主题路由

  • 直连路由 Direct(按级别投递到不同队列)
    • 生产者 send_direct.py: import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’))) ch = conn.channel() ch.exchange_declare(exchange=‘logs’, exchange_type=‘direct’, durable=True) for level in (‘info’, ‘error’, ‘debug’): ch.basic_publish(exchange=‘logs’, routing_key=level, body=f’[{ level} ] Hello Direct’) conn.close()
    • 消费者 recv_direct.py(消费 error 队列): import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’))) ch = conn.channel() ch.exchange_declare(exchange=‘logs’, exchange_type=‘direct’, durable=True) q = ch.queue_declare(queue=‘q.error’, durable=True) ch.queue_bind(exchange=‘logs’, queue=q.method.queue, routing_key=‘error’) def cb(ch, method, props, body): print(f" [x] { method.routing_key} => { body.decode()} ") ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_consume(queue=q.method.queue, on_message_callback=cb, auto_ack=False) print(’ [*] Waiting for error logs…') ch.start_consuming()
  • 主题路由 Topic(按模式匹配)
    • 生产者 send_topic.py: import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’))) ch = conn.channel() ch.exchange_declare(exchange=‘topic_logs’, exchange_type=‘topic’, durable=True) keys = [‘app.error’, ‘app.warning’, ‘audit.info’, ‘audit.security’] for k in keys: ch.basic_publish(exchange=‘topic_logs’, routing_key=k, body=f’[{ k} ] Topic message’) conn.close()
    • 消费者 recv_topic.py(接收 app.* 与 audit.#): import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘admin’))) ch = conn.channel() ch.exchange_declare(exchange=‘topic_logs’, exchange_type=‘topic’, durable=True) q = ch.queue_declare(queue=‘’, exclusive=True) for b in (‘app.‘, ‘audit.#’): ch.queue_bind(exchange=‘topic_logs’, queue=q.method.queue, routing_key=b) def cb(ch, method, props, body): print(f" [x] { method.routing_key} => { body.decode()} ") ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_consume(queue=q.method.queue, on_message_callback=cb, auto_ack=False) print(’ [] Waiting for topic logs…’) ch.start_consuming()
  • 运行要点:先启动消费者,再运行生产者;Topic 中 “*” 匹配一个单词、“#” 匹配零个或多个单词。

五 生产环境与安全加固

  • 远程访问与账户:默认 guest/guest 仅本地可用;创建专用管理员并赋权,必要时在配置中调整 loopback_users(示例见下)。
  • 配置文件与目录(示例):
    • /etc/rabbitmq/rabbitmq.config(允许远程管理员登录):
      [
      { rabbit, [
      { loopback_users, []}
      ]}
      ].
    • /etc/rabbitmq/rabbitmq-env.conf(数据与日志目录):
      RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia
      RABBITMQ_LOG_BASE=/data/rabbitmq/log
    • 目录与权限:
      mkdir -pv /data/rabbitmq/{ mnesia,log}
      chown -R rabbitmq:rabbitmq /data/rabbitmq
  • 防火墙:开放 5672(AMQP)、15672(管理界面)
    • Ubuntu/Debian:sudo ufw allow 5672/tcp & & sudo ufw allow 15672/tcp
  • 资源与高可用:
    • 资源阈值(/etc/rabbitmq/rabbitmq.conf):
      vm_memory_high_watermark.relative = 0.6
      disk_free_limit.absolute = 2GB
    • 建议:开启镜像队列/仲裁队列、监控磁盘与内存、按业务设置队列与消息 TTL、开启确认机制与重试策略。

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: RabbitMQ消息路由Linux如何实现
本文地址: https://pptw.com/jishu/764913.html
Debian Exploit攻击有哪些案例 Debian系统Telnet连接速度慢怎么办

游客 回复需填写必要信息