私はKafka DirectStreamを消費し、各パーティションのRDDを処理し、処理された値をDBに書き込もうとしています。 reduceByKeyを実行しようとすると(シャッフルなしでパーティションごとに)、次のエラーが発生します。通常、ドライバノードでは、sc.parallelize(Iterator)を使用してこの問題を解決できます。しかし、私はスパークストリーミングでそれを解決したいと思います。スパークストリーミング - イテレータのパーティション内でreduceByKeyを使用する方法
value reduceByKey is not a member of Iterator[((String, String), (Int, Int))]
パーティション内のイテレータで変換を実行する方法はありますか?
myKafkaDS
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val commonIter = rdd.mapPartitionsWithIndex ((i,iter) => {
val offset = offsetRanges(i)
val records = iter.filter(item => {
(some_filter_condition)
}).map(r1 => {
// Some processing
((field2, field2), (field3, field4))
})
val records.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // Getting reduceByKey() is not a member of Iterator
// Code to write to DB
Iterator.empty // I just want to store the processed records in DB. So returning empty iterator
})
}
(パーティションごとにプロセスカフカRDDSとDBに保存)これを行うにはよりエレガントな方法はありますか?
保存しようとしているデータベースはありますか?多くのデータベースで使用可能なspark dbコネクタAPIがあり、RDDをデータベースに簡単に保存することができます。 – Shankar
@Shankar私の懸念はDBに格納されていません。しかし、同じカフカオフセットに属するRDDを処理し、オフセットとデータの両方を保存して、処理されたオフセットを追跡することができます。 – santhosh