2017-07-05 9 views
0

のデータを前処理していますが、緯度と経度のデータセットでK平均クラスタリングを実行しようとしています。 .csvデータをスパークデータフレーム(〜1M行)にインポートし、データフレームをk-meansモデルの入力として読み込もうとしましたが、エラーが発生しています。spark/exampleディレクトリのkmeansの例を見て、pyspark

マイスパークデータフレームは、次のようになります。ここでは

ID    col1   col2  Latitude   Longitude 
ford   ...   ...   22.2    13.5 
landrover  ...   ...   21.4    13.8 
mercedes  ...   ...   21.8    14.1 
bmw    ...   ...   28.9    18.0 
...    ...   ...   ....    .... 

は私のコードです:

from pyspark.ml.clustering import KMeans 
from pyspark.ml.linalg import Vectors 

df = spark.read.csv('file.csv') 

spark_rdd = df.rdd.sortByKey() 
parsedData = spark_rdd.map(lambda x: Vectors.dense(x[3],x[4])).sortByKey() 

kmeans = KMeans().setK(2).setSeed(1) 
model = kmeans.fit(parsedData) 

sum_of_squared_errors = model.computeCost(parsedData) 
    print str(sum_of_squared_errors) 

centers = model.clusterCenters() 

for center in centers: 
    print(center) 

次のように私が手にエラーがある:


Py4JJavaError        Traceback (most recent call last) 
<ipython-input-32-76d5a466dc4c> in <module>() 
     3 
     4 spark_rdd = df.rdd.sortByKey() 
----> 5 parsedData = spark_rdd.map(lambda x: Vectors.dense(x[3],x[4])).sortByKey() 
     6 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in sortByKey(self, ascending, numPartitions, keyfunc) 
    660   # the key-space into bins such that the bins have roughly the same 
    661   # number of (key, value) pairs falling into them 
--> 662   rddSize = self.count() 
    663   if not rddSize: 
    664    return self # empty RDD 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in count(self) 
    1039   3 
    1040   """ 
-> 1041   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    1042 
    1043  def stats(self): 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in sum(self) 
    1030   6.0 
    1031   """ 
-> 1032   return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    1033 
    1034  def count(self): 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in fold(self, zeroValue, op) 
    904   # zeroValue provided to each partition is unique from the one provided 
    905   # to the final reduce call 
--> 906   vals = self.mapPartitions(func).collect() 
    907   return reduce(op, vals, zeroValue) 
    908 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in collect(self) 
    807   """ 
    808   with SCCallSiteSync(self.context) as css: 
--> 809    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    810   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    811 

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0 (TID 139, 10.3.1.31, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main 
    process() 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 138, in dump_stream 
    for obj in iterator: 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1752, in add_shuffle_key 
    for k, v in iterator: 
ValueError: too many values to unpack 
... 

どれでも役立つだろう大変感謝します。おかげ


EDIT:あなたはDuf59 @返信いただきありがとうございます。データフレームにはIDごとに複数のデータポイントがあることに注意してください(例: 'landrover'の50データポイント、 'bmw'の70データポイント、 'mercedes'の80データポイントなど)

私のメソッドを使用すると、次のエラー: --------------------------------------------- ------------------------------あなたがされていないRDDにsortByKey()を適用するため

Py4JJavaError        Traceback (most recent call last) 
    <ipython-input-53-37fce322868d> in <module>() 
      5 
      6 spark_rdd = df.rdd.map(lambda row: (row["ID"], Vectors.dense(row["Latitude"],row["Longitude"]))) 
    ----> 7 feature_df = spark_rdd.toDF(["ID", "features"]) 
      8 feature_df.show() 
      9 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.pyc in toDF(self, schema, sampleRatio) 
    55   [Row(name=u'Alice', age=1)] 
    56   """ 
---> 57   return sparkSession.createDataFrame(self, schema, sampleRatio) 
    58 
    59  RDD.toDF = toDF 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 
    518 
    519   if isinstance(data, RDD): 
--> 520    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) 
    521   else: 
    522    rdd, schema = self._createFromLocal(map(prepare, data), schema) 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.pyc in _createFromRDD(self, rdd, schema, samplingRatio) 
    358   """ 
    359   if schema is None or isinstance(schema, (list, tuple)): 
--> 360    struct = self._inferSchema(rdd, samplingRatio) 
    361    converter = _create_converter(struct) 
    362    rdd = rdd.map(converter) 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.pyc in _inferSchema(self, rdd, samplingRatio) 
    329   :return: :class:`pyspark.sql.types.StructType` 
    330   """ 
--> 331   first = rdd.first() 
    332   if not first: 
    333    raise ValueError("The first row in RDD is empty, " 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in first(self) 
    1359   ValueError: RDD is empty 
    1360   """ 
-> 1361   rs = self.take(1) 
    1362   if rs: 
    1363    return rs[0] 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in take(self, num) 
    1341 
    1342    p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
-> 1343    res = self.context.runJob(self, takeUpToNumLeft, p) 
    1344 
    1345    items += res 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 
    963   # SparkContext#runJob. 
    964   mappedRDD = rdd.mapPartitions(partitionFunc) 
--> 965   port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 
    966   return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 
    967 

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 134.0 failed 4 times, most recent failure: Lost task 0.3 in stage 134.0 (TID 557, 10.3.1.31, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main 
    process() 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft 
    yield next(iterator) 
    File "<ipython-input-53-37fce322868d>", line 6, in <lambda> 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense 
    return DenseVector(elements) 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__ 
    ar = np.array(ar, dtype=np.float64) 
ValueError: could not convert string to float: Latitude 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441) 
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main 
    process() 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft 
    yield next(iterator) 
    File "<ipython-input-53-37fce322868d>", line 6, in <lambda> 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense 
    return DenseVector(elements) 
    File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__ 
    ar = np.array(ar, dtype=np.float64) 
ValueError: could not convert string to float: Latitude 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 
+0

私がすることができ、また行って、Pythonのリストにグループ私の緯度と経度を: 'LAT = df.rdd.map(ラムダY:y.latitude).collectを( ) これは次のようになります。[lat1、lat2、lat3 ...] または lat_lon = df.rdd.map(ラムダx:[x.latitude、x.longitude])。collect() ' :[[lat1、lon1]、[lat2、lon2]、[lat3、lon3] ...] これは私のkモデルへの入力には便利かもしれませんが、どのようにすればよいか分かりません – msharky

+0

エラーA)はk-データをロードするとき、およびB)緯度および経度をロードするとき、k-手段はHaversine距離を使用できないため、悪い結果を生成します。 –

+0

なぜ 'sortByKey'を呼びますか? –

答えて

0

あなたの誤差がありますPairwiseRDDdf.rddはあなたにRDDの行を与え、あなたの場合は各行に5つの値があります)。*byKeyメソッドはPairwiseRDDsで動作します。つまり、長さ2のタプルを含むRDDまたはその他構造はk, v = pairのように開梱できます。

それ以外は、RDDでmlアルゴリズムを使用しようとしています。ここでは、kmeanモデルにデータフレームを渡す必要があります(デフォルトでは、kmeans.fitは、featuresという名前の列のデータフレームが必要です)。文書hereを参照できます。あなたは何ができるか

です:

spark_rdd = df.rdd.map(lambda row: (row["ID"], Vectors.dense(row["Latitude"],row["Longitude"]))) 
feature_df = spark_rdd.toDF(["ID", "features"]) 

kmeans = KMeans().setK(2).setSeed(1) 
model = kmeans.fit(feature_df) 
+0

説明していただきありがとうございます@ Duf59。元の投稿の「編集」をチェックして、コードを実装したときに何が起こったのかを確認してください。別のエラーが返されます。乾杯。 – msharky

+0

おそらくあなたのデータと何か関係があるでしょうか?printSchema()には?緯度/経度の列が文字列の場合は、最初に浮動小数点数に変換します。 – Duf59

+0

私はdf.prinSchema()を行うと、それは私を与える: ルート | - ID:文字列(真= NULL可能) | - COL1:文字列(真= NULL可能) | - COL2:文字列(NULL可能= true) | Latitude:string(nullable = true) | Longitude:string(nullable = true) 各列のすべての値を浮動小数点数に変換することを意味しますか?または列名をfloatに変換しますか?どうすればいい?ありがとう – msharky

関連する問題