2017-02-27 30 views
0

私はsparkから始まっていますが、私はまだいくつかの概念を理解していませんでした。spark - 値とキーを比較

私はこのような名前のペアを持つファイルがあります:

foo bar 
bar foo 

をしかし、fooとbarの間で同じ関係です。第一の出力を作成するために

step1 = joined.reduceByKey(lambda x,y: x+';'+y).map(lambda x: (x[0], x[1].split(';'))).sortByKey(True).mapValues(lambda x: sorted(x)).collect() 

、と私はのために既存の値を削除するには、別のreduceByKeyが必要だと思う:私は、私はこのコードを作成するだけで1つのリレーション

foo bar 

とRDDを作成しようとしています以前の反復が、私はそれを行う方法がわかりません。

私は正しく考えていますか?

+1

あなたは、その値がすでにキーとして存在しているので、あなたのファイルの2番目のレコードが除去されなければならない意味ですか最初のレコード? –

+0

@ rogue-one、はい。ありがとう@サントン! –

答えて

1

どのような単純なものについて:データフレームを使用して

>>> sc.parallelize(("foo bar", "bar foo")).map(lambda x: " ".join(sorted(x.split(" ")))).distinct().collect() 
['bar foo'] 
+0

!ちょうどあなたが提案する行を解析する関数を作成します。 #parse行のファイル def parseLine(行): #問題を回避するためにリンクを無効にします n1、n2 = '' .join(sorted(line.split( '\ t')))。split( '' ) return(n1、n2) –

1
from pyspark.sql import functions as f  

rdd = spark.sparkContext.parallelize([('foo', 'bar'), ('bar', 'foo'), ]) 
df = spark.createDataFrame(rdd, schema=['c1', 'c2']) 
df = df.withColumn('c3', f.sort_array(f.array(df['c1'], df['c2']))) 
df.show() 

# output: 
+---+---+----------+ 
| c1| c2|  c3| 
+---+---+----------+ 
|foo|bar|[bar, foo]| 
|bar|foo|[bar, foo]| 
+---+---+----------+ 

がはるかに簡単です

+0

@ zhangtongに感謝します。あなたの提案が好きです。同じことをする他の方法を見ることは良いことです。再度、感謝します! –

関連する問題