2016-11-19 16 views
1

sparkのPipelinesのカスタムEstimatorを書きたいと思います。データクリーニングタスクを実行する必要があります。これは、いくつかの行が削除され、いくつかの列が削除され、いくつかの列が追加され、いくつかの値が既存の列に置き換えられることを意味します。また、数値カラムの平均値または最小値をNaN置換として格納する必要があります。Sparkカスタム前処理エスティメータ

しかし、

override def transformSchema(schema: StructType): StructType = { 
    schema.add(StructField("foo", IntegerType)) 
} 

はフィールドのみを追加サポートしていますか? 私はこれをどう扱うべきか不思議です。

答えて

2

フィールドを追加するだけで、StructField apiでサポートされているとします。しかし、それもあなたがフィールドを削除することはできませんことを意味するわけではありません!

StructTypeの値はfieldsで、Array[StructField]となります。あなたは.filter()この配列を見ることができますが、希望の列だけを保持して(namedataType、または何かもっと複雑な)適合がわかります。

あなたのフィルタリングを完了したら、次の2つのオプションがあります:

  1. がフィルタリングfields配列にそれぞれの新しい列のStructFieldを追加し、この
  2. からStructTypeを構築fieldsからStructTypeを構築配列を作成し、.add(...)を使用して新しい列を追加します。
+0

カラムが交換されたときに性能が影響するか、新しいカラムが追加されるときにパフォーマンスに影響があるかを知っていますか? –

+0

パフォーマンスのコストはどういうものかは分かりませんが、一般的には、新しいフィールドの計算にどれくらいの費用がかかります。あなたの仕事がスパークの性質に従って分散されている限り、あなたは大丈夫です。 データフレーム操作は遅延して実行され、データフレームAPIを使用する場合、Catalystオプティマイザは操作を最適化するのに最適です。データフレームにたくさんの列があることを恐れないでください。 –