私は異なるデータを持つ2つのファイルを持っています。私は2 diff RDDs &でそれらを読み取ろうとしているとデータフレーム&ハイブに挿入するに変換します。私はこの通常のコードを実行することができました。だから、2番目の人はクラスターに十分なリソースがあるのに、1番目が乗り越えるのを待っていました。私は、RDD計算がAsyncメソッドを使って並列化できることを学びました。だからforeachPartitionAsyncを試しています。しかし、私はそれ以上デバッグすることはできませんエラーをスローします。サンプルコード:foreachPartitionAsync throwsは停止したSparkContextのメソッドを呼び出せません
object asynccode {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("Parser")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt")
val test = ercs.map { k =>
var rc = method1(k._2, k._1).toSeq
rc
}
.flatMap(identity)
.foreachPartitionAsync { f =>
f.toSeq.toDF()
.write.insertInto("dbname.tablename1")
}
val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt")
val test2 = ercs2.map { k =>
var rs = method2(k._2, k._1)
rs
}
.flatMap(identity)
.foreachPartitionAsync(f => f.toSeq.toDF()
.write.insertInto("dbname.tablename2")
)
sc.stop()
}
def method1 = ???
def method2 = ???
}
ただし、以下のエラーメッセージが表示されます。 foreachPartitionAsyncをコードから削除すると、正常に動作します。 foreachPartitionAsyncに関して私が間違っていることを確かめないでください。
タスクのシリアル化に失敗しました:java.lang.IllegalStateException:停止したSparkContextのメソッドを呼び出すことができません。
更新日: ご意見ありがとうございます。私はそれを以下のように更新しました。しかし、今は何もしていない。スパークWeb UI、私はステージがトリガーされていない(その空)を見ることができます。私のテーブルは更新されません。しかし、仕事は間違いなく完了しました。
val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt")
val test = ercs.map { k =>
var rc = method1(k._2, k._1).toSeq
rc
}
.flatMap(identity)
toDF()
val f1 = Future(test.write.insertInto("dbname.tablename1"))
}
val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt")
val test2 = ercs2.map { k =>
var rs = method2(k._2, k._1)
rs
}
.flatMap(identity)
toSeq.toDF()
val f2 = Future(test2.write.insertInto("dbname.tablename2"))
)
Future.sequence(Seq(f1,f2)).onComplete(_ => sc.stop)
私が見逃したことはありますか?