2016-04-15 5 views
1

私にできることは次のように2スパークDStreamsに結合します。本質的には、stream1.anti-join(stream2)のようなものです。これは何とか可能ですか?スパーク:私が参加していなかったすべてのレコードをフィルタリングする必要がある場合はどのような</p>今 <pre><code>val joinStream = stream1.join(stream2) </code></pre> <p>、抗参加2 DStreams

ありがとう、ありがとうございます!

+0

私は何を抗参加 – eliasah

+0

によって意味していますが、共通のキー権を持つレコードの間で起こるJOINを理解していませんか? JOINに共通のKeyを持たない両方のストリームのすべてのレコードが必要です。 – void

+1

http://2.bp.blogspot.com/-9xB6dMw3mcY/UIGn0glldYI/AAAAAAAAAEo/H8AkcRYvUHk/s1600/sql-left-outer-join-where-table-is-null-or-table-is-nullのようなものです。 PNG? – eliasah

答えて

2

は、あなたがこれらを持っていたと仮定すると:

val rdd1 = sc.parallelize(Array(
    (1, "one"), 
    (2, "twow"), 
    (3, "three"), 
    (4, "four"), 
    (5, "five") 
)) 
val rdd2 = sc.parallelize(Array(
    (1, "otherOne"), 
    (4, "otherFour"), 
    (5,"otherFive"), 
    (6,"six"), 
    (7,"seven") 
)) 

val antiJoined = rdd1.fullOuterJoin(rdd2).filter(r => r._2._1.isEmpty || r._2._2.isEmpty) 

antiJoined.collect foreach println 
(6,(None,Some(six))) 
(2,(Some(twow),None)) 
(3,(Some(three),None)) 
(7,(None,Some(seven))) 
関連する問題

 関連する問題