2017-05-26 12 views
0
KeyValueStore<String, Long> kvStore=(KeyValueStore<String, Long>) 
Stores.create("InterWindowStore1").withKeys(Serdes.String()) 
       .withValues(Serdes.Long()) 
       .persistent() 
       .build().get();` 

私はあなたのコメントで説明したようあなたは基本的には、ウィンドウの集約を行っている上記のコードに示すようにstatestoreを作成し、kvStore.put(key, value);に挿入しようとするが、それは私にNPEDSLにKeyValueStore状態ストアを使用する方法は?

Caused by: java.lang.NullPointerException 
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:117) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) 
    at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103) 
    at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34) 
    at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86) 
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131) 
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:95) 
+0

これは問題ありません。 'kvStore'が呼び出しの時点でヌルでないか、またはkey/valueがヌルでないことを確かめますか? – Abhishek

+0

はいkvstoreとkeyの両方の値がnullではない –

+0

タイトルに "プロセッサを追加しない"と書かれているように、全体的なシナリオを理解しているかどうかわかりません - "外部"から "put"あなたのカフカストリームアプリケーションの?この用途のために設計されていない店舗です。 (どのバージョンを使用しているのかわからないので、NPEを詳細に追跡できませんでした。スタックトレースに表示されている行は0.10.2ではNPEを投げないようです)。 - だから実際に何を達成しようとしているのですか? –

答えて

1

を投げされています:

KStream stream = ... 
KTable table = stream.groupByKey().aggregate(..., TimeWindow.of(...)); 

KTableストリームには、あなたのウィンドウ集計の更新が含まれている可能性があるため、このストリームを変更する必要があります。このために、あなたはステートフルトランスや価値の変圧器を使用することができます。

StateStoreSupplier myState = State.create("nameOfMyState")....; 

KStream result = table.toStream().transform(..., "nameOfMyState"); 

最後に、あなたが出力トピックにあなたの結果を書き込むことができます。

result.to("output-topic"); 

あなたはtransformに提供あなたのTransformerを得ることができます状態をinit()の指定されたコンテキストを介して受け取り、ウィンドウ出力が生成/更新されるたびにtransform()の範囲内で使用します。

関連する問題