2017-08-25 13 views
0

を発生させながら文字列値を取得するにはどうすれば二つのファイルスパーク:出力ファイル

--------Student.csv--------- 
     StudentId,City 
     101,NDLS 
     102,Mumbai 

-------StudentDetails.csv--- 
     StudentId,StudentName,Course 
     101,ABC,C001 
     102,XYZ,C002 

要件第二のファイルにStudentNameとコースに置き換える必要があります最初に

  1. StudentIdを持っています。

  2. はかつて私は、コードが

    val studentRDD = sc.textFile(file path); 
    val studentdetailsRDD = sc.textFile(file path); 
    val studentB = sc.broadcast(studentdetailsRDD.collect) 
    
    //Generating CSV 
    studentRDD.map{student => 
        val name = getName(student.StudentId) 
        val course = getCourse(student.StudentId) 
        Array(name, course, student.City) 
    }.mapPartitions{data => 
        val stringWriter = new StringWriter(); 
        val csvWriter =new CSVWriter(stringWriter); 
        csvWriter.writeAll(data.toList) 
        Iterator(stringWriter.toString()) 
    }.saveAsTextFile(outputPath) 
    
    
    
    //Functions defined to get details 
    def getName(studentId : String) { 
        studentB.value.map{stud =>if(studentId == stud.StudentId) stud.StudentName} 
    } 
    
    def getCourse(studentId : String) { 
        studentB.value.map{stud =>if(studentId == stud.StudentId) stud.Course} 
    }   
    

    問題

    ファイルを取得する使用

    ABC,C001,NDLS 
    XYZ,C002,Mumbai 
    

のような完全な詳細で新しいCSVを生成する必要が置き換え値はString値ではなくオブジェクト表現です。

オブジェクトではなく文字列値を取得するにはどうすればよいですか?

答えて

1

SparkのDataFrame APIは、2つのDataFramesの結合とCSVファイルの書き込みを容易にサポートするため、これに特に適しています。

あなたはRDDのAPIと一緒に暮らす上で主張すれば、あなたのコードの主な問題は、ルックアップ機能するようにしかし、ルックス:getNamegetCourseは基本的に何もしない、その戻り値の型はUnitであるからです。 ifelseなしで使用すると、入力によっては戻り値がないため、関数全体がUnitを返すことになります。この問題を解決するには

、それはそれらを取り除くとMapをブロードキャストすることによって、ルックアップを簡素化を取得する方が簡単です:

// better to broadcast a Map instead of an Array, would make lookups more efficient 
val studentB = sc.broadcast(studentdetailsRDD.keyBy(_.StudentId).collectAsMap()) 

// convert to RDD[String] with the wanted formatting 
val resultStrings = studentRDD.map { student => 
    val details = studentB.value(student.StudentId) 
    Array(details.StudentName, details.Course, student.City) 
} 
    .map(_.mkString(",")) // naive CSV writing with no escaping etc., you can also use CSVWriter like you did 

// save as text file 
resultStrings.saveAsTextFile(outputPath) 
+0

をよくお読みいただきありがとうございます。あなたの答えは私の疑問を解決した – Rahul

1

スパークは、ファイルにjoinwriteを大きくサポートしています。 Joinは1行のコードしか取らず、writeも1つしかかかりません。

手書きこれらのコードは、エラーが実証済みで、読みにくく、おそらく超低速である可能性があります。

val df1 = Seq((101,"NDLS"), 
       (102,"Mumbai") 
     ).toDF("id", "city") 
val df2 = Seq((101,"ABC","C001"), 
       (102,"XYZ","C002") 
     ).toDF("id", "name", "course") 

val dfResult = df1.join(df2, "id").select("id", "city", "name") 

dfResult.repartition(1).write.csv("hello.csv") 

ディレクトリが作成されます。ディレクトリには最終的に1つのファイルしかありません。

+0

おかげRockie。私はこのアプローチを試みました、そして、それは本当にとても単純で、まっすぐでした。 – Rahul

+0

私はちょうど1つの質問があります。どのように私はそれを表示する前に、いくつかのロジックを実装する必要があるそれらの列を処理することはできますか?データフレームにはSQLと同様の機能がありますか? – Rahul

+0

Spark DataFrameには、カラムを処理するための多くの組み込み関数があり、UDF(ユーザ定義関数)で拡張する機能を提供しています。プログラミングガイドは、https://spark.apache.org/docs/2.2.0/sql-programming-guide.html –

関連する問題