ドッカーの単一インスタンス環境で別のアプリケーションからKTable
にアクセスする方法をテストする2つのJavaアプリケーション(App1、App2)があります。StateStoreとは別のアプリケーションからKTableにアクセスできない
最初のApp(App1)は、次のコードでKTable
に書き込みます。
public static void main(String[] args)
{
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"gateway-service");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ServiceTransactionSerde.class);
KStreamBuilder builder = new KStreamBuilder();
KStream<String,ServiceTransaction> source = builder.stream("gateway_request_processed");
KStream<String, Long> countByApi = source.groupBy((key,value)-> value.getApiId().toString()).count("Counts").toStream();
countByApi.to(Serdes.String(), Serdes.Long(),"countByApi");
countByApi.print();
final KafkaStreams streams = new KafkaStreams(builder,props);
streams.start();
System.out.println(streams.state());
System.out.println(streams.allMetadata());
System.out.println(streams.allMetadataForStore("countByApi"));
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
System.out.println(streams.allMetadata());
streams.close();
}
}));
}
私は私のプロデューサーを実行すると、私はこれは私の状態= RUNNINGを示しApp1の
RUNNING
[]
[]
[KTABLE-TOSTREAM-0000000006]: c00af5ee-3c2d-4d12-9c4b-3b55c1284dd6, 19
でコードの出力を次しまいました。ストアのメタデータも空です。しかし、要求は処理され、KTableに正常に格納されます(String、Long)。
私が実行したときkafka-topics.sh --list --zookeeper:2181
次のトピックがあります。
bash-4.3# kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
countByApi
gateway-Counts-changelog
gateway-Counts-repartition
gateway-service-Counts-changelog
gateway-service-Counts-repartition
gateway_request_processed
これは、KTableが何らかの形で新しいトピックで保持されていることを示しています。
次に、このKTable
に状態ストア(ReadOnlyKeyValueStore
)としてアクセスしてアクセスしようとする次のコードを持つコマンドラインアプリケーション(App2)があります。
RUNNING
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, countByApi, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
at com.comp.streamtable.App.main(App.java:37)
残念ながら、私は1つだけのインスタンスを持っていないと私は状態が等しい「RUNNING」であることを確認します。私は2アプリケーションを実行すると
public static void main(String[] args)
{
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "gateway-service-table-client");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");
KStreamBuilder builder = new KStreamBuilder();
KafkaStreams streams = new KafkaStreams(builder,props);
streams.cleanUp();
streams.start();
System.out.println("Hello World!");
System.out.println(streams.state());
ReadOnlyKeyValueStore<String,Long> keyValueStore =
streams.store("countByApi", QueryableStoreTypes.keyValueStore());
final KeyValueIterator<String,Long> range = keyValueStore.all();
while(range.hasNext()){
KeyValue<String,Long> next = range.next();
System.out.println(String.format("key: %s | value: %s", next.key,next.value));
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
System.out.println(streams.allMetadata());
streams.close();
}
}));
}
私はエラーメッセージを得ますか。
注:これは別の例外として、アプリケーションごとに異なるapplication.idを選択する必要がありました。これが興味深いかもしれないので、これを指摘したかっただけです。
他のアプリから私のKTableにアクセスするにはここで迷っていますか?
明確化を:(上記アプリケーション2でのご 'streams.storeのような(...)'コール)「にアクセスローカル状態」インタラクティブクエリのためのAPIのメソッドを使用してのみこれらのローカル状態ストアを作成/維持する同じアプリケーションで動作します。もちろん、インタラクティブクエリを使用して、さまざまなアプリケーション間でこの状態データにアクセスすることができますが、対話型クエリと組み合わせてRPCレイヤーを使用する必要があります。http://docs.confluent.io/current/streams/developer -guide.html#querying-remote-state-for-the-entire-applicationを使用します。 –