flink-streaming

    0

    1答えて

    メッセージをJSON形式で継続的に送信するKAFKAプロデューサからのメッセージを消費する次のコードを記述しました。誰も私はどのように秒あたりのメッセージの数を計算することができます(私のセットアップのスループットになります) public class FlinkStreamingJob { public static void main(String[] args) throws Excep

    3

    1答えて

    IDを持つJSONレコードなどのデータストリームがあります。 同じキーを持つすべてのレコードが同じステートフルタスクによって処理されるようにデータを処理したいとします。 どうすればいいですか?

    1

    2答えて

    私はソケットから読み取り、パターンを検出するフリンクcepコードを持っています。パターン(単語)が「警告」であると言います。アラートが5回以上発生する場合は、アラートを作成する必要があります。しかし、入力ミスマッチエラーが発生しています。 Flinkのバージョンは1.3.0です。前もって感謝します !! package pattern; import org.apache.flink.cep.

    0

    2答えて

    私はKafkaからデータを読み込み、データを処理するためにFlinkストリーミングを使用しています。アプリケーションの開始時にKafkaを使用する前に、DataSet APIを使用してファイルを読み取り、いくつかの基準に基づいてファイルをソートし、そこからリストを作成する必要があります。その後、カフカからストリーミングで消費され始めます。私は、DataSet APIを使ってファイルからデータを読み

    0

    1答えて

    Apache Flinkチェックポイント機能をステートフルな関数で使用する必要がありますか?

    0

    1答えて

    私は8つのキー(keyBy上)があり、スロットの広がりが歪んでいる場合があります。私は3の並列性を持ち、キー操作の普及率は5,2と1であり、5つのキーが1つのスロットに入っています。キー操作の分散が均衡することを保証する方法はありますか? (キーストリームでパーティション操作を行っていません)

    1

    1答えて

    私はデータストリームとしてカフカのトピックを消費しており、FlatMapFunctionを使用してデータを処理しています。この処理は、ストリームから得られるインスタンスを豊富にすることにより、データベースから取得したデータを他のデータベースで取得して出力を収集するが、それが最良のアプローチではないと感じるほどのデータで構成されます。 ドキュメントを読む私はデータベースクエリからDataSetを作成

    0

    1答えて

    私は、分秒のタイムスタンプでキー入力された60秒のデータを最大30秒の遅延で集計しようとしています。 ​​ 私はデータを受け取りました。透かしとタイムスタンプが設定されています。集計されたデータはohlcStreamAggregatedに送信されないため、ログに記録されません。 public TimestampExtractor(Time maxDelayInterval) { if

    1

    1答えて

    Flinkストリーミングジョブからチェックポイントをトリガすることは可能ですか? 私の使用例は次のとおりです。私は、2つのストリームRとSをタンブリング時間ウィンドウに結合する必要があります。ソースはカフカです。イベント時間処理とBoundedOutOfOrdernessGeneratorを使用して、2つのストリームからのイベントが同じウィンドウで終了するようにします。 私の状態は大きく、定期的な

    1

    1答えて

    データストリームをpostgresテーブルにシンクするストリーミングジョブをコーディングしようとしています。完全な情報を得るために、私はJDBCOutputFormatを使うことを提案している記事、つまりhttps://tech.signavio.com/2017/postgres-flink-sinkに基づいて作業しました。 私のコードは次のようになります。 98 ... 99 Strin