2017-03-22 9 views
2

後、私は生成されたUUIDを含み、列とスパークデータフレームを持って変更されます。 しかし、データフレームでアクションまたは変換を行うたびに、各ステージでUUIDが変更されます。スパークデータフレームランダムUUIDは、すべての変換/アクション

どのように私は一度だけUUIDを生成し、UUIDを持っていますが、その後、静的なまま。

私の問題を再生成するためにいくつかのサンプルコード

は以下の通りです:

def process(spark: SparkSession): Unit = { 

    import spark.implicits._ 

    val sc = spark.sparkContext 
    val sqlContext = spark.sqlContext 
    sc.setLogLevel("OFF") 

    // create dataframe 
    val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2") 
    df.createOrReplaceTempView("df") 
    df.show(false) 

    // register an UDF that creates a random UUID 
    val generateUUID = udf(() => UUID.randomUUID().toString) 

    // generate UUID for new column 
    val dfWithUuid = df.withColumn("new_uuid", generateUUID()) 
    dfWithUuid.show(false) 
    dfWithUuid.show(false) // uuid is different 

    // new transformations also change the uuid 
    val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1) 
    dfWithUuidWithNewCol.show(false) 
} 

出力は次のようになります。UUIDは、各ステップで異なる

+----+----+ 
|col1|col2| 
+----+----+ 
|a |1 | 
|b |2 | 
|c |3 | 
+----+----+ 

+----+----+------------------------------------+ 
|col1|col2|new_uuid       | 
+----+----+------------------------------------+ 
|a |1 |a414e73b-24b8-4f64-8d21-f0bc56d3d290| 
|b |2 |f37935e5-0bfc-4863-b6dc-897662307e0a| 
|c |3 |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26| 
+----+----+------------------------------------+ 

+----+----+------------------------------------+ 
|col1|col2|new_uuid       | 
+----+----+------------------------------------+ 
|a |1 |1c6597bf-f257-4e5f-be81-34a0efa0f6be| 
|b |2 |6efe4453-29a8-4b7f-9fa1-7982d2670bd6| 
|c |3 |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e| 
+----+----+------------------------------------+ 

+----+----+------------------------------------+----+ 
|col1|col2|new_uuid       |col3| 
+----+----+------------------------------------+----+ 
|a |1 |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 | 
|b |2 |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 | 
|c |3 |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 | 
+----+----+------------------------------------+----+ 

注こと。

答えて

3

これは正常な動作です。ユーザー定義関数have to be deterministic

ユーザー定義関数は確定的でなければなりません。最適化、 重複呼び出しを省略してもよいか、機能がさえ は、それがクエリに存在しているよりも多くの時間を呼び出すことができます。

非決定的な機能を含めて出力を保存する場合は、中間データを永続ストレージに書き込んで読み戻す必要があります。チェックポイントやキャッシングは、いくつかの簡単な例で働くかもしれないが、それは、一般的で信頼性の高いではありません。

アップストリームプロセスが確定的である場合(シャッフルがある場合)、rand function with seedを使用してバイト配列に変換し、UUID.nameUUIDFromBytesに渡すことができます。

も参照してください:About how to add a new column to an existing DataFrame with random values in Scala

SPARK-20586は、特定の最適化を無効にすることができdeterministicフラグを導入し、データがpersistedで、エグゼキュータの損失が発生したときに、それがどのように動作するか明確ではありません。

関連する問題