Scala
に既存のコードがあり、Java
に同じコードを書き込もうとしています。しかし、いくつかの問題に直面。Javaを使用してscala.collection.immutable.Streamクラスを使用する方法
Scalaのコード:
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
def readFile(path: String,minPartitions: Int): RDD[String] = {
if (path.endsWith(".zip")) {
sc.binaryFiles(path, minPartitions)
.flatMap {
case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
val entry = zis.getNextEntry
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
}
}
私は、Javaコードの下に書かれている - 私は、エラーの下に取得しています
import org.apache.spark.input.PortableDataStream;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public RDD<String> readFile(String inputDir, int minPartitions) throws Exception {
SparkSession sparkSession = null;
sparkSession = SparkSession.builder().appName("zipPoc").config("spark.master", "yarn").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
if (inputDir.endsWith(".zip")) {
sc.binaryFiles(inputDir, minPartitions).flatMap (
(String name , PortableDataStream content) -> {
ZipInputStream stream = new ZipInputStream(content.open());
ZipEntry entry = stream.getNextEntry();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
scala.collection.immutable.Stream.continually(br.readLine()).takeWhile(_ != null);
}
);
}
}
。
誰もがこのことについての手掛かりを持っているし、適切なコードを助けます。
Javaは、 '_!= null !'の略語として' _ - > _!= null'をサポートしていないことは確かです。どうしてあなたがJavaに戻ってほしくないのか分かりません。 –
AWSラムダでコードを実行する必要があります。しかし、AWSラムダはScalaをサポートしていません。 – Avijit
それは色あせです。それはサポートされるべきです。 –