189 8069 5689

storm-kafka(stormspout作为kafka的消费端)-创新互联

storm是grovvy写的

10年积累的成都做网站、成都网站制作经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先做网站后付款的网站建设流程,更有那坡免费网站建设让你可以放心的选择与我们合作。

kafka是scala写的

storm-kafka storm连接kafka consumer的插件

下载地址:

https://github.com/wurstmeister/storm-kafka-0.8-plus

除了需要storm和kafka相关jar包还需要google-collections-1.0.jar

以及zookeeper相关包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar

以前由com.netflix.curator组织开发现在归到org.apache.curator下面

1.Kafka Consumer即Storm Spout代码

package demo;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class MyKafkaSpout {
public static void main(String[] args) {
    
    String topic ="track";
    ZkHosts zkhosts  = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181");
    
    SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic,
            "/MyKafka", //偏移量offset的根目录
            "MyTrack");//子目录对应一个应用    
    List zkServers=new ArrayList();
    //zkServers.add("192.168.1.107");
    //zkServers.add("192.168.1.108");
    for(String host:zkhosts.brokerZkStr.split(","))
    {
        zkServers.add(host.split(":")[0]);
    }
    
    spoutConfig.zkServers=zkServers;
    spoutConfig.zkPort=2181;
    spoutConfig.forceFromStart=true;//从头开始消费,实际上是要改成false的
    spoutConfig.socketTimeoutMs=60;
    spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定义输出为string类型
    
    TopologyBuilder builder=new TopologyBuilder();
    builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并发度设为1
    builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout");
    
    Config config =new Config();
    config.setDebug(true);//上线之前都要改成false否则日志会非常多
    if(args.length>0){
        
        try {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } catch (AlreadyAliveException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InvalidTopologyException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }else{
        
        LocalCluster localCluster=new LocalCluster();
        localCluster.submitTopology("mytopology", config,  builder.createTopology());
        //本地模式在一个进程里面模拟一个storm集群的所有功能
    }
    
    
    
}
}

2.Bolt代码只是简单打印输出,覆写execute方法即可

package demo;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class MyKafkaBolt implements IBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector arg1) {
    String kafkaMsg =input.getString(0);
    System.err.println("bolt"+kafkaMsg);

    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


本文标题:storm-kafka(stormspout作为kafka的消费端)-创新互联
URL分享:http://gzruizhi.cn/article/edehg.html

其他资讯