2017-11-20 9 views
-2

私はHolden Karauによって "Spark With Fast Process"という本から次の例を見つけました。私は、次のコード行は、プログラムの中で何をするか分かりませんでした:マップ関数の操作の理解

val splitLines = inFile.map(line => { 
val reader = new CSVReader(new StringReader(line)) 
reader.readNext() 
}) 
val numericData = splitLines.map(line => line.map(_.toDouble)) 
val summedData = numericData.map(row => row.sum) 

プログラムは次のとおりです。

package pandaspark.examples 
import spark.SparkContext 
import spark.SparkContext._ 
import spark.SparkFiles; 
import au.com.bytecode.opencsv.CSVReader 
import java.io.StringReader 
object LoadCsvExample { 
    def main(args: Array[String]) { 
    if (args.length != 2) { 
    System.err.println("Usage: LoadCsvExample <master> 
     <inputfile>") 
    System.exit(1) 
    } 

val master = args(0) 
val inputFile = args(1) 
val sc = new SparkContext(master, "Load CSV Example", 
System.getenv("SPARK_HOME"), 
Seq(System.getenv("JARS"))) 
sc.addFile(inputFile) 
val inFile = sc.textFile(inputFile) 
val splitLines = inFile.map(line => { 
val reader = new CSVReader(new StringReader(line)) 
reader.readNext() 
}) 
val numericData = splitLines.map(line => line.map(_.toDouble)) 
val summedData = numericData.map(row => row.sum) 
println(summedData.collect().mkString(",")) 
} 
} 

私は、上記に簡単にプログラムの機能を知っています。それは入力 CSVを解析し、すべての行を合計します。しかし、これら3行のコードがどの程度正確に達成できるかは、私が理解できないものです。

また、これらの行がflatMapに置き換えられた場合、どのように出力が変化するか説明できますか? Like:

答えて

1

このコードでは、基本的にCSVファイルのデータを読み込んで値を追加しています。 CSVファイルのようなものであると仮定 - 私たちはCSVからデータをフェッチしているINFILEのようなファイルので、ここで

10,12,13 
1,2,3,4 
1,2 

- ので、ここで

val inFile = sc.textFile("your CSV file path") 

INFILEは、テキスト形式のデータを持っているRDDです。 とあなたがそれを収集するには、該当する場合、それは次のようになります -

Array[String] = Array(10,12,13 , 1,2,3,4 , 1,2) 

を、あなたはそれの上にマップを適用するとき、あなたが見つける -

line = 10,12,13 
line = 1,2,3,4 
line = 1,2 

をCSV形式でこのデータを読み込むためにそれ使用している -

- そう

val reader = new CSVReader(new StringReader(line)) 
reader.readNext() 

をCSV形式でデータを読み込んだ後、分割線は次のようになり10分割線の

Array(
Array(10,12,13), 
Array(1,2,3,4), 
Array(1,2) 
) 

、それはあなたがアレイ(10,12,13)を取得するラインにここ

splitLines.map(line => line.map(_.toDouble)) 

を適用していますし、それの後に、それはそう、それはすべての要素の種類を変更することだ

line.map(_.toDouble) 

を使用しています文字列からDoubleへ そうnumericDataにあなたは同じでしょう

Array(Array(10.0, 12.0, 13.0), Array(1.0, 2.0, 3.0, 4.0), Array(1.0, 2.0)) 

が、現在はダブル

、個々の行または列の合計はそうのような何か答える適用していますのフォーム内のすべての要素 - アレイ(35.0、10.0、 3.0)

をあなたはsusummedData.collect()

+0

あなたはどのようにでしょう出力変化を教えてもらえ同様:? valの分割線= inFile.flatMap(ライン=> {valのリーダー=新しいCSVReaderを(新しいStringReader(行))reader.readNext()}) 数値numericData = splitLines.flatMap(行=> line.map(_。toDouble)) val summedData = numericData.map(行=> row.sum) – Annapurna

+0

flatMapは常に内部ラッパーを削除します。ここでは の配列(配列(10.0、12.0、13.0)、配列(1.0,2.0,3.0,4.0)、配列(1.0,2.0)) のような応答を得ていますが、代わりにflatMapを適用すると、ように - 配列(10.0,12.0,13.0,1.0,2.0,3.0,4.0,1.0,2.0) ここで見ることができるflatMapのbeacuseすべての内部ラッパーが削除されました –

1

まず、コードサンプルにflatMapオペレーションはありません。タイトルは誤解を招きます。しかし、一般的には、コレクションで呼び出されるmapは、コレクションの各要素に関数が適用された新しいコレクションを返します。あなたのコードスニペットを通じて行ずつ行く

:​​の

val splitLines = inFile.map(line => { 
val reader = new CSVReader(new StringReader(line)) 
reader.readNext() 
}) 

タイプRDD[String]です。そのような文字列をすべて取得し、csvリーダーを作成してreadNext(文字列の配列を返す)を呼び出します。だから最後にRDD[String[]]が得られます。

val numericData = splitLines.map(line => line.map(_.toDouble)) 

2つのマップ操作がネストされたもう少しトリッキーな行です。繰り返しますが、RDDコレクションの各要素(今はString[])を取得して、のすべての要素に_.toDouble関数を適用します。最後にRDD[Double[]]が得られます。

val summedData = numericData.map(row => row.sum) 

RDDの要素を取り、sum関数を適用します。すべての要素がDouble[]であるため、合計でDoubleという値が生成されます。最後にRDD[Double]が得られます。

+0

を適用するとき、あなたはそれを得るだろう、私が代わりにflatMapを使用した場合、あなたはどのように出力変化だろう教えてもらえ同様:? valの分割線= inFil数値データ= splitLines.flatMap(line => line.map(_。toDouble))この数値データを使用して、私が代わりにflatMapを使用した場合 ヴァルsummedData = numericData.map(行=> row.sum) – Annapurna