のデータを前処理していますが、緯度と経度のデータセットでK平均クラスタリングを実行しようとしています。 .csvデータをスパークデータフレーム(〜1M行)にインポートし、データフレームをk-meansモデルの入力として読み込もうとしましたが、エラーが発生しています。spark/exampleディレクトリのkmeansの例を見て、pyspark
マイスパークデータフレームは、次のようになります。ここでは
ID col1 col2 Latitude Longitude
ford ... ... 22.2 13.5
landrover ... ... 21.4 13.8
mercedes ... ... 21.8 14.1
bmw ... ... 28.9 18.0
... ... ... .... ....
は私のコードです:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
df = spark.read.csv('file.csv')
spark_rdd = df.rdd.sortByKey()
parsedData = spark_rdd.map(lambda x: Vectors.dense(x[3],x[4])).sortByKey()
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(parsedData)
sum_of_squared_errors = model.computeCost(parsedData)
print str(sum_of_squared_errors)
centers = model.clusterCenters()
for center in centers:
print(center)
次のように私が手にエラーがある:
Py4JJavaError Traceback (most recent call last)
<ipython-input-32-76d5a466dc4c> in <module>()
3
4 spark_rdd = df.rdd.sortByKey()
----> 5 parsedData = spark_rdd.map(lambda x: Vectors.dense(x[3],x[4])).sortByKey()
6
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in sortByKey(self, ascending, numPartitions, keyfunc)
660 # the key-space into bins such that the bins have roughly the same
661 # number of (key, value) pairs falling into them
--> 662 rddSize = self.count()
663 if not rddSize:
664 return self # empty RDD
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in count(self)
1039 3
1040 """
-> 1041 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1042
1043 def stats(self):
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in sum(self)
1030 6.0
1031 """
-> 1032 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
1033
1034 def count(self):
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
904 # zeroValue provided to each partition is unique from the one provided
905 # to the final reduce call
--> 906 vals = self.mapPartitions(func).collect()
907 return reduce(op, vals, zeroValue)
908
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in collect(self)
807 """
808 with SCCallSiteSync(self.context) as css:
--> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
810 return list(_load_from_socket(port, self._jrdd_deserializer))
811
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0 (TID 139, 10.3.1.31, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
process()
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 138, in dump_stream
for obj in iterator:
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1752, in add_shuffle_key
for k, v in iterator:
ValueError: too many values to unpack
...
どれでも役立つだろう大変感謝します。おかげ
EDIT:あなたはDuf59 @返信いただきありがとうございます。データフレームにはIDごとに複数のデータポイントがあることに注意してください(例: 'landrover'の50データポイント、 'bmw'の70データポイント、 'mercedes'の80データポイントなど)
私のメソッドを使用すると、次のエラー: --------------------------------------------- ------------------------------あなたがされていないRDDにsortByKey()
を適用するため
Py4JJavaError Traceback (most recent call last)
<ipython-input-53-37fce322868d> in <module>()
5
6 spark_rdd = df.rdd.map(lambda row: (row["ID"], Vectors.dense(row["Latitude"],row["Longitude"])))
----> 7 feature_df = spark_rdd.toDF(["ID", "features"])
8 feature_df.show()
9
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.pyc in toDF(self, schema, sampleRatio)
55 [Row(name=u'Alice', age=1)]
56 """
---> 57 return sparkSession.createDataFrame(self, schema, sampleRatio)
58
59 RDD.toDF = toDF
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema)
518
519 if isinstance(data, RDD):
--> 520 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
521 else:
522 rdd, schema = self._createFromLocal(map(prepare, data), schema)
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.pyc in _createFromRDD(self, rdd, schema, samplingRatio)
358 """
359 if schema is None or isinstance(schema, (list, tuple)):
--> 360 struct = self._inferSchema(rdd, samplingRatio)
361 converter = _create_converter(struct)
362 rdd = rdd.map(converter)
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.pyc in _inferSchema(self, rdd, samplingRatio)
329 :return: :class:`pyspark.sql.types.StructType`
330 """
--> 331 first = rdd.first()
332 if not first:
333 raise ValueError("The first row in RDD is empty, "
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in first(self)
1359 ValueError: RDD is empty
1360 """
-> 1361 rs = self.take(1)
1362 if rs:
1363 return rs[0]
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
967
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 134.0 failed 4 times, most recent failure: Lost task 0.3 in stage 134.0 (TID 557, 10.3.1.31, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
process()
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
yield next(iterator)
File "<ipython-input-53-37fce322868d>", line 6, in <lambda>
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense
return DenseVector(elements)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__
ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: Latitude
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
process()
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
yield next(iterator)
File "<ipython-input-53-37fce322868d>", line 6, in <lambda>
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense
return DenseVector(elements)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__
ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: Latitude
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
私がすることができ、また行って、Pythonのリストにグループ私の緯度と経度を: 'LAT = df.rdd.map(ラムダY:y.latitude).collectを( ) これは次のようになります。[lat1、lat2、lat3 ...] または lat_lon = df.rdd.map(ラムダx:[x.latitude、x.longitude])。collect() ' :[[lat1、lon1]、[lat2、lon2]、[lat3、lon3] ...] これは私のkモデルへの入力には便利かもしれませんが、どのようにすればよいか分かりません – msharky
エラーA)はk-データをロードするとき、およびB)緯度および経度をロードするとき、k-手段はHaversine距離を使用できないため、悪い結果を生成します。 –
なぜ 'sortByKey'を呼びますか? –