0.10.1.1で正常に動作していたKafka Streamsアプリケーションがありました。新しいブローカと一緒に新しい0.10.2.0 Kafka Streamsライブラリに更新されました(ただし、新しいライブラリは0.10.1.1と下位互換性があります)。クイック背景Kafkaがバージョン0.10.2.0でアプリケーションの問題を修正しました
- 私はそれがローカルストアを検索しても(KafkaStreams.allMetadataForStore方法により得られたStreamsMetadataを使用して)リモートストアを照会インタラクティブクエリメタデータベースのAPI
- の上に構築されたREST APIを持って
- これを有効にするにはapplication.serverの設定パラメータを使用してください。
アプリケーションは1つのアプリケーションインスタンスで正常に動作しています。すぐに私は別のインスタンスを起動し、REST経由ストアを照会として、私は次の問題が発生する
私が最初に始めたのノード上で検索を実行すると、ローカルストアの検索では、この例外に
SEVERE: Error - the state store, my-store, may have migrated to another instance.
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)
**at mycode.getLocalMetrics(myclass.java:121)**
**at mycode.remote(myclass.java:98)**
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
を失敗しました私は??が欠落することができるもの
私は新しいノード上で検索を実行すると、ローカルストア検索は大丈夫だったが、私は.forEach(新しい消費者(でnullポインタを参照してください)ライン
ks.allMetadataForStore(storeName)
.stream()
.filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores
.forEach(new Consumer<StreamsMetadata>() {
@Override
public void accept(StreamsMetadata t) {
//some logic
}
});
下回っストリームstatelistener内のストリームストアを読み込むことができます。http://docs.confluent.io/current/streams/faqのFAQエントリを参照してください。 .html#handling-invalidstatestexexception-state-store-may-have-may-migated-to-another-instance詳細については、問題の解決方法を説明してください。 –