2016-10-15 10 views
1

私はをタイプrddをrddに変換してからデータフレームを作成しようとしています。コードを実行すると、は例外をスローします。型JavaRDDを行JavaRDDに変換する

コード:

JavaRDD<Counter> rdd = sc.parallelize(counters); 
JavaRDD<Row> rowRDD = rdd.map((Function<Counter, Row>) RowFactory::create); 

//I am using some schema here based on the class Counter 
DataFrame df = sqlContext.createDataFrame(rowRDD, getSchema()); 
marineDF.show(); //throws Exception 

型付けRDDからの変換が行の工場での順序を保持RDD行にしていますか?そうでない場合はどうすればそれを確認できますか?

クラスコード:

class Counter { 
    long vid; 
    byet[] bytes; 
    List<B> blist; 
} 
class B { 
    String id; 
    long count; 
} 

スキーマ:

private StructType getSchema() { 
List<StructField> fields = new ArrayList<>(); 
fields.add(DataTypes.createStructField("vid", DataTypes.LongType, false)); 
fields.add(DataTypes.createStructField("bytes",DataTypes.createArrayType(DataTypes.ByteType), false)); 

List<StructField> bFields = new ArrayList<>(); 
bFields.add(DataTypes.createStructField("id", DataTypes.StringType, false)); 
bFields.add(DataTypes.createStructField("count", DataTypes.LongType, false)); 

StructType bclasSchema = DataTypes.createStructType(bFields); 

fields.add(DataTypes.createStructField("blist", DataTypes.createArrayType(bclasSchema, false), false)); 
StructType schema = DataTypes.createStructType(fields); 
return schema; 
} 

は例外で失敗します

java.lang.ClassCastException: test.spark.SampleTest$A cannot be cast to java.lang.Long 

    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) 
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:42) 
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:221) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$LongConverter$.toScalaImpl(CatalystTypeConverters.scala:367) 

答えて

2

事はここには変換はありませんです。 を作成するときは、任意のObjectを受け入れることができます。それはそのまま置かれます。だから、DataFrame創造と同等ではありません。

spark.createDataFrame(rdd, Counter.class); 

またはDataset<Counter>作成:

Encoder<Counter> encoder = Encoders.bean(Counter.class); 
spark.createDataset(rdd, encoder); 

Beanクラスでの作業します。

だからRowFactory::createはここではあまり適用されません。 RDD<Row>を渡す場合は、すべての値をDataFramerequired type mappingで直接使用できる形式で表す必要があります。

Row(vid, bytes, List(Row(id1, count1), ..., Row(idN, countN)) 

とあなたのコードは以下と等価でなければなりません:それはあなたが明示的に次のような形状のに各Counterをマッピングしなければならないことを意味

JavaRDD<Row> rows = counters.map((Function<Counter, Row>) cnt -> { 
    return RowFactory.create(
    cnt.vid, cnt.bytes, 
    cnt.blist.stream().map(b -> RowFactory.create(b.id, b.count)).toArray() 
); 
}); 

Dataset<Row> df = sqlContext.createDataFrame(rows, getSchema()); 
関連する問題