2017-08-30 7 views
1

ドッカーの単一インスタンス環境で別のアプリケーションからKTableにアクセスする方法をテストする2つのJavaアプリケーション(App1、App2)があります。StateStoreとは別のアプリケーションからKTableにアクセスできない

最初のApp(App1)は、次のコードでKTableに書き込みます。

public static void main(String[] args) 
    { 
     final Properties props = new Properties(); 

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"gateway-service"); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092"); 
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName()); 
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ServiceTransactionSerde.class); 


    KStreamBuilder builder = new KStreamBuilder(); 


    KStream<String,ServiceTransaction> source = builder.stream("gateway_request_processed"); 


    KStream<String, Long> countByApi = source.groupBy((key,value)-> value.getApiId().toString()).count("Counts").toStream(); 

    countByApi.to(Serdes.String(), Serdes.Long(),"countByApi"); 

    countByApi.print(); 

    final KafkaStreams streams = new KafkaStreams(builder,props); 

    streams.start(); 
    System.out.println(streams.state()); 

    System.out.println(streams.allMetadata()); 
    System.out.println(streams.allMetadataForStore("countByApi")); 

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 


     @Override 
     public void run() { 
      System.out.println(streams.allMetadata()); 
      streams.close(); 
     } 
    })); 
} 

私は私のプロデューサーを実行すると、私はこれは私の状態= RUNNINGを示しApp1の

RUNNING 
[] 
[] 
[KTABLE-TOSTREAM-0000000006]: c00af5ee-3c2d-4d12-9c4b-3b55c1284dd6, 19 

でコードの出力を次しまいました。ストアのメタデータも空です。しかし、要求は処理され、KTableに正常に格納されます(String、Long)。

私が実行したときkafka-topics.sh --list --zookeeper:2181 次のトピックがあります。

bash-4.3# kafka-topics.sh --list --zookeeper zookeeper:2181 
__consumer_offsets 
countByApi 
gateway-Counts-changelog 
gateway-Counts-repartition 
gateway-service-Counts-changelog 
gateway-service-Counts-repartition 
gateway_request_processed 

これは、KTableが何らかの形で新しいトピックで保持されていることを示しています。

次に、このKTableに状態ストア(ReadOnlyKeyValueStore)としてアクセスしてアクセスしようとする次のコードを持つコマンドラインアプリケーション(App2)があります。

RUNNING 
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, countByApi, may have migrated to another instance. 
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) 
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728) 
    at com.comp.streamtable.App.main(App.java:37) 

残念ながら、私は1つだけのインスタンスを持っていないと私は状態が等しい「RUNNING」であることを確認します。私は2アプリケーションを実行すると

public static void main(String[] args) 
    { 
     final Properties props = new Properties(); 
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "gateway-service-table-client"); 
     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092"); 


     KStreamBuilder builder = new KStreamBuilder(); 
     KafkaStreams streams = new KafkaStreams(builder,props); 

     streams.cleanUp(); 
     streams.start(); 
     System.out.println("Hello World!"); 
     System.out.println(streams.state()); 

     ReadOnlyKeyValueStore<String,Long> keyValueStore = 
     streams.store("countByApi", QueryableStoreTypes.keyValueStore()); 

     final KeyValueIterator<String,Long> range = keyValueStore.all(); 

     while(range.hasNext()){ 
      KeyValue<String,Long> next = range.next(); 
      System.out.println(String.format("key: %s | value: %s", next.key,next.value)); 

     } 

     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 


         @Override 
         public void run() { 
          System.out.println(streams.allMetadata()); 
          streams.close(); 
         } 
     })); 

    } 

私はエラーメッセージを得ますか。

注:これは別の例外として、アプリケーションごとに異なるapplication.idを選択する必要がありました。これが興味深いかもしれないので、これを指摘したかっただけです。

他のアプリから私のKTableにアクセスするにはここで迷っていますか?

答えて

1

両方のアプリケーションに2種類の異なるapplication.idを使用しています。したがって、両方のアプリケーションは完全に分離されています。

インタラクティブクエリは同じアプリの異なるインスタンス用に設計されており、アプリケーション間では動作しません。

このブログの記事は役立つかもしれない:https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/

+1

明確化を:(上記アプリケーション2でのご 'streams.storeのような(...)'コール)「にアクセスローカル状態」インタラクティブクエリのためのAPIのメソッドを使用してのみこれらのローカル状態ストアを作成/維持する同じアプリケーションで動作します。もちろん、インタラクティブクエリを使用して、さまざまなアプリケーション間でこの状態データにアクセスすることができますが、対話型クエリと組み合わせてRPCレイヤーを使用する必要があります。http://docs.confluent.io/current/streams/developer -guide.html#querying-remote-state-for-the-entire-applicationを使用します。 –

関連する問題