2016-12-03 4 views
0

私はFlinkを使用してMap-Reduceを学習しており、DataSetの要素を効率的に数える方法について質問があります。私はこれまで持っていることはこれです:Flink:DataSet.count()がボトルネックになっています - 並列数をカウントする方法は?

DataSet<MyClass> ds = ...; 
long num = ds.count(); 

これを実行すると、私のFLINKにそれが言うログ

2016年12月3日午前19時47分二十七秒DataSink(数())(1/1)実行に切り替え

CPUが1つしか使用されていません(私は4つのコマンドを使用して、すべてのコマンドを減らしています)。

私はcount()内部的にすべての4つのCPUからDataSetを収集し、各CPUがその部分を数えるのではなく、順番にそれらをカウントし、次にそれを合計すると思います。本当?

「はい」の場合、すべてのCPUをどのように活用できますか?データセットを、最初の項目として元の値を、2番目の項目としてlong値1を含む2タプルにマッピングしてから、SUM関数を使用して集計するとよいでしょうか?

たとえば、データセットは、Longが常に1になるDataSetにマップされます。したがって、すべてのアイテムを合計すると、タプルの2番目の値の合計が正しいカウント値になります。

データセット内のアイテムを数えるのに最適な方法は何ですか?

よろしくサイモン

答えて

0

DataSet#count()が非並列動作であるので、単一のスレッドを使用することができます。

並列化を取得するためにはカウントバイキーを行い、キーカウントには最終的な合計を適用して全体的なカウントに達し、計算時間を短縮します。

+0

ご回答ありがとうございます。残念ながら私はカウント・バイ・キーをどのように行うことができるのか分かりません。これはフリンク操作ですか?私はこの情報を見つけることができなかった –

+0

解決策を見つけました。あなたはこれを意味しましたか? \t \t \tデータセット> X = hasNum.map(新しいMapFunction >(){ \t \t \t \t @Override \t \t \t \t公共Tuple1 マップ(MyClassのtの)スロー例外{ \t \t \t \t \t戻り新しいTuple1 (1L); \t \t \t \t} \t \t \t \t \t \t \t})。groupBy(0)。和(0); –

+0

ほとんど;)私はあなた自身の答えにお答えします –

0

これは良い解決策ですか?

DataSet<Tuple1<Long>> x = ds.map(new MapFunction<MyClass, Tuple1<Long>>() { 
    @Override public Tuple1<Long> map(MyClass t) throws Exception { 
     return new Tuple1<Long>(1L); 
    } 
}).groupBy(0).sum(0); 

Long c = x.collect().iterator().next().f0; 
+1

一般的な考え方は正しいですが、Tuple1にマップしてすべてのタプルに同じ値を使用する - したがって、すべてのタプルは同じスレッドで終わります。 - 複数のスレッドが開始された場合でも、1つだけがデータを受信します。したがって、 'new Tuple1 (...)'にランダムな値を生成する必要があります。 'sum(0)'の代わりに 'count()'を使います。これは、あなたが再び合計する必要がある複数のカウントを与えるでしょう。 –

+0

count()が内部でこれをしない理由はありますか? –

+0

技術的な理由はありません。 Flinkは理論的にこの最適化を自動的に適用することができます。ちょうどそれは決して実現しませんでした - あなたはメーリングリストで調査する必要があります理由を把握する。 –

関連する問題