新聞中心
storm是grovvy寫的
kafka是scala寫的
storm-kafka storm連接kafka consumer的插件
下載地址:
https://github.com/wurstmeister/storm-kafka-0.8-plus
除了需要storm和kafka相關(guān)jar包還需要google-collections-1.0.jar
以及zookeeper相關(guān)包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar
以前由com.netflix.curator組織開發(fā)現(xiàn)在歸到org.apache.curator下面
1.Kafka Consumer即Storm Spout代碼
package demo; import java.util.ArrayList; import java.util.List; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class MyKafkaSpout { public static void main(String[] args) { String topic ="track"; ZkHosts zkhosts = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181"); SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic, "/MyKafka", //偏移量offset的根目錄 "MyTrack");//子目錄對應(yīng)一個應(yīng)用 ListzkServers=new ArrayList (); //zkServers.add("192.168.1.107"); //zkServers.add("192.168.1.108"); for(String host:zkhosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers=zkServers; spoutConfig.zkPort=2181; spoutConfig.forceFromStart=true;//從頭開始消費,實際上是要改成false的 spoutConfig.socketTimeoutMs=60; spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定義輸出為string類型 TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并發(fā)度設(shè)為1 builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout"); Config config =new Config(); config.setDebug(true);//上線之前都要改成false否則日志會非常多 if(args.length>0){ try { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else{ LocalCluster localCluster=new LocalCluster(); localCluster.submitTopology("mytopology", config, builder.createTopology()); //本地模式在一個進程里面模擬一個storm集群的所有功能 } } }
2.Bolt代碼只是簡單打印輸出,覆寫execute方法即可
package demo; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class MyKafkaBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } @Override public MapgetComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector arg1) { String kafkaMsg =input.getString(0); System.err.println("bolt"+kafkaMsg); } @Override public void prepare(Map arg0, TopologyContext arg1) { // TODO Auto-generated method stub } }
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
本文題目:storm-kafka(stormspout作為kafka的消費端)-創(chuàng)新互聯(lián)
地址分享:http://fisionsoft.com.cn/article/edehg.html