2017-03-06 4 views
0

私はマスターと2つのエグゼキュータを持つスパークスタンドアロンクラスタを持っています。私はRDD[LevelOneOutput]を持っており、以下のグループにspark groupBy操作が199/200でハングアップ

class LevelOneOutput extends Serializable { 

    @BeanProperty 
    var userId: String = _ 

    @BeanProperty 
    var tenantId: String = _ 

    @BeanProperty 
    var rowCreatedMonth: Int = _ 

    @BeanProperty 
    var rowCreatedYear: Int = _ 

    @BeanProperty 
    var listType1: ArrayBuffer[TypeOne] = _ 

    @BeanProperty 
    var listType2: ArrayBuffer[TypeTwo] = _ 

    @BeanProperty 
    var listType3: ArrayBuffer[TypeThree] = _ 

    ... 
    ... 

    @BeanProperty 
    var listType18: ArrayBuffer[TypeEighteen] = _ 

    @BeanProperty 
    var groupbyKey: String = _ 
} 

は、今私が欲しいLevelOneOutputクラスのuserId、tenantId、rowCreatedMonth、rowCreatedYearに基づいて、このRDDです。そのために私はこの

val levelOneRDD = inputRDD.map(row => { 
    row.setGroupbyKey(s"${row.getTenantId}_${row.getRowCreatedYear}_${row.getRowCreatedMonth}_${row.getUserId}") 
    row 
}) 

val groupedRDD = levelOneRDD.groupBy(row => row.getGroupbyKey) 

これはIterable[LevelOneOutput]

としてStringと値として私のキーでデータを与えなかった今、私はそのグループキーのLevelOneOutputの1つのオブジェクトを生成します。これは、入力のサイズが小さいために正常に機能しているが、私は、入力データの大きなセット上で実行しようとすると、操作によってグループが/ 199でハングなっている

val rdd = groupedRDD.map(row => { 
    val levelOneOutput = new LevelOneOutput 
    val groupKey = row._1.split("_") 

    levelOneOutput.setTenantId(groupKey(0)) 
    levelOneOutput.setRowCreatedYear(groupKey(1).toInt) 
    levelOneOutput.setRowCreatedMonth(groupKey(2).toInt) 
    levelOneOutput.setUserId(groupKey(3)) 

    var listType1 = new ArrayBuffer[TypeOne] 
    var listType2 = new ArrayBuffer[TypeTwo] 
    var listType3 = new ArrayBuffer[TypeThree] 
    ... 
    ... 
    var listType18 = new ArrayBuffer[TypeEighteen] 

    row._2.foreach(data => { 
    if (data.getListType1 != null) listType1 = listType1 ++ data.getListType1 
    if (data.getListType2 != null) listType2 = listType2 ++ data.getListType2 
    if (data.getListType3 != null) listType3 = listType3 ++ data.getListType3 
    ... 
    ... 
    if (data.getListType18 != null) listType18 = listType18 ++ data.getListType18 
    }) 

    if (listType1.isEmpty) levelOneOutput.setListType1(null) else levelOneOutput.setListType1(listType1) 
    if (listType2.isEmpty) levelOneOutput.setListType2(null) else levelOneOutput.setListType2(listType2) 
    if (listType3.isEmpty) levelOneOutput.setListType3(null) else levelOneOutput.setListType3(listType3) 
    ... 
    ... 
    if (listType18.isEmpty) levelOneOutput.setListType18(null) else levelOneOutput.setListType18(listType18) 

    levelOneOutput 
}) 

:そのために私は以下のようなものをやっていました200及びI標準出力/標準エラー出力に設定された特定のエラーや警告が表示されない

仕事は私がペアRDDなどを作成している...さらに

答えて

0

代わりのgroupBy操作を使用して進行していない理由をいくつかのいずれかが私を指すことができます以下

val levelOnePairedRDD = inputRDD.map(row => { 
    row.setGroupbyKey(s"${row.getTenantId}_${row.getRowCreatedYear}_${row.getRowCreatedMonth}_${row.getUserId}") 
    (row.getGroupByKey, row) 
}) 

と私の問題を解決した処理ロジックを更新しました。

関連する問題