1
reduceByKeyが機能しないようなコードスニペットがあります。reduceByKeyがスパークストリーミングで動作しない
val myKafkaMessageStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
myKafkaMessageStream
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val myIter = rdd.mapPartitionsWithIndex { (i, iter) =>
val offset = offsetRanges(i)
iter.map(item => {
(offset.fromOffset, offset.untilOffset, offset.topic, offset.partition, item)
})
}
val myRDD = myIter.filter((<filter_condition>)).map(row => {
//Process row
((field1, field2, field3) , (field4, field5))
})
val result = myRDD.reduceByKey((a,b) => (a._1+b._1, a._2+b._2))
result.foreachPartition { partitionOfRecords =>
//I don't get the reduced result here
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
私に何かが不足していますか?
ここには詳細がほとんどありません。これをコアストリームの例(定数ストリームまたはキューストリーム)に減らすことはできますか?それはどういう意味ですか?うまくいかないのですか?例外をスローしますか?レコードをグループ化しませんか?前者の場合は、レコードを比較することができます(有用なハッシュ/平等を提供する)? –
あなたはfromOffset、untilOffsetとtopicを使って減らすために適切なキーを作成するのはなぜでしょうか? –
@LostInOverflow明快さが欠けて申し訳ありません。 myRDD.foreach(println)を実行すると、内容が表示されます。しかし、私がresult.foreach(println)を実行すると、内容は表示されません。エラーはありません。空の結果が返されます。 – santhosh