Kafka安装步骤 基本概念 采用伪集群的方式部署Kafka集群 后台启动内置的zookeeper 关闭zookeeper 开启kafk集群(伪集群),shell脚本 关闭kafk集群(伪集群),shell脚本 查看当前服务器中的所有 topic 创建 topic 删除 topic(需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除) 发送消息 消费消息(默认当前,不是从头开始) 消费消息,从头开始 查看某个 Topic 的详情 修改分区数 自带压测命令 安装Kafka Eagle监控Kafka集群

1.Producer:消息生产者,就是向 kafka broker 发消息的客户端
2.Consumer:消息消费者,向 kafka broker 取消息的客户端
3.Consumer Group(CG ):消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据, 一个分区只能由一个 组内 消费者消费; 消费者组之间互不影响。 所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者。
4.Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
5.Topic:可以理解为一个队列, 生产者和消费者面向的都是一个 topic
6.Partition:为了实现扩展性, 一个非常大的 topic 可以分布到多个 broker (即服务器) 上,一个 topic  可以分为多个 partition,每个 partition 是一个有序的队列;
7.Replica: 副本, 为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作, kafka 提供了副本机制, 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
8.leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
9.follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

采用伪集群的方式部署Kafka集群

# 下载软件
cd /usr/local/src
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xvf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0

# 解压缩后进入目录中
# 方法1:使用同一个程序,但是采用不同的配置文件
# 方法2:复制两份目录出来,使用不同的目录区分
# 此处采用的是方法1,在后续步骤采用的是方法2

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties 

# log.dir表示的是队列数据存储的路径,根据实际情况修改

config/server.properties: 
    broker.id=0 
    listeners=PLAINTEXT://:9092
    log.dir=/data/kafka-logs-0
    delete.topic.enable=true #删除 topic 功能
    
config/server-1.properties: 
    broker.id=1 
    listeners=PLAINTEXT://:9093 
    log.dir=/data/kafka-logs-1
    delete.topic.enable=true #删除 topic 功能

config/server-2.properties: 
    broker.id=2 
    listeners=PLAINTEXT://:9094 
    log.dir=/data/kafka-logs-2
    delete.topic.enable=true

后台启动内置的zookeeper

# 测试可以使用Kafka自带的zookeeper。生产上建议使用单独的zookeeper集群
# 若是搭建集群的话,方法同Kafka
# 此处先启动一个zookeeper,后续采用同Kafka的方法2搭建集群使用
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

关闭zookeeper

bin/zookeeper-server-stop.sh stop

开启kafk集群(伪集群),shell脚本

bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

关闭kafk集群(伪集群),shell脚本

bin/zookeeper-server-stop.sh stop

查看当前服务器中的所有 topic

bin/kafka-topics.sh --list --zookeeper localhost:2181 

创建 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic first

删除 topic(需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除)

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic first

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first

消费消息(默认当前,不是从头开始)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first

消费消息,从头开始

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic second --from-beginning

查看某个 Topic 的详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic first

修改分区数

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic first --partitions 6

自带压测命令

bin/kafka-producer-perf-test.sh --topic first --num-records 1000 --record-size 100 --throughput 1000  --producer-props bootstrap.servers=localhost:9092

安装Kafka Eagle监控Kafka集群

1.修改 kafka-server-start.sh 命令
因为使用的是伪集群,所以,需要把目录复制三份,每个目录下该文件的JMX_PORT值都不一样才行

正好利用这种情况,每个目录下都可以启动自带的zookeeper,修改端口号,从而构建一个zookeeper伪集群

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

2.下载Kafka Eagle软件

cd /usr/local/src/
wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.3.9.tar.gz
tar -zxvf v1.3.9.tar.gz
cd kafka-eagle-bin-1.3.9/
cp kafka-eagle-web-1.3.9-bin.tar.gz ../
tar -zxvf kafka-eagle-web-1.3.9-bin.tar.gz 
cd kafka-eagle-web-1.3.9/conf
vim system-config.properties

3.设置
若是使用zookeeper伪集群,做相应的增加

vim system-config.properties

kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181

cluster1.kafka.eagle.offset.storage=kafka

kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false

# 使用sqlite数据库,注意保存路径,没有的话需要新建,可以换成MySQL
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org

4.安装maven

yum install maven

5.配置环境变量

vim /etc/profile.d/ke.sh
export KE_HOME=/usr/local/src/kafka-eagle-web-1.3.9
export PATH=$PATH:$KE_HOME/bin

vim /etc/profile.d/java.sh
export JAVA_HOME=//usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64
export JRE_HOME=$JAVA_HOME/jre
export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

source /etc/profile.d/ke.sh
soucer /etc/profile.d/ke.sh

6.启动

cd bin
bash ke.sh start.sh

*******************************************************************
* Kafka Eagle system monitor port successful... 
*******************************************************************
[2019-09-10 16:41:46] INFO: Status Code[0]
[2019-09-10 16:41:46] INFO: [Job done!]
Welcome to
    __ __    ___     ____    __ __    ___            ______    ___    ______    __     ______
   / //_/   /   |   / __/   / //_/   /   |          / ____/   /   |  / ____/   / /    / ____/
  / ,<     / /| |  / /_    / ,<     / /| |         / __/     / /| | / / __    / /    / __/   
 / /| |   / ___ | / __/   / /| |   / ___ |        / /___    / ___ |/ /_/ /   / /___ / /___   
/_/ |_|  /_/  |_|/_/     /_/ |_|  /_/  |_|       /_____/   /_/  |_|\____/   /_____//_____/   
                                                                                             

Version 1.3.9
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

提示的访问地址是http://127.0.0.1:8048/ke,但是可以用http://192.168.0.145/ke进行访问