以下は、データフレームにシーケンス番号の列を追加するロジックです。区切られたファイルからデータを読み込んでいるときに期待通りに動作します。今日私はoracleテーブルからデータを読み込み、シーケンス番号とプロセスを追加する新しいタスクを持っています。私はoracleテーブルからデータフレームを読むときに、シーケンス番号をデータフレームに追加するために、以下のロジックに問題があることに直面しています。dataframe usnig scalaにシーケンス番号の列を追加
oracleTableDF is my dataframe
//creating Sequence no. logic for SeqNum
val rowRDD = oracleTableDF.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq))
//creating StructType to add Seqnum in schema
val newstructure = StructType(Array(StructField("SeqNum",LongType)).++(oracleTableDF.schema.fields))
//creating new Data Frame with seqnum
oracleTableDF = spark.createDataFrame(rowRDD, newstructure)
私は、実際の問題を見つけることができないのです。なぜなら、ファイルをファイルから読み取るときに、ロジックがクラスタ内で期待どおりに動作しているからです。しかし、私はoracleテーブルからそれを読むときにいくつかの問題に直面しています。ローカルモードでも期待通りに動作します。あなたが使用することができます
below is the error :
"ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, xxxx, executor 1): java.lang.NoClassDefFoundError: Could not initialize class oracleDataProcess$"
この「SeqNum」列は行番号と同じですか? – philantrovert
はい、各レコードに行番号を追加しています。 –
はい、各レコードに行番号を追加する必要があります。分割されたファイルにはうまくいきますが、オラクルテーブルに問題が発生したときには必ず確認してください。 –