kafka和zookeeper集群部署

kafka和zookeeper集群部署

(集群的3台机器都需要如下操作)

cd /home/app

上传 kafkazookeeper的压缩包,并解压(安装方式很多)

部署zookeeper集群

cd /data/app/zookeeper**

mkdir logs data

修改配置文件

cd /data/app/zookeeper****/config

mv zoo_sample.cfg zoo.cfg

vim zoo.cfg

clientPort=2181
maxClientCnxns=0
tickTime=2000    
initLimit=5
syncLimit=2
dataDir=/data/app/zookeeper***/data 
dataLogDir=/data/app/zookeeper***/logs
server.1=***-1:2888:3888
server.2=***-2:2888:3888 
server.3=***-3:2888:3888

(***-1  是每太机器的hostname,如果没做映射就写服务器ip)

做映射如下

vim /etc/hosts添加以下内容:
ip ***-1
ip ***-2
ip ***-3
***换成机器实际的hostname

添加id

cd /data/app/zookeeper****/data

vim myid

第一台服务器输入1,第二台输入2,第三台输入3,要与配置文件server.1,server.2,server.3对应

保存

到此zookeeper部署完成,暂时可以不启动

 

部署kafka集群

cd /data/app/kafka****

mkdir kafka-logs

修改配置文件

cd /data/app/app/kafka***/config

vim server.properties

#每一个broker都需要一个标识符,使用broker.id来表示。它的默认值是0,也可以被设置成任意其它整数。这个值在整个kafka集群里必须是唯一的。这个值可以任意选定。建议把他们设置成语机器名具有相关性的整数,这样在维护时,将ID号映射到机器名就没有那么麻烦了。
broker.id=0  //三台机器要不一样 

#kafka把所有的消息都保存在磁盘上,存放这些日志片段的目录是通过log.dirs指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么Broker会根据
#“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker会往拥有最好数目分区的路径新增分区,而不是往用于最小磁盘空间的路径新增分区。

log.dirs=/data/app/kafka***/kafka-logs


listeners=PLAINTEXT://本机ip:9092

##如果有端口映射等情况,可以填写对应的访问ip
advertised.listeners=PLAINTEXT://本机ip:9092 

 

#num.partitions参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。
#该参数的默认值是1。注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区数个数少于num.partitions指定的值,需要手动创建该主题。
#kafka集群通过分区对主题进行横向扩展,所以当有新的Broker加入集群时,可以通过分区的个数来实现集群负载的均衡。为了能让分区分布到所有broker上,主题分区的个数必须要大于Broker的个数
#拥有大量消息的主题如果要进行负载均衡,就需要大量的分区。分区的大小限制在25GB以内可以得到比较理想的效果
#分区数量的估算可以通过主题每秒钟的吞吐量/消费者每秒钟的吞吐量来进行估算

num.partitions=4

#我们知道segment文件默认会被保留7天的时间,超时的话就

#会被清理,那么清理这件事情就需要有一些线程来做。这里就是

#用来设置恢复和清理data下数据的线程数量
#对以下3种情况,kafka会使用可配置的线程池来处理日志片段
#服务器正常启动,用于打开每个分区的日志片段;
#服务器崩溃后重启,用于检查和截短每个分区的日志片段;
#服务器正常关闭,用于关闭日志片段
#默认情况下,每个日志目录只使用一个线程,因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说
#一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个目录,也就是说该项配置为8,并且log.dirs
#指定了3个路径,那么总共需要24个线程

num.recovery.threads.per.data.dir=1


offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
zookeeper.connect=ip1:2181, ip2:2181, ip3:2181   ip1  ip2   ip3分别是zookeeper集群的三台服务器ip

 

#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000

#默认情况下,kafka会在如下情况下自动创建主题:
#当一个生产者开始往主题写入消息时
#当一个消费者开始从主题读取消息时
#当任意一个客户端向主题发送元数据请求时
#很多时候,这些行为都是非预期的,而且,根据kafka协议,如果一个主题不先被创建,根本无法知道它是否已经存在。如果显示的创建主题,不管是手动创建还是通过其它配置系统来创建
#都可以把auto.create.topics.enable设为false
auto.create.topics.enable=true

#segment文件保留的最长时间,默认保留7天(168小时),
#超时将被删除,也就是说7天之前的数据将被清理掉。
#Kafka可以根据时间来决定数据可以被保留多久。默认使用log.retention.hours参数来配置时间,默认是168小时。
#除此之外还有其它两个参数log.retention.minuteslog.retention.ms。使用log.retention.ms为宜
#如果指定了不知一个参数,Kafka会优先使用具有最小值的那个参数
#根据时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现的。一般来说,最后修改时间指的就是日志片段关闭的时间,也就是文件里最后一个消息的时间戳。
#不过如果使用管理工具在服务器间移动分区,最后修改时间就不准确了,时间误差可能会导致这些分区过多的保留数据
log.retention.hours=168
#log.retention.bytes,这是另一种决定日志片段是否被删除的方式。(这两种方式都是作用在日志片段上的,而不是作用在单个消息上)
#通过保留的消息的字节数来判断是否过期。它的值通过log.retention.bytes来指定,作用在每一个分区上,就是说这个值被设置为了1024,一个主题有8个分区,那么这个主题一共可以保存8K数据
#如果同时指定了log.retention.byteslog.retention.ms,只要任意一个条件满足,消息就会被删除。
log.retention.bytes=1024

#日志文件中每个segment的大小,默认为1G
#当消息到达broker时,它们被追加到分区的当前日志片段上,当日志片段大小达到log.segment.bytes指定的上限时,当前日志片段就会被关闭,一个新的日志片段被打开。
#如果一个新的日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。
#如果主题的消息量不大,那么怎么调整这个参数就非常重要了。
#使用时间戳获取偏移量:日志片段大小会影响使用时间戳获取便宜浪。在使用时间戳获取日志偏移量时,kafka会检查分区里最后修改时间大于指定时间戳的日志片段(已经被关闭而定),
#该日志片段的前一个文件的最后修改时间小于指定时间戳。然后kafka返回该日志片段(也就是文件名)开头的偏移量。对于使用时间戳获取偏移量的操作来说,日志片段越小,结果越准确。
log.segment.bytes=1073741824
#控制日志片段关闭的另一个参数是:log.segment.ms,它指定了多长时间之后日志片段就会被关闭。log.segment.byteslog.segment.bytes两个参数,哪个条件先达到就使用哪个条件关闭日志片段
#默认情况下log.segment.ms没有设定值,所以只根据大小来关闭日志片段。如果使用基于时间的日志片段,要着重考虑并行关闭多个日志片段对磁盘性能的影响。

#message.max.bytesbroker通过设置message.max.bytes参数来限制单个消息的大小,默认值是1000000,也就是1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,
#还会收到broker返回的错误信息。跟其它与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后消息大小小于message.max.bytes指定的值,消息的实际大小
#可以远大于这个值。这个值对性能有着显著的影响,值越大,那么复杂处理网络连接和请求得到线程就需要花越多的时间处理这些请求。它还会增加磁盘写入块的大小,从而影响IO吞吐量。
#在服务端和客户端之间协调消息大小的配置
#消费者客户端设置的fetch.message.max.bytes必须与服务器端设置的消息大小进行协调。如果这个值比message.max.bytes小,那么消费者就无法读取比较大的消息,导致出现消费者被
#阻塞的情况。在为集群里的broker配置replica.fetch.max.bytes参数时也需要遵守同样的规则。
message.max.bytes=1000000

#以上为主题的默认配置
#处理网络请求的线程数量,也就是接收消息的线程数。
#接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3

#消息从内存中写入磁盘是时候使用的线程数量。
#用来处理磁盘IO的线程数量
num.io.threads=8

#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小
socket.request.max.bytes=104857600

#滚动生成新的segment文件的最大时间
log.roll.hours=168

#上面的参数设置了每一个segment文件的大小是1G,那么
#就需要有一个东西去定期检查segment文件有没有达到1G
#多长时间去检查一次,就需要设置一个周期性检查文件大小
#的时间(单位是毫秒)。
log.retention.check.interval.ms=300000

#日志清理是否打开
log.cleaner.enable=true

#上面我们说过接收线程会将接收到的消息放到内存中,然后再从内存
#写到磁盘上,那么什么时候将消息从内存中写入磁盘,就有一个
#时间限制(时间阈值)和一个数量限制(数量阈值),这里设置的是
#数量阈值,下一个参数设置的则是时间阈值。
#partion buffer中,消息的条数达到阈值,将触发flush到磁盘。
log.flush.interval.messages=10000

#消息buffer的时间,达到阈值,将触发将消息从内存flush到磁盘,
#单位是毫秒。
log.flush.interval.ms=3000

#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:
#Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01

 

到此kafka集群配置完成了

 

启动zookeeper

/data/app/zookeeper***/bin/zkServer.sh start

启动所有kafka
nohup /data/app/kafka_***/bin/kafka-server-start.sh /data/app/kafka***/config/server.properties

 

测试部署结果

cd /data/app/kafka****/bin

./kafka-console-producer.sh --broker-list ip:9092 --topic test

set test 123456(set  key   value)

./kafka-console-consumer.sh --zookeeper ip:2181 --topic test --from-beginning

get test (get key)

ip都换成集群的三台服务器都试试