2016-07-25 10 views
0

1つの縮小タスクが非常に長く実行されている操作でグループを実行しています。下図の減速を4時間実行されている一つのキーのためのデータのスキューすなわち、あまりにも多くの値が存在するので、サンプル・コード・スニペットと問題の説明、豚のグループ化中に歪んだデータを扱う方法

inp =load 'input' using PigStorage('|') AS(f1,f2,f3,f4,f5); 

grp_inp = GROUP inp BY (f1,f2) parallel 300; 

あります。すべての削減タスクは1分ほどで完了します。

この問題を解決するにはどうすればよいですか?どんな助けでも大歓迎です。ありがとう!

答えて

0

あなたはいくつかのことをチェックする必要があります: -

1>(もしあれば)NULLとして両方f1とf2の値を持つレコードをフィルタ

2>は、代数インタフェースを実装することにより、Hadoopのコンバイナを使用してみてください可能な場合: -

https://www.safaribooksonline.com/library/view/programming-pig/9781449317881/ch10s02.html

3>減速機を介してデータを配布するための別のキーを使用するには、カスタムパーティショナを使用します。

ここで私は後に参加する私の歪んだデータを分割するために使用されるサンプル・コード(同じでも、グループの後に使用することができます)です: -

import java.util.logging.Level; 
import java.util.logging.Logger; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.impl.io.NullableTuple; 
import org.apache.pig.impl.io.PigNullableWritable; 

public class KeyPartitioner extends Partitioner<PigNullableWritable, Writable> { 

/** 
* Here key contains value of current key used for partitioning and Writable 
* value conatins all fields from your tuple. I used my 5th field from tuple to do partitioning as I knew it has evenly distributed value. 
**/ 
@Override 
public int getPartition(PigNullableWritable key, Writable value, int numPartitions) { 
    Tuple valueTuple = (Tuple) ((NullableTuple) value).getValueAsPigType(); 
    try { 
     if (valueTuple.size() > 5) { 
      Object hashObj = valueTuple.get(5); 
      Integer keyHash = Integer.parseInt(hashObj.toString()); 
      int partitionNo = Math.abs(keyHash) % numPartitions; 
      return partitionNo; 
     } else { 
      if (valueTuple.size() > 0) { 
       return (Math.abs(valueTuple.get(1).hashCode())) % numPartitions; 
      } 
     } 
    } catch (NumberFormatException | ExecException ex) { 
     Logger.getLogger(KeyPartitioner.class.getName()).log(Level.SEVERE, null, ex); 
    } 
    return (Math.abs(key.hashCode())) % numPartitions; 
} 
} 
+0

おかげSorabhは、F1とF2のデータにはヌルはありません。私はコンバイナが実行されるようにコードを修正しましたが、パフォーマンスは向上しませんでした。他の解決策はありますか? –

+0

私はよく分かりませんが、pig.exec.reducers.bytes.per.reducerを非常に低い値に設定してみてください。豚はこの価値を無視するかもしれないが、それを試す価値がある。あなたがそれぞれの縮小ファイルサイズを制御しているときは、並列を使用する方が良いでしょう。 – sorabh

+0

実際にもう一つ試してみることができます: - パーティショナーを使用して、異なるキーを使用してデータを分割するレデューサーを許可する。 PIGクエリは次のようになります。 - GROUP inp BY(f1、f2)PARTITION BY com.util.KeyPartitioner PARALLEL 300; keyparterを実装するためにhadoop partioner docを確認してください。アイデアはあなたのキーの分散値を与えることができると思う別の列(または列の組み合わせ)を選択することです – sorabh

関連する問題