2011-11-08 4 views
1

アカウントidsがあり、それぞれtimestampusernameでグループ化されています。これらのユーザグループのforeachはすべてのペア(最古のアカウント、他のアカウント)のとしたいです。豚のパズル:単純な豚のスクリプトとして関与する減速器を書き直しますか?

私はそれを行うJavaレデューサーを持っています。単純なブタスクリプトとして書き直すことはできますか?

スキーマ:

{group:(username),A: {(id , create_dt)}

入力:

(batman,{(id1,100), (id2,200), (id3,50)}) 
(lulu ,{(id7,100), (id9,50)}) 

所望の出力:

(batman,{(id3,id1), (id3,id2)}) 
(lulu ,{(id9,id7)}) 

答えて

1

ない、誰もが気にするようだが、ここに行くこと。あなたは、UDFを作成する必要があります。

desired = foreach my_input generate group as n, FIND_PAIRS(A) as pairs_bag; 

をそしてUDF:

public class FindPairs extends EvalFunc<DataBag> { 
@Override 
    public DataBag exec(Tuple input) throws IOException { 
     Long pivotCreatedDate = Long.MAX_VALUE; 
     Long pivot = null; 

     DataBag accountsBag = (DataBag) input.get(0); 
     for (Tuple account : accountsBag){ 
      Long accountId = Long.parseLong(account.get(0).toString()); 
      Long creationDate = Long.parseLong(account.get(4).toString()); 
      if (creationDate < pivotCreatedDate) { 
       // pivot is the one with the minimal creation_dt 
       pivot = accountId; 
       pivotCreatedDate = creationDate; 
      } 
     } 

     DataBag allPairs = BagFactory.getInstance().newDefaultBag(); 
     if (pivot != null){ 
      for (Tuple account : accountsBag){ 
       Long accountId = Long.parseLong(account.get(0).toString()); 
       Long creationDate = Long.parseLong(account.get(4).toString()); 
       if (!accountId.equals(pivot)) { 
        // we don't want any self-pairs 
        Tuple output = TupleFactory.getInstance().newTuple(2); 
        if (pivot < accountId){ 
          output.set(0, pivot.toString()); 
          output.set(1, accountId.toString()); 
        } 
        else { 
        output.set(0, accountId.toString()); 
        output.set(1, pivot.toString()); 
        } 
       allPairs.add(output); 
      } 
     }    
     return allPairs; 
} 

を、あなたは本当のうまくプレーしたい場合は、この追加:

/** 
* Letting pig know that we emit a bag with tuples, each representing a pair of accounts 
*/ 
@Override 
public Schema outputSchema(Schema input) { 
    try{ 
     Schema pairSchema = new Schema(); 
     pairSchema.add(new FieldSchema(null, DataType.BYTEARRAY)); 
     pairSchema.add(new FieldSchema(null, DataType.BYTEARRAY)); 
     return new Schema(
       new FieldSchema(null, 
       new Schema(pairSchema), DataType.BAG));   
    }catch (Exception e){ 
      return null; 
    } 
} 

を}