2016-09-09 10 views
2

私はKafkaからストリームするSparkコンシューマを持っています。 私は、正確に一度のセマンティクスのためのオフセットを管理しようとしています。しかしRDDからKafkaOffsetにアクセス中の例外

、それは次の例外スローオフセットへのアクセス中:

「とjava.lang.ClassCastException:org.apache.spark.rdd.MapPartitionsRDDを がorg.apache.spark.streamingにキャストすることはできません.kafka.HasOffsetRanges」

これを実行するコードの一部は以下の通りである:ここでは

var offsetRanges = Array[OffsetRange]() 
dataStream 
    .transform { 
    rdd => 
     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
     rdd 
    } 
    .foreachRDD(rdd => { }) 

dataStrea誰かが私が私がここで間違ってやっているかを理解することができた場合

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2) 

:mはダイレクト・ストリームである(DSTREAM [文字列])のようなKafkaUtilsのAPIのものを使用して作成しました。同様

おかげで公式ドキュメントで述べたように 変換データストリーム上で実行する方法のチェーンの最初の方法があります。

答えて

6

あなたの問題がある:代わりにKafkaUtils.createKafkaStreamによって作成されたDirectKafkaInputDStreamMapPartitionedDStreamを作成

.map(._2) 

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t)) 

kafkaStream 
    .transform { 
    rdd => 
     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
     rdd 
    } 
    .map(_._2) 
    .foreachRDD(rdd => // stuff) 
+0

アーッ:transform後のあなたはmapに必要

!愚かな私。ありがとうございます。 乾杯! :) – taransaini43

+0

@ user1521672あなたは大歓迎です。 –

+0

また、オフセットを使用して直接ストリームを作成しようとすると、エラーが発生します。
ヴァルのfromOffsets:(TopicAndPartition、ロング)= TopicAndPartition(metrics_rs.getString(1)、metrics_rs.getInt(2)) - > metrics_rs.getLong(3)
KafkaUtils.createDirectStream [文字列、文字列、StringDecoder、StringDecoder、(文字列、文字列)](SSC、kafkaParams、fromOffsets、MessageHandlerの)
、 ヴァルMessageHandlerの= (MMD:MessageAndMetadata [文字列、文字列])=> mmd.message.length とmetrics_rsそこから結果セットであります私はオフセットマップを取得しています。それはあまりにも多くの型引数のエラー – taransaini43

関連する問題