新聞中心
隨著大數(shù)據(jù)的迅猛發(fā)展,對于時(shí)間序列數(shù)據(jù)的處理變得越來越重要。Apache Kafka stream作為流處理核心框架,有著非常好的支持性,在大數(shù)據(jù)領(lǐng)域得到了廣泛的應(yīng)用。本文將介紹如何搭建和配置Kafka Stream架構(gòu)在linux系統(tǒng)上運(yùn)行程序,以及常見的使用方法。

### 1. 系統(tǒng)要求
Kafka Stream有一些關(guān)鍵的系統(tǒng)要求,如操作系統(tǒng)環(huán)境,使用的Java版本以及用到的Kafka Stream工具集等。搭建環(huán)境前,必須保證系統(tǒng)能夠支持和滿足Kafka Stream系統(tǒng)要求,才能通過后續(xù)配置步驟形成可運(yùn)行程序。
### 2. 集群節(jié)點(diǎn)搭建
在安裝集群節(jié)點(diǎn)之前,需要考慮集群節(jié)點(diǎn)數(shù)配置,確定主節(jié)點(diǎn)和從節(jié)點(diǎn),用于分開承擔(dān)不同的任務(wù),例如主節(jié)點(diǎn)負(fù)責(zé)訂閱消息,從節(jié)點(diǎn)負(fù)責(zé)處理數(shù)據(jù)。在準(zhǔn)備環(huán)境之后,使用以下命令可完成Linux系統(tǒng)的Kafka Stream節(jié)點(diǎn)安裝:
# 設(shè)置Kafka_stream_ home路徑
export KAFKA_STREAMS_HOME=/usr/local/kafka_streams
# 下載安裝包
wget http://download.kafka.apache.org/streams/1.5.2/kafka-streams-1.5.2-bin.tar.gz
# 解壓安裝包
tar -zxvf kafka-streams-1.5.2-bin.tar.gz
# 復(fù)制解壓好的文件到Kafka home
mv kafka-streams-1.5.2/* $KAFKA_STREAMS_HOME
# 刪除壓縮文件
rm kafka-streams-1.5.2-bin.tar.gz
# 根據(jù)節(jié)點(diǎn)類型進(jìn)行配置
# 主節(jié)點(diǎn)配置
# /usr/local/kafka_streams/conf/server.properties
streamConfig.broker= # 設(shè)置broker地址
# 從節(jié)點(diǎn)配置
# /usr/local/kafka_streams/conf/consumer.properties
bootstrap.servers= # 設(shè)置Zookeeper地址
group.id= # 設(shè)置groupid
完成集群節(jié)點(diǎn)的搭建之后,就可以開始利用Kafka Streams節(jié)點(diǎn)搭建Kafka Stream任務(wù)。
### 3. 編寫Stream任務(wù)
Kafka Stream的任務(wù)形式類似于MapReduce,它可以實(shí)現(xiàn)從處理和聚合單詞出現(xiàn)頻度及計(jì)數(shù)等高級功能。在編寫任務(wù)之前,首先需要?jiǎng)?chuàng)建Topic,使用以下命令:
# 創(chuàng)建主題
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test-topic
# 檢查主題
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
Kafka Stream的任務(wù)編寫就是一個(gè)實(shí)體類,可以使用Java和Scala等編程語言編寫類,內(nèi)部實(shí)現(xiàn)Streams API:
“`Java
public class StreamExample {
public static void main(String[] args) {
// 配置文件
final Properties props = new Properties();
// 設(shè)置應(yīng)用的ID
props.put(StreamsConfig.APPLICATION_ID_CONFIG, “stream-example-app”);
// 設(shè)置應(yīng)用的Broker
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “l(fā)ocalhost:9092”);
// 設(shè)置Client ID
props.put(StreamsConfig.CLIENT_ID_CONFIG, “stream-example-client”);
// 創(chuàng)建StreamsBuilder
final StreamsBuilder builder = new StreamsBuilder();
// 從topic獲取流
final KStream source = builder.stream(“test-topic”);
// 進(jìn)行聚合
final KTable counts = stream.flatMap((key, value) ->
Arrays.asList(value.split(” “)).iterator())
.map((key, value) -> new KeyValue(value, value))
.countByKey(“counts”);
// 輸出到另一個(gè)topic
counts.toStream().to(“streams-wordcount-output”);
// 創(chuàng)建Topology
final Topology topology = builder.build();
// 寫入控制臺
System.out.println(topology.describe());
//初始化一個(gè)KafkaStream對象
final KafkaStreams streams = new KafkaStreams(topology, props);
//啟動(dòng)程序
streams.start();
}
}
### 4. 實(shí)時(shí)流數(shù)據(jù)分析
任務(wù)編寫好之后,OK!Kafka Stream的搭建以及配置和使用就完成啦。
創(chuàng)新互聯(lián)服務(wù)器托管擁有成都T3+級標(biāo)準(zhǔn)機(jī)房資源,具備完善的安防設(shè)施、三線及BGP網(wǎng)絡(luò)接入帶寬達(dá)10T,機(jī)柜接入千兆交換機(jī),能夠有效保證服務(wù)器托管業(yè)務(wù)安全、可靠、穩(wěn)定、高效運(yùn)行;創(chuàng)新互聯(lián)專注于成都服務(wù)器托管租用十余年,得到成都等地區(qū)行業(yè)客戶的一致認(rèn)可。
網(wǎng)頁題目:Linux下搭建KafkaStream架構(gòu)的實(shí)踐(linuxkafka)
網(wǎng)頁路徑:http://fisionsoft.com.cn/article/dpgoogh.html


咨詢
建站咨詢
