2016-09-29 3 views
3

私はJavaのApacheのビームクラスclassby.sortbytimestampを持っている気づいたpythonは、その機能はまだ実装されていますか?そうでない場合は、ウィンドウ内の要素を並べ替える方法は何ですか? DoFnでウィンドウ全体を並べ替えることができるとわかりましたが、より良い方法があるかどうかを知りたいと思います。どのように私はPythonのApacheのビームでウィンドウ内の要素を注文できますか?

+0

?私はそれがもう存在しないと思う:https://github.com/apache/beam/search?utf8=%E2%9C%93&q=sortbytimestamp&type= – skeller88

答えて

6

現在、Beam(PythonまたはJavaのいずれか)に値のソートが組み込まれていません。今のところ、最良の選択肢は、あなたが言及したようなDoFnで自分で値をソートすることです。

1

ここには、CombineFnを使用したソリューションがあります。これには、TreeSetを使用してデータを重複除外するという追加のボーナスがあります。また、ウィンドウのデータが1人の作業者のメモリに収まるように十分小さいことを確認する必要があります。あなたはそのクラスを見つけている

public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> { 
@Override 
public TreeSet<MarketData> createAccumulator() { 
    return new TreeSet<>(Comparator 
      .comparingLong(MarketData::getEventTime) 
      .thenComparing(MarketData::getOrderbookType)); 
} 

@Override 
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) { 
    accum.add(input); 
    return accum; 
} 

@Override 
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) { 

    TreeSet<MarketData> merged = createAccumulator(); 
    for (TreeSet<MarketData> accum : accums) { 
     merged.addAll(accum); 
    } 
    return merged; 
} 

@Override 
public List<MarketData> extractOutput(TreeSet<MarketData> accum) { 
    return Lists.newArrayList(accum.iterator()); 
} 

}

関連する問題