2017-07-05 7 views
1

私はwholeTextFilesを使用してフォルダ内のすべてのファイル名を読み取り、それらを1つずつ別々に処理しようとしています(たとえば、各データセットのSVDベクトルを取得しようとしています。合計)。データはスペースで区切られ、異なる行(行列など)に配置された.txtファイルに保存されます。SparkContext.wholeTextFilesの後に複数のファイルを個別に処理する方法

私は "WholeTextFiles("すべてのテキストファイル "のパス") "を使用した後、問題を読んでデータを解析するのが難しく、私のような方法を使用できません1つのファイルのみを読み取るときに使用されます。この方法は、1つのファイルを読み込んだときに正常に動作し、正しい出力が得られます。誰かが私にここでそれを修正する方法を教えてもらえますか?ありがとう!

public static void main (String[] args) { 
    SparkConf sparkConf = new SparkConf().setAppName("whole text files").setMaster("local[2]").set("spark.executor.memory","1g");; 
    JavaSparkContext jsc = new JavaSparkContext(sparkConf); 
    JavaPairRDD<String, String> fileNameContentsRDD = jsc.wholeTextFiles("/Users/peng/FMRITest/regionOutput/"); 

    JavaRDD<String[]> lineCounts = fileNameContentsRDD.map(new Function<Tuple2<String, String>, String[]>() { 
     @Override 
     public String[] call(Tuple2<String, String> fileNameContent) throws Exception { 
       String content = fileNameContent._2(); 
       String[] sarray = content .split(" "); 
       double[] values = new double[sarray.length]; 
       for (int i = 0; i< sarray.length; i++){ 
        values[i] = Double.parseDouble(sarray[i]); 
       } 


      pd.cache(); 
      RowMatrix mat = new RowMatrix(pd.rdd()); 

      SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(84, true, 1.0E-9d); 
      Vector s = svd.s(); 
    }}); 

答えて

0

引用SparkContext.wholeTextFilesのscaladoc:

wholeTextFiles(パス:文字列、minPartitionsます。int = defaultMinPartitions):RDD [(文字列、文字列)] HDFSからのテキストファイルのディレクトリを読みます、ローカルファイルシステム(すべてのノードで利用可能)、またはHadoopでサポートされているファイルシステムURI。各ファイルは単一のレコードとして読み取られ、キーと値のペアで戻されます。ここで、キーは各ファイルのパスであり、値は各ファイルの内容です。

つまり、wholeTextFilesは、単にあなたが望むものではない場合があります。デザインによって以来

は「小さなファイルが好まれる」(scaladocを参照してください)、あなたは( filter付き) mapPartitionsまたは collectに解析を適用するファイルのサブセットをつかむために可能性があります。あなたがあなたの手の中に、パーティションごとのファイルを持っていたら

、あなたが使用することができScalaのParallel Collection APIschedule Spark jobs to execute in parallel:与えられたスパークアプリケーション(SparkContextインスタンス)彼らはから提出された場合には、複数の並列ジョブを同時に実行することができインサイド

別のスレッド。 「ジョブ」とは、このセクションでは、スパークアクション(保存、収集など)とそのアクションを評価するために実行する必要のあるタスクを意味します。 Sparkのスケジューラは完全にスレッドセーフであり、このユースケースをサポートして、複数のリクエスト(複数のユーザに対するクエリなど)を処理するアプリケーションを有効にします。

デフォルトでは、SparkのスケジューラはFIFO形式でジョブを実行します。各ジョブは「ステージ」に分かれており(例:マップとフェーズを減らす)、最初のジョブは使用可能なすべてのリソースに優先順位を付け、ステージには起動するタスクがあり、次に第2のジョブが優先されます。キューはクラスタ全体を使用する必要はなく、後でジョブをすぐに実行できるようになっていますが、キューの先頭にあるジョブが大きい場合、後のジョブが大幅に遅れることがあります。

+0

元々私はwholeTextFilesを使用してこの大きなセットの各ファイルを解析しようとしていましたが、かなり難しいと感じました。ループを使用して各ファイルを読み込み、結果をまとめてマージしようとするのではなく、並行して行う方法を見つけようとします。地図区画またはフィルタヘルプが表示されますか? –

+0

アップデートを参照してください。私はあなたがそれを好きになると思います。指が交差した。 –

+0

ありがとう、私はそれをチェックしています。ただ興味がありますが、私のコードでは、別の仕事区画で想定されるベクターが生成されています。ベクトルを「収集」する方法はありますか?私はrowMatrixを収集することができることを知っています。それは、それを直接rddに変換するメソッド ".rdd()"を持っていますが、Vectorはそうではありません。今私は100のデータセットすべてにすべての "U"(SVDの1つの出力ですが、私が望むものではない)を簡単に保存できますが、保存したい出力ベクトルである "S"ベクトルに対してはできません、私が常に印刷できるのは最後の出力結果ですが、すべてがコレクションではありません。 –

関連する問題