2017-09-11 9 views
2

以下は、データフレームにシーケンス番号の列を追加するロジックです。区切られたファイルからデータを読み込んでいるときに期待通りに動作します。今日私はoracleテーブルからデータを読み込み、シーケンス番号とプロセスを追加する新しいタスクを持っています。私はoracleテーブルからデータフレームを読むときに、シーケンス番号をデータフレームに追加するために、以下のロジックに問題があることに直面しています。dataframe usnig scalaにシーケンス番号の列を追加

oracleTableDF is my dataframe

//creating Sequence no. logic for SeqNum 
    val rowRDD = oracleTableDF.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq)) 

    //creating StructType to add Seqnum in schema 
     val newstructure = StructType(Array(StructField("SeqNum",LongType)).++(oracleTableDF.schema.fields)) 

    //creating new Data Frame with seqnum 
    oracleTableDF = spark.createDataFrame(rowRDD, newstructure) 

私は、実際の問題を見つけることができないのです。なぜなら、ファイルをファイルから読み取るときに、ロジックがクラスタ内で期待どおりに動作しているからです。しかし、私はoracleテーブルからそれを読むときにいくつかの問題に直面しています。ローカルモードでも期待通りに動作します。あなたが使用することができます

below is the error :

"ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, xxxx, executor 1): java.lang.NoClassDefFoundError: Could not initialize class oracleDataProcess$"

+0

この「SeqNum」列は行番号と同じですか? – philantrovert

+0

はい、各レコードに行番号を追加しています。 –

+0

はい、各レコードに行番号を追加する必要があります。分割されたファイルにはうまくいきますが、オラクルテーブルに問題が発生したときには必ず確認してください。 –

答えて

0

1つのオプションは、あなたが必要とするすべての自動インクリメントの整数値を使用してデータフレームに列を追加する場合は、インクリメンタルIDで

val dataFrame = oracleTableDF.withColumn("incremental_id", monotonically_increasing_id()) 
3

を新しい列を作成することmonotonically_increasing_id()あり、あなたはLongTypeであるmonotonicallyIncreasingIdを使用することができます。

val oracleTableDF2 = oracleTableDF.withColumn("SeqNum", monotonicallyIncreasingId) 

[UPDATE]

monotonicallyIncreasingIdは推奨されていません。代わりにmonotonically_increasing_id()を使用してください。

+0

"https://stackoverflow.com/questions/35705038/how-do-i-add-an-persistent-column-of-row-ids-to-spark-dataframe"の提案によると、明確な機能ではありません。それはどんなパラメータもとらない。私はこれにいくつかのパラメータを渡す必要があります。 –

+0

私は 'Spark 2.1'以降、[SPARK-14393](https://issues.apache.org/jira/browse/SPARK-14393)の問題は既に解決されていると思います。あなたは 'monotonicallyIncreasingId'はパラメータを取らないというのは正しいです。 –

+0

@LeoC、1からの値を取得する方法はありますか?今では0 – user4342532

関連する問題