Graph.pregel
アルゴリズムをGraphFrame.aggregateMessages
に移植しています。私は少し煩わしいGraphFrame
APIを見つけています。Sparkで複雑なカラム構造を作成するためのショートカット
Graph
APIでは、メッセージタイプとしてcase class
を送信できます。しかし、GraphFrame
APIでは、aggregateMessages.sendToSrc
と.sendToDst
は、SQL式String
またはColumn
のいずれかで動作します。私はこれが尻の痛みであるほど強力であることを見出しています。
Column
を構築する必要が
GraphFrames
で
Iterator((1L, Send(Vote(yay = true), from = 2L)))
:
case class Vote(yay: Boolean, voters: Long = 1L)
case class Send(vote: Vote, from: Long)
が、私はのようなものかもしれないIterator[(VertexId,Send)]
返しsendMsg
、構築することができGraphX
とpregel
機能を使います上記のサンプルよりも複雑な私の定義済みのcase classes
を完全に破棄することなく、理想的にはIterator[(VertexId,Send)]
と同じ目的を果たします。
これにはどのようなショートカットがありますか?
対応する構造体へcase class
のインスタンスを変換するにはかなり簡単だった:私は今のところ得たもの
。これは主にそこに私を取得します。
def ccToStruct(cc: Product) : Column = {
val values = cc.productIterator
var seq = Seq[Column]()
while (values.hasNext) {
val field = values.next() match {
case p: Some[Product @unchecked] if (p.get.productArity > 0) => ccToStruct(p.get)
case p: Product if (p.productArity > 0) => ccToStruct(p)
case x => lit(x)
}
seq = seq :+ field
}
struct(seq:_*)
}
これは私が行うことができます:
ccToStruct(Send(Vote(true, 1L), 123L))
// res4: org.apache.spark.sql.Column = struct(struct(true,1),123)
私はそれを正しく動作させるために少しスキーマをアップパッチを適用する必要があるだろうが、私はそれを行うために開始する前にこれはまったく役に立たないアプローチだと気付きました。 case class
の値をstruct
に変換することは決してありません。ccToStruct(Send(Vote(true, 1L), 123L))
は、かなり役に立たないメッセージを作成します。 lit()
がケースクラスをサポートしていないことを除いて、lit(Send(..))
値を送信するのと同じです。あなたの代わりに何をしたいのか
はミックスと一致3210値をAM.dst("*")
とAM.src("*")
列ではなく、case class
のスキーマに対応するそうすることです。 (私はケースクラスを完全に放棄することを考えましたが、UDAF
〜sum
メッセージがあり、そのロジックはケースクラスを使用している限り、移植が非常に簡単でした)。
私は答えが可能であると信じています。このような構造を作成:
import org.graphframes.lib.AggregateMessages
val AM = AggregateMessages
val msg = Seq[Any](Seq[Any](true, 1L), AM.src("id"))
をそしてstruct()
と私の場合クラスのスキーマを使用してColumn
にそれを変換します。
誰もこれを行う良い方法がない場合(そしておそらく誰かが行っても)、私は自分の質問に答えて後で解決します。
ハズレなしバグ - あなただけを指定する必要がありますする必要があり 'のSeq [任意]'配列(1.0、4L、123) 'ので、 'は' Seq [Any](1.0、4L、123) 'と同じではなく、2番目のものだけがあなたの値を互換性のある型にスカッシュしません。 –