私はSparkにとって非常に新しいことをまず皆にお知らせします。sparkでの並列メソッドの呼び出しと渡されたメソッドでのsparkセッションの使用
Iは、テーブル内のレコードの膨大な数を処理する必要があり、それが電子メールによってグループ化されたとき、それは約1 million.Iは個々メールと更新データベースに対して設定されたデータに基づいて複数の論理演算を実行する必要があります論理計算
に基づいて大雑把に私のコードの構造は、
初期データロード...
輸入sparkSessioのようなものですn.implicits._ VARたtableData = sparkSession.read.jdbc(、、て、ConnectionProperties).select( "Eメール")。ここで、()
レコードと//データフレームメールにグループ化して1より大きいカウント
VAR recordsGroupedBy = tableData.groupBy( "Eメール")は。(カウント)。withColumnRenamed( "カウント"、 "RecordCountを")。( "> 1でRecordCount")をフィルタリングする.toDF()
今すぐ来てprocessDataAgainstEmail()メソッドを使用して電子メールにグループ化した後の処理
recordsGroupedBy.collect()。foreachの(X => processDataAgainstEmail(x.getAs( "Eメール")、sparkSession))ここで私はforeachのが並列に実行されません参照.I方法processDataAgainstEmailを起動する必要性を
(、)並行して。 しかし、私は
を行うことによって並列化しようとした場合こんにちは、私は
ヴァルemailList = dataFrameWithGroupedByMultipleRecords.select( "Eメール")。rdd.map(R => R(0)を呼び出すことによって、リストを取得することができます。 asInstanceOf [文字列])。)(コレクト。ToListメソッド
VARのRDD = sc.parallelize(emailList)
rdd.foreach(X => processDataAgainstEmail(x.getAs( "Eメール")、 sparkSession))
parallelizeを使用しているときにsparkSessionを渡すことができないため、これはサポートされていません。
processDataAgainstEmail(、)では、データベースの挿入と更新に関連して複数の操作が実行され、データフレームとスパークSQL操作を実行する必要があります。方法は、データベース.I午前に何かを実行することはできません、スパークセッションを通過するすべてのことができない場合には、私はsparksession
と平行processDataAgainstEmailを(、)を起動する必要がsummerizeする
私のシナリオでは、電子メールの並列処理として何が別の方法になるのか分からない。
実際には、** processDataAgainstEmail()**メソッドを使用してメールリストを作成し、メールとスパークセッションをその2つのパラメータとして使用して結果を並行して作業したいと思いますか?または別の方法で? – Soumen