2016-03-19 6 views
2

ファイルから大きなデータセットを読み込み、それをSparkマトリックスに変換し、マトリックス上で機械学習アルゴリズムを実行する必要があります。私は機械学習アルゴリズムの速度をベンチマークしたいと思う。スパークRDDは常に遅延評価されるため、機械学習アルゴリズムをベンチマークすることは困難です。私はランタイムを測定するときに、入力ファイルを解析するためのランタイムも含みます。Sparkで明示的にRDDを実現する方法

SparkにいくつかのRDDを強制的に適用させる方法はありますか?私は機械学習アルゴリズムを実行する前に事前に入力ファイルを解析することができますか?

おかげで、 ダ

+0

データフレームとsaveAsTableへの変換はどうですか? –

答えて

3

私は通常、このような何かを:

val persisted = rdd.persist(...); 

ここでは、それ以外の場合は、それがメモリに収まる場合のみ、メモリを提供する、あなたのRDDのサイズに依存 - メモリとディスクレベル。その後、

そして:

persisted.count(); 
// now you can use 'persisted', it's materialized 

、その後、他のすべてのパイプライン変換(あなたのケースではミリリットル)

ので、カウントはアクションです - ので、あなたが前にそれを持続してきたので、それはRDDを具体化し、 - 次のステージはファイルからではなく永続ストレージからrddを取るでしょう

+0

メモリに永続化している場合は、val persisted = rdd.cache()を使用できます。それはメモリ内の永続性とまったく同じ効果を持っています – PinoSan

+0

私はこれがあなたが意味するものだと思います。私はRDDをメモリに格納されたデータで永続化し、count()を実行します。これは、入力ファイルを解析するためにSparkをトリガーします。しかし、永続的なデータに対してcorr()を実行すると、スピードアップは見られません。私は正しいことをしていますか? ライン= sc.textFile(sys.argvの[1]) データ= lines.map(parseVector) DATA1 = data.persist(storageLevel = StorageLevel.MEMORY_ONLY) data1.count() START_TIME = time.time() CORR = Statistics.corr(DATA1、メソッド= corrType) END_TIME = time.time() プリント( "%sの秒" %(END_TIME - START_TIME)) –

+0

あなたはあなたのことを検証していファイルはメモリに保存できますか?ボトルネックはmlであり、解析中ではないかもしれません。 –

関連する問題