2017-10-06 7 views
0

私はこのようないくつかのコードでScalaのを使用してデータフレームをストリーミングスパークから値を抽出しようとしています:TimestampType/java.sql.TimestampのgetLongに相当しますか?

var txs = spark.readStream 
    .format("kafka") .option("kafka.bootstrap.servers",KAFKABS) 
    .option("subscribe", "txs") 
    .load() 
txs = txs.selectExpr("CAST(value AS STRING)") 

val schema = StructType(Seq(
     StructField("from",StringType,true), 
     StructField("to", StringType, true), 
     StructField("timestamp", TimestampType, true), 
     StructField("hash", StringType, true), 
     StructField("value", StringType, true) 
)) 

txs = txs.selectExpr("cast (value as string) as json") 
      .select(from_json($"json", schema).as("data")) 
      .select("data.*") 
      .selectExpr("from","to","cast(timestamp as timestamp) as timestamp","hash","value") 
val newDataFrame = txs 
    .flatMap(row => { 
    val to = row.getString(0) 
    val from = row.getString(1) 
    // val timestamp = row.getTimestamp?? 

    //do stuff 
    }) 

タイムスタンプの等価入力されたgetメソッドがある場合、私は疑問に思って?私の混乱に加えて、構造化されたストリームのために定義しているSQL型と、変数flatMap funcitonを使ってアクセスするときの実際の型の間に、隠されたマッピング(少なくとも私には隠されていた)があったようです。私はドキュメントを見て、これは事実でした。ドキュメントに従って:

位置iの値を返します。値がNULLの場合、NULLは が返されます。 > java.lang.BooleanのByteType - - >がjava.lang.Byte
はShortType - > java.lang.ShortのIntegerType - > java.langの

BooleanType:次は、Spark SQLタイプと 戻り値の型の間のマッピングであります.Integer
FloatType - > java.lang.FloatのDoubleType - > java.lang.Doubleの
StringType - >文字列DecimalType - > java.math.BigDecimalの

DATETYPE - > java.sql.Date TimestampType - > javaの.sql.Timestamp

BinaryType - >バイト配列ArrayType - > scala.colle ction.Seq(java.util.Listのため GETLISTを使用)のMapType - > scala.collection.Map(java.util.Mapのため てgetJavaMapを使用)StructType - > org.apache.spark.sql.Row

私は、このマッピングがクラスに、それが実装するインタフェースとして正式にベーキングされていると思いますが、明らかにそうではありません。(TimestampType/javaの場合はそうです。 sql.Timestamp、私は何かのために私のタイムスタンプ型を放棄する必要がありますか?誰かが私が間違っている理由を説明してください!私はスカラを使用し、3〜4ヶ月間スパークしています。

-Paul

答えて

1

あなたは正しくTimestampType列のScalaの型がjava.sql.Timestampであると推定しています。

V1.5.0org.apache.spark.sql.RowhasgetTimestamp(i: Int)方法、あなたはそれを呼び出すことができますし、java.sql.Timestamp得るよう:

:あなたは、以前のバージョンを使用する場合でも

val timestamp = row.getTimestamp(1) 

は、このタイプを放棄する必要はありません、あなたは、単にgetAs[T](i: Int)java.sql.Timestampとを使用することができます

val timestamp = row.getAs[java.sql.Timestamp](2) 
// OR: 
val timestamp = row.getAs[java.sql.Timestamp]("timestamp") 
関連する問題