2016-11-28 13 views
1

私はKafkaからデータを読み取る必要があるスパークアプリケーションに取り組んでいます。私はプロデューサーがメッセージを投稿していたKafkaトピックを作成しました。コンソールのコンシューマから、メッセージが正常に送信されたことを確認しました。Spark Streamingアプリケーションでカフカのレコード数がカウントされないのはなぜですか?

私はKafkaからデータを読み込むための短いスパークアプリケーションを作成しましたが、データを取得していません。この問題を解決する方法について

def main(args: Array[String]): Unit = { 
    val Array(zkQuorum, group, topics, numThreads) = args 
    val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]") 
    val ssc = new StreamingContext(sparkConf, Seconds(2)) 

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 

    process(lines) // prints the number of records in Kafka topic 

    ssc.start() 
    ssc.awaitTermination() 
} 

private def process(lines: DStream[String]) { 
    val z = lines.count() 
    println("count of lines is "+z) 
    //edit 
    lines.foreachRDD(rdd => rdd.map(println) 
    // <-- Why does this **not** print? 
) 

任意の提案: 後、私が使用したコードはありますか?

****** EDITは****

私も実際のコードで

lines.foreachRDD(rdd => rdd.map(println) 

を使用していたが、それも機能していません。私はポスト:Kafka spark directStream can not get dataで述べたように保存期間を設定しました。しかし、まだ問題が存在します。あなたのprocess

答えて

1

すべてのバッチ間隔を実行パイプラインを取得していない出力オペレータとDStreamパイプラインの続きです。

各RDDはそれぞれをカウントすることによって生成された単一の要素を持っている新しいDSTREAMを返します:count's scaladocは引用

count(): DStream[Long] 

あなたはcount operatorの署名を読むことによって、それを「見る」ことができますこのDStreamのRDD。

したがって、単一の値のdstream(countの結果)に変換するカフカレコードのdstreamがあります。コンソールや他のシンクに出力することはあまりありません。あなたは公式ドキュメントOutput Operations on DStreamsで説明したように、出力演算子を使用してパイプラインを終了する必要が

出力操作は、DSTREAMのデータがデータベースやファイル・システムなどの外部システムに押し出されることを可能にします。出力操作では実際に変換されたデータが外部システムによって消費されるため、(RDDのアクションと同様に)すべてのDStream変換の実際の実行がトリガーされます。

(低レベル)の実行を開始することができるように出力オペレーターが出力dstreamsとして入力dstreamsを登録します。 Spark StreamingのDStreamには、出力dstreamという概念はありません。入力と出力のdstreamを知り、区別することができるのはDStreamGraphです。

+0

実際のコードでOutput演算子も使用しました。元の質問を編集して、まだ問題が存在することを示しました。今はどんな提案? – Alok

+0

Hehe、出力なしの演算子を別のものに「交換」しています:)まず 'lines.count()'の代わりに 'lines.count()。print'を使用してください。私はコンソールに10レコードをプリントアウトすることを確信しています。 RDDの場合、 'rdd.foreach(println)'(変換である 'rdd.map(println) 'ではなく)を使用してください。楽しむ! :) –

関連する問題