2016-05-02 9 views
0

csvファイルをRDD形式にインポートしようとしています。私が.first()コマンドを使用してrddの最初の行を取得すると、以下のようにエラーが発生します。first()は既存のRDDで動作しません

.map機能は、.first().count()のようなコマンドが動作しないパイプライン型RDDにRDDを機能させるようです。これに取り組む他の方法はありますか?

import csv 
import StringIO 

def loadRecord(line): 
    input = StringIO.StringIO(line) 
    reader = csv.DictReader(input, fieldnames=["PassengerId","Survived","Pclass","Name","Sex","Age","SibSp","Parch","Ticket","Fare","Cabin","Embarked"]) 
    return reader.next() 
input = sc.textFile("C:\Users\rohit.guglani\Documents/train.csv",4).map(loadRecord) 

type(input) 

pyspark.rdd.PipelinedRDD 



input.first() 

は、このエラーを与える:Windows上で作業するとき

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-9-d93d15081c08> in <module>() 
----> 1 input.first() 

C:\spark-1.6.1\python\pyspark\rdd.pyc in first(self) 
    1313   ValueError: RDD is empty 
    1314   """ 
-> 1315   rs = self.take(1) 
    1316   if rs: 
    1317    return rs[0] 

C:\spark-1.6.1\python\pyspark\rdd.pyc in take(self, num) 
    1265   """ 
    1266   items = [] 
-> 1267   totalParts = self.getNumPartitions() 
    1268   partsScanned = 0 
    1269 

C:\spark-1.6.1\python\pyspark\rdd.pyc in getNumPartitions(self) 
    2361 
    2362  def getNumPartitions(self): 
-> 2363   return self._prev_jrdd.partitions().size() 
    2364 
    2365  @property 

C:\spark-1.6.1\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

C:\spark-1.6.1\python\pyspark\sql\utils.pyc in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

C:\spark-1.6.1\python\lib\py4j-0.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling o50.partitions. 
ohit.guglani/Documents/train.csv 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Unknown Source) 
+0

を使用しているときには注意しなければなりません。 textFileに渡すパス文字列をチェックして、バックスラッシュ文字を含まないか、文字列の先頭に 'r'を付けて、Pythonがバックスラッシュ文字を特殊文字と解釈しないようにします。 –

+0

ありがとうフィリップ。バックスラッシュに問題がありました。パイプライン化されたRDDでは.first()を実行できませんが、.top(n)はパイプライン化されたRDDで動作します。 –

答えて

0

コードは私のために動作しますが、Linuxプラットフォーム上で/代わりの\

input = sc.textFile("C:\Users\rohit.guglani\Documents\train.csv",4).map(loadRecord) 
関連する問題