KafKa单机、集群、控制台搭建


环境准备

java版本:1.8

操作系统:CentOS7

zookeeper:3.5.8

安装包:https://kafka.apache.org/downloads

我这里选择的是kafka_2.11-2.4.1版本的KafKa,2.11是scala的版本,2.4.1是kafka的版本

KafKa单机搭建

Kafka是用Scala语言开发,需要JVM环境,所以需要安装Java。

KafKa依赖zookeeper,所以需要安装zookeeper

对于zookeeper和Java环境的安装这里就不写了

Kafka下载

Kafka下载有多种方式,可以选择官网也可以选择镜像

## 镜像下载
wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz
## 解压
tar -zxvf kafka_2.11-2.4.1.tgz -C  /
## 软链接
ln -s /kafka_2.11-2.4.1/ /kafka
cd /kafka

配置修改

vim config/server.properties

这里需要说几个关键配置

属性 默认值 解释
broker.id 0 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可
log.dirs /tmp/kafka-logs kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行
listeners PLAINTEXT://:9092 server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connect localhost:2181 zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为hostname1:port1, hostname2:port2, hostname3:port3
log.retention.hours 168 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样
num.partitions 1 创建topic的默认分区数
default.replication.factor 1 自动创建topic的默认副本数量,建议设置为大于等于2
min.insync.replicas 1 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生
delete.topic.enable false 是否允许删除主题

简版:

#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.186.131:9092
#kafka的消息存储文件
log.dir=/usr/local/data/kafka‐logs
#kafka连接zookeeper的地址
zookeeper.connect=localhost:2181

启动服务

## -daemon 表示后台启动
bin/kafka-server-start.sh -daemon config/server.properties

后台启动日志会打印在logs目录的server.log

测试

主题创建一个名字为“test”的Topic,partition=1,备份因子=1

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

发送消息

bin/kafka-console-producer.sh --broker-list 192.168.186.131:9092 --topic test
## 命令中输入要发送的消息的内容

消费Topic为test的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.186.131:9092 --topic test

KafKa集群搭建

与单机搭建基本一致,主要改一下配置文件

我们有3个配置文件server.properties,server-1.properties,server-2.properties

server-1.properties:

#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.186.131:9093
log.dir=/data/kafka/kafka‐logs‐1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=localhost:2181

server-2.properties:

#broker.id属性在kafka集群中必须要是唯一
broker.id=2
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.186.131:9094
log.dir=/data/kafka/kafka‐logs‐2
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=localhost:2181

启动

bin/kafka‐server‐start.sh ‐daemon config/server‐1.properties
bin/kafka‐server‐start.sh ‐daemon config/server‐2.properties

Kafka客户端常用命令

创建一个新的topic,副本数设置为3,分区数设置为2

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-repl-topic

查看topic分区情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-repl-topic

发消息

bin/kafka-console-producer.sh --broker-list 192.168.186.131:9092,192.168.186.131:9093,192.168.186.131:9094 --topic my-repl-topic

消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.186.131:9092,192.168.186.131:9093,192.168.186.131:9094 --from-beginning --topic my-repl-topic

Kafka控制台搭建

Kafka官方是没有开发自己的控制台。yahoo构建了一个Kafka控制台管理后台(CMAK)。这个管理工具可以查看Kafka集群的当前运行状态包括broker的状态

项目地址:https://github.com/yahoo/kafka-manager

如果觉得编译过程太复杂,可以这里直接下载然后直接走安装后面的步骤

下载链接:

https://blog-dm.oss-cn-shanghai.aliyuncs.com/zip/cmak-3.0.0.5.zip (需要jdk11,否则报错)

jdk11没有安装的可以使用老版本的kafka-manager,以下链接为kafka-manager1.3.3.7的版本,测试kafka_2.11-2.4.1可正常使用

https://blog-dm.oss-cn-shanghai.aliyuncs.com/zip/kafka-manager-1.3.3.7.zip (jdk8版本可用)

下载管理工具CMAK

## 下载
wget https://github.com/yahoo/kafka-manager/archive/3.0.0.5.zip
## 解压
unzip CMAK-3.0.0.5.zip -d /

sbt编译

yum安装sbt:

这个后台管理工具和Kafka一样是scala语言编写需要编译

curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
mv bintray-sbt-rpm.repo /etc/yum.repos.d/
yum install sbt

修改仓库地址:(sbt 默认下载库文件很慢)填上阿里云的镜像

 vim ~/.sbt/repositories

文件内容:

[repositories]
  local
  aliyun-nexus: https://maven.aliyun.com/nexus/content/groups/public/
  jcenter: https://jcenter.bintray.com/
  typesafe-ivy-releases: https://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
  maven-central

验证sbt安装成功

sbt -version

编译CMAK(前身叫Kafka-manager)

## 来到刚刚解压目录下执行
./sbt clean dist
## 编译如果还是不行报Download failed. Obtain the jar manually and place it at /root/.sbt/launchers/0.13.9/sbt-launch.jar
## 可以修改解压目录下sbt编译文件下的http://repo.typesafe.com为https://repo.typesafe.com

安装

## 重新解压编译后的代码
unzip cmak-3.0.0.5.zip -d /
ln -s /cmak-3.0.0.5/ /cmak
cd /cmak
## 修改配置文件
vim conf/application.conf

修改kafka-manager.zkhosts列表为自己的zk节点

cmak.zkhosts="192.168.186.131:2181"  (3.0.0.5 - jdk11)
cmak.zkhosts="192.168.186.131:2181" (1.3.3.7 - jdk8)

启动

后台启动,日志存储nohup.out,默认端口9000,-Dhttp.port=9999可修改端口

nohup bin/cmak -Dconfig.file=conf/application.conf &   (3.0.0.5 - jdk11)
nohup bin/kafka-manager -Dconfig.file=conf/application.conf &   (1.3.3.7 - jdk8)

打开:ip:9000

添加一个集群

主界面:

Broker:

Topic列表:

Topic详情:


文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 上一篇
Kafka生产实践与性能优化 Kafka生产实践与性能优化
Kafka如何保证消息不丢失首先要想保证全链路的消息不丢失,要从生产端和消费端来考虑 生产端控制参数ack的设置 acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最
2024-03-05
下一篇 
Kafka基本使用及设计原理 Kafka基本使用及设计原理
背景Kafka是一个分布式基于Zookeeper的分布式消息系统,支持多分区,多副本。是由Scala语言开发,现在是Apache基金会顶级开源项目。它有着高吞吐、低延迟的特性。这种特性主要应用于大数据场景,日志收集场景。不适合一些复杂的业务
2024-02-01
  目录