2016-09-29 4 views
0

受信機受信機においてストリーミングスパーク - カスタム受信機とデータフレーム推論スキーマ

val incomingMessage = subscriberSocket.recv(0) 
val stringMessages = new String(incomingMessage).stripLineEnd.split(',') 
store(Row.fromSeq(Array(stringMessages(0)) ++ stringMessages.drop(2))) 

に以下のコードを検討し、私は列タイプの各々(stringMessages(0)によって示されている)テーブルを変換したいことはありません実際のテーブルタイプに変換します。コードのメインセクションで

、私は

val df = sqlContext.createDataFrame(eachGDNRdd,getSchemaAsStructField) 
println(df.collect().length) 

を行うときに、私は以下のエラー今

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double 
     at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) 
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44) 

を取得し、スキーマは、文字列とのIntフィールドの両方で構成され。私はそのフィールドがタイプによって一致していることを十字で確認しました。しかし、spark dataframeのようなものは、そのタイプを推論していません。


質問
1.(矛盾がない限り)実行時に、スキーマの型を推論スパークべきではないでしょうか。
2.テーブルは動的なので、スキーマは各行の最初の要素(テーブル名を含む)に基づいて変化します。スキーマをオンザフライで変更する簡単な提案方法はありますか?

または明らかに何か不足していますか?

答えて

0

私はSparkを初めて使用していますが、実行しているバージョンについては言いませんでしたが、v2.1.0では、あなたが言及した特定の理由によりスキーマの推論がデフォルトで無効になっています。レコード構造が矛盾する場合、Sparkはスキーマを確実に推論することができません。 spark.sql.streaming.schemaInferenceをtrueに設定してスキーマの推論を有効にすることはできますが、自分でスキーマを指定する方が良いと思います。