新聞中心
SpringBoot 整合 Kafka 實現(xiàn)數(shù)據(jù)高吞吐
作者:鴨血粉絲Tang 2022-04-28 07:31:41
云計算
Kafka 本文主要以SpringBoot技術(shù)框架為背景,結(jié)合實際業(yè)務(wù)需求,采用 kafka 進(jìn)行數(shù)據(jù)消費,實現(xiàn)數(shù)據(jù)量的高吞吐,在下篇文章中,我們會介紹消費失敗的處理流程。

為成都等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計制作服務(wù),及成都網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為成都網(wǎng)站建設(shè)、成都做網(wǎng)站、成都網(wǎng)站設(shè)計,以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!
一、介紹
在上篇文章中,我們詳細(xì)的介紹了 kafka 的架構(gòu)模型,在集群環(huán)境中,kafka 可以通過設(shè)置分區(qū)數(shù)來加快數(shù)據(jù)的消費速度。
光知道理論還不行,我們得真真切切的實踐起來才行!
下面,我將結(jié)合生產(chǎn)環(huán)境的真實案例,以SpringBoot技術(shù)框架為基礎(chǔ),向大家介紹 kafka 的使用以及如何實現(xiàn)數(shù)據(jù)高吞吐!
二、程序?qū)嵺`
最近,公司大數(shù)據(jù)團(tuán)隊每天凌晨會將客戶的訂單數(shù)據(jù)進(jìn)行統(tǒng)計計算,然后把業(yè)績數(shù)據(jù)推送給我們,以便銷售人員每天能看到昨天的業(yè)績數(shù)據(jù),數(shù)據(jù)的體量大約在 1000 多萬條,以下是我對接的過程!
2.1、添加 kafka 依賴包
本次項目的SpringBoot版本為2.1.5.RELEASE,依賴的 kafka 的版本為2.2.6.RELEASE。
https://back-media.51cto.com/editor?id=707646/h6e90be6-7EV6kJbV
2.2、添加 kafka 配置變量
當(dāng)添加完了依賴包之后,我們只需要在application.properties中添加 kafka 配置變量,基本上就可以正常使用了。
# 指定kafka server的地址,集群配多個,中間,逗號隔開
spring.kafka.bootstrap-servers=197.168.25.196:9092
#重試次數(shù)
spring.kafka.producer.retries=3
#批量發(fā)送的消息數(shù)量
spring.kafka.producer.batch-size=1000
#32MB的批處理緩沖區(qū)
spring.kafka.producer.buffer-memory=33554432
#默認(rèn)消費者組
spring.kafka.consumer.group-id=crm-microservice-newperformance
#最早未被消費的offset
spring.kafka.consumer.auto-offset-reset=earliest
#批量一次最大拉取數(shù)據(jù)量
spring.kafka.consumer.max-poll-records=4000
#是否自動提交
spring.kafka.consumer.enable-auto-commit=true
#自動提交時間間隔,單位ms
spring.kafka.consumer.auto-commit-interval=1000
2.3、創(chuàng)建一個消費者
@Component
public class BigDataTopicListener {
private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
/**
* 監(jiān)聽kafka數(shù)據(jù)
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"})
public void consumer(ConsumerRecord, ?> consumerRecord) {
log.info("收到bigData推送的數(shù)據(jù)'{}'", consumerRecord.toString());
//...
//db.save(consumerRecord);//插入或者更新數(shù)據(jù)
}
}
2.4、模擬對方推送數(shù)據(jù)測試
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
@Autowired
private KafkaTemplatekafkaTemplate;
@Test
public void testSend(){
for (int i = 0; i < 5000; i++) {
Mapmap = new LinkedHashMap<>();
map.put("datekey", 20210610);
map.put("userid", i);
map.put("salaryAmount", i);
//向kafka的big_data_topic主題推送數(shù)據(jù)
kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
}
}
}
起初,通過這種單條數(shù)據(jù)消費方式,進(jìn)行測試程序沒太大毛病!
但是,當(dāng)上到生產(chǎn)之后,發(fā)現(xiàn)一個很大的問題,就是消費1000萬條數(shù)據(jù),至少需要3個小時,結(jié)果導(dǎo)致數(shù)據(jù)看板一直沒數(shù)據(jù)。
第二天痛定思痛,決定改成批量消費模型,怎么操作呢,請看下面!
2.5、將 kafka 的消費模式改成批量消費
首先,創(chuàng)建一個KafkaConfiguration配置類,內(nèi)容如下!
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private Integer retries;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.batch.concurrency}")
private Integer batchConcurrency;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
/**
* 生產(chǎn)者配置信息
*/
@Bean
public MapproducerConfigs() {
Mapprops = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/**
* 生產(chǎn)者工廠
*/
@Bean
public ProducerFactoryproducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 生產(chǎn)者模板
*/
@Bean
public KafkaTemplatekafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* 消費者配置信息
*/
@Bean
public MapconsumerConfigs() {
Mapprops = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 消費者批量工廠
*/
@Bean
public KafkaListenerContainerFactory> batchFactory() {
ConcurrentKafkaListenerContainerFactoryfactory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//設(shè)置并發(fā)量,小于或等于Topic的分區(qū)數(shù)
factory.setConcurrency(batchConcurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//設(shè)置為批量消費,每個批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
}
同時,新增一個spring.kafka.consumer.batch.concurrency變量,用來設(shè)置并發(fā)數(shù),通過這個參數(shù)我們可以指定幾個線程來實現(xiàn)消費。
在application.properties配置文件中,添加如下變量:
#批消費并發(fā)量,小于或等于Topic的分區(qū)數(shù)
spring.kafka.consumer.batch.concurrency = 3
#設(shè)置每次批量拉取的最大數(shù)量為4000
spring.kafka.consumer.max-poll-records=4000
#設(shè)置自動提交改成false
spring.kafka.consumer.enable-auto-commit=false
最后,將單個消費方法改成批量消費方法模式。
@Component
public class BigDataTopicListener {
private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
/**
* 監(jiān)聽kafka數(shù)據(jù)(批量消費)
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"}, containerFactory = "batchFactory")
public void batchConsumer(List> consumerRecords, Acknowledgment ack) {
long start = System.currentTimeMillis();
//...
//db.batchSave(consumerRecords);//批量插入或者批量更新數(shù)據(jù)
//手動提交
ack.acknowledge();
log.info("收到bigData推送的數(shù)據(jù),拉取數(shù)據(jù)量:{},消費時間:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start));
}
}
此時,消費性能大大的提升,數(shù)據(jù)處理的非??欤?00萬條數(shù)據(jù),最多 30 分鐘就全部消費完畢了。
本例中的消費微服務(wù),生產(chǎn)環(huán)境部署了3臺服務(wù)器,同時big_data_topic主題的分區(qū)數(shù)為3,因此并發(fā)數(shù)設(shè)置為3比較合適。
隨著推送的數(shù)據(jù)量不斷增加,如果你覺得消費速度還不夠,你可以重新設(shè)置每次批量拉取的最大數(shù)量,活著橫向擴(kuò)展微服務(wù)的集群實例數(shù)量和 topic 的分區(qū)數(shù),以此來加快數(shù)據(jù)的消費速度。
但是,如果在單臺機(jī)器中,每次批量拉取的最大數(shù)量過大,大對象也會很大,會造成頻繁的 gc 告警!
因此,在實際的使用過程中,每次批量拉取的最大數(shù)量并不是越大越好,根據(jù)當(dāng)前服務(wù)器的硬件配置,調(diào)節(jié)到合適的閥值,才是最優(yōu)的選擇!
三、小結(jié)
本文主要以SpringBoot技術(shù)框架為背景,結(jié)合實際業(yè)務(wù)需求,采用 kafka 進(jìn)行數(shù)據(jù)消費,實現(xiàn)數(shù)據(jù)量的高吞吐,在下篇文章中,我們會介紹消費失敗的處理流程。
網(wǎng)頁標(biāo)題:SpringBoot整合Kafka實現(xiàn)數(shù)據(jù)高吞吐
URL地址:http://fisionsoft.com.cn/article/dpeshpe.html


咨詢
建站咨詢
