2016-12-09 14 views
0

私はSpark 1.6の新機能です。寄木張りのファイルを読んで処理したいと思います。 のためにこのような構成の寄木細工を持っていると仮定簡素化:spark:寄木細工ファイルを読んで処理する

id, amount, label 

と私は3のルールがあります:スパークとScalaでそれを行うことができますどのように

amount < 10000 => label=LOW 
10000 < amount < 100000 => label=MEDIUM 
amount > 1000000 => label = HIGH 

を?それは正しいアプローチ

case class SampleModels(
    id: String, 
    amount: Double, 
    label: String, 
) 

val sc = SparkContext.getOrCreate() 
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._ 

val df = sqlContext.read.parquet("/path/file/") 
val ds = df.as[SampleModels].map(row=> 
    MY LOGIC 
    WRITE OUTPUT IN PARQUET 
) 

です:

は、私はそのような何かをしようと?それは効率的ですか? 「MYLOGIC」はより複雑になる可能性があります。

ありがとうございます。

答えて

1

はい、これは正しい方法です。あなたのロジックがシンプルであれば、組み込み関数を使ってデータフレームを直接操作することもできます(例えばwhenのように)、行をcaseクラスにマッピングし、jvmでコードを実行するより少し速くなります。結果を簡単に寄木細工に保存することができます。

+0

に書いたように

Error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases. df.as[AmountModel].map(row => { ^ 

私はケースクラスとインポートsqlContext.implicits._を使用しています。一般に、スパーク・データフレーム/データセット操作を使用することをお勧めします。この操作はSpark SQLの[Catalyst Optimizer](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)に渡されます。この最適化エンジンは、あなたの質問をよりうまく実行できるようになりました。 –

0

はい、正しいアプローチです。 必要な余分な列を作成するために、完全なデータを1回パスします。

あなたは、SQL方法をしたい場合は、これが移動するための方法である、

val df = sqlContext.read.parquet("/path/file/") 
df.registerTempTable("MY_TABLE") 
val df2 = sqlContext.sql("select *, case when amount < 10000 then LOW else HIGH end as label from MY_TABLE") 

はしかし代わりにsparkContextのhiveContextを使用することを忘れないでください。

0

お返事ありがとうございます。 は、私が最初のバージョンを実装しようとしましたが、私はこのエラーが表示されます。私はこれが正しい答えであるポスト

関連する問題