2016-05-21 6 views
0

私はKafkaからのメッセージを読み込み、リアルタイムでStormで処理するPOCに取り組んでいます。私は地元の動物園とカフカを始めました。私はトピック(作成者:test)を作成し、プロデューサとコンシューマはコマンドプロンプトから正常に動作しています。今度は、Stormを使ってトピックからのメッセージを読みたいと思っていました。以下のコードを実行しようとすると、Storm吐き気がKafka/Zookeeperに接続されていません。 localhostまたは2181の記述がどこにもないので、これはログから明らかです。 /テスト/嵐/ partition_0 - >ヌルApache Storm(ローカル)Apache Kafka(ローカル)に接続していません

:パーティション情報を読むから - そしてプロセスは例外

6939 [スレッド-15-eventsEmitter-キュータ[2 2]] INFO oaskPartitionManagerで失敗します

public class TestTopology { 

    public static void main(String[] args) { 

     BrokerHosts zkHosts = new ZkHosts("localhost:2181"); 
     SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "test", "/test", "storm"); 
     kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("eventsEmitter", kafkaSpout, 1); 
     builder.setBolt("eventsProcessor", new WordCountBolt(), 1).shuffleGrouping("eventsEmitter"); 
     Config config = new Config(); 
     config.setMaxTaskParallelism(5); 
     /* 
     * config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2); 
     * 
     * config.put(Config.STORM_ZOOKEEPER_PORT, 2181); 
     * config.put(Config.STORM_ZOOKEEPER_SERVERS, 
     * Arrays.asList("localhost")); 
     */ 

     try { 
      ILocalCluster cls = new LocalCluster();   
      cls.submitTopology("my-topology", config, builder.createTopology()); 
     } catch (Exception e) { 
      throw new IllegalStateException("Couldn't initialize the topology", 
        e); 
     } 
    } 

} 

これは、1つのその作成及びませんが、親切にあなたがより多くの情報が必要な場合は私に知らせカフカ

4632 [Thread-11] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 
4632 [Thread-11] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 wa[email protected]acd1da 
4633 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 
4634 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2000, initiating session 
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:62287 
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:62287 
4635 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154d458c4130011 with negotiated timeout 20000 for client /127.0.0.1:62287 
4635 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2000, sessionid = 0x154d458c4130011, negotiated timeout = 20000 
4635 [Thread-11-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 

を実行しているローカルのZooKeeperを接続しています。

+0

あなたはどの例外を体験しますか? – theDima

答えて

0

あなたは次のように、それは例えば、カフカサーバポートを知っているよようにconfigを設定する必要があります。

Properties props = new Properties(); 
//default broker port = 9092 
props.put("metadata.broker.list", "localhost:" + BROKER_PORT); 
props.put("request.required.acks", "1"); 
props.put("serializer.class", "kafka.serializer.StringEncoder"); 

Config config = new Config();   
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); 
config.setDebug(true); 
config.setMaxTaskParallelism(5); 
+0

@マティアス、@ディマは私自身の質問に答えました。どうやらそれは私の愚かさでした。コメントしてくれてありがとう。感謝します。 – mavrav

0

を、私はこのための解決策を思い付く神経障るの夜の後。実際問題はコードではなく、ジャーであった。私は、zookeeper、kafka、storm.Butの3つのパッケージすべてからlog4j jarを添付していましたが、このコードでは1つしか期待していませんでした。これは、私が以前無視していた私の食の赤い警告として示していた。私が不要なlog4jsを削除すると、カフカの噴出口が私が作成したカフカの話題から読み始めました。この問題を調査する時間をとっていただき、ありがとうございます。 @Matthias私はそれをZookeeperにリンクしていたので、Zookeeperによって管理されているどのカフカにも接続していると思います。だから、それは地方レベルでは少なくとも必要ではないかもしれません。しかし、とにかくおかげで...

関連する問題