2017-12-11 25 views
0

ファイルからデータを読み込み(括弧で区切った項目)、このデータをPySparkを使用してFPGrowthアルゴリズムに渡そうとしています。ファイルからデータを読み込んでSpark/PySparkのFPGrowthアルゴリズムに渡す方法

私のコード今のところ以下の通りです:

import pyspark 
from pyspark import SparkContext 

sc = SparkContext("local", "Assoc Rules", pyFiles=[]) 

txt = sc.textFile("step3.basket") 
data = txt.map(lambda line: line.split(",")).collect() 
rdd = sc.parallelize(data, 2) 

from pyspark.ml.fpm import FPGrowth 

fpg = FPGrowth(minSupport=0.02, minConfidence=0.6) 
model = fpg.fit(rdd) 

しかし、私は次のエラーを取得するコードを実行しよう:

--------------------------------------------------------------------------- 
AttributeError       Traceback (most recent call last) 
<ipython-input-3-d34039dccad5> in <module>() 
     2 
     3 fpg = FPGrowth(minSupport=0.02, minConfidence=0.6) 
----> 4 model = fpg.fit(rdd) 

~/local/spark/python/pyspark/ml/base.py in fit(self, dataset, params) 
    62     return self.copy(params)._fit(dataset) 
    63    else: 
---> 64     return self._fit(dataset) 
    65   else: 
    66    raise ValueError("Params must be either a param map or a list/tuple of param maps, " 

~/local/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset) 
    263 
    264  def _fit(self, dataset): 
--> 265   java_model = self._fit_java(dataset) 
    266   return self._create_model(java_model) 
    267 

~/local/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset) 
    260   """ 
    261   self._transfer_params_to_java() 
--> 262   return self._java_obj.fit(dataset._jdf) 
    263 
    264  def _fit(self, dataset): 

はAttributeError:「RDD」オブジェクトが「何の属性を持っていませんが_jdf '

私は間違っていますが、どうすれば修正できますか?

答えて

1

pyspark.ml.fpmからのFPGrowthは、rddではなくpysparkデータフレームを取ります。 rddをdataframeに変換して渡す。 あなたが進むことができ、2つの方法がありますmllib

from pyspark.mllib.fpm import FPGrowth 

EDITからhttp://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth.fit

またはインポートfpgrowthを確認してください。ドキュメントから直接取る

1.Using RDD方式

from pyspark.mllib.fpm import FPGrowth 
txt = sc.textFile("step3.basket").map(lambda line: line.split(","))  
          #your txt is already a rdd 
          #No need to collect it and parallelize again 

model = FPGrowth.train(txt, minSupport=0.2, numPartitions=10) #change parameters according to need 
                   #model is ready 

2.Usingのデータフレーム私は新しいです

from pyspark.ml.fpm import FPGrowth 
df = sc.textFile("step3.basket").map(lambda line: (line.split(","),)) 
     .toDF('items') 

fp = FPGrowth(minSupport=0.2, minConfidence=0.7) 
model = fp.fit(df) #model is ready! 
+0

(私はお勧めし、より良い方法です) PySpark、ファイルからデータフレームにデータを読み込む方法を説明できますか? – Jeff

+0

編集を確認してください。 –