2016-05-14 3 views
2

私はflinkデータセットAPIを使用して反復計算を行っています。
しかし、各反復の結果は私の完全な解決の一部です。
(詳細は、必要な場合:私は、格子ノードのレベルごとの各反復で底部に向かっ上から順に計算しています、形式概念分析を参照してください)

私は私の結果を保存せずに大量の繰り返しでFLINKデータセットのAPIを使用している場合は、コードがします
Flink Datasetで一括反復の部分出力を保存する可能性はありますか?

val start = env.fromElements((0, BitSet.empty)) 
val end = start.iterateWithTermination(size) { inp => 
    val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup) 
    (result,result) 
} 
end.count() 

しかし、私は、反復内の部分的な結果(_.writeAsTextを())の書き込みしようとした場合、または任意のアクション、私はエラーを取得します:以下のようになり

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration? 

バルク反復なしの代替案は、以下のようです:

var start = env.fromElements((0, BitSet.empty)) 
var count = 1L 
var all = count 
while (count > 0){ 
    start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup) 
    count = start.count() 
    all = all + count 
} 
println("total nodes: " + all) 

しかし、このアプローチは、最小の入力データに非常に遅く、反復バージョンが< 30秒かかり、ループバージョンが> 3分かかります。
私はflinkがループを実行する最適なプランを作成できないと思います。

回避するにはどうすればよいですか?ハンクスなどで部分的な結果を保存できるようにするために、フリンクの修正が可能ですか?

答えて

3

残念ながら、現在、一括反復から中間結果を出力することはできません。反復の最後には、最終結果のみを出力することができます。

また、あなたが正しく気づいたように、Flinkはwhileループまたはforループを効率的にアンロールできないため、動作しません。

中間結果がそれほど大きくない場合は、中間結果を部分的なソリューションに追加して、繰り返しの最後にすべてを出力することができます。同様のアプローチがTransitiveClosureNaive exampleに実装されています。反復で発見されたパスは、次の部分解に蓄積されます。

+0

実装方法を提案いただきありがとうございます。 :)それが動作する場合、更新されます。 私は、データセットapiが十分な成熟したフリンクではなく、多くの場合実行者の損失ではないと思います。しかし、ストリーミングは完璧に動作します。 –

関連する問題