2017-05-02 7 views
1

私は次の問題に直面しています: 私はデータフレーム(acc_name)の列の要素と比較する必要があるリストを持っています。私は、次のループ機能を使用していますが、それはbs_list要素がacc_name列のサブセットであるPysparkpysparkのループエラーのため

bs_list = 
['AC_E11','AC_E12','AC_E13','AC_E135','AC_E14','AC_E15','AC_E155','AC_E157', 
'AC_E16','AC_E163','AC_E165','AC_E17','AC_E175','AC_E180','AC_E185', 'AC_E215','AC_E22','AC_E225','AC_E23','AC_E23112','AC_E235','AC_E245','AC_E258','AC_E25','AC_E26','AC_E265','AC_E27','AC_E275','AC_E31','AC_E39','AC_E29'] 


    for i in bs_list: 
      bs_acc1 = (acc\ 
         .filter(i == acc.acc_name) 
         .select(acc.acc_name,acc.acc_description) 
        ) 

を使用して私に30 を提供しなければならないときには、私だけ1つのレコードを返します。私は次の2つの列acc_name、acc_descriptionを持つ新しいDFを作成しようとしています。リストにある要素の値の詳細のみが含まれますbs_list 私はどこが間違っているのか教えてください。

+0

リスト内の要素に対してRDDを反復しようとしています。代わりに、RDDの各要素でリストを反復しようとします。 – Dandekar

答えて

0

これはループ内で毎回iをフィルタリングするため、新しいデータフレームbs_acc1を作成するためです。だから、それはあなたのbs_list、すなわち行の最後の値に属する1行のみを示すされている必要があり、それを行うには'AC_E29'

ための一つの方法は、それ自体で繰り返し組合なので、以前の結果も同様にデータフレーム内に残る -

# create a empty dataframe, give schema which is appropriate to your data below 

bs_acc1 = sqlContext.createDataFrame(sc.emptyRDD(), schema) 

for i in bs_list: 
      bs_acc1 = bs_acc1.union(
         acc\ 
          .filter(i == acc_fil.acc_name) 
          .select(acc.acc_name,acc.acc_description) 
         ) 

より良い方法は、すべてのループをやっていません -

from pyspark.sql.functions import * 
bs_acc1 = acc.where(acc.acc_name.isin(bs_list)) 
+0

私は次のエラーが発生している2番目のソリューションを試しました:条件は文字列または列でなければなりません – Atrayee

+0

申し訳ありませんが、私の悪い、良い方法があります。上記の私の更新された回答を参照してください – Pushkr

+0

私は両方のソリューションを試しました。両方とも働いた。これは、どこが間違っているのかを明確にしています。ありがとうございました – Atrayee

0

をあなたはまた、カラムacc_nameでDATAFRAMEし、その後、ちょうどaccデータフレームに参加するんbs_listを変換することができます。

bs_rdd = spark.sparkContext.parallelize(bs_list) 
bs_df = bs_rdd.map(lambda x: Row(**{'acc_name':x})).toDF() 
bs_join_df = bs_df.join(acc, on='acc_name') 
bs_join_df.show()