新聞中心
Kafka Connect如何實(shí)現(xiàn)同步RDS Binlog數(shù)據(jù)?
作者:魚跟貓 2018-05-14 13:51:39
大數(shù)據(jù)
Kafka 本文將演示如何在E-MapReduce上實(shí)現(xiàn)將RDS binlog實(shí)時(shí)同步到Kafka集群中。實(shí)驗(yàn)中使用VPC網(wǎng)絡(luò)環(huán)境,以下實(shí)例創(chuàng)建時(shí)默認(rèn)都是在VPC環(huán)境下。

為企業(yè)提供網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、網(wǎng)站優(yōu)化、成都營(yíng)銷網(wǎng)站建設(shè)、競(jìng)價(jià)托管、品牌運(yùn)營(yíng)等營(yíng)銷獲客服務(wù)。創(chuàng)新互聯(lián)擁有網(wǎng)絡(luò)營(yíng)銷運(yùn)營(yíng)團(tuán)隊(duì),以豐富的互聯(lián)網(wǎng)營(yíng)銷經(jīng)驗(yàn)助力企業(yè)精準(zhǔn)獲客,真正落地解決中小企業(yè)營(yíng)銷獲客難題,做到“讓獲客更簡(jiǎn)單”。自創(chuàng)立至今,成功用技術(shù)實(shí)力解決了企業(yè)“網(wǎng)站建設(shè)、網(wǎng)絡(luò)品牌塑造、網(wǎng)絡(luò)營(yíng)銷”三大難題,同時(shí)降低了營(yíng)銷成本,提高了有效客戶轉(zhuǎn)化率,獲得了眾多企業(yè)客戶的高度認(rèn)可!
1. 背景
在我們的業(yè)務(wù)開發(fā)中,往往會(huì)碰到下面這個(gè)場(chǎng)景:
- 業(yè)務(wù)更新數(shù)據(jù)寫到數(shù)據(jù)庫(kù)中
- 業(yè)務(wù)更新數(shù)據(jù)需要實(shí)時(shí)傳遞給下游依賴處理
所以傳統(tǒng)的處理架構(gòu)可能會(huì)這樣:
但這個(gè)架構(gòu)也存在著不少弊端:我們需要在項(xiàng)目中維護(hù)很多發(fā)送消息的代碼。新增或者更新消息都會(huì)帶來(lái)不少維護(hù)成本。所以,更好的處理方式應(yīng)該是直接將數(shù)據(jù)庫(kù)的數(shù)據(jù)接入到流式系統(tǒng)中,如下圖:
本文將演示如何在E-MapReduce上實(shí)現(xiàn)將RDS binlog實(shí)時(shí)同步到Kafka集群中。
2. 環(huán)境準(zhǔn)備
實(shí)驗(yàn)中使用VPC網(wǎng)絡(luò)環(huán)境,以下實(shí)例創(chuàng)建時(shí)默認(rèn)都是在VPC環(huán)境下。
2.1 準(zhǔn)備一個(gè)測(cè)試RDS數(shù)據(jù)庫(kù)
創(chuàng)建一個(gè)RDS實(shí)例,版本選擇5.7。這里不贅述如何創(chuàng)建RDS,詳細(xì)流程請(qǐng)參考RDS文檔。創(chuàng)建完如圖:
2.2 準(zhǔn)備一個(gè)Kafka集群
創(chuàng)建一個(gè)E-MapReduce Kafka集群,版本選擇EMR-3.11.0。需要注意,這里必須選擇EMR-3.11.0以上版本,否則不會(huì)默認(rèn)安裝啟動(dòng)Kafka Connect服務(wù)。詳細(xì)創(chuàng)建流程請(qǐng)參考E-MapReduce文檔。創(chuàng)建完如圖:
注意:RDS實(shí)例和E-MapReduce Kafka集群***在同一個(gè)VPC中,否則需要打通兩個(gè)VPC之間的網(wǎng)絡(luò)。
3. Kafka Connect
3.1 Connector
Kafka Connect是一個(gè)用于Kafka和其他數(shù)據(jù)系統(tǒng)之間進(jìn)行數(shù)據(jù)傳輸?shù)墓ぞ撸梢詫?shí)現(xiàn)基于Kafka的數(shù)據(jù)管道,打通上下游數(shù)據(jù)源。我們需要做的就是在Kafka Connect服務(wù)上運(yùn)行一個(gè)Connector,這個(gè)Connector是具體實(shí)現(xiàn)如何從/向數(shù)據(jù)源中讀/寫數(shù)據(jù)。Confluent提供了很多Connector實(shí)現(xiàn),你可以在這里下載。不過(guò)今天我們使用Debezium提供的一個(gè)MySQL Connector插件,下載地址。
下載這個(gè)插件,并將解壓出來(lái)的jar包全部拷貝到kafka lib目錄下。注意:需要將這些jar包拷貝到Kafka集群所有機(jī)器上。
在Kafka集群的服務(wù)列表中重啟Kafka Connect組件。
3.2 啟動(dòng)Connector
在創(chuàng)建connector前,我們需要做一番配置,這里羅列一些Debezium MySQL Connector的主要配置項(xiàng):
登錄到Kafka集群,配置并創(chuàng)建一個(gè)connector,命令如下:
這時(shí),我們可以看到一個(gè)創(chuàng)建好的connector,如圖:
3.3 注意事項(xiàng)
server_id是多少?:你可以在RDS執(zhí)行"SELECT @@server_id;"查到。
創(chuàng)建connector時(shí)可能會(huì)出現(xiàn)連接失敗,請(qǐng)確保RDS的白名單已經(jīng)授權(quán)了Kafka集群機(jī)器訪問(wèn)。
4 測(cè)試
4.1 創(chuàng)建一張表
一會(huì)之后,Kafka集群中會(huì)自動(dòng)創(chuàng)建一個(gè)對(duì)應(yīng)的topic
插入幾條數(shù)據(jù)
查看binlog數(shù)據(jù)
查看fulfillment.mugen.students這個(gè)topic,是否有剛剛新插入的數(shù)據(jù)
結(jié)果如圖所示:
5. 資料
- confluent官方文檔 https://docs.confluent.io
- debezium官網(wǎng) http://debezium.io/
- kafka官方文檔 http://kafka.apache.org/documentation.html
網(wǎng)站標(biāo)題:KafkaConnect如何實(shí)現(xiàn)同步RDSbinlog數(shù)據(jù)?
URL分享:http://fisionsoft.com.cn/article/djhjhgs.html


咨詢
建站咨詢
