2017-03-23 9 views
0

集計関数(max)を列に適用していますが、これを結合で参照しています。結合条件の列に適用されたエイリアス集約関数を参照する方法

データフレームの列はmax(column_name)になります。だから、それが簡単にPythonのドット表記を使用して参照するために作るために、私が列をエイリアスが、私はまだエラーを取得しています:

tmp = hiveContext.sql("SELECT * FROM s3_data.nate_glossary WHERE profile_id_guid='ffaff64b-e87c-4a43-b593-b0e4bccc2731'" 
       ) 

max_processed = tmp.groupby('profile_id_guid','profile_version_id','record_type','scope','item_id','attribute_key') \ 
.agg(max("processed_date").alias("max_processed_date")) 

df = max_processed.join(tmp, [max_processed.profile_id_guid == tmp.profile_id_guid, 
          max_processed.profile_version_id == tmp.profile_version_id, 
          max_processed.record_type == tmp.record_type, 
          max_processed.scope == tmp.scope, 
          max_processed.item_id == tmp.item_id, 
          max_processed.attribute_key == tmp.attribute_key, 
          max_processed.max_processed_date == tmp.processed_date]) 

エラー:

File "", line 7, in File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/dataframe.py", line 650, in join jdf = self._jdf.join(other._jdf, on._jc, "inner") File "/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u'resolved attribute(s) processed_date#10 missing from record_type#41,scope#4,item_id#5,profile_id_guid#1,data_type#44,attribute_value#47,logical_id#45,profile_version_id#40,profile_version_id#2,attribute_key#8,max_processed_date#37,attribute_key#46,processed_date#48,scope#42,record_type#3,item_id#43,profile_id_guid#39,ems_system_id#38 in operator !Join Inner, Some((((((((profile_id_guid#1 = profile_id_guid#1) && (profile_version_id#2 = profile_version_id#2)) && (record_type#3 = record_type#3)) && (scope#4 = scope#4)) && (item_id#5 = item_id#5)) && (attribute_key#8 = attribute_key#8)) && (max_processed_date#37 = processed_date#10)));'

注エラーメッセージ: "処理済日付#10がありません "。私は属性のリストでprocessed_date#48とprocessed_date#10を見る。

+0

は 'processed_date#10 'は、属性のリストにない正直に言うと、これはスパーク系統の欠陥だと思います。 'tmp.printSchema'を実行するとどうなりますか? –

答えて

0

参照:

# DataFrame transformation 
tmp -> max_processed -> df 

上記の3つのデータフレームは、同じ系統を共有するので、あなたが複数回同じカラムを使用したい場合、あなたはaliasを使用する必要があります。例えば

は:

tmp = spark.createDataFrame([(1, 3, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val']) 

max_processed = tmp.groupBy(['key1', 'key2']).agg(f.max(tmp['val']).alias('max_val'))\ 
    .withColumnRenamed('key1', 'max_key1').withColumnRenamed('key2', 'max_key2')\ 

df = max_processed.join(tmp, on=[max_processed['max_key1'] == tmp['key1'], 
           max_processed['max_key2'] == tmp['key2'], 
           max_processed['max_val'] == tmp['val']]) 
df.show() 

+--------+--------+-------+----+----+---+          
|max_key1|max_key2|max_val|key1|key2|val| 
+--------+--------+-------+----+----+---+ 
|  1|  3|  1| 1| 3| 1| 
|  2|  3|  1| 2| 3| 1| 
+--------+--------+-------+----+----+---+ 

私はまだ