Apache Sparkの代替/補完として、ストリーム処理のためにApache Flinkを評価しています。 Sparkで通常解決している課題の1つは、データの豊富化です。Flink状態を使用して結合を実行できますか?
私は、センサIDを持つIoTセンサからのデータストリームを持っており、センサメタデータのセットを持っています。入力ストリームをセンサメジャー+センサメタデータのストリームに変換したい。
Sparkでは、RStreamでDStreamに参加できます。
case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
spark.read.json(...).as[SensorMetadata]
.map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] =
sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] =>
rdd.join(staticMetadata)
.map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}
Apache Flinkで同じトリックを実行できますか?私はこれに直接APIを見ません。私が持っている唯一のアイデアは、ステートフルな変換を使用することである - 私は(擬似コード)単一のストリームにメタデータとセンサーのイベントをマージし、メタデータを格納するためにFLINK状態ストレージを使用することができます。
val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
sensorInput.keyBy("sensorId")
.connect(staticMetadata.keyBy("sensorId"))
.flatMap {new RichCoFlatMapFunction() {
private val ValueState<SensorMetadata> md = _;
override def open = ??? // initiate value state
def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) =
collector.collect(s, md.value)
def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) =
md.update(s)
}}
はこの正しいアプローチですか?メタデータが1台のマシンに収まらない場合、より大きなスケールで使用できますか?参加するにCoFlatMapFunction
を使用し
おかげ
あなたの答えをありがとう。 私には不明な点が残っています - どうすれば入力をバッファできますか?検索するキーワードを教えてください。 –
対応するメタデータ・イベントが到着するまで、イベント・レコードを追加する2番目のキー状態のオブジェクトを作成できます。これが起こると、すべてのイベントに参加して放出し、イベント状態をクリアして、メタデータイベントをメタデータ状態に挿入します。 –