springboot kafka 同一服务的多个实例,如何设置成不同的消费组?以实现对同一个topic的多次消费
问题描述:
场景如下:我的一个微服务A会在本地缓存一些业务配置数据,配置更新时由相关服务B发送一个变动消息。A收到消息更新本地缓存。那么问题来了,同一个服务的多个实例如何多次消费同一个topic消息
答
已经解决!通过redis setif 加代码动态配置groupID、不同实例获取不动groupID。启动的时候会配置kafka消费工厂ConsumerFactory 这个时候生成groupID
setIfAbsent()redis。如果失败就继续生成知道成功
package com.jieshun.open.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Description
* @Date 2021-7-23 16:04
* @Created by yyk
*/
@Slf4j
@Component
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String BROKERS;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean ENABLE_AUTO_COMMIT;
@Value("${spring.kafka.consumer.auto-commit-interval-ms}")
private String AUTO_COMMIT_INTERVAL_MS;
@Value("${spring.kafka.consumer.session-timeout-ms}")
private Integer SESSION_TIMEOUT_MS;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String AUTO_OFFSET_RESET;
@Value("${spring.kafka.consumer.group-id}")
private String GROUP_ID;
@Value("${spring.kafka.consumer.max-poll-records}")
private String MAX_POLL_RECORDS;
/**缓存名称前缀*/
private final String CACHE_GROUP_NAME_PREFIX = "jop:gateway:group:";
private String CURRENT_INSTANCE_GROUP_ID;
@Autowired
private StringRedisTemplate redisTemplate;
/** 线程池,为了实现分组名称续租服务*/
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
/**构建kafka监听工厂*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
/**通过redis限制获取的分组名称*/
public String getSerializeGroupId(Integer currValue){
String key = CACHE_GROUP_NAME_PREFIX.concat(currValue.toString());
boolean b = redisTemplate.opsForValue().setIfAbsent(key, currValue.toString());
if(b){
return GROUP_ID.concat(currValue.toString());
}else{
currValue++;
return getSerializeGroupId(currValue);
}
}
/**初始化消费工厂配置 其中会动态指定消费分组*/
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
/**多实例部署每个实例设置不同groupId 实现发布订阅*/
CURRENT_INSTANCE_GROUP_ID = getSerializeGroupId(0);
log.info("当前实例 group_id:{}",CURRENT_INSTANCE_GROUP_ID);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
return new DefaultKafkaConsumerFactory<String, String>(properties);
}
}