2017-12-15 31 views
1

私はAというカフカのトピックを持っています。トピック内のデータのカフカから最新の値を取得

形式は次のとおりです。今

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000} 
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000} 
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000} 
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000} 

消費者側では、私は一時間ウィンドウの最新のデータのみを取得したいが、私は上のベース話題から最新の値を取得する必要が1時間ごとに意味しますcreated_at

私の予想される出力は次のようになります。

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000} 
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000} 

私はこれがksqlが、イムわからないことによって解決することができると思います。私を助けてください。

ありがとうございます。

+0

あなたのキーは何ですか? –

+0

キーがメッセージ1、メッセージ2などであると考えてください。上記の値は – shakeel

+1

Coolです。 Kafka Streamsはキーに基づいてすべてを集約/グループ化するため、キーを念頭に置いておく必要があります。 –

答えて

3

はい、これにはKSQLを使用できます。次のことを試してみてください。

CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAT) WITH (kafka_topic = 'topic_name', value_format = 'JSON');

CREATE TABLE maxRow AS SELECT id, name, max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) AS creted_at FROM s1 WINDOW TUMBLING (size 1 hour) GROUP BY id, name;

結果は、Linuxのタイムスタンプ形式でcreated_at時間を持つことになります。新しいクエリでTIMESTAMPTOSTRING udfを使用して、目的の形式に変更できます。 問題が見つかった場合はお知らせください。

+0

あなたの回答をありがとう、私は10分に1時間のウィンドウを減らすこともできます、それは任意のパフォーマンスの問題に行くのですか? – shakeel

+0

もちろん、 '(size 10 minutes)'を使うことができます。重大なパフォーマンス上の問題はありません。 – Hojjat

+0

あなたの回答をありがとう、もう1つの質問は、メモリまたはディスクにデータを格納するksqlですか? – shakeel

関連する問題