2017-02-23 4 views
0

私はSparkにとって非常に新しいことをまず皆にお知らせします。sparkでの並列メソッドの呼び出しと渡されたメソッドでのsparkセッションの使用

Iは、テーブル内のレコードの膨大な数を処理する必要があり、それが電子メールによってグループ化されたとき、それは約1 million.Iは個々メール更新データベースに対して設定されたデータに基づいて複数の論理演算を実行する必要があります論理計算

に基づいて大雑把に私のコードの構造は、

初期データロード...

輸入sparkSessioのようなものですn.implicits._ VARたtableData = sparkSession.read.jdbc(、、て、ConnectionProperties).select( "Eメール")。ここで、()

レコードと//データフレームメールにグループ化して1より大きいカウント

VAR recordsGroupedBy = tableData.groupBy( "Eメール")は。(カウント)。withColumnRenamed( "カウント"、 "RecordCountを")。( "> 1でRecordCount")をフィルタリングする.toDF()

今すぐ来てprocessDataAgainstEmail()メソッドを使用して電子メールにグループ化した後の処理

recordsGroupedBy.collect()。foreachの(X => processDataAgainstEmail(x.getAs( "Eメール")、sparkSession))ここで私はforeachのが並列に実行されません参照.I方法processDataAgainstEmailを起動する必要性を

(、)並行して。 しかし、私は

を行うことによって並列化しようとした場合こんにちは、私は

ヴァルemailList = dataFrameWithGroupedByMultipleRecords.select( "Eメール")。rdd.map(R => R(0)を呼び出すことによって、リストを取得することができます。 asInstanceOf [文字列])。)(コレクト。ToListメソッド

VARのRDD = sc.parallelize(emailList)

rdd.foreach(X => processDataAgainstEmail(x.getAs( "Eメール")、 sparkSession))

parallelizeを使用しているときにsparkSessionを渡すことができないため、これはサポートされていません。

processDataAgainstEmail(、)では、データベースの挿入と更新に関連して複数の操作が実行され、データフレームとスパークSQL操作を実行する必要があります。方法は、データベース.I午前に何かを実行することはできません、スパークセッションを通過するすべてのことができない場合には、私はsparksession

と平行processDataAgainstEmailを(、)を起動する必要がsummerizeする

私のシナリオでは、電子メールの並列処理として何が別の方法になるのか分からない。

答えて

0

forEachは、リストの各要素を順番に処理するリストなので、一度に1つずつ処理し、processDataAgainstEmailメソッドに渡します。

あなたは結果のリストを得ているたら、は、前の手順で操作しますが、作成されたレコードのリストから/データフレームの作成を並列化する上でsc.parallelizeを呼び出します。並列化は、私がpySparkで見ることができるように、データフレームの作成のプロパティであり、操作の結果を処理するものではありません。

+0

実際には、** processDataAgainstEmail()**メソッドを使用してメールリストを作成し、メールとスパークセッションをその2つのパラメータとして使用して結果を並行して作業したいと思いますか?または別の方法で? – Soumen

関連する問題