2016-10-07 2 views
1

reduceByKeyが機能しないようなコードスニペットがあります。reduceByKeyがスパークストリーミングで動作しない

val myKafkaMessageStream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topicsSet, kafkaParams) 
) 

myKafkaMessageStream 
    .foreachRDD { rdd => 
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    val myIter = rdd.mapPartitionsWithIndex { (i, iter) => 
     val offset = offsetRanges(i) 
     iter.map(item => { 
     (offset.fromOffset, offset.untilOffset, offset.topic, offset.partition, item) 
     }) 
    } 

    val myRDD = myIter.filter((<filter_condition>)).map(row => { 
     //Process row 

     ((field1, field2, field3) , (field4, field5)) 
    }) 

    val result = myRDD.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) 

    result.foreachPartition { partitionOfRecords => 
     //I don't get the reduced result here 
     val connection = createNewConnection() 
     partitionOfRecords.foreach(record => connection.send(record)) 
     connection.close() 
    }   
    } 

私に何かが不足していますか?

+1

ここには詳細がほとんどありません。これをコアストリームの例(定数ストリームまたはキューストリーム)に減らすことはできますか?それはどういう意味ですか?うまくいかないのですか?例外をスローしますか?レコードをグループ化しませんか?前者の場合は、レコードを比較することができます(有用なハッシュ/平等を提供する)? –

+0

あなたはfromOffset、untilOffsetとtopicを使って減らすために適切なキーを作成するのはなぜでしょうか? –

+1

@LostInOverflow明快さが欠けて申し訳ありません。 myRDD.foreach(println)を実行すると、内容が表示されます。しかし、私がresult.foreach(println)を実行すると、内容は表示されません。エラーはありません。空の結果が返されます。 – santhosh

答えて

2

ストリーミングの状況では、あなたが探しているものを、特定の時間枠に渡って使用するのは、reduceByKeyAndWindowを使用する方が理にかなっています。 (K、V)対のDSTREAMに呼び出された場合

// Reduce last 30 seconds of data, every 10 seconds 
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) 

」は、各キーの値はでバッチにわたって所与減らす関数funcを使用して集計され(K、V)対の新しいDSTREAMを返します注意:デフォルトでは、Sparkのデフォルトの並列タスク数(ローカルモードの場合は2、クラスタモードの場合は設定プロパティspark.default.parallelismによって数値が決定されます)を使用して、グループ化を行います。異なる数のタスクを設定するnumTasks引数。

http://spark.apache.org/docs/latest/streaming-programming-guide.html