2017-02-03 8 views
1

私は、文字列がタイムスタンプとして評価される原因となるsparkデータフレームに本当に奇妙なエラーがあります。pyspark createdataframe:タイムスタンプとして解釈される文字列、スキーマが列を混合する

from datetime import datetime 
from pyspark.sql import Row 
from pyspark.sql.types import StructType, StructField, StringType, TimestampType 

new_schema = StructType([StructField("item_id", StringType(), True), 
         StructField("date", TimestampType(), True), 
         StructField("description", StringType(), True) 
         ]) 

df = sqlContext.createDataFrame([Row(description='description', date=datetime.utcnow(), item_id='id_string')], new_schema) 

これは私に次のエラーを与える:ここで

は私のセットアップコードである

AttributeError Traceback (most recent call last) in() ----> 1 df = sqlContext.createDataFrame([Row(description='hey', date=datetime.utcnow(), item_id='id_string')], new_schema)

/home/florian/spark/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 307 Py4JJavaError: ... 308 """ --> 309 return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema) 310 311 @since(1.3)

/home/florian/spark/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 522 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) 523 else: --> 524 rdd, schema = self._createFromLocal(map(prepare, data), schema) 525 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) 526 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/home/florian/spark/python/pyspark/sql/session.pyc in _createFromLocal(self, data, schema) 397 398 # convert python objects to sql data --> 399 data = [schema.toInternal(row) for row in data] 400 return self._sc.parallelize(data), schema 401

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, obj) 574 return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) 575 elif isinstance(obj, (tuple, list)): --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 577 elif hasattr(obj, "dict"): 578 d = obj.dict

/home/florian/spark/python/pyspark/sql/types.pyc in ((f, v)) 574 return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) 575 elif isinstance(obj, (tuple, list)): --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 577 elif hasattr(obj, "dict"): 578 d = obj.dict

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, obj) 434 435 def toInternal(self, obj): --> 436 return self.dataType.toInternal(obj) 437 438 def fromInternal(self, obj):

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, dt) 188 def toInternal(self, dt): 189 if dt is not None: --> 190 seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo 191 else time.mktime(dt.timetuple())) 192 return int(seconds * 1e6 + dt.microsecond)

AttributeError: 'str' object has no attribute 'tzinfo'

文字列がTimestampType.toInternal()に

渡されたかのように、これが見えます本当に奇妙なことは、このデータフレームで同じエラーが発生するということです。

df = sqlContext.createDataFrame([Row(description='hey', date=None, item_id='id_string')], new_schema) 

この1つが機能している間:

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id='id_string')], new_schema) 

をし、この1つは、同様に動作します。私にとって

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema) 

を、これは今、「pysparkは何とか欄に「ITEM_ID」から値を置くことを意味しこのエラーが発生します。 私は何か間違っていましたか?これはデータフレーム内のバグですか?

情報: 私はpysparkを使用しています2.0.1

編集:あなたは、Rowオブジェクトを作成すると

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema) 
df.first() 

Row(item_id=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2017,MONTH=1,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=3,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=1,HOUR=3,HOUR_OF_DAY=15,MINUTE=19,SECOND=30,MILLISECOND=85,ZONE_OFFSET=?,DST_OFFSET=?]', date=None, description=None)

答えて

1

、フィールドは(http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.Row)アルファベット順にソートされているので、あなたがいるときRow(description, date, item_id)オブジェクトを作成すると、(date, description, item_id)という名前になります。

この行とスキーマとデータフレームを作成するときに、あなたのスキーマがStringType, TimestampType, StringType、と注文すると、スパークがStringTypedateにあるものにマップされますが、StringTypeからTimestampTypeitem_iddescriptionにするものです。

それはエラーが述べたように、文字列オブジェクトがない、tzinfo属性を要求するので、StringTypeに(datetime形式で)タイムスタンプを渡すとエラーが発生しないが、TimestampTypeに文字列を渡すことはありませんそれを持っています。

また、NoneがスキーマのTimestampTypeに渡されているため、実際に働いたデータフレームが実際に働いた理由は受け入れられる値です。

0

@ rafael-zanettiからの上記の回答を参考にしてください。列をソートするには、次の手順を実行します。

new_schema = [StructField("item_id", StringType(), True), 
        StructField("date", TimestampType(), True), 
        StructField("description", StringType(), True)] 
new_schema = StructType(sorted(new_schema, key=lambda f: f.name)) 
関連する問題