2017-07-10 5 views
0
val sparkConf = new SparkConf().setMaster("yarn-cluster") 
           .setAppName("SparkJob") 
           .set("spark.executor.memory","2G") 
           .set("spark.dynamicAllocation.executorIdleTimeout","5") 


val streamingContext = new StreamingContext(sparkConf, Minutes(1)) 

var historyRdd: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD 

var historyRdd_2: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD 


val stream_1 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_1)) 
val dstream_2 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_2)) 


val dstream_2 = stream_2.map((r: Tuple2[String, GenericData.Record]) => 
{ 
    //some mapping 
} 

val historyDStream = dstream_1.transform(rdd => rdd.union(historyRdd)) 
dstream_2.foreachRDD(r => r.repartition(500)) 
val historyDStream_2 = dstream_2.transform(rdd => rdd.union(historyRdd_2)) 
val fullJoinResult = historyDStream.fullOuterJoin(historyDStream_2) 

val filtered = fullJoinResult.filter(r => r._2._1.isEmpty) 


filtered.foreachRDD{rdd => 

    val formatted = rdd.map(r => (r._1 , r._2._2.get)) 

    historyRdd_2.unpersist(false) // unpersist the 'old' history RDD 
    historyRdd_2 = formatted // assign the new history 
    historyRdd_2.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation 
} 


val filteredStream = fullJoinResult.filter(r => r._2._2.isEmpty) 


filteredStream.foreachRDD{rdd => 
    val formatted = rdd.map(r => (r._1 , r._2._1.get)) 
    historyRdd.unpersist(false) // unpersist the 'old' history RDD 
    historyRdd = formatted // assign the new history 
    historyRdd.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation 
} 
streamingContext.start() 
streamingContext.awaitTermination() 
} 
} 

を変更すると、128個のパーティションを持っているdstream_2が、私はパーティションを結合やっているとき、3つのパーティションに減少してきています。私が知っているように、結合はパーティションごとに行われます。つまり、パーティション1は別のRddのパーティション1に参加します。フィルタリングされたすべてのRDDには3つのパーティションがあり、そのためhistoryRDDとHistoryRDD2には3つのパーティションがあります。はなぜ2 DStreamsを接合した後、パーティションの数は私のstream_1ここ

答えて

0

スパーク内のパーティションは、実行する操作によって異なります。逆に、join()のようないくつかの操作は、RDD1とRDD2の両方からのパーティション数を合計します。 RDD1に2つのパーティションがあり、RDD2に3つのパーティションがあると仮定した場合、join()の結果は5になります。

+0

次に、パーティションが減少している理由をhistoryDStream_1(128パーティション)とhistoryDStream_2(128パーティション)あなたのロジックは256であるべきですが、それは3です。 – JSR29

関連する問題