2017-07-20 3 views
0

私は、結果にタイプがDataset[Point]またはArray[Point]であることを望む次の関数を持っています。しかし、それはDataset[Array[Point]]を返します。データセットと配列をマップして新しい行を生成する方法(データセット[配列] [ポイント]]ではなくデータセット[ポイント])?

また、結果をPoint.Point >= 8でフィルタリングしたいとします。フィルタ関数を呼び出す最良の場所はどこですか?

def compare2(dbo: Dataset[Cols], ods: Array[Cols]) = { 
    import dbo.sparkSession.implicits._ 
    dbo.mapPartitions(p => p.map(l => ods.map(r => 
     Point(l.Id, r.Id, getPoint(l, r)))) 
     //.filter(p => p.Point >= 8) // p is Array[Point] 
    ) 
    } 

case class Cols (Id: Int, F1: String, F2: String, F3: String) 
case class Point (Id1: Int, Id2: Int, Point: Int) 

答えて

0

あなたはflatMapを使用したいかもしれません:

dbo.mapPartitions(p => p.flatMap(l => ods.map(r => 
    Point(l.Id, r.Id, getPoint(l, r)))) 
    .filter(p => p.Point >= 8) // p is Array[Point] 

最後の部分は、アレイからのマップであるため、あなたは基本的にポイントの配列にそれぞれCOLSを変換する[Colsは] =>配列[ポイント] 。フラットマップすると、これらの配列を要素にフラット化します。一度これを行うと、フィルタは正常に動作するはずです。

+1

これは正しい考えですが、おそらく 'dbo.mapPartitions(p => p.flatMap(l => ods.map(r =>'? –

2

dbo.mapPartitionsには、func: (Iterator[T]) ⇒ Iterator[U]が必要です(明確にするため暗黙的にEncoderは削除されています)。

はmapPartitions [U](FUNC:(イテレータ[T])⇒イテレータ[U]):各パーティションにFUNCを適用した結果を含む新しいデータセットを返しデータセット[U]。 mapPartitions内部pタイプIterator[Cols]であることを与える

p.map(lは、タイプColslとタイプIterator[T]の結果を示します。

あなたが Iterator[Iterator[T]]を生産しているが、それは十分ではありません:(

ods: Array[Cols]あなたはods.map(rArray[Point]を与えます。

がすべてを取ると、あなたは何が起こっているか理解するための巨大な精神的なタスクを持っていない一部がそれと

ここで、次のコードに書き換えることができます(。ESPあなたのコードの将来の読者のために)、私はanotを行うことをお勧めしたい物事を簡単にするために

dbo.mapPartitions { p: Iterator[Cols] => 
    p.map { l: Cols => 
    ods.map { r: Cols => 
     Point(l.Id, r.Id, getPoint(l, r) } } } 

Scalaを使った書き直しFor Comprehension

Scalaは、シーケンスの解説を表現するための軽量表記法を提供しています。それと

は、私は次のことをお勧めしたい:

dbo.mapPartitions { p: Iterator[Cols] => 
    for { 
    l <- p 
    r <- ods 
    } yield Point(l.Id, r.Id, getPoint(l, r)) 
} 

フィルタリングは、その後非常に簡単で、理解のための一環として、単一ifが必要となります。

関連する問題