私はこのようないくつかのコードでScalaのを使用してデータフレームをストリーミングスパークから値を抽出しようとしています:TimestampType/java.sql.TimestampのgetLongに相当しますか?
var txs = spark.readStream
.format("kafka") .option("kafka.bootstrap.servers",KAFKABS)
.option("subscribe", "txs")
.load()
txs = txs.selectExpr("CAST(value AS STRING)")
val schema = StructType(Seq(
StructField("from",StringType,true),
StructField("to", StringType, true),
StructField("timestamp", TimestampType, true),
StructField("hash", StringType, true),
StructField("value", StringType, true)
))
txs = txs.selectExpr("cast (value as string) as json")
.select(from_json($"json", schema).as("data"))
.select("data.*")
.selectExpr("from","to","cast(timestamp as timestamp) as timestamp","hash","value")
val newDataFrame = txs
.flatMap(row => {
val to = row.getString(0)
val from = row.getString(1)
// val timestamp = row.getTimestamp??
//do stuff
})
タイムスタンプの等価入力されたgetメソッドがある場合、私は疑問に思って?私の混乱に加えて、構造化されたストリームのために定義しているSQL型と、変数flatMap
funcitonを使ってアクセスするときの実際の型の間に、隠されたマッピング(少なくとも私には隠されていた)があったようです。私はドキュメントを見て、これは事実でした。ドキュメントに従って:
位置iの値を返します。値がNULLの場合、NULLは が返されます。 > java.lang.BooleanのByteType - - >がjava.lang.Byte
はShortType - > java.lang.ShortのIntegerType - > java.langのBooleanType:次は、Spark SQLタイプと 戻り値の型の間のマッピングであります.Integer
FloatType - > java.lang.FloatのDoubleType - > java.lang.Doubleの
StringType - >文字列DecimalType - > java.math.BigDecimalのDATETYPE - > java.sql.Date TimestampType - > javaの.sql.Timestamp
BinaryType - >バイト配列ArrayType - > scala.colle ction.Seq(java.util.Listのため GETLISTを使用)のMapType - > scala.collection.Map(java.util.Mapのため てgetJavaMapを使用)StructType - > org.apache.spark.sql.Row
私は、このマッピングがクラスに、それが実装するインタフェースとして正式にベーキングされていると思いますが、明らかにそうではありません。(TimestampType/javaの場合はそうです。 sql.Timestamp、私は何かのために私のタイムスタンプ型を放棄する必要がありますか?誰かが私が間違っている理由を説明してください!私はスカラを使用し、3〜4ヶ月間スパークしています。
-Paul