新聞中心
簡介
Debezium 是一個開源的,為捕獲數(shù)據(jù)變更(CDC)提供低延遲的數(shù)據(jù)流平臺。使用 Debezium 來監(jiān)控數(shù)據(jù)庫,應(yīng)用程序可以接收來自數(shù)據(jù)庫的每個行變更事件,只有提交的變更才是可見的,因此無需擔(dān)心事務(wù)或變更的回滾。Debezium 提供了所有變更事件的單一模型,不必?fù)?dān)心每種數(shù)據(jù)庫管理系統(tǒng)的復(fù)雜性。

同時,Debezium 將歷史數(shù)據(jù)變更記錄在了持久化的日志中,因此應(yīng)用程序可以隨時停止并重新啟動,啟動后能夠重新接收在停止運行期間錯過的所有事件。
監(jiān)控數(shù)據(jù)庫并在數(shù)據(jù)變更時收到通知是一件很復(fù)雜的事情,關(guān)系型數(shù)據(jù)庫的觸發(fā)器可能很合適,但是局限于某些數(shù)據(jù)庫,并且通常僅限于更新同一個數(shù)據(jù)庫內(nèi)的狀態(tài),不與外部進(jìn)程通信。一些數(shù)據(jù)庫提供了 API 或者框架來進(jìn)行監(jiān)控,但是沒有一個統(tǒng)一的標(biāo)準(zhǔn),因此每個數(shù)據(jù)庫的方法都不同,還需要大量比較專業(yè)的代碼來實現(xiàn);與此同時,監(jiān)聽數(shù)據(jù)變更后,如何保證這些變更事件的有序性并降低對數(shù)據(jù)庫的影響是非常有挑戰(zhàn)性的。
Debezium 提供了完成這些工作的模塊,一些模塊是通用的,可以與多個數(shù)據(jù)庫管理系統(tǒng)一起使用,但在功能和性能方面也有一些限制。其他模塊是為特定的數(shù)據(jù)庫管理系統(tǒng)定制的,功能更加強(qiáng)大,并且很好地利用了系統(tǒng)的特定功能。
項目地址:
https://github.com/debezium/debezium
基本信息
基礎(chǔ)架構(gòu)
Debezium 利用 Kafka 和 Kafka Connect 實現(xiàn)了自己的持久性、可靠性和容錯性。每一個部署在 Kafka Connect 服務(wù)中的 connector 監(jiān)控一個上游數(shù)據(jù)庫服務(wù)器,捕獲所有的數(shù)據(jù)庫變更,然后記錄到一個或者多個 Kafka topic(通常一個數(shù)據(jù)庫表對應(yīng)一個kafka topic)。Kafka 確保所有這些數(shù)據(jù)更改事件具有多個副本并且總體上有序(Kafka 只能保證一個 topic 的單個分區(qū)內(nèi)有序),這樣,更多的客戶端可以獨立接收同樣的數(shù)據(jù)變更事件而對上游數(shù)據(jù)庫系統(tǒng)造成的影響降到很小(如果 N 個應(yīng)用都直接去監(jiān)控數(shù)據(jù)庫,對數(shù)據(jù)庫的壓力為 N,而用 debezium 匯報數(shù)據(jù)庫更改事件到 kafka,所有的應(yīng)用都去接收 kafka,可以把對數(shù)據(jù)庫的壓力降到1)。另外,客戶端可以隨時停止接收,然后重啟,從上次停止接收的地方接著接收。
對于不需要或者不想要這種容錯級別、性能、可擴(kuò)展性、可靠性的應(yīng)用,他們可以使用內(nèi)嵌的 Debezium connector 引擎來直接在應(yīng)用內(nèi)部運行 connector。這種應(yīng)用仍需要接收數(shù)據(jù)庫更改事件,但更希望 connector 直接傳遞給它,而不是持久化到 Kafka。
常見使用場景
緩存失效
緩存的內(nèi)容在源頭被更改或者被刪除的時候立即讓緩存中的條目失效。如果緩存在一個獨立的進(jìn)程中運行(例 Redis,Memcache,Infinispan 或者其他的),那么簡單的緩存失效邏輯可以放在獨立的進(jìn)程或服務(wù)中,從而簡化主應(yīng)用的邏輯。在一些場景中,緩存失效邏輯可以更復(fù)雜一點,讓它利用更改事件中的更新數(shù)據(jù)去更新緩存中受影響的條目。
簡化單體應(yīng)用
許多應(yīng)用更新數(shù)據(jù)庫,然后在數(shù)據(jù)庫中的更改被提交后,做一些額外的工作:更新搜索索引,更新緩存,發(fā)送通知,運行業(yè)務(wù)邏輯,等等。這種情況通常稱為雙寫(dual-writes),因為應(yīng)用沒有在一個事務(wù)內(nèi)寫多個系統(tǒng)。這樣不僅應(yīng)用邏輯復(fù)雜難以維護(hù),而且雙寫容易丟失數(shù)據(jù)或者在一些系統(tǒng)更新成功而另一些系統(tǒng)沒有更新成功的時候造成不同系統(tǒng)之間的狀態(tài)不一致。使用 CDC,在源數(shù)據(jù)庫的數(shù)據(jù)更改提交后,這些額外的工作可以被放在獨立的線程或者進(jìn)程(服務(wù))中完成。這種實現(xiàn)方式的容錯性更好,不會丟失事件,容易擴(kuò)展,并且更容易支持升級。
共享數(shù)據(jù)庫
當(dāng)多個應(yīng)用共用同一個數(shù)據(jù)庫的時候,一個應(yīng)用提交的更改通常要被另一個應(yīng)用感知到。一種實現(xiàn)方式是使用消息總線,另一種實現(xiàn)方式,即 Debezium:每個應(yīng)用可以直接監(jiān)控數(shù)據(jù)庫的更改,并且響應(yīng)更改。
數(shù)據(jù)集成
數(shù)據(jù)通常被存儲在多個地方,尤其是當(dāng)數(shù)據(jù)被用于不同的目的的時候,會有不同的形式。保持多系統(tǒng)的同步是很有挑戰(zhàn)性的,但是可以通過使用 Debezium 加上簡單的事件處理邏輯來實現(xiàn)簡單的 ETL 類型的解決方案。
命令查詢職責(zé)分離
在命令查詢職責(zé)分離 Command Query Responsibility Separation (CQRS) 架構(gòu)模式中,更新數(shù)據(jù)使用了一種數(shù)據(jù)模型,讀數(shù)據(jù)使用了一種或者多種數(shù)據(jù)模型。由于數(shù)據(jù)更改被記錄在更新側(cè),這些更改將被處理以更新各種讀展示。所以CQRS應(yīng)用通常更復(fù)雜,尤其是他們需要保證可靠性和全序處理。Debezium 和CDC 使這種方式更可行:寫操作被正常記錄,但是 Debezium 捕獲數(shù)據(jù)變更,并且持久化到全序流里,然后供那些需要異步更新只讀視圖的服務(wù)接收。
安裝
使用 Debezium 需要三個獨立的服務(wù):ZooKeeper、Kafka 和 Debezium 連接器服務(wù)。官方推薦使用 Docker 進(jìn)行安裝,數(shù)據(jù)庫以 MySQL 為例。
啟動 ZooKeeper
$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9
如果是使用 Podman,運行以下命令:
$ sudo podman pod create --name=dbz --publish "9092,3306,8083"
$ sudo podman run -it --rm --name zookeeper --pod dbz quay.io/debezium/zookeeper:1.9
啟動后看到如下輸出:
Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
...
port 0.0.0.0/0.0.0.0:2181
啟動 Kafka
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9
如果是使用 Podman,運行以下命令:
$ sudo podman run -it --rm --name kafka --pod dbz quay.io/debezium/kafka:1.9
啟動后看到如下輸出:
...
2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
...
2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started
啟動 MySQL
該容器運行一個預(yù)先配置有 inventory 數(shù)據(jù)庫的 MySQL 數(shù)據(jù)庫服務(wù)器:
$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9
如果是使用 Podman,運行以下命令:
$ sudo podman run -it --rm --name mysql --pod dbz -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9
啟動后看到如下輸出:
...
[System] [MY-010931] [Server] /usr/sbin/mysqld: ready for connections. Version: '8.0.27' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server - GPL.
[System] [MY-011323] [Server] X Plugin ready for connections. Bind-address: '::' port: 33060, socket: /var/run/mysqld/mysqlx.sock
啟動 Kafka Connector
該服務(wù)公開了一個 REST API 來管理 Debezium MySQL 連接器:
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9
如果是使用 Podman,運行以下命令:
$ sudo podman run -it --rm --name connect --pod dbz -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses quay.io/debezium/connect:1.9
啟動后看到如下輸出:
...
2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
...
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
注冊 MySQL 連接器
通過注冊 Debezium MySQL 連接器,連接器將開始監(jiān)控 MySQL 數(shù)據(jù)庫服務(wù)器的 binlog,記錄數(shù)據(jù)庫的 binlog 所有事務(wù)(例如對單個行的變更)。當(dāng)數(shù)據(jù)庫中的一行發(fā)生變更時,Debezium 會生成一個變更事件。
配置如下:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}使用 curl 命令進(jìn)行注冊:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
更新數(shù)據(jù)庫并查看變更事件
使用 watch-topic 可以查看 dbserver1.inventory.customers 主題:
$ docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:1.9 watch-topic -a -k dbserver1.inventory.customers
在 MySQL 客戶端做一次數(shù)據(jù)變更:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
Query OK, 1 row affected (0.05 sec)
Rows matched: 1 Changed: 1 Warnings: 0
查改修改后的值:
mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | [email protected] |
| 1002 | George | Bailey | [email protected] |
| 1003 | Edward | Walker | [email protected] |
| 1004 | Anne Marie | Kretchmar | [email protected] |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
切換到 watch-topic 終端以查看事件:通過比較 before 和 after 結(jié)構(gòu),可以確定由于提交而在受影響的行中實際更改的內(nèi)容。
{
"schema": {...},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"source": {
"name": "1.9.5.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501486,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "u",
"ts_ms": 1486501486308
}
} 分享標(biāo)題:7.2KStar!把數(shù)據(jù)庫的每一行都監(jiān)控到的強(qiáng)大數(shù)據(jù)流平臺
鏈接分享:http://fisionsoft.com.cn/article/ccdcooj.html


咨詢
建站咨詢
