在Ubuntu上搭建Kafka集群的步骤如下:
1. 环境准备
- 安装JDK:
sudo apt update sudo apt install openjdk-8-jdk java -version
- 下载并解压Kafka:
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.13-2.5.0.tgz tar -xvf kafka_2.13-2.5.0.tgz cd kafka_2.13-2.5.0
2. 配置Zookeeper
- 创建Zookeeper目录:
sudo mkdir -p /opt/zookeeper/data sudo mkdir -p /opt/zookeeper/log
- 修改Zookeeper配置文件:
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg nano /opt/zookeeper/conf/zoo.cfg
修改以下配置:dataDir=/opt/zookeeper/data dataLogDir=/opt/zookeeper/log clientPort=2181 server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888
- 启动Zookeeper:
bin/zookeeper-server-start.sh config/zoo.cfg
3. 配置Kafka
-
修改Kafka配置文件:
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties cp config/server.properties config/server-3.properties
修改以下配置:
broker.id
:每台机器不同listeners
:例如listeners=PLAINTEXT://192.168.1.100:9092
advertised.listeners
:例如advertised.listeners=PLAINTEXT://192.168.1.100:9092
zookeeper.connect
:例如zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
log.dirs
:例如/tmp/kafka-logs
num.network.threads
:例如3
num.io.threads
:例如8
socket.send.buffer.bytes
:例如1048576
socket.receive.buffer.bytes
:例如1048576
socket.request.max.bytes
:例如104857600
log.flush.interval.messages
:例如1000
log.flush.interval.ms
:例如1000
log.segment.bytes
:例如1073741824
log.retention.hours
:例如168
log.retention.check.interval.ms
:例如300000
log.cleaner.min.compaction.lag.ms
:例如10000
log.cleaner.max.compaction.lag.ms
:例如900000
log.cleaner.花椒.max.size
:例如10737418240
num.partitions
:例如1
default.replication.factor
:例如1
min.insync.replicas
:例如1
replica.lag.time.max.ms
:例如10000
replica.lag.max.messages
:例如100000
message.max.bytes
:例如1000000
replica.fetch.max.bytes
:例如1000000
fetch.min.bytes
:例如1048576
fetch.max.wait.ms
:例如100
max.partition.fetch.bytes
:例如1048576
num.recovery.threads.per.data.dir
:例如1
transaction.timeout.ms
:例如180000
transaction.state.log.replication.factor
:例如1
transaction.state.log.min.isr
:例如1
transaction.state.log.max.size.mb
:例如100
transactional.id.max.bytes
:例如900000000
auto.commit.interval.ms
:例如1000
auto.commit.delta.bytes
:例如1048576
group.initial.rebalance.delay.ms
:例如0
partition.assignment.strategy
:例如org.apache.kafka.common.cluster.assign.RangeAssignor
num.network.threads
:例如3
num.io.threads
:例如8
socket.send.buffer.bytes
:例如1048576
socket.receive.buffer.bytes
:例如1048576
socket.request.max.bytes
:例如104857600
log.flush.interval.messages
:例如1000
log.flush.interval.ms
:例如1000
log.segment.bytes
:例如1073741824
log.retention.hours
:例如168
log.retention.check.interval.ms
:例如300000
log.cleaner.min.compaction.lag.ms
:例如10000
log.cleaner.max.compaction.lag.ms
:例如900000
log.cleaner.花椒.max.size
:例如10737418240
num.partitions
:例如1
default.replication.factor
:例如1
min.insync.replicas
:例如1
replica.lag.time.max.ms
:例如10000
replica.lag.max.messages
:例如100000
message.max.bytes
:例如1000000
replica.fetch.max.bytes
:例如1000000
fetch.min.bytes
:例如1048576
fetch.max.wait.ms
:例如100
max.partition.fetch.bytes
:例如1048576
num.recovery.threads.per.data.dir
:例如1
transaction.timeout.ms
:例如180000
transaction.state.log.replication.factor
:例如1
transaction.state.log.min.isr
:例如1
transaction.state.log.max.size.mb
:例如100
transactional.id.max.bytes
:例如900000000
auto.commit.interval.ms
:例如1000
auto.commit.delta.bytes
:例如1048576
group.initial.rebalance.delay.ms
:例如0
partition.assignment.strategy
:例如org.apache.kafka.common.cluster.assign.RangeAssignor
-
启动Kafka:
bin/kafka-server-start.sh config/