2016-10-06 8 views
0

JavaプログラムをApache Sparkに移行する必要があります。現在のJavaは、java.util.concurrentによって提供される機能を大いに利用し、単一のマシン上で動作します。ワーカー(Callable)の初期化は高価なので、ワーカーは何度も再利用されます。つまり、ワーカーが終了してその結果を返すと、ワーカーは自分自身をプールに再挿入します。高価な移行でjava.util.concurrent.CallablesをApache Sparkに初期化する

より正確:

  • 現在の実装では、10E06エントリ/数GB単位の範囲内の小さなデータセット上で動作します。
  • データには、個別に処理できるエントリが含まれています。つまり、タスクごとに1人のワーカーを起動して、それをJavaスレッドプールに送信できます。
  • しかし、エントリを処理するためのワーカーを設定するには、より多くのデータをロードしたり、グラフを作成したりする必要があります。
  • 実際に、一部のデータは作業者間で共有することができます。いくつかのルックアップテーブルが必要ではありません。一部のデータは従業員にプライベートであるため、共有されません。作業者は、エントリを処理している間にデータを変更し、後でそれを迅速にリセットするだけでよい。現在処理中のエントリに固有のキャッシュ。したがって、作業者はプール内に自分自身を再挿入し、高価な初期化を行わずに次の作業を開始することができます。
  • ワーカー1人あたりの実行時間とエントリの範囲は秒です。
  • ワーカーは結果をExecutorCompletionService経由で返します。つまり、プログラムの中央でpool.take()。get()を呼び出して結果を取得します。

Apache Sparkについて知っているほとんどの例は、標準の変換と動作を使用しています。私はまた、add their own functions to the DAG by extending the APIの例を見つける。それでも、これらの例はすべて単純な軽量計算に固執しており、初期化コストはかかりません。

私は、何らかの種類の「重労働」を再利用するSparkアプリケーションを設計するための最良のアプローチが何であるか疑問に思っています。エグゼクティブは、そのような労働者のプールを保持できる唯一の永続的な存在であるように思われる。ただし、関数を使用してsolution(可能性)を指すanswerを発見20161007

を編集した...

私が最も可能性の高いいくつかのポイントを見逃しスパークの世界に新しいもの。そこで問題は、私は

  1. スプリット各エグゼキュータは正確に一つのパーティションを取得します
  2. エグゼキュータの数に応じて、私のパーティションは、
  3. My機能で作業することが可能である(リンクsolutionセットアップと呼ばれます)スレッドプールを作成し、労働者
  4. に別の結合機能を再利用し、後で結果
+0

の詳細はあなたの現在のJava実装が実際に何をするかについて、もう少し具体的にすることができ、あなたがメモリ不足に文句を言わないことを確実にするためにMEMORY_AND_DISKに永続化モデルを設定することができますか? Sparkの主なセールスポイントはRDDであり、操作はAPIのもので定義されています。おそらく共有メモリなどに依存するアプリケーションを複数のノードにスケールアウトするためにSparkを悪用しようとしているように私には聞こえます。アプリケーションがデータの処理を行う場合、Spark APIを使用してアプリケーションを実行しようとしましたか? – LiMuBei

+0

@LiMuBei現在のJava実装の詳細な説明が追加されました。私はまだそのデザインが「虐待」かどうか評価しています。 [Spark API](http://spark.apache.org/docs/latest/api/java/index。)から。html)すなわち.javaと.java.function私は私自身の関数を追加することができます。しかし、私はそれらがどのように実行されるかをほとんど制御できません。おそらく、それはスパークの全体のポイントです - その乱雑なものを隠している。 – Andreas

+1

あまりにも多くの拡張子を付けずにSpark APIを使って同じことをすることができるように思えます。 Scala APIを使用する場合、Javaライブラリをあまり面倒なく使用できます。 'mapPartitions'はパーティション内のエントリを反復することを可能にし、データベース接続などのような高価な設定を持つものに使用されることになっています。 – LiMuBei

答えて

0

あなたの現在のアーキテクチャをマージスレッド間で共有状態を持つモノリシックなマルチスレッドアーキテクチャです。現代のハードウェアでは、データセットのサイズが比較的小さいので、クラスタのノード内のエグゼキュータにスレッドを置き換えるSparkで、データセットを比較的簡単に並列化できます。

あなたの質問から2つの主な懸念事項は、Sparkが複雑な並列計算を処理できるか、分散環境で必要な状態を共有するかということです。

複雑なビジネスロジック:最初の部分については、現在のアーキテクチャのワーカースレッドと同等の、任意の複雑なビジネスロジックをSpark Executorで実行できます。

Clouderaのからこのブログの記事は、うまく実行モデルの他の重要な概念と一緒にコンセプトを説明します:あなたはそれに注意を払う必要があります

http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

一つの側面は、しかし、あなたのスパークの設定ですエグゼクティブが完了までに時間がかかり過ぎることによるタイムアウトを避けるため、あなたのような複雑なビジネスロジックを持つアプリケーションには期待されるかもしれません。

詳細はDataBricksから、より具体的に実行動作に優れたページを参照してください:

http://spark.apache.org/docs/latest/configuration.html#execution-behavior

共有状態を:あなたはスパークでグラフやアプリケーションの設定などの複雑なデータ構造を共有することができますノード間で1つのアプローチは、ブロードキャスト変数です。ここでは、配布する状態のコピーがすべてのノードに配布されます。以下は、概念のいくつかの非常に良い説明です:

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html

http://g-chi.github.io/2015/10/21/Spark-why-use-broadcast-variables/

データの局所性を確保しながらこれは、アプリケーションからの待ち時間を剃るます。

データの処理が(もっとここで:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html)に基づいパーティション上で実行することができるドライバまたはアキュムレータ(:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-accumulators.htmlもっとここ)を使用して集計結果を、。結果のデータが複雑な場合は、パーティションのアプローチがうまく機能し、アプリケーションの実行をより詳細に制御できます。

ハードウェアリソースの要件に関しては、アプリケーションが共有状態で数ギガバイト必要であり、メモリ内に留まる必要があり、さらにすべてのノードのデータに対して数ギガバイトが必要になると思われます。あなたは

http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

+0

ありがとう。これは、最初の質問に答え、新しいものを引き出しました... – Andreas

+0

@アンドレアス、投稿してください:-) –

関連する問題