Webサービスのリクエストと応答に関する詳細を含むJSONメッセージをKafkaトピックに送信しています。私はKafkaストリームを使ってKafkaに到着するごとに各メッセージを処理し、その結果をクライアントが接続されているWebSocketに継続的に更新された要約(JSONメッセージ)として送信したいと考えています。kafka-streamsを使用して複数の集約を含む新しいKStreamを作成する
クライアントはJSONを解析し、Webページにさまざまなカウント/サマリーを表示します。
サンプル入力メッセージは、メッセージのストリームとして
{
"reqrespid":"048df165-71c2-429c-9466-365ad057eacd",
"reqDate":"30-Aug-2017",
"dId":"B198693",
"resp_UID":"N",
"resp_errorcode":"T0001",
"resp_errormsg":"Unable to retrieve id details. DB Procedure error",
"timeTaken":11,
"timeTakenStr":"[0 minutes], [0 seconds], [11 milli-seconds]",
"invocation_result":"T"
}
{
"reqrespid":"f449af2d-1f8e-46bd-bfda-1fe0feea7140",
"reqDate":"30-Aug-2017",
"dId":"G335887",
"resp_UID":"Y",
"resp_errorcode":"N/A",
"resp_errormsg":"N/A",
"timeTaken":23,
"timeTakenStr":"[0 minutes], [0 seconds], [23 milli-seconds]",
"invocation_result":"S"
}
{
"reqrespid":"e71b802d-e78b-4dcd-b100-fb5f542ea2e2",
"reqDate":"30-Aug-2017",
"dId":"X205014",
"resp_UID":"Y",
"resp_errorcode":"N/A",
"resp_errormsg":"N/A",
"timeTaken":18,
"timeTakenStr":"[0 minutes], [0 seconds], [18 milli-seconds]",
"invocation_result":"S"
}
はカフカに入ってくるの下に、私はフライ
**
- 総数に計算することができるようにしたいようですすべての要求数
- invocation_resultが 'S'に等しい要求の総数 「S」に等しいinvocation_resultと要求の「Y」
- 総数に等しい「S」「S」に等しいinvocation_resultと要求の
- 総数とUID に等しくないinvocation_resultと要求の
- 総数平均時間、すなわち平均(timeTaken)
**
とreqdate値に新しいキーセットと新しい値以前
{
"total_cnt":3, "num_succ":2, "num_fail":1, "num_succ_data":2,
"num_succ_nodata":0, "num_fail_biz":0, "num_fail_tech":1,
"min_timeTaken":11, "max_timeTaken":23, "avg_timeTaken":17.3
}
示す3つのメッセージを使用して、以下のように計算された値が含まれているJSONメッセージでKStreamにそれらを書き出しますカフカの新しいストリームです。どのように私は、複数のカウントを行い、1つの異なる列の異なるステップのチェーンとしてですか? Apache Flinkまたは方解石は、KTableの私の理解が、あなたがキーしか持たないことを示唆しているように、より適切であると思われます。 30-AUG-2017であり、その後、単一の列値、例えば、となる。 1つのキーと複数のカウント値を持つ結果のテーブル構造が必要です。
すべてのサポートは大変ありがとうございます。
私はそれをまともなショットと元に戻します。 – managbo