Kafka安装与基本使用

一、环境配置
操作系统:Win10
Zookeeper版本:apache-zookeeper-3.4.9
Kafka版本:kafka_2.11-0.10.1.1
JDK版本:1.8.0_121

二、操作过程
1、下载Kafka并解压
http://mirrors.cnnic.cn/apache/kafka/0.10.1.1/kafka_2.11-0.10.1.1.zip

2、Kafka目录介绍
/bin 操作kafka的可执行脚本,还包含windows下脚本
/config 配置文件所在目录
/libs 依赖库目录
/logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller

3、配置
配置zookeeper
进入kafka安装工程根目录编辑config/server.properties
kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect,kafka server端config/server.properties参数说明和解释如下:

4、启动/停止Kafka
启动
进入kafka目录,敲入命令 start bin/windows/kafka-server-start.bat config/server.properties
检测2181与9092端口

V:\program\service>netstat -ano | findstr "2181 9093"
  TCP0.0.0.0:2181   0.0.0.0:0  LISTENING   9416
  TCP0.0.0.0:9092   0.0.0.0:0  LISTENING   12112

说明:
Kafka的进程ID为12112,占用端口为9092
zookeeper实例,进程ID为9416,在2181端口监听

停止
kafka-server-stop
删除实例 \\DESKTOP-0AAM5SB\ROOT\CIMV2:Win32_Process.Handle=”11152″
实例删除成功。
删除实例 \\DESKTOP-0AAM5SB\ROOT\CIMV2:Win32_Process.Handle=”9228″
实例删除成功。

5、单机连通性测试
创建主题: kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
删除主题: kafka-topics.bat –delete –zookeeper localhost:2181 –topic test
如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
此时你若想真正删除它,可以如下操作:
(1)登录zookeeper客户端:命令:zkCli.cmd
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rmr /brokers/topics/test即可,此时topic被彻底删除。
另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/test,
如果你删除了此处的topic,那么marked for deletion 标记消失
zookeeper 的config中也有有关topic的信息: ls /config/topics/test

启动2个命令行,一个用于生产者发送消息,一个用于消费者接受消息。
运行producer,随机敲入几个字符,相当于把这个敲入的字符消息发送给队列。
kafka-console-producer.bat –broker-list localhost:9092 –topic test
运行consumer,可以看到刚才发送的消息列表。
kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test

上面的只是一个单个的broker,下面我们来实验一个多broker的集群。

6、搭建一个多个broker的伪集群
刚才只是启动了单个broker,现在启动有2个broker组成的集群,这些broker节点也都是在本机上。
(1)为每一个broker提供配置文件
我们先看看config/server0.properties配置信息:

broker.id=0
listeners=PLAINTEXT://:9093
port=9093
host.name=localhost
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.181:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete

说明:
broker.id为集群中唯一的标注一个节点,因为在同一个机器上,所以必须指定不同的端口和日志文件,避免数据被覆盖。
在上面单个broker的实验中,为什么kafka的端口为9092,这里可以看得很清楚。
kafka cluster怎么同zookeeper交互的,配置信息中也有体现。

那么下面,我们仿照上面的配置文件,提供另外1个broker的server1.properties配置信息:

broker.id=1
listeners=PLAINTEXT://:9094
port=9094
host.name=localhost
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs1
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.181:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete

(2)启动所有的broker

命令如下:

start bin/windows/kafka-server-start.bat config/server1.properties   #启动broker0
start bin/windows/kafka-server-start.bat config/server2.properties   #启动broker1

查看2181、9092、9093、9094端口

V:\program\service>netstat -ano | findstr "2181 9093 9094"
  TCP    0.0.0.0:2181           0.0.0.0:0              LISTENING       9416
  TCP    0.0.0.0:9093           0.0.0.0:0              LISTENING       12112
  TCP    0.0.0.0:9094           0.0.0.0:0              LISTENING       1316

一个zookeeper在2181端口上监听,2个kafka cluster(broker)分别在端口9093,9094监听。
(3)创建topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic创建情况:

kafka-topics.sh --list --zookeeper localhost:2181 test
V:\program\service>kafka-topics.bat --describe --zookeeper localhost:2181
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:2     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 4    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 5    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 6    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 7    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 8    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 9    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 10   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 11   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 12   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 13   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 14   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 15   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 16   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 17   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 18   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 19   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 20   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 21   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 22   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 23   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 24   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 25   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 26   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 27   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 28   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 29   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 30   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 31   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 32   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 33   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 34   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 35   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 36   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 37   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 38   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 39   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 40   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 41   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 42   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 43   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 44   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 45   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 46   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 47   Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: __consumer_offsets       Partition: 48   Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: __consumer_offsets       Partition: 49   Leader: 1       Replicas: 1,2   Isr: 1,2
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 2       Replicas: 2     Isr: 2

(4)模拟客户端发送,接受消息
发送消息

kafka-console-producer.bat --broker-list localhost:9093,localhost:9094 --topic test

接收消息

kafka-console-consumer.bat --bootstrap-server localhost:9093,localhost:9094 --topic test

需要注意,此时producer将topic发布到了2个broker中,现在就有点分布式的概念了。

(5) kill some broker

kill broker(id=0)

首先,我们根据前面的配置,得到broker(id=0)应该在9092监听,这样就能确定它的PID了。

broker0没kill之前topic在kafka cluster中的情况

kafka-topics.bat --describe --zookeeper localhost:2181
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_3	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 2,1,0

kill之后,再观察,做下对比。很明显,主要变化在于Isr,以后再分析

kafka-topics.bat --describe --zookeeper localhost:2181
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: -1	Replicas: 0	Isr: 
Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:

	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1
Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2
Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_3	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 2,1

测试下,发送消息,接受消息,是否收到影响。
发送消息

kafka-console-producer.bat --topic topic_1 --broker-list localhost:9093,localhost:9094

接收消息

kafka-console-consumer.bat --topic topic_1 --zookeeper localhost:2181 --from-beginning

可见,kafka的分布式机制,容错能力还是挺好的~

关于Zeno Chen

本人涉及的领域较多,杂而不精 程序设计语言: Perl, Java, PHP, Python; 数据库系统: MySQL,Oracle; 偶尔做做电路板的开发,主攻STM32单片机
此条目发表在NoSQL分类目录。将固定链接加入收藏夹。