0

私はカフカと嵐に新しいです。私はカフカと嵐を統合するJavaの例を実装しようとしていました。私はオンラインの例を見つけました。 Eclipse IDEでJavaプログラムを実行しようとしています。私はmavenを使用していません。Kafka StormとJavaの統合。 kafka.api.OffsetRequest.DefaultClientId()Ljava/lang/String;エラー

私は、外部ジャーとしてstorm-kafka-0.10.0.jar,kafka-0.6.jar,scala-library-2.10.3.jarおよびstorm-core-0.10.0.jarを持っています。

ここに私のJavaコードです。

KafkaStormSample.java

import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.topology.TopologyBuilder; 

import java.util.UUID; 

import backtype.storm.spout.SchemeAsMultiScheme; 
import storm.kafka.ZkHosts; 
import storm.kafka.BrokerHosts; 
import storm.kafka.SpoutConfig; 
import storm.kafka.KafkaSpout; 
import storm.kafka.StringScheme; 

public class KafkaStormSample { 
    public static void main(String[] args) throws Exception{ 
     Config config = new Config(); 
     config.setDebug(true); 
     config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); 
     String zkConnString = "localhost:2181"; 
     String topic = "my-first-topic"; 
     BrokerHosts hosts = new ZkHosts(zkConnString); 

     SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,  
     UUID.randomUUID().toString()); 
     kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; 
     kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; 
    //kafkaSpoutConfig.forceFromStart = true; 
     kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig)); 
    //builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout"); 
     builder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter"); 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("KafkaStormSample", config, builder.createTopology()); 

     Thread.sleep(10000); 

     cluster.shutdown(); 
    } 
} 

CountBolt.java

import java.util.Map; 
import java.util.HashMap; 

import backtype.storm.tuple.Tuple; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.IRichBolt; 
import backtype.storm.task.TopologyContext; 

public class CountBolt implements IRichBolt{ 
    Map<String, Integer> counters; 
    private OutputCollector collector; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
    OutputCollector collector) { 
     this.counters = new HashMap<String, Integer>(); 
     this.collector = collector; 
    } 

    @Override 
    public void execute(Tuple input) { 
     String str = input.getString(0); 

     if(!counters.containsKey(str)){ 
     counters.put(str, 1); 
     }else { 
     Integer c = counters.get(str) +1; 
     counters.put(str, c); 
     } 

     collector.ack(input); 
    } 

    @Override 
    public void cleanup() { 
     for(Map.Entry<String, Integer> entry:counters.entrySet()){ 
     System.out.println(entry.getKey()+" : " + entry.getValue()); 
     } 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; 
    } 
} 

私は、私は以下のエラーを取得しておくkafkaStormSample.javaを実行してみてください。

Exception in thread "main" java.lang.NoSuchMethodError: kafka.api.OffsetRequest.DefaultClientId()Ljava/lang/String; 
    at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43) 
    at storm.kafka.SpoutConfig.<init>(SpoutConfig.java:40) 
    at KafkaStormSample.main(KafkaStormSample.java:23) 

私はすべての必要な瓶があることを確認しました。しかし、まだ私は瓶がないと思う。

ご協力いただければ幸いです。

ありがとうございます!

答えて

0

私はこれらのシステムについてはよく分かりませんが、私にはライブラリのバージョンの不一致があります。

ライブラリの1つ(このケースではStorm)は、そのメソッドが定義されている別のバージョンのkafkaに対してコンパイルされました。依存関係を確認してください。

依存関係管理システムが役立つ理由の1つです。

更新:彼らはこれを提供彼らのドキュメントから は、Mavenの上に設定:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.1.1</version> 
     <exclusions> 
      <exclusion> 
       <groupId>org.apache.zookeeper</groupId> 
       <artifactId>zookeeper</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>log4j</groupId> 
       <artifactId>log4j</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 

は、あなたのカフカのバージョンが古すぎるようです。