Kafka消息队列Linux如何实现
一、安装Java环境(Kafka前置依赖)
Kafka基于Java开发,需先安装Java运行环境(JRE或JDK)。推荐使用OpenJDK 11及以上版本(兼容性更好):
- Ubuntu/Debian系统:执行
sudo apt update & & sudo apt install openjdk-11-jdk -y
; - CentOS/RHEL系统:执行
sudo yum install java-11-openjdk -y
。
安装完成后,通过java -version
验证是否成功(需显示Java版本信息)。
二、下载并解压Kafka
从Apache Kafka官网下载最新稳定版本(如3.6.1),使用wget
命令获取安装包,再用tar
解压到指定目录(如/opt/kafka
):
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xzf kafka_2.13-3.6.1.tgz
mv kafka_2.13-3.6.1 /opt/kafka # 可选:移动到目标目录
建议将Kafka目录加入系统环境变量(编辑/etc/profile
,添加export KAFKA_HOME=/opt/kafka
和export PATH=$PATH:$KAFKA_HOME/bin
,然后执行source /etc/profile
),方便后续命令调用。
三、配置Kafka(修改server.properties)
进入Kafka配置目录($KAFKA_HOME/config
),编辑server.properties
文件,调整以下关键参数:
- broker.id:集群中Broker的唯一标识(如
broker.id=0
); - listeners:Broker监听的地址和端口(如
listeners=PLAINTEXT://your_server_ip:9092
,需替换为实际IP); - log.dirs:日志存储目录(如
log.dirs=/data/kafka/logs
,需提前创建目录并赋予权限mkdir -p /data/kafka/logs & & chown -R $USER:$USER /data/kafka/logs
); - zookeeper.connect:ZooKeeper集群地址(如
zookeeper.connect=localhost:2181
,单节点ZooKeeper用此配置;集群需列出所有节点,如host1:2181,host2:2181,host3:2181
)。
四、启动ZooKeeper(Kafka依赖的协调服务)
Kafka通过ZooKeeper管理集群元数据(如Topic、分区信息),需先启动ZooKeeper:
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
默认情况下,ZooKeeper会在2181
端口启动。可通过netstat -an | grep 2181
验证是否运行正常。
五、启动Kafka服务器
在另一个终端窗口中,启动Kafka服务(后台运行模式,避免占用当前终端):
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
启动后,可通过netstat -an | grep 9092
验证Kafka是否在9092
端口监听(客户端通信端口)。
六、创建Topic(消息存储的主题)
使用Kafka命令行工具创建Topic,指定Topic名称、分区数(partitions
,决定并行处理能力)和副本因子(replication-factor
,数据冗余备份数量,单节点集群设为1):
$KAFKA_HOME/bin/kafka-topics.sh --create \
--topic my_topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
创建成功后,可通过$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
查看所有Topic。
七、测试Kafka功能(发送/接收消息)
- 发送消息:启动控制台生产者,向指定Topic发送消息(输入消息后按回车键):
$KAFKA_HOME/bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092
- 接收消息:启动控制台消费者,从指定Topic读取消息(
--from-beginning
表示从最早的消息开始读取):$KAFKA_HOME/bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092
在生产者终端输入消息(如Hello Kafka
),消费者终端会同步显示该消息,验证Kafka消息收发功能正常。
八、停止Kafka服务(可选)
测试完成后,可通过以下命令停止服务:
- 停止消费者:在消费者终端按
Ctrl+C
; - 停止生产者:在生产者终端按
Ctrl+C
; - 停止Kafka服务器:执行
$KAFKA_HOME/bin/kafka-server-stop.sh
; - 停止ZooKeeper:执行
$KAFKA_HOME/bin/zookeeper-server-stop.sh
。
注意事项:
- 生产环境中,需配置多节点ZooKeeper集群(提高可靠性)、调整Kafka内存参数(如
log.retention.hours
控制日志保留时间)、启用数据加密(SSL/TLS)和认证机制(SASL)以提升安全性; - 若不想手动安装ZooKeeper,可使用Kafka KRaft模式(无需ZooKeeper,自3.0版本支持),简化部署流程。
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: Kafka消息队列Linux如何实现
本文地址: https://pptw.com/jishu/718190.html