SpringBoot:整合Kafka

helloworld

依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

配置文件application.properties

spring.kafka.bootstrap-servers=192.168.1.51:9092
spring.kafka.consumer.group-id=myGroup

测试:

@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaBootController {

    private static final String TOPIC = "wj";

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public KafkaBootController(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
     //消息发送
    @GetMapping("/send")
    public String send(){
        kafkaTemplate.send(TOPIC,"hello boot");
        return "success";
    }
    //消息监听
    @KafkaListener(topics = TOPIC)
    public void listener(String content){
        log.info(content);
    }
}

SpringBoot:整合Kafka

executeInTransaction事务

开启事务支持:application.properties

spring.kafka.producer.transaction-id-prefix=kafka_tx.
@GetMapping("/send/{input}")
public String send(@PathVariable String input){
    kafkaTemplate.executeInTransaction(t->{
        t.send(TOPIC,"hello boot");
        if("tx".equals(input)){
            throw new RuntimeException("异常");
        }
        t.send(TOPIC,"hello boot");
        return true;
    });
    return "success";
}

@KafkaListener(topics = TOPIC)
public void listener(String content){
    log.info(content);
}

访问:http://localhost:8080/kafka/send/tx

SpringBoot:整合Kafka

出现图中所示红框内容,则事务控制成功。

注解事务

@GetMapping("/send2/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public String send2(@PathVariable String input){
    kafkaTemplate.send(TOPIC,"hello boot");
    if("tx".equals(input)){
        throw new RuntimeException("异常");
    }
    kafkaTemplate.send(TOPIC,"hello boot");
    return "success";
}

访问:http://localhost:8080/kafka/send2/tx

SpringBoot:整合Kafka