2016-10-18 4 views
4

Apache Sparkの代替/補完として、ストリーム処理のためにApache Flinkを評価しています。 Sparkで通常解決している課題の1つは、データの豊富化です。Flink状態を使用して結合を実行できますか?

私は、センサIDを持つIoTセンサからのデータストリームを持っており、センサメタデータのセットを持っています。入力ストリームをセンサメジャー+センサメタデータのストリームに変換したい。

Sparkでは、RStreamでDStreamに参加できます。

case calss SensorValue(sensorId: Long, ...) 
case class SensorMetadata(sensorId: Long, ...) 
val sensorInput: DStream[SensorValue] = readEventsFromKafka() 
val staticMetadata: RDD[(Long, SensorMetadata)] = 
    spark.read.json(...).as[SensorMetadata] 
.map {s => (s.sensorId, s)}.rdd 
val joined: DStream[(SensorValue, SensorMetadata)] = 
    sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] => 
    rdd.join(staticMetadata) 
    .map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple 
} 

Apache Flinkで同じトリックを実行できますか?私はこれに直接APIを見ません。私が持っている唯一のアイデアは、ステートフルな変換を使用することである - 私は(擬似コード)単一のストリームにメタデータとセンサーのイベントをマージし、メタデータを格納するためにFLINK状態ストレージを使用することができます。

val sensorInput: DataStream[SensorValue] = readEventsFromKafka() 
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson() 
val result: DataStream[(SensorValue, SensorMetadata)] = 
    sensorInput.keyBy("sensorId") 
.connect(staticMetadata.keyBy("sensorId")) 
.flatMap {new RichCoFlatMapFunction() { 
    private val ValueState<SensorMetadata> md = _; 
    override def open = ??? // initiate value state 
    def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) = 
     collector.collect(s, md.value) 
    def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) = 
    md.update(s) 
}} 

はこの正しいアプローチですか?メタデータが1台のマシンに収まらない場合、より大きなスケールで使用できますか?参加するにCoFlatMapFunctionを使用し

おかげ

答えて

3

は、一般的なアプローチです。しかしながら、それは1つの重大な欠点を有する。この関数は、どちらかの入力のタプルが到着し、最初に消費する入力を制御できないときに呼び出されます。したがって、最初は、メタデータが完全に読み込まれていないときにセンサイベントを処理する必要があります。 1つのアプローチは、一方の入力のすべてのイベントを他方の入力が消費されるまでバッファリングすることです。一方、CoFlatMapFunctionアプローチでは、メタデータを動的に更新できるという利点があります。コード例では、両方の入力が結合キーにキー入力されています。これは、入力が分割され、各タスク・スロットが異なるキー・セットを処理していることを意味します。したがって、あなたのメタデータは、マシンが処理できるものよりも大きくなる可能性があります(RocksDB状態のバックエンドを構成すると、ディスクの状態を永続化できるため、メモリのサイズにも縛られません)。

ジョブの開始時にすべてのメタデータが存在する必要があり、メタデータが静的(変更されない)で1台のマシンに収まるように十分小さい場合は、通常のFlatMapFunctionを使用してメタデータファイルからopen()メソッドの場合あなたのアプローチとは対照的に、これはブロードキャスト結合であり、各タスク・スロットはメモリ内に完全なメタデータを持っています。イベントデータが消費されたときにすべてのメタデータを利用できるほか、どのマシンでも結合できるため、イベントデータをシャッフルする必要はありません。

+0

あなたの答えをありがとう。 私には不明な点が残っています - どうすれば入力をバッファできますか?検索するキーワードを教えてください。 –

+1

対応するメタデータ・イベントが到着するまで、イベント・レコードを追加する2番目のキー状態のオブジェクトを作成できます。これが起こると、すべてのイベントに参加して放出し、イベント状態をクリアして、メタデータイベントをメタデータ状態に挿入します。 –

関連する問題