SQLを更新することについて推論するのではなく、SQLの場合と同様に考えることをお勧めします。次の操作を行うことができます:ここで
import org.spark.sql.functions.when
val spark: SparkSession = ??? // your spark session
val df: DataFrame = ??? // your dataframe
import spark.implicits._
df.select(
$"UniqueRowIdentifier", $"_c0", $"_c1", $"_c2", $"_c3", $"_c4",
$"_c5", when($"_c5".contains("Incorrect"), 1).otherwise(0) as "isBadRecord")
は、ローカルで結果を見るためにあなたのスパークシェル上でコピーして貼り付けることができます自己完結型のスクリプトです:
誰の関連出力以下れる
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
sc.setLogLevel("ERROR")
val schema =
StructType(Seq(
StructField("UniqueRowIdentifier", IntegerType),
StructField("_c0", DoubleType),
StructField("_c1", DoubleType),
StructField("_c2", StringType),
StructField("_c3", DoubleType),
StructField("_c4", StringType),
StructField("_c5", StringType),
StructField("isBadRecord", IntegerType)))
val contents =
Seq(
Row(1, 0.0 , 0.0 , "Name", 0.0, "Desc", "", 0),
Row(2, 2.11 , 10000.0 , "Juice", 0.0, "XYZ", "2016/12/31 : Incorrect", 0),
Row(3, -0.5 , -24.12, "Fruit", -255.0, "ABC", "1994-11-21 00:00:00", 0),
Row(4, 0.087, 1222.0 , "Bread", -22.06, "", "2017-02-14 00:00:00", 0),
Row(5, 0.087, 1222.0 , "Bread", -22.06, "", "", 0)
)
val df = spark.createDataFrame(sc.parallelize(contents), schema)
df.show()
val withBadRecords =
df.select(
$"UniqueRowIdentifier", $"_c0", $"_c1", $"_c2", $"_c3", $"_c4",
$"_c5", when($"_c5".contains("Incorrect"), 1).otherwise(0) as "isBadRecord")
withBadRecords.show()
:
+-------------------+-----+-------+-----+------+----+--------------------+-----------+
|UniqueRowIdentifier| _c0| _c1| _c2| _c3| _c4| _c5|isBadRecord|
+-------------------+-----+-------+-----+------+----+--------------------+-----------+
| 1| 0.0| 0.0| Name| 0.0|Desc| | 0|
| 2| 2.11|10000.0|Juice| 0.0| XYZ|2016/12/31 : Inco...| 0|
| 3| -0.5| -24.12|Fruit|-255.0| ABC| 1994-11-21 00:00:00| 0|
| 4|0.087| 1222.0|Bread|-22.06| | 2017-02-14 00:00:00| 0|
| 5|0.087| 1222.0|Bread|-22.06| | | 0|
+-------------------+-----+-------+-----+------+----+--------------------+-----------+
+-------------------+-----+-------+-----+------+----+--------------------+-----------+
|UniqueRowIdentifier| _c0| _c1| _c2| _c3| _c4| _c5|isBadRecord|
+-------------------+-----+-------+-----+------+----+--------------------+-----------+
| 1| 0.0| 0.0| Name| 0.0|Desc| | 0|
| 2| 2.11|10000.0|Juice| 0.0| XYZ|2016/12/31 : Inco...| 1|
| 3| -0.5| -24.12|Fruit|-255.0| ABC| 1994-11-21 00:00:00| 0|
| 4|0.087| 1222.0|Bread|-22.06| | 2017-02-14 00:00:00| 0|
| 5|0.087| 1222.0|Bread|-22.06| | | 0|
+-------------------+-----+-------+-----+------+----+--------------------+-----------+
withColumn APIを使用して、ある列の値をチェックし、それに基づいて他の列を更新するにはどうすればよいですか? –
私の答えを更新してください。 –
ありがとう、私はちょうどこれを試してみましたが、それはうまく動作します –