2017-01-06 4 views
1

私は異なるデータを持つ2つのファイルを持っています。私は2 diff RDDs &でそれらを読み取ろうとしているとデータフレーム&ハイブに挿入するに変換します。私はこの通常のコードを実行することができました。だから、2番目の人はクラスターに十分なリソースがあるのに、1番目が乗り越えるのを待っていました。私は、RDD計算がAsyncメソッドを使って並列化できることを学びました。だからforeachPartitionAsyncを試しています。しかし、私はそれ以上デバッグすることはできませんエラーをスローします。サンプルコード:foreachPartitionAsync throwsは停止したSparkContextのメソッドを呼び出せません

object asynccode { 
    def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("Parser") 
    val sc = new SparkContext(conf) 
    val hiveContext = new HiveContext(sc) 
    import hiveContext.implicits._ 

    val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt") 
    val test = ercs.map { k => 
     var rc = method1(k._2, k._1).toSeq 
     rc 
    } 
     .flatMap(identity) 
     .foreachPartitionAsync { f => 
     f.toSeq.toDF() 
      .write.insertInto("dbname.tablename1") 
     } 

    val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt") 
    val test2 = ercs2.map { k => 
     var rs = method2(k._2, k._1) 
     rs 
    } 
     .flatMap(identity) 
     .foreachPartitionAsync(f => f.toSeq.toDF() 
     .write.insertInto("dbname.tablename2") 

    ) 
    sc.stop() 
    } 

    def method1 = ??? 
    def method2 = ??? 
} 

ただし、以下のエラーメッセージが表示されます。 foreachPartitionAsyncをコードから削除すると、正常に動作します。 foreachPartitionAsyncに関して私が間違っていることを確かめないでください。

タスクのシリアル化に失敗しました:java.lang.IllegalStateException:停止したSparkContextのメソッドを呼び出すことができません。

更新日: ご意見ありがとうございます。私はそれを以下のように更新しました。しかし、今は何もしていない。スパークWeb UI、私はステージがトリガーされていない(その空)を見ることができます。私のテーブルは更新されません。しかし、仕事は間違いなく完了しました。

val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt") 
    val test = ercs.map { k => 
     var rc = method1(k._2, k._1).toSeq 
     rc 
    } 
     .flatMap(identity) 
    toDF() 
    val f1 = Future(test.write.insertInto("dbname.tablename1")) 
     } 

    val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt") 
    val test2 = ercs2.map { k => 
     var rs = method2(k._2, k._1) 
     rs 
    } 
     .flatMap(identity) 
     toSeq.toDF() 

val f2 = Future(test2.write.insertInto("dbname.tablename2")) 

    ) 
     Future.sequence(Seq(f1,f2)).onComplete(_ => sc.stop) 

私が見逃したことはありますか?

答えて

1

FutureActionsが完了するのを待たずにSparkContextを停止します。あなたは応答して、コンテキストを完了し、停止する措置を待つ必要があります。

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 
import scala.util.{Success, Failure} 

val f1: Future[Unit] = sc.range(1, 200).foreachAsync(_ => Thread.sleep(10)) 
val f2: Future[Unit] = sc.range(1, 200).foreachAsync(_ => Thread.sleep(10)) 

Future.sequence(Seq(f1, f2)).onComplete { 
    case Success(_) => sc.stop 
    case Failure(e) => 
    e.printStackTrace // or some other appropriate actions 
    sc.stop 
} 

あなたのコードは、我々は非同期アクションを無視しても無効で言われていること。あなたは非同期書き込みアクションが直接Futuresを使用したい場合は

.foreachPartitionAsync(
    f => f.toSeq.toDF().write.insertInto("dbname.tablename2") 
) 

:あなたアクションまたは変換内の分散データ構造を使用することはできません

val df1: Dataframe = ??? 
val df2: Dataframe = ??? 

val f1: Future[Unit] = Future(df1.write.insertInto("dbname.tablename1")) 
val f2: Future[Unit] = Future(df2.write.insertInto("dbname.tablename2")) 

とアクションが完了するのを待つ上記のように。

関連する問題