2016-10-07 8 views
0

私はKafka DirectStreamを消費し、各パーティションのRDDを処理し、処理された値をDBに書き込もうとしています。 reduceByKeyを実行しようとすると(シャッフルなしでパーティションごとに)、次のエラーが発生します。通常、ドライバノードでは、sc.parallelize(Iterator)を使用してこの問題を解決できます。しかし、私はスパークストリーミングでそれを解決したいと思います。スパークストリーミング - イテレータのパーティション内でreduceByKeyを使用する方法

value reduceByKey is not a member of Iterator[((String, String), (Int, Int))] 

パーティション内のイテレータで変換を実行する方法はありますか?

myKafkaDS 
    .foreachRDD { rdd => 
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    val commonIter = rdd.mapPartitionsWithIndex ((i,iter) => { 

     val offset = offsetRanges(i) 

     val records = iter.filter(item => { 
     (some_filter_condition) 
     }).map(r1 => { 
     // Some processing 
     ((field2, field2), (field3, field4)) 
     }) 

     val records.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // Getting reduceByKey() is not a member of Iterator 
     // Code to write to DB  
     Iterator.empty // I just want to store the processed records in DB. So returning empty iterator 
    }) 
} 

(パーティションごとにプロセスカフカRDDSとDBに保存)これを行うにはよりエレガントな方法はありますか?

+0

保存しようとしているデータベースはありますか?多くのデータベースで使用可能なspark dbコネクタAPIがあり、RDDをデータベースに簡単に保存することができます。 – Shankar

+0

@Shankar私の懸念はDBに格納されていません。しかし、同じカフカオフセットに属するRDDを処理し、オフセットとデータの両方を保存して、処理されたオフセットを追跡することができます。 – santhosh

答えて

0

あなたのレコードの値はイテレータであり、RDDではありません。したがって、レコード関係ではreduceByKeyを呼び出すことはできません。

0

構文の問題:

1)reduceByKeyロジックはOKに見えますが、文の前のvalを削除してください(タイプミスではない場合)&は(reduceByKeyを添付)マップの後:

.map(r1 => { 
    // Some processing 
    ((field2, field2), (field3, field4)) 
    }).reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) 

2)の後にiter.nextを追加各反復の終わり。

3)iter.emptyが間違って配置されています。 mapPartitionsWithIndexから出てきた後に置く()

4)安全のために、イテレータの条件を追加します。

val commonIter = rdd.mapPartitionsWithIndex ((i,iter) => if (i == 0 && iter.hasNext){ 
.... 
}else iter),true) 
0

そう...私たちはmapPartitionsWithIndex内火花変換を使用することはできません。しかし、スカラー変換を使用してgroupbyのようなメソッドを減らすと、私はこの問題を解決するのに役立ちました。

関連する問題