spring boot自动配置方式整合

spring boot具有许多自动化配置,对于kafka的自动化配置当然也包含在内,基于spring boot自动配置方式整合kafka,需要做以下步骤。

1. 引入kafka的pom依赖包


<!-- https://mvnrepository/artifact/org.springframework.kafka/spring-kafka -->

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

<version>2.2.2.RELEASE</version>

</dependency>

2. 在配置文件中配置kafka相关属性配置,分别配置生产者和消费者的属性,在程序启动时,spring boot框架会自动读取这些配置的属性,创建相关的生产者、消费者等。下面展示一个简单的配置。 


#kafka默认消费者配置

spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092

spring.kafka.consumer.enable-auto-commit=false

spring.kafka.consumer.auto-offset-reset=earliest

#kafka默认生产者配置

spring.kafka.producer.bootstrap-servers=192.168.0.15:9092

spring.kafka.producer.acks=-1

spring.kafka.client-id=kafka-producer

spring.kafka.producer.batch-size=5

当然,在实际生产中的配置肯定比上面的配置复杂,需要一些定制化的操作,那么spring boot的自动化配置创建的生产者或者消费者都不能满足我们时,应该需要自定义化相关配置,这个在后续举例,这里先分析自动化配置。

在进行了如上配置之后,需要生产者时,使用方式为下代码所示。


@RunWith(SpringRunner.class)

@SpringBootTest(classes = {UserSSOApplication.class})

public class UserSSOApplicationTests {


@Resource

//注入kafkatemplete,这个由spring boot自动创建

KafkaTemplate kafkaTemplate;

@Test

public void testKafkaSendMsg() {

//发送消息

kafkaTemplate.send("test", 0,12,"1222");

}


}

消费者的使用注解方式整合,代码如下。 


@Component

@Slf4j

public class KafkaMessageReceiver2 {

//指定监听的topic,当前消费者组id

@KafkaListener(topics = {"test"}, groupId = "receiver")

public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) {

log.info(integerStringConsumerRecords.value());

}



}

上面是最简单的配置,实现的一个简单例子,如果需要更加定制化的配置,可以参考类
org.springframework.boot.autoconfigure.kafka.KafkaProperties这里面包含了大部分需要的kafka配置。针对配置,在properties文件中添加即可。


spring boot自动配置的不足


上面是依赖spring boot自动化配置完成的整合方式,实际上所有的配置实现都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成。可以看出个是依赖于@Configuration完成bean配置,这种配置方式基本能够实现大部分情况,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。
但是这种方式还有一个问题,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并没有涵盖所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,这就导致某些特殊配置不能依赖spring boot自动创建,需要我们手动创建Poducer和comsumer。


@Configuration

@ConditionalOnClass(KafkaTemplate.class)

@EnableConfigurationProperties(KafkaProperties.class)

@Import(KafkaAnnotationDrivenConfiguration.class)

public class KafkaAutoConfiguration {


private final KafkaProperties properties;


private final RecordMessageConverter messageConverter;


public KafkaAutoConfiguration(KafkaProperties properties,

ObjectProvider<RecordMessageConverter> messageConverter) {

this.properties = properties;

this.messageConverter = messageConverter.getIfUnique();

}


@Bean

@ConditionalOnMissingBean(KafkaTemplate.class)

public KafkaTemplate<?, ?> kafkaTemplate(

ProducerFactory<Object, Object> kafkaProducerFactory,

ProducerListener<Object, Object> kafkaProducerListener) {

KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(

kafkaProducerFactory);

if (this.messageConverter != null) {

kafkaTemplate.setMessageConverter(this.messageConverter);

}

kafkaTemplate.setProducerListener(kafkaProducerListener);

kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());

return kafkaTemplate;

}


@Bean

@ConditionalOnMissingBean(ConsumerFactory.class)

public ConsumerFactory<?, ?> kafkaConsumerFactory() {

return new DefaultKafkaConsumerFactory<>(

this.properties.buildConsumerProperties());

}


@Bean

@ConditionalOnMissingBean(ProducerFactory.class)

public ProducerFactory<?, ?> kafkaProducerFactory() {

DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(

this.properties.buildProducerProperties());

String transactionIdPrefix = this.properties.getProducer()

.getTransactionIdPrefix();

if (transactionIdPrefix != null) {

factory.setTransactionIdPrefix(transactionIdPrefix);

}

return factory;

}


//略略略


}

spring boot下手动配置kafka


由于需要对某些特殊配置进行配置,我们可能需要手动配置kafka相关的bean,创建一个配置类如下,类似于
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,这里创建了对应类型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的对应Bean定义将不起作用。
所有的生产者配置可以参考ProducerConfig类,所有的消费者配置可以参考ConsumerConfig类。


/**

* kafka配置,实际上,在KafkaAutoConfiguration中已经有默认的根据配置文件信息创建配置,但是自动配置属性没有涵盖所有

* 我们可以自定义创建相关bean,进行如下配置

*

* @author zhoujy

* @date 2018年12月17日

**/

@Configuration

public class KafkaConfig {


@Value("${spring.kafka.consumer.bootstrap-servers}")

private String bootstrapServers;


//构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多

private Map<String, Object> consumerProperties(){

Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);

props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service");

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

return props;

}


/**

* 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式

* @return

*/

@Bean("consumerFactory")

public DefaultKafkaConsumerFactory consumerFactory(){

return new DefaultKafkaConsumerFactory(consumerProperties());

}



@Bean("listenerContainerFactory")

//个性化定义消费者

public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {

//指定使用DefaultKafkaConsumerFactory

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

factory.setConsumerFactory(consumerFactory);


//设置消费者ack模式为手动,看需求设置

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

//设置可批量拉取消息消费,拉取数量一次3,看需求设置

factory.setConcurrency(3);

factory.setBatchListener(true);

return factory;

}


/* @Bean

//代码创建方式topic

public NewTopic batchTopic() {

return new NewTopic("topic.quick.batch", 8, (short) 1);

}*/


//创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多

private Map<String, Object> producerProperties(){

Map<String, Object> props = new HashMap<>();

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.ACKS_CONFIG, "-1");

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);

props.put(ProducerConfig.LINGER_MS_CONFIG, 500);

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

return props;

}


/**

* 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义

* @return

*/

@Bean("produceFactory")

public DefaultKafkaProducerFactory produceFactory(){

return new DefaultKafkaProducerFactory(producerProperties());

}



/**

* 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义

* @param produceFactory

* @return

*/

@Bean

public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){

return new KafkaTemplate(produceFactory);

}

}

生产者的使用方式,跟自动配置一样,直接注入KafkaTemplate即可。主要是消费者的使用有些不同。 

批量消费消息

上面的消费者配置配置了一个bean,@Bean(“listenerContainerFactory”),这个bean可以指定为消费者,注解方式中是如下的使用方式。
containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作为消费者。

1. 注意registryReceiver中的参数,ConsumerRecord对比之前的消费者,因为设置listenerContainerFactory是批量消费,因此ConsumerRecord是一个List,如果不是批量消费的话,相对应就是一个对象。
2. 注意第二个参数Acknowledgment,这个参数只有在设置消费者的ack应答模式为AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手动ack。


如果不想要批量消费消息,那就可以另外定义一个bean类似于@Bean(“listenerContainerFactory”),如下,只要不设置批量消费即可。


@Bean("listenerContainerFactory2")

//个性化定义消费者

public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) {

//指定使用DefaultKafkaConsumerFactory

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

factory.setConsumerFactory(consumerFactory);


//设置消费者ack模式为手动,看需求设置

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

return factory;

}

 

更多推荐

spring boot整合kafka(springBoot默认自动配置和自定义手动配置)