新聞中心
Apache Kafka是一種分布式流處理平臺,可以處理來自多個來源的數(shù)據(jù)流,使其能夠以實時和快速的方式處理和存儲大量的數(shù)據(jù)。與其他流式處理平臺不同的是,Kafka采用發(fā)布和訂閱的架構(gòu),使其更加具有可擴展性和可靠性,能夠長時間運行而不會導(dǎo)致系統(tǒng)崩潰或數(shù)據(jù)丟失。本文將介紹如何使用Kafka發(fā)送ON數(shù)據(jù)到數(shù)據(jù)庫中。

1. 安裝Kafka
要運行Kafka,您需要在計算機上安裝Kafka的服務(wù)器和客戶端。可以通過使用以下命令來安裝Kafka:
“`
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz
“`
下載完成之后,解壓文件并進(jìn)入Kafka目錄。
2. 創(chuàng)建主題
Kafka中的數(shù)據(jù)是按照主題劃分的,因此我們需要創(chuàng)建一個新的主題來存儲ON數(shù)據(jù)??梢允褂靡韵旅顏韯?chuàng)建一個名為“json-topic”的主題:
“`
./bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic json-topic
“`
3. 發(fā)送ON數(shù)據(jù)
現(xiàn)在我們可以使用Kafka的生產(chǎn)者API來發(fā)送ON數(shù)據(jù)。以下是一個示例生產(chǎn)者代碼,用于將ON數(shù)據(jù)發(fā)送到“json-topic”主題:
“`java
Properties props = new Properties();
props.put(“bootstrap.servers”, “l(fā)ocalhost:9092”);
props.put(“acks”, “all”);
props.put(“retries”, 0);
props.put(“batch.size”, 16384);
props.put(“l(fā)inger.ms”, 1);
props.put(“buffer.memory”, 33554432);
props.put(“key.serializer”, “org.apache.kafkmon.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafkmon.serialization.StringSerializer”);
Producer producer = new KafkaProducer(props);
String jsonString = “{\”name\”:\”John Smith\”, \”age\”:30, \”city\”:\”New York\”}”;
producer.send(new ProducerRecord(“json-topic”, jsonString));
producer.close();
“`
以上代碼通過Kafka生產(chǎn)者API,將一個ON數(shù)據(jù)字符串發(fā)送到名為“json-topic”的主題中。在實際應(yīng)用中,可以根據(jù)業(yè)務(wù)需求,開發(fā)相應(yīng)的生產(chǎn)者代碼。
4. 接收ON數(shù)據(jù)
一旦我們成功發(fā)送ON數(shù)據(jù)之后,就需要從Kafka中拉取數(shù)據(jù),并將其存儲到數(shù)據(jù)庫中。以下是一個示例消費者代碼,用于從“json-topic”主題中拉取數(shù)據(jù),然后將其存儲到SQL數(shù)據(jù)庫中:
“`java
Properties props = new Properties();
props.put(“bootstrap.servers”, “l(fā)ocalhost:9092”);
props.put(“acks”, “all”);
props.put(“retries”, 0);
props.put(“batch.size”, 16384);
props.put(“l(fā)inger.ms”, 1);
props.put(“buffer.memory”, 33554432);
props.put(“key.deserializer”, “org.apache.kafkmon.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafkmon.serialization.StringDeserializer”);
props.put(“group.id”, “test”);
Consumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(“json-topic”));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord record : records) {
String jsonString = record.value();
// 解析ON數(shù)據(jù)并存儲到SQL數(shù)據(jù)庫中
// …
}
}
consumer.close();
“`
在實際應(yīng)用中,我們可以根據(jù)業(yè)務(wù)需求,開發(fā)相應(yīng)的消費者代碼,將數(shù)據(jù)存儲到數(shù)據(jù)庫中。
5. 結(jié)論
成都網(wǎng)站建設(shè)公司-創(chuàng)新互聯(lián)為您提供網(wǎng)站建設(shè)、網(wǎng)站制作、網(wǎng)頁設(shè)計及定制高端網(wǎng)站建設(shè)服務(wù)!
flink處理數(shù)據(jù)從kafka到另外一個kafka
需求就是將流量數(shù)據(jù)(json格式)中某個接口數(shù)據(jù)抽取一下。如:有個identityUri=”yiyang/user/getById/13782″ , 這里的13782,是個userId,我們需要將其處理成 identityUri=”yiyang/user/getById/{}”
實際上我們生產(chǎn)中是將二者接口使用的。先使用2,如果沒有匹配到,在使用1
這里是演示flink kafka的用法,我們簡單使用正則處理
注意:kafka消費的方式是: kafkaConsumer.setStartFromGroupOffsets();
看下上面的啟動日志,有這樣的信息:Resetting offset for partition yiyang-0 to offset 22.
我們另外啟動一個程序,發(fā)送消息,并消費兩個topic中的數(shù)據(jù)
看下 ConsumeKafkaTest 中的日志
在看下另外一個服務(wù)(消費兩個topic數(shù)據(jù))的日志:
說明已經(jīng)成功的把處理好的消息發(fā)送到另外一個topic中了
關(guān)于數(shù)據(jù)處理,如果只是簡單的增加字段,減少字段,正則替換,也可以使用logstash工具
kafka 發(fā)送json數(shù)據(jù)庫的介紹就聊到這里吧,感謝你花時間閱讀本站內(nèi)容,更多關(guān)于kafka 發(fā)送json數(shù)據(jù)庫,使用Kafka發(fā)送ON到數(shù)據(jù)庫,flink處理數(shù)據(jù)從kafka到另外一個kafka的信息別忘了在本站進(jìn)行查找喔。
香港云服務(wù)器機房,創(chuàng)新互聯(lián)(www.cdcxhl.com)專業(yè)云服務(wù)器廠商,回大陸優(yōu)化帶寬,安全/穩(wěn)定/低延遲.創(chuàng)新互聯(lián)助力企業(yè)出海業(yè)務(wù),提供一站式解決方案。香港服務(wù)器-免備案低延遲-雙向CN2+BGP極速互訪!
分享標(biāo)題:使用Kafka發(fā)送ON到數(shù)據(jù)庫(kafka發(fā)送json數(shù)據(jù)庫)
本文網(wǎng)址:http://fisionsoft.com.cn/article/dhooddd.html


咨詢
建站咨詢
