kafka装配测试
kafka安装测试
kafka安装测试过程
kafka的性能在此不再赘述,百度一下很多,在此描述一下kafka的安装和测试过程:
-
安装kafka:
#tar -xzf kafka_2.9.2-0.8.1.tgz #cd kafka_2.9.2-0.8.1 #mv kafka_2.9.2-0.8.1 kafka
-
开启zookeeper服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
-
开启kafka服务:
bin/kafka-server-start.sh config/server.properties
-
创建话题topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
具体kafka-topics.sh 的参数自行查看--help帮助 -
查看kafka服务中的topics:
bin/kafka-topics.sh --list --zookeeper localhost:2181 #列出topics如下 test
在2.8之前的版本中的shell脚本可能不同 -
打开produce,向test话题添加消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test xxxxxxxxxxxxxxxxx #输入内容后enter即可发送出消息内容
-
打开customer读取test话题内容:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning xxxxxxxxxxxxxxxxx
kafka的是scala语言编写的服务框架,因此用scala开发produce和custome应用程序应该是非常方便的,但是没有找到相应examples,但kafka也支持java和python以及c编写的客户端应用程序,下面分享一下java的代码片段(网络转载): - 消费者custome:
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class ConsumerTest extends Thread { private final ConsumerConnector consumer; private final String topic; public static void main(String[] args) { ConsumerTest consumerThread = new ConsumerTest("1test"); consumerThread.start(); } public ConsumerTest(String topic) { consumer = kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", "master:2181"); props.put("group.id", "0"); props.put("zookeeper.session.timeout.ms", "400000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) System.out.println(new String(it.next().message())); } }
消息的生产者produce:import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class ProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "master:2181"); // zookeeper的一个节点地址 props.put("serializer.class", "kafka.serializer.StringEncoder");// kafka序列化方式 props.put("metadata.broker.list", "master:9092"); props.put("request.required.acks", "1"); //props.put("partitioner.class", "com.xq.SimplePartitioner"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); String msg ="this is a messageuuu! XXXmessDageuuu"; KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", msg); for(int i = 0 ; i < 5; i ++){ System.out.println("send"+i); producer.send(data); } producer.close(); } }
分别运行custom和produce即可看到控制台消息发送和接受的内容。 - 后续将继续更新kafka的各个参数的说明文档以及与spark集成,与flume集成。