2016-04-10 17 views
3

この例では、createDataFrame呼び出しの中からStackOverflowErrorを取得しています。これは、無限ループで自身を呼び出すjava型の推論を伴うスカラーコードから始まります。avro型のcreateDataFrameの無限再帰

final EventParser parser = new EventParser(); 
JavaRDD<Event> eventRDD = sc.textFile(path) 
     .map(new Function<String, Event>() 
{ 
    public Event call(String line) throws Exception 
    { 
     Event event = parser.parse(line); 
     log.info("event: "+event.toString()); 
     return event; 
    } 
}); 
log.info("eventRDD:" + eventRDD.toDebugString()); 

DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class); 
df.show(); 

スタックトレースの下部には、次のようになります。

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102) 
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104) 
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 

これはhttp://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.htmlで報告されたバグに似ていますが、私はこのバグだったときより後のスパーク1.4.1を使用しています修理された。

イベントクラスは、このavscからavroによって生成されます。それは問題を引き起こすと報告されているdoubleとlongフィールドを含んでいますが、doubleをstringに置き換えても症状は変わりません。

{ 
    "namespace": "mynamespace", 
    "type": "record", 
    "name": "Event", 
    "fields": [ 
     { "name": "ts", "type": "double", "doc": "Timestamp"}, 
     { "name": "uid", "type": "string", "doc": "Unique ID of Connection"}, 
     { "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address (AKA ORIG)"}, 
     { "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port (or ICMP code)"}, 
     { "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address (AKA RESP)"}, 
     { "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or ICMP code)"}, 
     { "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"}, 
     { "name": "service", "type": "string", "doc": "Dynamically detected application protocol, if any"}, 
     { "name": "duration", "type": "double", "doc": "Time of last packet seen – time of first packet seen"}, 
     { "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence numbers if TCP"}, 
     { "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence numbers if TCP"}, 
     { "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state table)"}, 
     { "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if remotely F."}, 
     { "name": "localresp", "type": "boolean", "doc": "empty, always unset"}, 
     { "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content gaps"}, 
     { "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history table)"}, 
     { "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"}, 
     { "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"}, 
     { "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"}, 
     { "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"}, 
     { "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection UID of encapsulating parent (s)"}, 
     { "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"}, 
     { "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"} 
    ] 
} 

誰かに助言してもらえますか?ありがとう!

+0

無限再帰はどこですか?再現可能な例を最低限提供してください! – eliasah

+0

再帰はscala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:108)から開始され、スタックオーバーフローエラーが発生するまで続きます。最小の例には、複雑なpom、avscファイル、パーサーなどがあります。だから私はそれらをすべてtgzで詰め込んだ。私はあなたにそれをどうやって渡すべきですか? – Bradjcox

+0

それは私が思う少し圧倒されるでしょう。何が壊れているのかを把握するためにコードの一部を分離してみましょう。 e。 RDDをDataFrameに変換する前に、カウントアクションおよび/またはRDD収集を実行する。 – eliasah

答えて

0

参照この問題に対処するために火花アブロプロジェクトで行われている作業があります:https://github.com/databricks/spark-avro/pull/217https://github.com/databricks/spark-avro/pull/216

これがマージされると、DataSetにアブロオブジェクトのRDDを変換する機能があるはずです(生成されたクラスのgetSchema()関数の循環参照の問題なしで、DataSetのDataSetはDataFrameに相当します)。