2016-05-08 4 views
0

"Subject:"を含む行を抽出してarraylistに保存しようとしました。私はエラーに直面していないが、配列のリストは空です。あなたは私が間違っているところで私を導くことができますか?これを行う最善の方法は?Spark - 文字列一致後の行を抽出してArrayListに保存します

import java.util.*; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.VoidFunction; 
public final class extractSubject { 

public static void main(String[] args) { 



    SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("JavaBookExample"); 
    JavaSparkContext sc = new JavaSparkContext(sparkConf); 

    JavaRDD<String> sample = sc.textFile("/Users/Desktop/sample.txt"); 
    final ArrayList<String> list = new ArrayList<>(); 
    sample.foreach(new VoidFunction<String>(){ 

        public void call(String line) { 

         if (line.contains("Subject:")) { 
          System.out.println(line); 
          list.add(line); 
         } 
        }} 
    ); 

    System.out.println(list); 
    sc.stop(); 
} 
} 

答えて

1

Sparkアプリケーションは分散して並行して実行されることにご注意ください。したがって、Sparkによって実行される関数外の変数は変更できません。

代わりに、これらの関数から結果を返す必要があります。あなたの場合、結果の返されるコレクションを連結する(結果がないforeachではなく)が必要です。

一致する行が含まれているリストが一致した場合はそれを返し、それ以外の場合は空のリストを返します。

mainファンクションのデータを印刷するには、まずcollect()を呼び出して、可能性のある分散データをマスターノードに収集する必要があります。ここで

例:

import java.util.*; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.FlatMapFunction; 

public final class extractSubject { 

    public static void main(String[] args) { 
     SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("JavaBookExample"); 
     JavaSparkContext sc = new JavaSparkContext(sparkConf); 

     //JavaRDD<String> sample = sc.textFile("/Users/Desktop/sample.txt"); 
     JavaRDD<String> sample = sc.parallelize(Arrays.asList("Subject: first", 
                   "nothing here", 
                   "Subject: second", 
                   "dummy")); 

     JavaRDD<String> subjectLinesRdd = sample.flatMap(new FlatMapFunction<String, String>() { 
      public Iterable<String> call(String line) { 
       if (line.contains("Subject:")) { 
        return Collections.singletonList(line); // line matches → return list with the line as its only element 
       } else { 
        return Collections.emptyList(); // ignore line → return empty list 
       } 
      } 
     }); 

     List<String> subjectLines = subjectLinesRdd.collect(); // collect values from Spark workers 
     System.out.println(subjectLines); // → "[Subject: first, Subject: second]" 

     sc.stop(); 
    } 
} 
関連する問題