2017-04-07 12 views
0

Sparkストリーミングアプリケーションで。 2つのカフカのトピックから2つのDStreamを作成しています。 2つのDStreamを別々に処理する必要があるので、私はそうしています。コード例は次のとおりです。Spark StreamingのKafkaトピックから2つのDStreamを作成できない

object KafkaConsumerTest3 { 
    var sc:SparkContext = null 
    def main(args: Array[String]) { 



    Logger.getLogger("org").setLevel(Level.OFF); 
    Logger.getLogger("akka").setLevel(Level.OFF); 

    val Array(zkQuorum, group, topics1, topics2, numThreads) = Array("localhost:2181", "group3", "test_topic4", "test_topic5","5") 
    val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]") 
    sc = new SparkContext(sparkConf) 
    val ssc = new StreamingContext(sc, Seconds(2)) 


    val topicMap1 = topics1.split(",").map((_, numThreads.toInt)).toMap 
    val topicMap2 = topics2.split(",").map((_, numThreads.toInt)).toMap 

    val lines2 = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap2).map(_._2) 
    val lines1 = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap1).map(_._2) 

    lines2.foreachRDD{rdd => 
     rdd.foreach { println }} 

    lines1.foreachRDD{rdd => 
     rdd.foreach { println }} 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

トピックにはデータがある場合とない場合があります。私の場合、最初のトピックは現在データを取得していませんが、2番目のトピックは取得しています。しかし、私のスパークアプリケーションはデータを印刷していません。例外もありません。 紛失しているものはありますか?またはこの問題を解決するにはどうすればよいですか?

+0

1つのストリームで機能しますか? – Natalia

+0

はい。1つのストリームで動作します。 – Alok

+0

@ Alok:foreachRDDメソッド内でrdd.countを印刷できますか? – Shankar

答えて

0

上記のコードで問題が見つかりました。問題は、私たちがローカルの[2]としてマスタを使用していて、2つのレシーバを登録しているということです。スレッドの数を増やして問題を解決します。

関連する問題