2016-03-21 9 views
1

Apache Sparkの実装の1つのワークフローで助けが必要です。私の仕事は次です:個別にSPARKでいくつかのファイルを処理する

  1. ソースデータとしていくつかのCSVファイルがあります。注意:結果は、いくつかの追加の列を持つソースファイルである:これらのファイルが別のレイアウトに

  2. を持っている可能性があり、私は、各ファイル(これは問題ではありません)

  3. 主な目標を解析する必要があるかの情報とメタデータを持っています。 1つの出力範囲に入らずに各ソースファイルを更新する必要があります。たとえば、ソース10個のファイル→10個の結果ファイル、および各結果ファイルには対応するソースファイルのデータのみがあります。

私はスパークがマスクで多くのファイルを開くことができます知っているように:

var source = sc.textFile("/source/data*.gz"); 

しかし、この場合には、私は、ファイルのどの行を認識することはできません。ソースファイルのリストを取得し、以下のシナリオで処理しようとすると:

JavaSparkContext sc = new JavaSparkContext(...); 
List<String> files = new ArrayList() //list of source files full name's 
for(String f : files) 
{ 
    JavaRDD<String> data = sc.textFile(f); 
    //process this file with Spark 
    outRdd.coalesce(1, true).saveAsTextFile(f + "_out"); 
} 

しかし、この場合、私はすべてのファイルを順次モードで処理します。

私の質問は次にあります:どのように多くのファイルを並列モードで処理できるのですか?たとえば、1つのファイル - 1つのエグゼキュータ?

私は、ソース・データとの単純なコードでこれを実装しようとした:

//JSON file with paths to 4 source files, saved in inData variable 
{ 
"files": [ 
    { 
     "name": "/mnt/files/DigilantDaily_1.gz", 
     "layout": "layout_1" 
    }, 
    { 
     "name": "/mnt/files/DigilantDaily_2.gz", 
     "layout": "layout_2" 
    }, 
    { 
     "name": "/mnt/files/DigilantDaily_3.gz", 
     "layout": "layout_3" 
    }, 
    { 
     "name": "/mnt/files/DigilantDaily_4.gz", 
     "layout": "layout_4" 
    } 
    ] 
} 

sourceFiles= new ArrayList<>(); 
    JSONObject jsFiles = (JSONObject) new JSONParser().parse(new FileReader(new File(inData))); 
    Iterator<JSONObject> iterator = ((JSONArray)jsFiles.get("files")).iterator(); 
    while (iterator.hasNext()){ 
     SourceFile sf = new SourceFile(); 
     JSONObject js = iterator.next(); 
     sf.FilePath = (String) js.get("name"); 
     sf.MetaPath = (String) js.get("layout"); 
     sourceFiles.add(sf); 
    } 

    SparkConf sparkConf = new SparkConf() 
      .setMaster("local[*]") 
      .setAppName("spark-app"); 
    final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); 

    try { 

     final Validator validator = new Validator(); 

     ExecutorService pool = Executors.newFixedThreadPool(4); 

     for(final SourceFile f : sourceFiles) 
     { 
      pool.execute(new Runnable() { 

       @Override 
       public void run() { 

        final Path inFile = Paths.get(f.FilePath); 

        JavaRDD<String> d1 = sparkContext 
          .textFile(f.FilePath) 
          .filter(new Function<String, Boolean>() { 
           @Override 
           public Boolean call(String s) throws Exception { 
            return validator.parseRow(s); 
           } 
          }); 

        JavaPairRDD<String, Integer> d2 = d1.mapToPair(new PairFunction<String, String, Integer>() { 
         @Override 
         public Tuple2<String, Integer> call(String s) throws Exception { 
          String userAgent = validator.getUserAgent(s); 
          return new Tuple2<>(DeviceType.deviceType(userAgent), 1); 
         } 
        }); 

        JavaPairRDD<String, Integer> d3 = d2.reduceByKey(new Function2<Integer, Integer, Integer>() { 
         @Override 
         public Integer call(Integer val1, Integer val2) throws Exception { 
          return val1 + val2; 
         } 
        }); 

        d3.coalesce(1, true) 
          .saveAsTextFile(outFolder + "/" + inFile.getFileName().toString());//, org.apache.hadoop.io.compress.GzipCodec.class); 
       } 
      }); 
     } 
     pool.shutdown(); 
     pool.awaitTermination(60, TimeUnit.MINUTES); 
    } catch (Exception e) { 
     throw e; 
    } finally { 
     if (sparkContext != null) { 
      sparkContext.stop(); 
     } 
    } 

しかし、このコードは例外で失敗しました:

Exception in thread "pool-13-thread-2" Exception in thread "pool-13-thread-3" Exception in thread "pool-13-thread-1" Exception in thread "pool-13-thread-4" java.lang.Error: org.apache.spark.SparkException: Task not serializable 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 
    Caused by: org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:335) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:334) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) 
at org.apache.spark.rdd.RDD.filter(RDD.scala:334) 
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78) 
at append.dev.App$1.run(App.java:87) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
... 2 more 

私はミスを持っている場所を知りたいのですが?

助けてくれてありがとう!

答えて

0

あなたは(ファイル名、コンテンツ)ペアのRDDを取得し、私は良い結果と同様のマルチスレッドのアプローチを使用している

+0

とは何ですか?どのようにディスクから各ファイルを読み込みますか?コンテキストのシリアライズを試みるべきではないので、 'fileRdd.flatMap(file => sc.textFile(file))'を実行することはできません。 –

+0

true、thx :)それを更新しました –

+0

'wholeTextFiles'も危険です - ファイルのいずれかが2GBより大きい場合は、例外が発生します(各ファイルを1つのレコードとして読み込み、2GBの制限があるためSparkのパーティションのサイズ)。例外が発生しなくても、パフォーマンスが低下します。 –

0

こと以上にマップするsc.wholeTextFiles(dirnameの)を使用することができます。私はあなたが定義する内部クラスに問題があることを信じています。

別のクラスで実行可能ファイル/呼び出し可能ファイルを作成し、提出されたjarファイルとともにSparkにアクセスできることを確認してください。また、関数に状態を暗黙的に渡すので、シリアライズ可能に実装します(f.FilePath)。

+0

2 Ioannis Deligiannis **こんにちは!私は匿名のRunnableクラスをmainメソッドから外部のjarクラスに分離する必要があると思いますか?あなたは私がこの作品の短い例を教えてくれますか? – Yustas

+0

はい。これは、あなたが基本的にあなたに "状態"を渡すことを明確にします。インナークラスとラムダは、スパークで使用するのが少し難しいです。これを最初に試してみて、シリアライゼーションの例外がなくなるはずです。あなたのコードをSparkに送った方法によっては、余分な瓶が必要ないかもしれません。 –

+0

申し訳ありませんが、実際には匿名関数<>を最初に移動する必要があります。バリデーターは直列化可能ですか? –

関連する問題