答えて

0

がTopology.java

import storm.kafka.BrokerHosts; 
import storm.kafka.KafkaSpout; 
import storm.kafka.SpoutConfig; 
import storm.kafka.StringScheme; 
import storm.kafka.ZkHosts; 

public class Topology{ 
    public static void main(String[] args){ 
    TopologyBuilder builder = new TopologyBuilder(); 
    String zkHosts = StringUtils.join("127.0.0.1", ','); 

     BrokerHosts hosts = new ZkHosts(zkHosts); 
     SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name"); 
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     spoutConfig.forceFromStart = forceFromStart; 
     builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5); 
     //... 
    } 
} 

を定義する基本的に、あなたはkafkaSpoutを作成するためにSpoutConfigを作成する必要があります。

+0

私はあなたを混乱させるかもしれませんが、私はそれがTopologyではなくKafkaSpout.javaである必要があります。これはあなたの新しいものです。 – cutd

関連する問題