2017-03-02 9 views
3

私はSparkの新機能ですが、現在はPythonを使用して、一連のデータに対してKMeansを実行する簡単なコードを作成しようとしています。タイプ<class 'pyspark.sql.types.Row'>をベクターに変換する方法

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
import re 
from pyspark.mllib.clustering import KMeans, KMeansModel 
from pyspark.mllib.linalg import DenseVector 
from pyspark.mllib.linalg import SparseVector 
from numpy import array 
from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.feature import MinMaxScaler 

import pandas as pd 
import numpy 
df = pd.read_csv("/<path>/Wholesale_customers_data.csv") 
sql_sc = SQLContext(sc) 
cols = ["Channel", "Region", "Fresh", "Milk", "Grocery", "Frozen", "Detergents_Paper", "Delicassen"] 
s_df = sql_sc.createDataFrame(df) 
vectorAss = VectorAssembler(inputCols=cols, outputCol="feature") 
vdf = vectorAss.transform(s_df) 
km = KMeans.train(vdf, k=2, maxIterations=10, runs=10, initializationMode="k-means||") 
model = kmeans.fit(vdf) 
cluster = model.clusterCenters() 
print(cluster) 

私はpysparkシェルにこれらを入力し、それがモデル= kmeans.fit(VDF)を実行したときに、私は次のエラーを得た:https://archive.ics.uci.edu/ml/machine-learning-databases/00292/Wholesale%20customers%20data.csv

:私が得た

TypeError: Cannot convert type into Vector

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:275) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/02/26 23:31:58 ERROR Executor: Exception in task 6.0 in stage 23.0 (TID 113) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/init.py", line 77, in _convert_to_vector raise TypeError("Cannot convert type %s into Vector" % type(l)) TypeError: Cannot convert type into Vector The

データからです

誰かがここで何が間違っているのか、私が逃したことを教えてください。私はどんな助けにも感謝します。

ありがとうございました!

UPDATE: @Garren 私が得たエラーは次のとおりです。

The errors I got is: >>> kmm = kmeans.fit(s_df)17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 5 17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 4

Traceback (most recent call last): File "", line 1, in File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/pipeline.py", line 69, in fit return self._fit(dataset) File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py", line 133, in _fit java_model = self._fit_java(dataset) File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py", line 130, in _fit_java return self._java_obj.fit(dataset._jdf) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot resolve 'features' given input columns: [Channel, Grocery, Fresh, Frozen, Detergents_Paper, Region, Delicassen, Milk];"

+0

どのラインでエラーが出ますか? –

+0

こんにちはVivekさん、行は:model = kmeans.fit(vdf) – hpnhxxwn

答えて

1

は、もっぱら上でスパーク2.xのMLパッケージを使用して、[すぐに廃止されるためにスパークmllibパッケージ:

from pyspark.ml.clustering import KMeans 
from pyspark.ml.feature import VectorAssembler 
df = spark.read.option("inferSchema", "true").option("header", "true").csv("whole_customers_data.csv") 
cols = df.columns 
vectorAss = VectorAssembler(inputCols=cols, outputCol="features") 
vdf = vectorAss.transform(df) 
kmeans = KMeans(k=2, maxIter=10, seed=1) 
kmm = kmeans.fit(vdf) 
kmm.clusterCenters() 
+0

こんにちはGarren、コードを教えてください。私はコードを実行し、エラーが発生しました..あなたの助けをありがとう! – hpnhxxwn

+0

@hpnhxxwn私は更新された答えでコードを共有します。また、他の人も彼らから学ぶことができるようにあなたのエラーを投稿してください。 – Garren

+0

ありがとう!これは動作します! – hpnhxxwn

関連する問題