2017-08-14 5 views
1

私は特定のPCollectionを特定のPTableに変換しているutilクラスにこれらのメソッドを持っています。Apache CrunchのPCollectionをPTableに変換する一般的な方法はありますか?

public static PTable<IdDetails, CASegmentsForModification> getPTableForCASegments(PCollection<CASegmentsForModification> aggregatedPCollectionForCASegments) { 
    return aggregatedPCollectionForCASegments.parallelDo(new CASegmentsPTableConverter(), 
      Avros.tableOf(Avros.records(IdDetails.class), Avros.records(CASegmentsForModification.class))); 
} 

public static PTable<IdDetails, UserPrimaryIdMapping> getPTableForPrimaryIdMapping(PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping) { 
    return pCollectionOfUserPrimaryIdMapping.parallelDo(new UserPrimaryIdMappingPTableConverter(), 
      Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping.class))); 
} 

public static PTable<IdDetails, UserGroupSegments> getPTableForUserGroupSegments(PCollection<UserGroupSegments> pCollectionOfUserGroupSegments) { 
    return pCollectionOfUserGroupSegments.parallelDo(new UserGroupSegmentsPTableConverter(), 
      Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserGroupSegments.class))); 
} 

上記の方法の1つの一般的な方法を実装するにはどうすればよいですか?

答えて

0

PTablesユーティリティクラスの静的asPtableメソッドを使用するより良い方法があります。あなたのPCollectionはタイプペアのなければならないとPTable結果は、タイプのものであろうPTable

public static <K,V> PTable<K,V> asPTable(PCollection<Pair<K,V>> pcollect) 

をあなたの例に基づいて、あなたは自分のDoFn(または拡張クラス)を作成する必要がありますよリターンAvros.pairs(Avros .records(yourClass.class)、Avros.records(yourOtherClass.class))。

もう1つの方法は、ExtractKEyFnというあらかじめ定義されたMapFnを使用してコレクションに適用することです。キーを抽出し、キー、値出力を生成するためにmapメソッドを実装する必要があります。 PCollection>をPTableに変換することは基本的に同じ考えです。

多くの定型文を保存するはずです。

FilterFnのような便利な機能がありますが、ユニットテストにMemPipelineを使用するといくつかのバグが見つかりました。私が提案した最初のアプローチは最も安全でなければなりません。

EDIT:

いくつかのコードを保存するための良好なバランスが、あなたのキーはフィールド名を使用して、フィールド名に基づいて、各PCollectionのために、このMapFnを呼び出しますされるだろう。

//we are assuming the key will be in the first level of your record 
public class GenericRecordToPair <V extends GenericRecord, K extends GenericRecord> extends MapFn<V, Pair<K, V>> { 
    String key; 

    public GenericRecordToPair(String key){ 
     this.key = key; 
    } 

    @Override 
    public Pair<T, TupleN> map(S input) { 
     return new Pair<K,V> (input.get(key), input); 
    } 

} 

あなたの例から、あなたは

PCollection<UserGroupSegments> pCollectionOfUserGroupSegments = ...//comming from somewhere 
PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping = ...//comming from somewhere 
PTable<IdDetails, UserGroupSegments> pTable1 = PTables.asPTable(pCollectionOfUserGroupSegments.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserGroupSegments)))); 
PTable<IdDetails, UserPrimaryIdMapping> pTable2 = PTables.asPTable(pCollectionOfUserPrimaryIdMapping.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping)))); 
+0

しかし、私のPCollectionはその唯一のVのペア、ではない「asPTable」、私が最初に私を変換する必要がありますPCollection ~PCollection も冗長コードになります。 例えば、私はいくつかのアブロのPCollectionを読み取って取得するには、この一般的な方法を使用することができる: - パブリック静的PCollection getPCollection(パイプラインのパイプライン、文字列パス、クラス clazz){ 戻りpipeline.read (より.avroFile(path、clazz)); } –

+0

あなたのPCollectionにExtractKeyFnを使用するのはどうですか?? genericRecordsを使用するgenerate関数を作成し、フィールド名に基づいてキーを抽出する場合でも、クラスからキーを抽出する方法を常に提供する必要があります。キーを生成する方法はありません。キーを抽出するロジック – hlagos

+0

同じように、上記のユースケースに対して1つの汎用メソッドを記述することは可能ですか? –

0

ような何かを行うことができますこれはまさに鍵を生成するためにMapFnを受け入れ、をキーと各レコードにPTableを返すPCollection.by方法、の目的でありますそのMapFnの結果。

ですから、のような何かができる:メソッドを使用するように.......

PTable<IdDetails, CASegmentsForModification> pTableForCASegments = aggregatedPCollectionForCASegments.by(
    new CASegmentsKeyMapFn(), 
    Avros.records(IdDetails.class) 
) 
関連する問題