2017-12-14 8 views
0

my java itemObjectに対して一連のルールを実行しています。各項目について、ルールのリストを処理しています。apache sparkでフラットマップ操作を回避してパフォーマンスを向上させる方法

通常、私は100万アイテムと100のルールを持っています。

現在、このプログラムをスパークで実行すると、15分かかります。

私は、faltMaptopairが時間がかかることを観察しました。私はこのプログラムのパフォーマンスを改善したい。リファクタリングの

Get the rules 
map each item against the list of rules and produce result set 
return JavaPairRDD of itemId and List<RuleResult> 

任意の提案このコードは、私は次のコードを書かれている

さらにパフォーマンスを向上させます。

public JavaPairRDD<String, List<RuleResult>> validate() {  
     List<ExecutableRule<T>> rules = ruleWrapper.getRulesList().collect(); 
     JavaPairRDD<String, List<RuleResult>> resultsPairRDD = itemsForValidation 
       .map(x -> getRulesResult(rules, x)) 
       .flatMapToPair(this::mapToRuleResultById) 
       .aggregateByKey(
         MapperUtil.<RuleResult>newList(), 
         MapperUtil::addToList, 
         MapperUtil::combineLists 
       );  
     return resultsPairRDD; 
    } 

    private List<Tuple2<String, RuleResult>> mapToRuleResultById(List<RuleResult> ruleResults) { 
     return ruleResults.stream() 
       .map(ruleResult -> new Tuple2<>(ruleResult.getItemId(), ruleResult)) 
       .collect(toList()); 
    } 

    private List<RuleResult> getRulesResult(List<ExecutableRule<T>> rules, T x) { 
     return rules.stream() 
       .map(rule -> rule.execute(x)).collect(toList()); 
    } 

    public RuleResult execute(T t){ 
    //get the rule result 
    } 

    public class RuleResult{ 
     private String itemId; 
    } 

答えて

0

たぶん私は何かを誤解していますが、私はflatMapまたはaggregateByKeyどちらの必要性が表示されません。

public JavaPairRDD<String, List<RuleResult>> validate() {  
    List<ExecutableRule<T>> rules = ruleWrapper.getRulesList().collect(); 
    JavaPairRDD<String, List<RuleResult>> resultsPairRDD = itemsForValidation 
      .map(x -> new Tuple2<>(x, getRulesResult(rules, x))); 
    return resultsPairRDD; 
} 

これは機能しませんか?

+0

ありがとうございました。後で私はRuleresultのフィールドとペアを作成するので、フラットマップが必要です。各ルールの結果にはitemidがあり、アイテムIDには複数のルールの結果が含まれます。 – Patan

関連する問題