2016-06-24 4 views
0

ご存知のように、あなたがRDD[String, Int]saveAsTextFileを使用する場合、出力は次のようになります。
SaveAsTextFileを使用して保存したSparkで生成されたファイルの読み取り/解析のベストプラクティスは何ですか?

(T0000036162,1747) 
(T0000066859,1704) 
(T0000043861,1650) 
(T0000075501,1641) 
(T0000071951,1638) 
(T0000075623,1638) 
(T0000070102,1635) 
(T0000043868,1627) 
(T0000094043,1626) 

あなたは再びスパークでこのファイルを使用することもできますし、何がそれを読み取り、解析するためのベストプラクティスすべきですか?それはそれのようなものでなければならないのですか、それに対して何かエレガントな方法がありますか?

val lines = sc.textFile("result/hebe") 

case class Foo(id: String, count: Long) 

val parsed = lines 
     .map(l => l.stripPrefix("(").stripSuffix(")").split(",")) 
     .map(l => new Foo(id=l(0),count = l(1).toLong)) 

答えて

1

あなたが探しているものによって異なります。 あなたはかなり何かをしたい場合は、私はおそらくあなたが

lines.map(new Foo) 

のようなものを持っている可能性があり、あなたが持っている場合はFooが

case class Foo(id: String, count: Long) { 
    def apply(l: String): Foo = { 
     val split = l.stripPrefix("(").stripSuffix(")").split(",") 
     new Foo(l(0), l(1)) 
    } 
} 

のようになりますように、単一の文字列を取るfooへの代替コンストラクタを追加することを検討したいですそのようなデータを出力する必要はありません。シーケンスファイルとして保存することを検討します。

パフォーマンスが問題ではない場合は、問題ありません。私は、最も重要なことは、後であなたが単体テストをテストし、後でそれを簡単に編集できるように、テキストの解析を分離することだけだと言っています。

1

ケースクラスをスキーマとして使用するDataframeとして保存するか(簡単に解析してSparkに戻すことができます)、RDDの個々のコンポーネントをマップする必要があります(ブラケットそれだけで、ファイルサイズが大きくなりので)保存する前に:あなたはDFに読み込むとき

yourRDD.toDF("id","count").saveAsParquetFile(path) 

、あなたは

RDDInput = input.map(x=>(x.getAs[Long]("id"),x.getAs[Int]("count"))) 

をしたい場合はRDDにそれを取り戻すためにスキーマ定義を通してそれを渡すことができますテキストファイルとして保存したい場合は、マッピングを検討することができます

yourRDD.map(x => s"${x._1}, ${x._2}") 
1

RDDの代わりにデータフレームをファイルとして直接書き込むことをお勧めします。ファイル読み込みこと

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._ 
val df = rdd.toDF() 
df.write.parquet("dir”) 

コード - - ファイルを書き込むこと

コード。saveAsTextFile使用マップ(X => x.mkStringを( "")行う前に

val rdd = sqlContext.read.parquet(“dir”).rdd.map(row => (row.getString(0),row.getLong(1))) 
1

rdd.map(x => x.mkString( "、")。saveAsTextFile(path)。出力には括弧は含まれません。

出力は次のようになります。

T0000036162,1747

T0000066859,1704

関連する問題