2016-05-11 18 views
1

私はScalaで次のSparkコードを持っています。私はPysparkに変換したいと思います。 Pythonにはcase classがありません。私はいくつかのことを試しましたが、何も働かなかったのです。 predictionsは、Dataframeです。私が直面している問題は、vector要素を含む確率列をキャストすることです。DataFrameを別のDataFrameに変換する際のエラー

case class R(probability: Double, label: Double) 
val predictionsC = predictions.select("probability", "label") 
     .rdd 
     .map(x => R(x(0).asInstanceOf[org.apache.spark.mllib.linalg.Vector].apply(1), 
        x(1).asInstanceOf[Double])) 
     .toDF 

私は、次のコードを試してみましたが、それは

prunedPredictions = predictions.select(predictions["probability"], predictions["label"]) 
predictionsC = prunedPredictions.map(lambda r: Row(prediction = r[0],probability = r[1][1])) 

改訂動作しません:

私は--py-filesオプションを使用してnumpyのライブラリを追加し、今私は取得しています次のエラーメッセージ。

File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 232.0 failed 4 times, most recent failure: Lost task 0.3 in stage 232.0 (TID 60801, anp-r01wn02.c03.hadoop.td.com): org.apache.spark.SparkException: 
Error from python worker: 
/usr/bin/python: No module named mtrand 
PYTHONPATH was: 
/usr/lib/spark/lib/spark-assembly-1.5.0-cdh5.5.1-hadoop2.6.0-cdh5.5.1.jar:/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip:/usr/lib/spark/python/::/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip:/usr/lib/spark/python/lib/pyspark.zip:/data/10/yarn/nm/usercache/zakerh2/appcache/application_1462889699566_2857/container_e37_1462889699566_2857_01_000332/numpy.zip 
java.io.EOFException 
    at java.io.DataInputStream.readInt(DataInputStream.java:392) 
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163) 
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:86) 
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62) 
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:135) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:73) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

probability = r[1][1]はdouble型です。このエラーメッセージはどのようなものでしょうか?

答えて

2

Pythonは実際にはasInstanceOfの同等物を提供しません。なぜなら、単に役に立たないからです。あなたが本当に必要とするのは型変換です。 probabilityvector型である場合には、pyspark.mllib.linalg.Vectorに変換され、あなたがgetitem(括弧表記)戻り値の型を使用するときに64ビットには、Spark SQL同等物を持っていません

from pyspark.mllib.linalg import DenseVector 
v = DenseVector([1, 2, 3]) 

type(v[0]) 
## numpy.float64 

フロートnumpyのさ。

sc.parallelize([(v[0],)]).toDF() 
## TypeError         Traceback (most recent call last) 
## ... 
## TypeError: not supported type: <class 'numpy.float64'> 

それはあなたが浮いているように、結果を変換することができますどちらか動作するように。

sc.parallelize([(float(v[0]),)]).toDF() 
## DataFrame[_1: double] 

または正しい方法で使用します:側では

sc.parallelize([(v.array.item(0),)]).toDF() 
## DataFrame[_1: double] 

を、ここにUDFを使用する方がよいでしょう注意してください。

from pyspark.sql.functions import udf 
from pyspark.sql.types import DoubleType 

def getitem(i): 
    def getitem_(v): 
     return v.array.item(i) 
    return udf(getitem_, DoubleType()) 

sc.parallelize([(v,)]).toDF(["v"]).select(getitem(0)("v")) 
+0

どのように私は私のコードであること使用できますか?問題は、上記のマップ関数で '' prediction = r [0] [0] ''のようなものがあると、エラーが発生します。 –

+0

エラーは何ですか? – zero323

+1

[this](http://stackoverflow.com/q/37150596/1560062)を意味しますか?それは依存関係の問題であり、キャストすることは何もありません。 mllib型を使用するには、すべてのノードでnumpyにアクセス可能である必要があります。 – zero323

関連する問題