私はKafkaからのメッセージを読み込み、リアルタイムでStormで処理するPOCに取り組んでいます。私は地元の動物園とカフカを始めました。私はトピック(作成者:test
)を作成し、プロデューサとコンシューマはコマンドプロンプトから正常に動作しています。今度は、Stormを使ってトピックからのメッセージを読みたいと思っていました。以下のコードを実行しようとすると、Storm吐き気がKafka/Zookeeperに接続されていません。 localhostまたは2181の記述がどこにもないので、これはログから明らかです。 /テスト/嵐/ partition_0 - >ヌルApache Storm(ローカル)Apache Kafka(ローカル)に接続していません
6939 [スレッド-15-eventsEmitter-キュータ[2 2]] INFO oaskPartitionManagerで失敗します
public class TestTopology { public static void main(String[] args) { BrokerHosts zkHosts = new ZkHosts("localhost:2181"); SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "test", "/test", "storm"); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("eventsEmitter", kafkaSpout, 1); builder.setBolt("eventsProcessor", new WordCountBolt(), 1).shuffleGrouping("eventsEmitter"); Config config = new Config(); config.setMaxTaskParallelism(5); /* * config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2); * * config.put(Config.STORM_ZOOKEEPER_PORT, 2181); * config.put(Config.STORM_ZOOKEEPER_SERVERS, * Arrays.asList("localhost")); */ try { ILocalCluster cls = new LocalCluster(); cls.submitTopology("my-topology", config, builder.createTopology()); } catch (Exception e) { throw new IllegalStateException("Couldn't initialize the topology", e); } } }
これは、1つのその作成及びませんが、親切にあなたがより多くの情報が必要な場合は私に知らせカフカ
4632 [Thread-11] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 4632 [Thread-11] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 wa[email protected]acd1da 4633 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 4634 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2000, initiating session 4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:62287 4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:62287 4635 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154d458c4130011 with negotiated timeout 20000 for client /127.0.0.1:62287 4635 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2000, sessionid = 0x154d458c4130011, negotiated timeout = 20000 4635 [Thread-11-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
を実行しているローカルのZooKeeperを接続しています。
あなたはどの例外を体験しますか? – theDima