kafka的简略实用
kafka的简单实用
首先maven导入kafka的包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
先来Producter生产者:
然后就是消费者:
MySeri源码:
首先maven导入kafka的包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
先来Producter生产者:
public static void main(String[] args) { //kafka的配置 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); //key的序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化方式一般是json方式,这里我自己写了个对象序列化方式 props.put("value.serializer", "com.zfsy.syyx.kafka.MySeri"); Producer<String, RoleBean> producer = new KafkaProducer<String, RoleBean>(props); RoleBean bean = new RoleBean(); bean.setDm("test"); bean.setMc("测试2"); //topic为linlin ProducerRecord<String, RoleBean> record = new ProducerRecord<String, RoleBean>("linlin", "test03", bean); //发送到kafka客户端 Future<RecordMetadata> future = producer.send(record); try { System.out.println(future.get().partition()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
然后就是消费者:
//消费者的配置 Properties props2 = new Properties(); props2.put("bootstrap.servers", "localhost:9092"); props2.put("group.id", "test"); props2.put("enable.auto.commit", "false"); props2.put("auto.commit.interval.ms", "1000"); props2.put("session.timeout.ms", "30000"); //key的反序列化方式 props2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value的反序列化方式 props2.put("value.deserializer", "com.zfsy.syyx.kafka.MySeri"); Consumer<String, RoleBean> consumer = new KafkaConsumer<String, RoleBean>(props2); //topic为linlin consumer.subscribe(Lists.newArrayList("linlin")); //一直获取 while(true){ ConsumerRecords<String, RoleBean> recoreds = consumer.poll(1000); Iterator<ConsumerRecord<String, RoleBean>> iterator = recoreds.iterator(); while(iterator.hasNext()){ ConsumerRecord<String, RoleBean> record2 = iterator.next(); System.out.println(record2.value().getDm()); System.out.println(record2.value().getMc()); } } }
MySeri源码:
public class MySeri implements Deserializer<Object>, Serializer<Object> { @Override public void configure(Map<String, ?> configs, boolean isKey) { // TODO Auto-generated method stub } @Override public Object deserialize(String topic, byte[] data) { ByteArrayInputStream in = new ByteArrayInputStream(data); try { ObjectInputStream objectInputStream = new ObjectInputStream(in); return objectInputStream.readObject(); } catch (Exception e) { e.printStackTrace(); } return null; } @Override public void close() { // TODO Auto-generated method stub } @Override public byte[] serialize(String topic, Object data) { ByteArrayOutputStream out = new ByteArrayOutputStream(); try { ObjectOutputStream objectOutputStream = new ObjectOutputStream(out); objectOutputStream.writeObject(data); return out.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return null; }