2017-09-01 4 views
1

Webサービスのリクエストと応答に関する詳細を含むJSONメッセージをKafkaトピックに送信しています。私はKafkaストリームを使ってKafkaに到着するごとに各メッセージを処理し、その結果をクライアントが接続されているWebSocketに継続的に更新された要約(JSONメッセージ)として送信したいと考えています。kafka-streamsを使用して複数の集約を含む新しいKStreamを作成する

クライアントはJSONを解析し、Webページにさまざまなカウント/サマリーを表示します。

サンプル入力メッセージは、メッセージのストリームとして

{ 
    "reqrespid":"048df165-71c2-429c-9466-365ad057eacd", 
    "reqDate":"30-Aug-2017", 
    "dId":"B198693", 
    "resp_UID":"N", 
    "resp_errorcode":"T0001", 
    "resp_errormsg":"Unable to retrieve id details. DB Procedure error", 
    "timeTaken":11, 
    "timeTakenStr":"[0 minutes], [0 seconds], [11 milli-seconds]", 
    "invocation_result":"T" 
} 

{ 
    "reqrespid":"f449af2d-1f8e-46bd-bfda-1fe0feea7140", 
    "reqDate":"30-Aug-2017", 
    "dId":"G335887", 
    "resp_UID":"Y", 
    "resp_errorcode":"N/A", 
    "resp_errormsg":"N/A", 
    "timeTaken":23, 
    "timeTakenStr":"[0 minutes], [0 seconds], [23 milli-seconds]", 
    "invocation_result":"S" 
} 

{ 
    "reqrespid":"e71b802d-e78b-4dcd-b100-fb5f542ea2e2", 
    "reqDate":"30-Aug-2017", 
    "dId":"X205014", 
    "resp_UID":"Y", 
    "resp_errorcode":"N/A", 
    "resp_errormsg":"N/A", 
    "timeTaken":18, 
    "timeTakenStr":"[0 minutes], [0 seconds], [18 milli-seconds]", 
    "invocation_result":"S" 
} 

はカフカに入ってくるの下に、私はフライ

**

  • 総数に計算することができるようにしたいようですすべての要求数
  • invocation_resultが 'S'に等しい要求の総数
  • 「S」に等しいinvocation_resultと要求の「Y」
  • 総数に等しい「S」「S」に等しいinvocation_resultと要求の
  • 総数とUID に等しくないinvocation_resultと要求の
  • 総数平均時間、すなわち平均(timeTaken)
を取ら
  • 最大時間すなわちMAX(timeTaken)採取
  • すなわち分(timeTaken) 'Y'
  • 最小時間が取らに等しく、UID

    **

    とreqdate値に新しいキーセットと新しい値以前

    { 
        "total_cnt":3, "num_succ":2, "num_fail":1, "num_succ_data":2, 
        "num_succ_nodata":0, "num_fail_biz":0, "num_fail_tech":1, 
        "min_timeTaken":11, "max_timeTaken":23, "avg_timeTaken":17.3 
    } 
    

    示す3つのメッセージを使用して、以下のように計算された値が含まれているJSONメッセージでKStreamにそれらを書き出しますカフカの新しいストリームです。どのように私は、複数のカウントを行い、1つの異なる列の異なるステップのチェーンとしてですか? Apache Flinkまたは方解石は、KTableの私の理解が、あなたがキーしか持たないことを示唆しているように、より適切であると思われます。 30-AUG-2017であり、その後、単一の列値、例えば、となる。 1つのキーと複数のカウント値を持つ結果のテーブル構造が必要です。

    すべてのサポートは大変ありがとうございます。

  • 答えて

    0

    複雑な集計ステップを実行するだけで、すべてを一度に計算できます。私はちょうど考えをスケッチしています:

    class AggResult { 
        long total_cnt = 0; 
        long num_succ = 0; 
        // and many more 
    } 
    
    stream.groupBy(...).aggregate(
        new Initializer<AggResult>() { 
         public AggResult apply() { 
          return new AggResult(); 
         } 
        }, 
        new Aggregator<KeyType, JSON, AggResult> { 
         AggResult apply(KeyType key, JSON value, AggResult aggregate) { 
          ++aggregate.total_cnt; 
          if (value.get("success").equals("true")) { 
           ++aggregate.num_succ; 
          } 
          // add more conditions to get all the other aggregate results 
          return aggregate; 
         } 
        }, 
        // other parameters omitted for brevity 
    ) 
    .to("result-topic"); 
    
    +0

    私はそれをまともなショットと元に戻します。 – managbo

    関連する問題