私はカフカと嵐に新しいです。私はカフカと嵐を統合するJavaの例を実装しようとしていました。私はオンラインの例を見つけました。 Eclipse IDEでJavaプログラムを実行しようとしています。私はmavenを使用していません。Kafka StormとJavaの統合。 kafka.api.OffsetRequest.DefaultClientId()Ljava/lang/String;エラー
私は、外部ジャーとしてstorm-kafka-0.10.0.jar
,kafka-0.6.jar
,scala-library-2.10.3.jar
およびstorm-core-0.10.0.jar
を持っています。
ここに私のJavaコードです。
KafkaStormSample.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.ZkHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
//kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
//builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
CountBolt.java
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
私は、私は以下のエラーを取得しておくkafkaStormSample.javaを実行してみてください。
Exception in thread "main" java.lang.NoSuchMethodError: kafka.api.OffsetRequest.DefaultClientId()Ljava/lang/String;
at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43)
at storm.kafka.SpoutConfig.<init>(SpoutConfig.java:40)
at KafkaStormSample.main(KafkaStormSample.java:23)
私はすべての必要な瓶があることを確認しました。しかし、まだ私は瓶がないと思う。
ご協力いただければ幸いです。
ありがとうございます!