私はGraphFramesを使い始めていますが、ドキュメントに従っていますが、aggregateMessages関数から結果を得ることができません(空のデータフレームを返します)。ここに私の問題の簡単な例である:私は私のvertexRDDが無い頂点属性を持つ唯一の頂点Y
で構成されており、私のedgeRDDはこのような二つのレコードで構成されていることtestGraph
は、呼び出されたオブジェクトGraphFrames:GraphFramesのaggregateMessagesからの出力がありません
| src | dst | min_ts1 | min_ts2 |
| X | Y | 20 | null |
| Y | X | null | -10 |
、私はmin_ts1
の値をdst
に送信し、min_ts2
をsrc
に送信する単純なアルゴリズムを実装したいとします。私はこのアルゴリズムを実装するために使用していたコードは次のとおりです。最初のレコードを見て、送信:私はそこにいくつかのnull値がここにありますが、関係なく、私は次の操作を実行するためのアルゴリズムを渡すメッセージを期待実現
import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions._
val AM = AggregateMessages
val msgToSrc = AM.edge("min_ts2)
val msgToDst = AM.edge("min_ts1")
val delay = testGraph
.aggregateMessages
.sendToSrc(msgToSrc)
.sendToDst(msgToDst)
.agg(sum(AM.msg).as("avg_time_delay"))
20
〜Y
のメッセージと、null
〜X
のメッセージを含む。次に、2番目のレコードを見て、null
というメッセージをXに、-10
〜Y
というメッセージを送信します。最後に、Y
のメッセージの合計が10
であり、結果にX
というレコードが存在しないことが結果から予想されます。なぜなら、それは頂点RDDに含まれていなかったからです。 X
がvertexRDDに含まれていた場合、両方のメッセージがnull
であったため、結果は単純にnull
になると思います。
しかし、私は空のRDDを取得しています。誰かが私が空の結果を得ている理由を理解するのを助けてくれますか?