2017-03-05 15 views
2

0.10.1.1で正常に動作していたKafka Streamsアプリケーションがありました。新しいブローカと一緒に新しい0.10.2.0 Kafka Streamsライブラリに更新されました(ただし、新しいライブラリは0.10.1.1と下位互換性があります)。クイック背景Kafkaがバージョン0.10.2.0でアプリケーションの問題を修正しました

  • 私はそれがローカルストアを検索しても(KafkaStreams.allMetadataForStore方法により得られたStreamsMetadataを使用して)リモートストアを照会インタラクティブクエリメタデータベースのAPI
  • の上に構築されたREST APIを持って
  • これを有効にするにはapplication.serverの設定パラメータを使用してください。

アプリケーションは1つのアプリケーションインスタンスで正常に動作しています。すぐに私は別のインスタンスを起動し、REST経由ストアを照会として、私は次の問題が発生する

私が最初に始めたのノード上で検索を実行すると、ローカルストアの検索では、この例外に

SEVERE: Error - the state store, my-store, may have migrated to another instance. 
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instance. 
     at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49) 
     at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55) 
     at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699) 
     **at mycode.getLocalMetrics(myclass.java:121)** 
     **at mycode.remote(myclass.java:98)** 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
     at java.lang.reflect.Method.invoke(Unknown Source) 
を失敗しました私は??が欠落することができるもの

私は新しいノード上で検索を実行すると、ローカルストア検索は大丈夫だったが、私は.forEach(新しい消費者(でnullポインタを参照してください)ライン

ks.allMetadataForStore(storeName) 
        .stream() 
        .filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores 
        .forEach(new Consumer<StreamsMetadata>() { 
         @Override 
         public void accept(StreamsMetadata t) { 
          //some logic 
         } 

        }); 
+0

下回っストリームstatelistener内のストリームストアを読み込むことができます。http://docs.confluent.io/current/streams/faqのFAQエントリを参照してください。 .html#handling-invalidstatestexexception-state-store-may-have-may-migated-to-another-instance詳細については、問題の解決方法を説明してください。 –

答えて

1

複数のインスタンスを起動すると、ストアがあるインスタンスから別のインスタンスに移行する可能性があります。ストアを移行する前にストアの場所に関するメタデータを収集すると、メタデータが正しくなりません。したがって、メタデータをリフレッシュする(つまり、再度収集する)必要があります。

例外org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instanceは、メタデータがストールしていることを示します。

この例外をキャッチして、その例外から回復することができます。

+1

はい。私は以前これを見てきましたが、これはストアの移行中に発生することがわかりました。しかし、0.10.2.0では、この移行は決して終わらないように見える。すなわち、第2ノードが始動するとすぐに、アプリケーションはこの状態になる。また、言及したように、v 0.10.1.1ではすべてがうまく動作します。それが私がちょっと困惑している理由です。 – Abhishek

+0

インスタンスからのログには何が表示されますか?アプリはリバランスしていますか? –

+0

'KafkaStreams.state()'を使ってリバランスが終了し、クエリの実行時に両方のアプリケーションがどちらの状態になっているか確認できますか? –

1

StreamsMetadataの作成にちょっとした「スリープ時間」を追加する必要があるかもしれません。私がThread.sleep(1000L);をコメントアウトすると、同じエラーが発生する可能性があります。 Matthias J. Saxコメントに基づいて

KafkaStreams streams = new KafkaStreams(builder, props); 
    streams.start(); 

    Thread.sleep(1000L);//If commented out,error occur 
    System.out.println(streams.allMetadataForStore("statestore").size()); 
0

、1はFYIような

streams.setStateListener((newState, oldState) -> { 
    if (newState == State.RUNNING && oldState.REBALNCING) { 
     System.out.println(streams.allMetadataForStore("statestore").size()); 
    }}); 
関連する問題