このフォーラムで質問するのが正しいかどうかはわかりません。ストームKafkaSpoutコネクタを使用してカフカのトピックから消費していました。これまではうまくいっていました。今度はバージョン0.10.x
で実行されている同じStorm envからアップグレードされたバージョン0.10.x
を持つ新しいKafkaクラスタに接続することになっています。ストーム0.10.x(KafkaSpout)を使用しているKafka 0.10.xトピックから消耗します
ストームのドキュメント(http://storm.apache.org/releases/1.1.0/storm-kafka-client.html)から、私は、嵐1.1.0
が、新しいカフカコンシューマAPIをサポートするKafka 0.10.x
と互換性があることがわかります。しかし、その場合、私は私の最後にトポロジを実行することができません(私は間違っている場合は私を修正してください)。
これに対応する回避策はありますか? 新しいKafka Consumer APIによってZooKeeperの依存関係が削除されたとしても、新しい–bootstrap-server
フラグ(推奨)の代わりに--zookeeper
フラグを渡して古いKafka-console-consumer.sh
を使用してメッセージを消費することができます。私はカフカ0.9を使用してからこのコマンドを実行し、カフカ上でホストされているトピックから消費することができ、我々は、以下の例外になっ接続しようとしているとき
を0.10.x:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?]
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?]
をしかし、我々はに接続することができますリモートZKサーバーとパスが存在することを検証:
./zkCli.sh -server remoteZKServer:2181
[zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions
[3, 2, 1, 0]
我々はトピックがその中に4つのパーティションを持っているとして、それが私たちに期待される出力を与えていることが、上記見ることができるように。この時点で
以下の質問があります。
1)は、嵐のバージョン0.10.x
を使用してカフカ0.10.x
に接続することで、すべての可能ですか?これを試しましたか?
2)私たちが消費することができても、トポロジーのシャットダウン/再起動の場合にメッセージオフセットを取得するためにコードを変更する必要があります。私は古いKafkaSpoutバージョンでサポートされているブローカー情報の代わりに、Zkクラスターの詳細を渡すので、これを求めています。ここでのオプションの不足
は、任意のポインタが高く評価されるだろう
UPDATE:
我々は接続することができますし、ローカルに実行している日食を使用しながら、リモートカフカのトピックから消費します。ストームがメモリ内のzkを使用しないようにするには、オーバーロードされたコンストラクタLocalCluster("zkServer",port)
を使用しました。うまくいき、データが出力されるのがわかります。これにより、ここではバージョンの互換性が問題ではないと結論づけられました。
ただし、トポロジをクラスタに展開すると運が悪くなります。 私たちは、この時点で
は本当にここにいくつかのポインタを必要とし、おそらくこれの何が問題である可能性があり、我々はそれをどのようにデバッグするのですか?...のznodeも罰金だzkservers に嵐ボックスからの接続性を確認しましたかKafka 0.10xと一緒に働いたことがないので、正確に何が欠けているか分かりません。
は本当に