2016-08-04 14 views
0

固定長のファイル(サンプルは以下に示します)を使用し、このファイルをSparkのDataFrames API(SCALAまたはPythonではなく)を使用して読み込みます。 。 DataFrames APIを使用すると、textFile、jsonファイルなどを読み取る方法はありますが、固定長ファイルを読み取る方法があるかどうかはわかりません。私はこれをインターネットで検索していました。linkが見つかりましたが、私はこの目的のためにspark-fixedwidth-assembly-1.0.jarをダウンロードしなければなりませんでしたが、どこでもその瓶を見つけることができませんでした。私はここで完全に失われており、あなたの提案や助けが必要です。 Stackoverflowにはいくつかの投稿がありますが、ScalaとDataFrame APIには関係ありません。DataFrame APIとSCALAを使用してSparkで固定長ファイルを読み取る方法

は、ここでファイル

56 apple  TRUE 0.56 
45 pear  FALSE1.34 
34 raspberry TRUE 2.43 
34 plum  TRUE 1.31 
53 cherry TRUE 1.4 
23 orange FALSE2.34 
56 persimmon FALSE23.2 

各列の固定幅である3、10、5、4

あなたの意見を提案してください。

答えて

2

まあ...行を分割するには部分文字列を使用してください。その後、縁取りを除去するために整えます。そして、あなたがしたいことを何でもしてください。

case class DataUnit(s1: Int, s2: String, s3:Boolean, s4:Double) 

sc.textFile('your_file_path') 
    .map(l => (l.substring(0, 3).trim(), l.substring(3, 13).trim(), l.substring(13,18).trim(), l.substring(18,22).trim())) 
    .map({ case (e1, e2, e3, e4) => DataUnit(e1.toInt, e2, e3.toBoolean, e4.toDouble) }) 
    .toDF 
+0

私はREPLでそれを試してみましたが、私はエラーを得ました。あなたはREPLで運動するものを教えていただけますか? –

+0

':32:エラー:パラメータの数が正しくありません。期待値= 1 val mapRDD = file.map(l =>(l.substring(0、4).trim()、l.substring(4,14).trim()、l.substring(14,19)。 map((e1、e2、e3、e4)=> DataUnit(e1.toInt、e2、e3.toBoolean、e4.toDouble))。 toDF ^ ' –

+0

今すぐ修正する必要があります。 REPLでステップごとに各マップを実行してみてください。 –

1

固定長のフォーマットは非常に古いので、このフォーマットのための良いScalaライブラリを見つけることができませんでした...私は自分自身を作成し​​ました。

あなたはここでそれをチェックアウトすることができます:スパークとhttps://github.com/atais/Fixed-Length

使い方は非常に簡単です、あなたは、オブジェクトのDataSetになるだろう!

あなたのオブジェクトの記述を作成することができ最初の必要性、FE

case class Employee(name: String, number: Option[Int], manager: Boolean) 

object Employee { 

    import com.github.atais.util.Read._ 
    import cats.implicits._ 
    import com.github.atais.util.Write._ 
    import Codec._ 

    implicit val employeeCodec: Codec[Employee] = { 
     fixed[String](0, 10) <<: 
     fixed[Option[Int]](10, 13, Alignment.Right) <<: 
     fixed[Boolean](13, 18) 
    }.as[Employee] 
} 

以降だけでパーサーを使用:

val input = sql.sparkContext.textFile(file) 
       .filter(_.trim.nonEmpty) 
       .map(Parser.decode[Employee]) 
       .flatMap { 
        case Right(x) => Some(x) 
        case Left(e) => 
         System.err.println(s"Failed to process file $file, error: $e") 
         None 
       } 
sql.createDataset(input) 
関連する問題