私はFlinkを使用してMap-Reduceを学習しており、DataSetの要素を効率的に数える方法について質問があります。私はこれまで持っていることはこれです:Flink:DataSet.count()がボトルネックになっています - 並列数をカウントする方法は?
DataSet<MyClass> ds = ...;
long num = ds.count();
これを実行すると、私のFLINKにそれが言うログ
2016年12月3日午前19時47分二十七秒DataSink(数())(1/1)実行に切り替え
CPUが1つしか使用されていません(私は4つのコマンドを使用して、すべてのコマンドを減らしています)。
私はcount()内部的にすべての4つのCPUからDataSetを収集し、各CPUがその部分を数えるのではなく、順番にそれらをカウントし、次にそれを合計すると思います。本当?
「はい」の場合、すべてのCPUをどのように活用できますか?データセットを、最初の項目として元の値を、2番目の項目としてlong値1を含む2タプルにマッピングしてから、SUM関数を使用して集計するとよいでしょうか?
たとえば、データセットは、Longが常に1になるDataSetにマップされます。したがって、すべてのアイテムを合計すると、タプルの2番目の値の合計が正しいカウント値になります。
データセット内のアイテムを数えるのに最適な方法は何ですか?
よろしくサイモン
ご回答ありがとうございます。残念ながら私はカウント・バイ・キーをどのように行うことができるのか分かりません。これはフリンク操作ですか?私はこの情報を見つけることができなかった –
解決策を見つけました。あなたはこれを意味しましたか? \t \t \tデータセット> X = hasNum.map(新しいMapFunction >(){ \t \t \t \t @Override \t \t \t \t公共Tuple1 マップ(MyClassのtの)スロー例外{ \t \t \t \t \t戻り新しいTuple1 (1L); \t \t \t \t} \t \t \t \t \t \t \t})。groupBy(0)。和(0); –
ほとんど;)私はあなた自身の答えにお答えします –