apache-flink

    0

    1答えて

    私は2つのrawストリームを持っています。これらのストリームに参加しています。そして、参加したイベントの総数と、ない。 joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() { @Override public Object map(JoinedEvent joinedEvent) t

    0

    1答えて

    FlinkのDataStream APIを使用してMisraGriesアルゴリズムを実装しています。 kカウンタに増分または減分のデータサマリを記録させます。 DataStream APIを使用してアルゴリズムを実装する場合、このようなカウンタを格納する最良の方法は何ですか?今私はオペレータの変数HashMapを宣言しました。これは正しいアプローチですか、あるいは私は州のようないくつかの他の機能を

    0

    1答えて

    私はFlinkコミュニティには初めてのので、ストリーミングデータのためのFlinkのパフォーマンスをキャプチャするための実験的な調査をしようとしています。 これは数時間かけてジョブを実行する統計を収集しようとしています。しかし、FlinkのUIを使用すると、過去5分間の統計しか見ることができません。 私はRest APIをヒットしようとしましたが、読み書きされたバイト以外の統計データは含まれていま

    0

    1答えて

    からAzure IoT Hubでメッセージを受信し、異なるイベントハブまたはサービスバスのトピックにルーティングしたいと考えています。 IoTハブのルートとエンドポイントは、データがバイナリデータ(protobuf)であり、10種類のエンドポイントしか存在しないため(オプションが必要です)、オプションはありません。ペイロードに基づいて異なるエンドポイントへのルーティングメッセージ を分割 は(多分

    0

    1答えて

    env.readCsvFile(location).pojoType(dynClass, arr);で指定された動的リターンタイプを生成するカスタムCSVリーダを作成します。dynClassはByteBuddyで作成され、arrは配列です列名の org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch:

    0

    1答えて

    ガイ私はいくつかの共通鍵(我々はいくつかのRESTサービスの呼び出しで複数のSQLテーブルについて話している)によって、いくつかのRESTサービスのJSONといくつかの巨大なSQLのテーブルを結合することになっています。事は、このデータは、リアルタイム/無限ストリームとも私は結合列でRESTサービスの出力を注文することができないと思いますではないです。今、愚かな方法は、すべてのデータを持参して、行

    1

    1答えて

    Apache Flink 1.3.2およびScala 2.10で反復処理を実行する際に、現在の反復インデックスにアクセスできますか?これまでの私の検索に基づいて val initialData: DataSet[(ItemSet[T], Int)] initialData.iterate(maxIterations) { current: DataSet[(ItemSet[T], I

    2

    1答えて

    大きな状態(メモリに収まらない)のカスタム演算子を実装しています。私はこの目的のためにListStateを使用しようとしています。 snapshotStateのhttps://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state 実装(で述べたよう

    0

    1答えて

    私はストリーミング分析シナリオでFlinkを評価しており、従来のETLセットアップの方法を満たす方法については十分な情報がありません今日のシステムです。 非常に一般的なシナリオは、我々がキーしていること、遅いスループットで、メタデータは、我々は、高スループットのデータ・ストリーム上での濃縮のために使用することをストリーム、の行で何か: このFlinkに関する2つの質問を提起します:時間ウィンドウが

    1

    2答えて

    Flinkに弾性プロデューサーを使用したいが、私は認証のためにいくつかの問題がある: 私は弾性検索クラスタの前にNginxを持っており、nginxで基本認証を使用する。 しかし、弾性検索コネクタで、私は(なぜならたInetSocketAddressの)私のURLでベーシック認証を追加することはできませんは あなたは基本的な認証とelasticsearchコネクタを使用するためのアイデアを持っていま