2016-07-06 54 views
-1

PySparkでは、RDDの各要素に対して、要素の配列を取得しようとしています。その結果をDataFrameに変換したいと考えています。何も出力がオンに存在しないようprint "in goThroughAB"フラットマップをRDDに適用できません

def simulate(jobId, house, a, b): 
    return Row(jobId=jobId, house=house, a=a, b=b, myVl=[i for i in range(10)]) 

def goThroughAB(jobId, house): 
    print "in goThroughAB" 
    results = [] 
    for a in as: 
    for b in bs: 
     results += simulate(jobId, house, a, b) 
    print type(results) 
    return results 

不思議なことには、何の効果もありません。私はこれらのヘルパーメソッドを呼び出しています、その中で

simulation = housesDF.flatMap(lambda house: goThroughAB(jobId, house)) 
    print simulation.toDF().show() 

私は、次のコードを持っていますスクリーン。

しかし、私はこのエラーを取得しています:この行で

---> 23 print simulation.toDF().show() 
    24 
    25 dfRow = sqlContext.createDataFrame(simulationResults) 

/databricks/spark/python/pyspark/sql/context.py in toDF(self, schema, sampleRatio) 
    62   [Row(name=u'Alice', age=1)] 
    63   """ 
---> 64   return sqlContext.createDataFrame(self, schema, sampleRatio) 
    65 
    66  RDD.toDF = toDF 

/databricks/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio) 
    421 
    422   if isinstance(data, RDD): 
--> 423    rdd, schema = self._createFromRDD(data, schema, samplingRatio) 
    424   else: 
    425    rdd, schema = self._createFromLocal(data, schema) 

/databricks/spark/python/pyspark/sql/context.py in _createFromRDD(self, rdd, schema, samplingRatio) 
    308   """ 
    309   if schema is None or isinstance(schema, (list, tuple)): 
--> 310    struct = self._inferSchema(rdd, samplingRatio) 
    311    converter = _create_converter(struct) 
    312    rdd = rdd.map(converter) 

/databricks/spark/python/pyspark/sql/context.py in _inferSchema(self, rdd, samplingRatio) 
    261 
    262   if samplingRatio is None: 
--> 263    schema = _infer_schema(first) 
    264    if _has_nulltype(schema): 
    265     for row in rdd.take(100)[1:]: 

/databricks/spark/python/pyspark/sql/types.py in _infer_schema(row) 
    829 
    830  else: 
--> 831   raise TypeError("Can not infer schema for type: %s" % type(row)) 
    832 
    833  fields = [StructField(k, _infer_type(v), True) for k, v in items] 

TypeError: Can not infer schema for type: <type 'str'> 

を:goThroughABが実行されないように

print simulation.toDF().show() 

だから、flatMapが実行されないことを意味し、見えます。

コードの問題は何ですか?

+0

print文は、分散環境で役に立たない文句を言う、最初の要素をサンプリングし、それをstr(あなたJOBID)を発見しました。そして、この質問には、例として 'housesDF.'はありません。 – zero323

答えて

0

まず、ドライバではなくSparkエグゼキュータで印刷します。ご存知のように、エグゼキュータは、Sparkタスクを並行して実行するリモートプロセスです。彼らはその行を自分のコンソールに表示します。あるexecutorが特定のパーティションを実行しているかどうか分からず、分散環境のprint文には依存しないでください。

次に、DataFrameを作成するときにSparkがテーブルのスキーマを知る必要があるという問題があります。指定しない場合は、サンプリング比が使用され、行のタイプを判別するためにいくつかの行がチェックされます。サンプリング比を指定しないと、最初の行のみがチェックされます。これはあなたのケースで発生し、おそらくタイプが判別できないフィールドを持っています(おそらくヌルです)。

これを解決するには、スキーマをtoDF()メソッドに追加するか、またはゼロ以外のサンプリング比を指定する必要があります。スキーマは、次のように事前に作成することができます。

schema = StructType([StructField("int_field", IntegerType()), 
        StructField("string_field", StringType())]) 
0

このコードは正しくありません。 results += simulate(jobId, house, a, b)行を連結しようとし、失敗します。 TypeErrorが表示されていない場合は到達しておらず、おそらくhousesDFを作成したときにコードが別の場所で失敗します。

-1

他の人に指摘されているように、重要な問題はresults += simulate(jobId, house, a, b)です。これはsimulationがRowオブジェクトを返すと機能しません。 resultslistに変更し、list.appendを使用してみてください。しかし、どうしてですか?yield

def goThroughAB(jobId, house): 
    print "in goThroughAB" 
    results = [] 
    for a in as: 
    for b in bs: 
     yield simulate(jobId, house, a, b) 

+ 2つの行オブジェクトが発生した場合はどうなりますか?

In[9]: 
from pyspark.sql.types import Row 
Row(a='a', b=1) + Row(a='b', b=2) 

Out[9]: 
('a', 1, 'b', 2) 

はその後toDFは、したがって

TypeError: Can not infer schema for type: <type 'str'> 
関連する問題