2017-05-25 10 views
0

このフォーラムで質問するのが正しいかどうかはわかりません。ストームKafkaSpoutコネクタを使用してカフカのトピックから消費していました。これまではうまくいっていました。今度はバージョン0.10.xで実行されている同じStorm envからアップグレードされたバージョン0.10.xを持つ新しいKafkaクラスタに接続することになっています。ストーム0.10.x(KafkaSpout)を使用しているKafka 0.10.xトピックから消耗します

ストームのドキュメント(http://storm.apache.org/releases/1.1.0/storm-kafka-client.html)から、私は、嵐1.1.0が、新しいカフカコンシューマAPIをサポートするKafka 0.10.xと互換性があることがわかります。しかし、その場合、私は私の最後にトポロジを実行することができません(私は間違っている場合は私を修正してください)。

これに対応する回避策はありますか? 新しいKafka Consumer APIによってZooKeeperの依存関係が削除されたとしても、新しい–bootstrap-serverフラグ(推奨)の代わりに--zookeeperフラグを渡して古いKafka-console-consumer.shを使用してメッセージを消費することができます。私はカフカ0.9を使用してからこのコマンドを実行し、カフカ上でホストされているトピックから消費することができ、我々は、以下の例外になっ接続しようとしているとき

を0.10.x:

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions 
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?] 
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?] 

をしかし、我々はに接続することができますリモートZKサーバーとパスが存在することを検証:

 ./zkCli.sh -server remoteZKServer:2181 

     [zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions 
     [3, 2, 1, 0] 

我々はトピックがその中に4つのパーティションを持っているとして、それが私たちに期待される出力を与えていることが、上記見ることができるように。この時点で

以下の質問があります。

1)は、嵐のバージョン0.10.xを使用してカフカ0.10.xに接続することで、すべての可能ですか?これを試しましたか?

2)私たちが消費することができても、トポロジーのシャットダウン/再起動の場合にメッセージオフセットを取得するためにコードを変更する必要があります。私は古いKafkaSpoutバージョンでサポートされているブローカー情報の代わりに、Zkクラスターの詳細を渡すので、これを求めています。ここでのオプションの不足

は、任意のポインタが高く評価されるだろう

UPDATE:
我々は接続することができますし、ローカルに実行している日食を使用しながら、リモートカフカのトピックから消費します。ストームがメモリ内のzkを使用しないようにするには、オーバーロードされたコンストラクタLocalCluster("zkServer",port)を使用しました。うまくいき、データが出力されるのがわかります。これにより、ここではバージョンの互換性が問題ではないと結論づけられました。

ただし、トポロジをクラスタに展開すると運が悪くなります。 私たちは、この時点で

は本当にここにいくつかのポインタを必要とし、おそらくこれの何が問題である可能性があり、我々はそれをどのようにデバッグするのですか?...のznodeも罰金だzkservers に嵐ボックスからの接続性を確認しましたかKafka 0.10xと一緒に働いたことがないので、正確に何が欠けているか分かりません。

は本当に

答えて

0

ストーム0.10xがカフカ0.10xと互換性がありますいくつかの助けと提案を感謝しています。zookeeperベースのオフセット保存メカニズムに依存する古いKafkaSpoutを引き続き使用できます。

私たちの側からの接続を許可/受け入れないリモートのカフカクラスターに到達しようとしていたため、接続の喪失の例外が発生していました。接続が確立できるように特定のファイアウォールポートを開く必要があります。トポロジがクラスタモードで実行されている間は、すべてのスーパバイザノードが飼い主に話すことができるはずなので、ファイアウォールはそれぞれのノードに対してオープンしている必要があります。

関連する問題