私はKafkaからストリームするSparkコンシューマを持っています。 私は、正確に一度のセマンティクスのためのオフセットを管理しようとしています。しかしRDDからKafkaOffsetにアクセス中の例外
、それは次の例外スローオフセットへのアクセス中:
「とjava.lang.ClassCastException:org.apache.spark.rdd.MapPartitionsRDDを がorg.apache.spark.streamingにキャストすることはできません.kafka.HasOffsetRanges」
これを実行するコードの一部は以下の通りである:ここでは
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })
dataStrea誰かが私が私がここで間違ってやっているかを理解することができた場合
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)
:mはダイレクト・ストリームである(DSTREAM [文字列])のようなKafkaUtilsのAPIのものを使用して作成しました。同様
おかげで公式ドキュメントで述べたように 変換データストリーム上で実行する方法のチェーンの最初の方法があります。
アーッ:
transform
後のあなたはmap
に必要!愚かな私。ありがとうございます。 乾杯! :) – taransaini43
@ user1521672あなたは大歓迎です。 –
また、オフセットを使用して直接ストリームを作成しようとすると、エラーが発生します。
ヴァルのfromOffsets:(TopicAndPartition、ロング)= TopicAndPartition(metrics_rs.getString(1)、metrics_rs.getInt(2)) - > metrics_rs.getLong(3)
KafkaUtils.createDirectStream [文字列、文字列、StringDecoder、StringDecoder、(文字列、文字列)](SSC、kafkaParams、fromOffsets、MessageHandlerの)
、 ヴァルMessageHandlerの= (MMD:MessageAndMetadata [文字列、文字列])=> mmd.message.length とmetrics_rsそこから結果セットであります私はオフセットマップを取得しています。それはあまりにも多くの型引数のエラー – taransaini43