2017-11-28 6 views
-1

私はGraphFramesを使い始めていますが、ドキュメントに従っていますが、aggregateMessages関数から結果を得ることができません(空のデータフレームを返します)。ここに私の問題の簡単な例である:私は私のvertexRDDが無い頂点属性を持つ唯一の頂点Yで構成されており、私のedgeRDDはこのような二つのレコードで構成されていることtestGraphは、呼び出されたオブジェクトGraphFrames:GraphFramesのaggregateMessagesからの出力がありません

| src | dst | min_ts1 | min_ts2 | 
| X | Y | 20 | null | 
| Y | X | null | -10 | 

、私はmin_ts1の値をdstに送信し、min_ts2srcに送信する単純なアルゴリズムを実装したいとします。私はこのアルゴリズムを実装するために使用していたコードは次のとおりです。最初のレコードを見て、送信:私はそこにいくつかのnull値がここにありますが、関係なく、私は次の操作を実行するためのアルゴリズムを渡すメッセージを期待実現

import org.graphframes.lib.AggregateMessages 
import org.apache.spark.sql.functions._ 
val AM = AggregateMessages 

val msgToSrc = AM.edge("min_ts2) 
val msgToDst = AM.edge("min_ts1") 

val delay = testGraph 
.aggregateMessages 
    .sendToSrc(msgToSrc) 
    .sendToDst(msgToDst) 
    .agg(sum(AM.msg).as("avg_time_delay")) 

20Yのメッセージと、nullXのメッセージを含む。次に、2番目のレコードを見て、nullというメッセージをXに、-10Yというメッセージを送信します。最後に、Yのメッセージの合計が10であり、結果にXというレコードが存在しないことが結果から予想されます。なぜなら、それは頂点RDDに含まれていなかったからです。 XがvertexRDDに含まれていた場合、両方のメッセージがnullであったため、結果は単純にnullになると思います。

しかし、私は空のRDDを取得しています。誰かが私が空の結果を得ている理由を理解するのを助けてくれますか?

答えて

0

これは私のVertexRDDにXがないことが原因です。私は、edgeRDD内のその頂点に出入りする辺があっても、私の集約メッセージが辺の属性のみに依存していても、アルゴリズムはこれらのメッセージを送信できません。

関連する問題