0
likes storm-kafka-client、私はstorm-kafka-clientに使用されていましたが、うまく動作せず、新しいspoutも書き込めません。 サンプルカフカの噴出口を書くのを助けることができる人は誰ですか?新しいカフカ消費者向けAPIを使用してサンプルカフカの噴出口を書き込む方法は?
likes storm-kafka-client、私はstorm-kafka-clientに使用されていましたが、うまく動作せず、新しいspoutも書き込めません。 サンプルカフカの噴出口を書くのを助けることができる人は誰ですか?新しいカフカ消費者向けAPIを使用してサンプルカフカの噴出口を書き込む方法は?
がTopology.java
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class Topology{
public static void main(String[] args){
TopologyBuilder builder = new TopologyBuilder();
String zkHosts = StringUtils.join("127.0.0.1", ',');
BrokerHosts hosts = new ZkHosts(zkHosts);
SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = forceFromStart;
builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5);
//...
}
}
を定義する基本的に、あなたはkafkaSpoutを作成するためにSpoutConfigを作成する必要があります。
私はあなたを混乱させるかもしれませんが、私はそれがTopologyではなくKafkaSpout.javaである必要があります。これはあなたの新しいものです。 – cutd