2017-10-16 5 views
1

キー(インデックス:Int)でグループ化されたRDD [(Int、Iterable [Coordinates])]があります。座標はメンバを持つクラスである:グループ化されたSpark RDDのコンテンツを個々の行としてフラット化してファイルに保存する方法

latitude: Double, longitude: Double

Iプリントを作成するか、または次のフォーム(各データポイントの行)であろうCSVファイルを作成したい:

index,latitude,longitude 

非グループ化されたRDD [(INT、座標)]それは、このように働いた:私はこのケースでそれを行うために管理するにはどうすればよい

val textOutputRDD = initialRDD.map(
    f => f._1.toString() + "," + f._2.latitude.toString() + "," + f._2.longitude.toString()) 
textOutputRDD.saveAsTextFile("TextOutput") 

+0

を試してみてください? – stefanobaghino

+0

@stefanobaghino 2.1.0 – ilvo

答えて

1

単純なネストループが行います。ここでダブルスの単純な対でI近似座標:

val rdd = 
    sc.parallelize(
    Seq(
     1 -> Seq((4.1, 3.4), (5.6, 6.7), (3.4, 9.0)), 
     2 -> Seq((0.4, -4.1), (-3.4, 6.7), (7.0, 8.9)) 
    ) 
) 

val csvLike = 
    for ((key, coords) <- rdd; (lat, lon) <- coords) yield s"$key,$lat,$lon" 

for (row <- csvLike) println(row) 

このコードは次のような出力になります:

2,0.4,-4.1 
2,-3.4,6.7 
2,7.0,8.9 
1,4.1,3.4 
1,5.6,6.7 
1,3.4,9.0 

編集

別の可能なアプローチは/実際のflatMapに交換することですmapシーケンスの場合、コンパイラはforを次のように解釈します。

rdd.flatMap { 
    case (key, coords) => 
    coords.map { 
     case (lat, lon) => s"$key,$lat,$lon" 
    } 
} 
+0

これは、自分のrddを使って試してみると、同じエラーが発生します:value withFilterはorg.apache.spark.rdd.RDDのメンバーではありません[(Int、Seq [(ダブル、ダブル)])]。修正を見つけようとしたが運がない。 – ilvo

+1

私はこれを 'spark-shell'でうまく走りました。私の推測では、'暗黙的に 'いくつかの行方が欠けているということです。あなたはループの代わりにこの行を入れ替えてみて、何が起こるか見てみることができますか?それは意味的には同等ですが、コンパイラは実際にループを次のように変換します: 'rdd.flatMap {case(key、c​​oords)=> coords.map {case(lat、lon)=> s" $ key、$ lat、$ lon "}}' – stefanobaghino

+0

'for'変形を使用してもエラーは表示されませんが、エラーは表示されません。あなたは '-Xfatal-warnings'コンパイラフラグを有効にして偶然コンパイルしていますか?いずれにしても、 'flatMap' /' map'変種は問題を解決するはずです。 – stefanobaghino

1

あなたがApacheのスパークのどのバージョンを使用しているflatmap-

val output = rdd.flatMap(s=>{ 
     val list=List[String]() 
     for (latlon <- s._2) { 
     list.addString(s._1.toString() + "," + latlon.latitude.toString() + "," + latlon.longitude.toString()) 
     } 
     return list 
    }) 
output.save(....) 
+1

'println(s._1.toString()+"、 "+ latlon.latitude +"、 "+ latlon.longitude)'は意図したとおりに動作しますが、これを使って作業するリストを取得できませんでした。さらに、addStringはStringBuilderを最初のパラメータとして必要とし、出力はおそらく 'output.saveAsTextFile(....) 'で保存する必要があります。あなたの入力をありがとう、フラットマップは動作します! – ilvo

関連する問題