2017-10-04 5 views
1

私は任意のデータセットでDataframeマップ関数を使用しようとしています。しかし、私は行 - >行からマップする方法を理解していません。何の例は、スパーク、SQLドキュメントの任意のデータのために与えられていない:エンコーダのいくつかの並べ替えが存在する必要があるので、Spark SQLの任意の行を持つデータセットのマップを使用

Dataset<Row> original_data = ... 
Dataset<Row> changed_data = original_data.map(new MapFunction<Row,Row>{ 
      @Override 
      public Row call(Row row) throws Exception { 
       Row newRow = RowFactory.create(obj1,obj2); 
       return newRow; 
      } 
}, Encoders.bean(Row.class)); 

しかし、これは動作しませんか? 一般的な行にどのようにマップできますか?

+0

obj1とobj2の種類は何ですか?それは基本的なデータ型ですか? –

+0

MapはRDD関数で、Spark SQLでは "select"と "with column"を使用するようにしてください...もしあなたがチャンスを持っていれば、Scalaでこれを簡単にコーディングしようとするべきです... – Victor

答えて

1

obj1およびobj2がプリミティブ型ではない場合は、そのスキーマをStructTypeに表してRowエンコーダを作成します。 Row型を使用する代わりに、obj1とobj2の両方を格納するカスタムBeanを作成し、map変換でカスタムBeanエンコーダを使用することをお勧めします。

行タイプ:

StructType customStructType = new StructType(); 
     customStructType = customStructType.add("obj1", DataTypes.< type>, false); 
     customStructType = customStructType.add("obj2", DataTypes.<type>, false); 
     ExpressionEncoder<Row> customTypeEncoder = null; 

     Dataset<Row> changed_data = original_data.map(row->{ 
      return RowFactory.create(obj1,obj2);; 
    }, RowEncoder.apply(customStructType)); 

カスタム豆の種類:

class CustomBean implements ....{ 
    Object obj1; 
    Object obj2; 
.... 
} 

Dataset<CustomBean> changed_data = original_data.map(row->{ 
       return new CustomBean(obj1,obj2); 
     }, Encoders.bean(CustomBean)); 
関連する問題