2017-11-08 8 views
0

私はさまざまなファイルのパスを含むHDFSファイルを持っています。ここFILE1と呼ばれるファイルは、次のとおりです。Scalaスパークループはエラーなしで処理されますが、出力は生成されません

path/of/HDFS/fileA 
path/of/HDFS/fileB 
path/of/HDFS/fileC 
. 
. 
. 

上記のファイルのそれぞれの行を読んで、別の関数で、それを処理するために、次のように私はScalaのスパークにforループを使用しています:

val lines=Source.fromFile("path/to/file1.txt").getLines.toList 

for(i<-lines){ 
i.toString() 
val firstLines=sc.hadoopFile(i,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap { 
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String] 
} 
} 

私が実行したとき上記のループは、エラーを返さずに実行され、新しい行にScalaプロンプトが表示されます:scala>

しかし、私はfirstLinesに格納される出力をいくつか見てみると、仕事:

scala> firstLines 
<console>:38: error: not found: value firstLines 
      firstLine 
     ^

出力を生成していない上記のループの問題は何も問題なく実行されますか?

追加情報 hadoopFileは、その最初のパラメータとして文字列のパス名を受け付ける関数。それで、私はfile1の各行(各行はパス名)を最初のパラメータiのStringとして渡そうとしているのです。 flatMapの機能は、hadoopFileに渡されたファイルの最初の行を取得し、それを単独で格納し、他のすべての行をダンプします。したがって、目的の出力(firstLines)は、パス名(i)でhadoopFileに渡されるすべてのファイルの最初の行にする必要があります。

私はlooopせずに、単に1つのファイルに対して機能を実行してみました、それは出力を生成します。

val firstLines=sc.hadoopFile("path/of/HDFS/fileA",classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap { 
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String] 
} 

scala> firstLines.take(3) 
res27: Array[String] = Array(<?xml version="1.0" encoding="utf-8"?>) 

FILEAはXMLファイルであるので、あなたは、そのファイルの結果の最初の行を見ることができます。だから私は関数が正常に動作することを知っている、それは私が把握することはできませんループの単なる問題です。助けてください。

答えて

1

変数firstLinesは、forループの本体に定義されているため、その範囲はこのループに限定されています。つまり、ループ外の変数にアクセスすることはできません。そのため、Scalaコンパイラからerror: not found: value firstLinesが通知されます。

linesに記載されているすべてのファイルの最初の行を収集したいと考えています。

ここでははすべて、Scalaで異なる構文に変換できます。あなたが書いたforループのようなものを使うことができますし、機能的アプローチを採用して、ファイルリストにmap関数を適用することもできます。以下のコードでは、mapの中に記述したコードを入れて、HadoopRDDを作成し、flatMapを関数の最初の行を取得するために適用します。

次に、RDD[String]のリストを取得します。この段階では、実際の作業を開始していないことに注意してください。 RDDの評価をトリガして結果を収集するには、リストにあるRDDごとにcollectメソッドへの追加呼び出しが必要です。

// Renamed "lines" to "files" as it is more explicit. 
val fileNames = Source.fromFile("path/to/file1.txt").getLines.toList 

val firstLinesRDDs = fileNames.map(sc.hadoopFile(_,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap { 
    case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String] 
}) 

// firstLinesRDDs is a list of RDD[String]. Based on this code, each RDD 
// should consist in a single String value. We collect them using RDD#collect: 
val firstLines = firstLinesRDDs.map(_.collect) 

しかし、このアプローチには、スパークが提供する利点から利益を得ることができないという欠点があります。

mapfilenamesで操作を適用すると、RDDで作業していないため、ファイル名はドライバ(Sparkセッションをホストするプロセス)で順次処理され、並列化可能なSparkジョブには含まれません。これは、一度に1つのファイル名で、コードの2番目のブロックで書いたことを実行するのと同じです。

問題に対処するには、どうすればよいですか? Sparkを使って作業するときは、できるだけ早くRDDの宣言をコード内でプッシュしようとするのが良いことです。どうして?これにより、Sparkは私たちがやりたい作業を並列化して最適化することができます。あなたの例は、このコンセプトの教科書のイラストでもありますが、ファイルを操作するという要件によって、ここでさらに複雑になります。

この場合、hadoopFileはカンマで区切られたファイルを入力として受け入れることができます。

val firstLinesRDD = sc.hadoopFile(fileNames.mkString(","), classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap { 
    case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String] 
} 

をそして、我々は、単一のcollectで私たちの最初の行を取得します:そのため、代わりに順次すべてのファイルのためのRDDSを作成するのではなく、我々は彼らのすべてのための1 RDDを作成

val firstLines = firstLinesRDD.collect 
+0

非常に明確だったことと、詳細な説明!私は今なぜforループが機能していないのか理解しています。あなたのソリューションを使用して、私はその結果を達成することができます。本当にありがとう! – PreethiS

+0

私はそれが助けてうれしいです!ありがとう –

関連する問題