Apache Sparkの実装の1つのワークフローで助けが必要です。私の仕事は次です:個別にSPARKでいくつかのファイルを処理する
ソースデータとしていくつかのCSVファイルがあります。注意:結果は、いくつかの追加の列を持つソースファイルである:これらのファイルが別のレイアウトに
を持っている可能性があり、私は、各ファイル(これは問題ではありません)
主な目標を解析する必要があるかの情報とメタデータを持っています。 1つの出力範囲に入らずに各ソースファイルを更新する必要があります。たとえば、ソース10個のファイル→10個の結果ファイル、および各結果ファイルには対応するソースファイルのデータのみがあります。
私はスパークがマスクで多くのファイルを開くことができます知っているように:
var source = sc.textFile("/source/data*.gz");
しかし、この場合には、私は、ファイルのどの行を認識することはできません。ソースファイルのリストを取得し、以下のシナリオで処理しようとすると:
JavaSparkContext sc = new JavaSparkContext(...);
List<String> files = new ArrayList() //list of source files full name's
for(String f : files)
{
JavaRDD<String> data = sc.textFile(f);
//process this file with Spark
outRdd.coalesce(1, true).saveAsTextFile(f + "_out");
}
しかし、この場合、私はすべてのファイルを順次モードで処理します。
私の質問は次にあります:どのように多くのファイルを並列モードで処理できるのですか?たとえば、1つのファイル - 1つのエグゼキュータ?
私は、ソース・データとの単純なコードでこれを実装しようとした:
//JSON file with paths to 4 source files, saved in inData variable
{
"files": [
{
"name": "/mnt/files/DigilantDaily_1.gz",
"layout": "layout_1"
},
{
"name": "/mnt/files/DigilantDaily_2.gz",
"layout": "layout_2"
},
{
"name": "/mnt/files/DigilantDaily_3.gz",
"layout": "layout_3"
},
{
"name": "/mnt/files/DigilantDaily_4.gz",
"layout": "layout_4"
}
]
}
sourceFiles= new ArrayList<>();
JSONObject jsFiles = (JSONObject) new JSONParser().parse(new FileReader(new File(inData)));
Iterator<JSONObject> iterator = ((JSONArray)jsFiles.get("files")).iterator();
while (iterator.hasNext()){
SourceFile sf = new SourceFile();
JSONObject js = iterator.next();
sf.FilePath = (String) js.get("name");
sf.MetaPath = (String) js.get("layout");
sourceFiles.add(sf);
}
SparkConf sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("spark-app");
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
try {
final Validator validator = new Validator();
ExecutorService pool = Executors.newFixedThreadPool(4);
for(final SourceFile f : sourceFiles)
{
pool.execute(new Runnable() {
@Override
public void run() {
final Path inFile = Paths.get(f.FilePath);
JavaRDD<String> d1 = sparkContext
.textFile(f.FilePath)
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return validator.parseRow(s);
}
});
JavaPairRDD<String, Integer> d2 = d1.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String userAgent = validator.getUserAgent(s);
return new Tuple2<>(DeviceType.deviceType(userAgent), 1);
}
});
JavaPairRDD<String, Integer> d3 = d2.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer val1, Integer val2) throws Exception {
return val1 + val2;
}
});
d3.coalesce(1, true)
.saveAsTextFile(outFolder + "/" + inFile.getFileName().toString());//, org.apache.hadoop.io.compress.GzipCodec.class);
}
});
}
pool.shutdown();
pool.awaitTermination(60, TimeUnit.MINUTES);
} catch (Exception e) {
throw e;
} finally {
if (sparkContext != null) {
sparkContext.stop();
}
}
しかし、このコードは例外で失敗しました:
Exception in thread "pool-13-thread-2" Exception in thread "pool-13-thread-3" Exception in thread "pool-13-thread-1" Exception in thread "pool-13-thread-4" java.lang.Error: org.apache.spark.SparkException: Task not serializable
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:335)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:334)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.filter(RDD.scala:334)
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
at append.dev.App$1.run(App.java:87)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
... 2 more
私はミスを持っている場所を知りたいのですが?
助けてくれてありがとう!
とは何ですか?どのようにディスクから各ファイルを読み込みますか?コンテキストのシリアライズを試みるべきではないので、 'fileRdd.flatMap(file => sc.textFile(file))'を実行することはできません。 –
true、thx :)それを更新しました –
'wholeTextFiles'も危険です - ファイルのいずれかが2GBより大きい場合は、例外が発生します(各ファイルを1つのレコードとして読み込み、2GBの制限があるためSparkのパーティションのサイズ)。例外が発生しなくても、パフォーマンスが低下します。 –