2015-10-14 9 views
5

のオブジェクトをSparkクラスタのpysparkでプログラミングすることはできません。 データは大きく、メモリにロードされたり、メモリにロードされたり、データに容易pyspark:TypeError:IntegerTypeはタイプ<type 'unicode>>

基本的には

af.b Current%20events 1 996 
af.b Kategorie:Musiek 1 4468 
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209 
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214 

Wikipediaのデータのようになります。

私はpysparkのintepreterで次のPythonコードでスパークデータフレームを構築しようとし、その後、AWS S3からそれを読んで:

parts = data.map(lambda l: l.split()) 
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3])) 


fields = [StructField("project", StringType(), True), 
StructField("title", StringType(), True), 
StructField("count", IntegerType(), True), 
StructField("byte_size", StringType(), True)] 

schema = StructType(fields) 

df = sqlContext.createDataFrame(wikis, schema) 

すべてがうまく見て、唯一createDataFrameは、私がIntegerTypeにカウントする必要がある3番目の列を設定することはできませんなぜ私は

Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame 
rdd, schema = self._createFromRDD(data, schema, samplingRatio) 
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD 
_verify_type(row, schema) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type 
_verify_type(v, f.dataType) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type 
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj))) 
TypeError: IntegerType can not accept object in type <type 'unicode'> 

をエラー与えますか? どうすればこの問題を解決できますか?

+1

あなたのbyte_size StructfieldがStringType型であり、IntegerTypeである必要がありますか? – ccheneson

+0

@ccheneson thxコメント –

答えて

5

cchenesonのように間違ったタイプを渡します。

data = sc.parallelize(["af.b Current%20events 1 996"]) 

あなたがRDD[List[String]]を取得する最初のマップの後:

parts = data.map(lambda l: l.split()) 
parts.first() 
## ['af.b', 'Current%20events', '1', '996'] 

第2のマップは(String, String, String, String)をタプルに変換する:

wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3])) 
wikis.first() 
## ('af.b', 'Current%20events', '1', '996') 

あなたのあなたにdataを想定し

は次のようになりますschemaは3r d列は整数です。

スキーマは、完全なテーブルスキャンで型を推論するのを避けるために最もよく使用され、型キャストを実行しません。

あなたは、最後のマップ中にデータをキャストすることができ、次のいずれか

wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3])) 

またはサイドノートcountStringTypeキャスト欄

fields[2] = StructField("count", StringType(), True) 
schema = StructType(fields) 

wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count") 

としてcountを定義するには、SQLおよびshouldn」内の予約語であります列名として使用されます。 Sparkでは、いくつかのコンテキストでは期待通りに動作し、別のコンテキストでは失敗します。

0

apache 2.0を使用すると、スパークがデータのスキーマを推測できるようにすることができます。全体として、上記のようにパーサー関数をキャストする必要があります:

"スキーマがNoneの場合、スキーマ(列名と型)をデータから推測しようとします。これはRowのRDDでなければなりません。 namedtuple、またはdict。 "

関連する問題