kafka java 事例

kafka java 例子

下载kafka包链接地址:http://download.csdn.net/detail/qinyanbin123/8959049

1 、新建项目 需要导入以下包

kafka java 事例

2、建类  生产者


 

import java.util.Properties; 
   
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
   
public class TestProducer {   
     
        public static void main(String[] args) {   
            Properties props = new Properties();   
            props.setProperty("metadata.broker.list","10.XX.XX.XX:9092");   
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
            props.put("request.required.acks","1");   
            ProducerConfig config = new ProducerConfig(props);   
            Producer<String, String> producer = new Producer<String, String>(config);   
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");   
            try {   
                int i =1
                while(i < 1000){ 
                       
                    producer.send(data);   
                
            } catch (Exception e) {   
                e.printStackTrace();   
            }   
            producer.close();   
        }   
}

 

3、消费者

 

import java.util.HashMap; 
import java.util.List;   
import java.util.Map;   
import java.util.Properties;   
     
import kafka.consumer.ConsumerConfig;   
import kafka.consumer.ConsumerIterator;   
import kafka.consumer.KafkaStream;   
import kafka.javaapi.consumer.ConsumerConnector;  
   
public class TestConsumer extends Thread{   
        private final ConsumerConnector consumer;   
        private final String topic;   
     
        public static void main(String[] args) {   
            TestConsumer consumerThread = new TestConsumer("mykafka");   
            consumerThread.start();   
        }   
        public TestConsumer(String topic) {   
            consumer =kafka.consumer.Consumer   
                    .createJavaConsumerConnector(createConsumerConfig());   
            this.topic =topic;   
        }   
     
    private static ConsumerConfig createConsumerConfig() {   
        Properties props = new Properties();   
        props.put("zookeeper.connect","10.XX.XX.XX:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");   
        props.put("group.id", "0");   
        props.put("zookeeper.session.timeout.ms","10000");   
        return new ConsumerConfig(props);   
    }   
     
    public void run(){   
        Map<String,Integer> topickMap = new HashMap<String, Integer>();   
        topickMap.put(topic, 1);   
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   
        ConsumerIterator<byte[],byte[]> it =stream.iterator();   
        System.out.println("*********Results********");   
        while(true){   
            if(it.hasNext()){ 
                   
                System.err.println("get data:" +new String(it.next().message()));   
            
            try {   
                Thread.sleep(1000);   
            } catch (InterruptedException e) {   
                e.printStackTrace();   
            }   
        }   
    }   
}

 

4、分别启动生产者和消费者,在消费者输出中看到下日志即成功

 
?
1
2
3
4
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). 
log4j:WARN Please initialize the log4j system properly. 
*********Results******** 

 

5、启动生产者如果报错如下:

?
1
2
3
4
5
6
7
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). 
log4j:WARN Please initialize the log4j system properly. 
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90
    at kafka.producer.Producer.send(Producer.scala:76
    at kafka.javaapi.producer.Producer.send(Producer.scala:33
    at ProducerTest.main(TestProducer.java:21)


需要改动config文件夹下的server.properties中的以下两个属性

?
1
2
3
zookeeper.connect=localhost:2181改成zookeeper.connect=你的ip地址:2181  
以及默认注释掉的 
#host.name=localhost改成host.name=你的ip地址

 

版权声明:本文为博主原创文章,未经博主允许不得转载。