2017-11-02 6 views
0

Scalaに既存のコードがあり、Javaに同じコードを書き込もうとしています。しかし、いくつかの問題に直面。Javaを使用してscala.collection.immutable.Streamクラスを使用する方法

Scalaのコード

import java.io.{BufferedReader, InputStreamReader} 
import java.util.zip.ZipInputStream 
import org.apache.spark.SparkContext 
import org.apache.spark.input.PortableDataStream 
import org.apache.spark.rdd.RDD 

def readFile(path: String,minPartitions: Int): RDD[String] = { 

     if (path.endsWith(".zip")) { 
     sc.binaryFiles(path, minPartitions) 
      .flatMap { 
       case (name: String, content: PortableDataStream) => 
      val zis = new ZipInputStream(content.open) 
      val entry = zis.getNextEntry 
      val br = new BufferedReader(new InputStreamReader(zis)) 
      Stream.continually(br.readLine()).takeWhile(_ != null) 
      } 
     } 
    } 

私は、Javaコードの下に書かれている - 私は、エラーの下に取得しています

import org.apache.spark.input.PortableDataStream; 
import org.apache.spark.sql.SparkSession; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.rdd.RDD; 

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.zip.ZipEntry; 
import java.util.zip.ZipInputStream; 

     public RDD<String> readFile(String inputDir, int minPartitions) throws Exception { 
    SparkSession sparkSession = null; 
    sparkSession = SparkSession.builder().appName("zipPoc").config("spark.master", "yarn").getOrCreate(); 

    JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext()); 
    if (inputDir.endsWith(".zip")) { 
     sc.binaryFiles(inputDir, minPartitions).flatMap (
      (String name , PortableDataStream content) -> { 
       ZipInputStream stream = new ZipInputStream(content.open()); 
       ZipEntry entry = stream.getNextEntry(); 
       BufferedReader br = new BufferedReader(new InputStreamReader(stream)); 
       scala.collection.immutable.Stream.continually(br.readLine()).takeWhile(_ != null); 
      } 
     ); 
    } 

} 

enter image description here

誰もがこのことについての手掛かりを持っているし、適切なコードを助けます。

+2

Javaは、 '_!= null !'の略語として' _ - > _!= null'をサポートしていないことは確かです。どうしてあなたがJavaに戻ってほしくないのか分かりません。 –

+0

AWSラムダでコードを実行する必要があります。しかし、AWSラムダはScalaをサポートしていません。 – Avijit

+0

それは色あせです。それはサポートされるべきです。 –

答えて

4

continuallyパラメータと戻り値なしのlambdaが必要です。 のJava等価では次のようになります。また、Javaには_ありません

() -> br.readLine()

は、明示的なパラメータを使用する必要があります。

(line) -> line != null

だから、これは動作するはずです:

Stream.continually(() -> {  
    try { 
     return br.readLine(); 
    } catch (IOException e) { 
     throw new RuntimeException(e); 
    } 
}).takeWhile((line) -> line != null) 

====

あなたはreadLineがチェック例外をスローに気づいたよう。最も速い修正は、try/catchの呼び出しをラップすることです。

+0

br.readLine()で「未処理例外:java.io.IOException」例外が発生する – Avijit

関連する問題