117.info
人生若只如初见

Ubuntu Kafka集群搭建步骤

在Ubuntu上搭建Kafka集群的步骤如下:

1. 环境准备

  • 安装JDK
    sudo apt update
    sudo apt install openjdk-8-jdk
    java -version
    
  • 下载并解压Kafka
    wget https://downloads.apache.org/kafka/2.5.0/kafka_2.13-2.5.0.tgz
    tar -xvf kafka_2.13-2.5.0.tgz
    cd kafka_2.13-2.5.0
    

2. 配置Zookeeper

  • 创建Zookeeper目录
    sudo mkdir -p /opt/zookeeper/data
    sudo mkdir -p /opt/zookeeper/log
    
  • 修改Zookeeper配置文件
    cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
    nano /opt/zookeeper/conf/zoo.cfg
    
    修改以下配置:
    dataDir=/opt/zookeeper/data
    dataLogDir=/opt/zookeeper/log
    clientPort=2181
    server.1=zookeeper1:2888:3888
    server.2=zookeeper2:2888:3888
    server.3=zookeeper3:2888:3888
    
  • 启动Zookeeper
    bin/zookeeper-server-start.sh config/zoo.cfg
    

3. 配置Kafka

  • 修改Kafka配置文件

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    cp config/server.properties config/server-3.properties
    

    修改以下配置:

    • broker.id:每台机器不同
    • listeners:例如 listeners=PLAINTEXT://192.168.1.100:9092
    • advertised.listeners:例如 advertised.listeners=PLAINTEXT://192.168.1.100:9092
    • zookeeper.connect:例如 zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
    • log.dirs:例如 /tmp/kafka-logs
    • num.network.threads:例如 3
    • num.io.threads:例如 8
    • socket.send.buffer.bytes:例如 1048576
    • socket.receive.buffer.bytes:例如 1048576
    • socket.request.max.bytes:例如 104857600
    • log.flush.interval.messages:例如 1000
    • log.flush.interval.ms:例如 1000
    • log.segment.bytes:例如 1073741824
    • log.retention.hours:例如 168
    • log.retention.check.interval.ms:例如 300000
    • log.cleaner.min.compaction.lag.ms:例如 10000
    • log.cleaner.max.compaction.lag.ms:例如 900000
    • log.cleaner.花椒.max.size:例如 10737418240
    • num.partitions:例如 1
    • default.replication.factor:例如 1
    • min.insync.replicas:例如 1
    • replica.lag.time.max.ms:例如 10000
    • replica.lag.max.messages:例如 100000
    • message.max.bytes:例如 1000000
    • replica.fetch.max.bytes:例如 1000000
    • fetch.min.bytes:例如 1048576
    • fetch.max.wait.ms:例如 100
    • max.partition.fetch.bytes:例如 1048576
    • num.recovery.threads.per.data.dir:例如 1
    • transaction.timeout.ms:例如 180000
    • transaction.state.log.replication.factor:例如 1
    • transaction.state.log.min.isr:例如 1
    • transaction.state.log.max.size.mb:例如 100
    • transactional.id.max.bytes:例如 900000000
    • auto.commit.interval.ms:例如 1000
    • auto.commit.delta.bytes:例如 1048576
    • group.initial.rebalance.delay.ms:例如 0
    • partition.assignment.strategy:例如 org.apache.kafka.common.cluster.assign.RangeAssignor
    • num.network.threads:例如 3
    • num.io.threads:例如 8
    • socket.send.buffer.bytes:例如 1048576
    • socket.receive.buffer.bytes:例如 1048576
    • socket.request.max.bytes:例如 104857600
    • log.flush.interval.messages:例如 1000
    • log.flush.interval.ms:例如 1000
    • log.segment.bytes:例如 1073741824
    • log.retention.hours:例如 168
    • log.retention.check.interval.ms:例如 300000
    • log.cleaner.min.compaction.lag.ms:例如 10000
    • log.cleaner.max.compaction.lag.ms:例如 900000
    • log.cleaner.花椒.max.size:例如 10737418240
    • num.partitions:例如 1
    • default.replication.factor:例如 1
    • min.insync.replicas:例如 1
    • replica.lag.time.max.ms:例如 10000
    • replica.lag.max.messages:例如 100000
    • message.max.bytes:例如 1000000
    • replica.fetch.max.bytes:例如 1000000
    • fetch.min.bytes:例如 1048576
    • fetch.max.wait.ms:例如 100
    • max.partition.fetch.bytes:例如 1048576
    • num.recovery.threads.per.data.dir:例如 1
    • transaction.timeout.ms:例如 180000
    • transaction.state.log.replication.factor:例如 1
    • transaction.state.log.min.isr:例如 1
    • transaction.state.log.max.size.mb:例如 100
    • transactional.id.max.bytes:例如 900000000
    • auto.commit.interval.ms:例如 1000
    • auto.commit.delta.bytes:例如 1048576
    • group.initial.rebalance.delay.ms:例如 0
    • partition.assignment.strategy:例如 org.apache.kafka.common.cluster.assign.RangeAssignor
  • 启动Kafka

    bin/kafka-server-start.sh config/

未经允许不得转载 » 本文链接:https://www.117.info/ask/fec20AzsKCQBeB1w.html

推荐文章

  • Ubuntu Tomcat日志中如何定位问题

    在Ubuntu系统中,Tomcat的日志文件通常位于/var/log/tomcat或者/opt/tomcat/logs目录下。要定位问题,你可以使用以下几种方法: 查看日志文件:
    使用文本编...

  • ubuntu如何优化php-fpm的响应时间

    要优化 Ubuntu 上 PHP-FPM 的响应时间,您可以采取以下措施: 选择合适的 PHP 版本:确保您正在使用最新的 PHP 版本,因为它通常包含性能改进和错误修复。 使用 ...

  • ubuntu镜像更新后如何同步数据

    在Ubuntu镜像更新后,同步数据的方法取决于你的具体需求和环境。以下是几种常见的方法:
    使用NFS、GlusterFS、Ceph、Lustre等分布式文件系统进行数据同步 N...

  • 如何在Ubuntu上使用FileZilla断点续传

    FileZilla本身不支持断点续传功能,但你可以通过使用rsync命令来实现这一需求。以下是具体步骤: 打开终端(Terminal)。
    使用rsync命令进行断点续传: rsy...

  • Node.js在CentOS上如何安装

    在CentOS上安装Node.js有多种方法,以下是几种常见的安装方式:
    1. 使用NodeSource存储库安装
    这是最推荐的方法,因为它可以轻松安装最新版本的Node.j...

  • Ubuntu vsftp如何配置虚拟用户

    在Ubuntu上配置vsftpd以使用虚拟用户需要以下几个步骤: 安装vsftpd:
    在终端中运行以下命令来安装vsftpd:
    sudo apt-get update
    sudo apt-get i...

  • Linux backlog如何优化性能

    在Linux系统中,backlog是指监听套接字的未完成连接队列,它的大小决定了系统能够容纳多少个未完成的连接请求。优化backlog参数可以帮助提高服务器的性能和稳定性...

  • Ubuntu SELinux状态查看

    在 Ubuntu 系统中,SELinux(Security-Enhanced Linux)的默认状态通常是“disabled”。不过,如果你的系统上安装了 SELinux,你可以通过以下命令来检查其状态:...