2016-10-04 3 views
1

idフィールドに自動インクリメント値を生成する必要があります。私のアプローチは、ウィンドウ関数と最大関数を使用することでした。spark dataframe window関数の結果をmaxのような通常の関数に追加します。自動インクリメント

私は純粋なデータフレームソリューション(rddなし)を探しています。

私はright-outer joinをした後だから私はこのデータフレームを取得する:

df2 = sqlContext.createDataFrame([(1,2), (3, None), (5, None)], ['someattr', 'id']) 

# notice null values? it's a new records that don't have id just yet. 
# The task is to generate them. Preferably with one query. 

df2.show() 

+--------+----+ 
|someattr| id| 
+--------+----+ 
|  1| 2| 
|  3|null| 
|  5|null| 
+--------+----+ 

私はidフィールドの自動インクリメント値を生成する必要があります。

AnalysisException       Traceback (most recent call last) 
<ipython-input-102-b3221098e895> in <module>() 
    10 
    11 
---> 12 df2.withColumn('hello', when(df2.id.isNull(), row_number().over(Window.partitionBy('id').orderBy('id')) + max('id')).otherwise(df2.id)).show() 

/Users/ipolynets/workspace/spark-2.0.0/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col) 
    1371   """ 
    1372   assert isinstance(col, Column), "col should be Column" 
-> 1373   return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx) 
    1374 
    1375  @ignore_unicode_prefix 

/Users/ipolynets/workspace/spark-2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    931   answer = self.gateway_client.send_command(command) 
    932   return_value = get_return_value(
--> 933    answer, self.gateway_client, self.target_id, self.name) 
    934 
    935   for temp_arg in temp_args: 

/Users/ipolynets/workspace/spark-2.0.0/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    67            e.java_exception.getStackTrace())) 
    68    if s.startswith('org.apache.spark.sql.AnalysisException: '): 
---> 69     raise AnalysisException(s.split(': ', 1)[1], stackTrace) 
    70    if s.startswith('org.apache.spark.sql.catalyst.analysis'): 
    71     raise AnalysisException(s.split(': ', 1)[1], stackTrace) 

AnalysisException: u"expression '`someattr`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;" 

ないこの例外は正直に言うと文句を言うのかわから:私のアプローチは、私は例外以下、このことの昇給を行うと

df2.withColumn('id', when(df2.id.isNull(), row_number().over(Window.partitionBy('id').orderBy('id')) + max('id')).otherwise(df2.id)) 

ウィンドウ機能を使用することでした。

window機能を通常のmax()機能に追加する方法に注意してください。

row_number().over(Window.partitionBy('id').orderBy('id')) + max('id')

私はそれをしても許されていますかはわかりません。

Ohh ..これは望ましいクエリの出力です。あなたがすでに考えているように。

+--------+----+ 
|someattr| id| 
+--------+----+ 
|  1| 2| 
|  3| 3| 
|  5| 4| 
+--------+----+ 

答えて

1

結果DATAFRAMEにもsomeattr列が存在しますので、あなたは、列を追加しています。

group bysomeattrを含めるか、一部の集計機能で使用する必要があります。

しかし、この方法でそれを行うには簡単です:

もちろん
df2.registerTempTable("test") 
df3 = sqlContext.sql(""" 
    select t.someattr, nvl (t.id, row_number(partition by id) over() + maxId.maxId) as id 
    from test t 
    cross join (select max(id) as maxId from test) as maxId 
""") 

あなたはDSLにそれを翻訳することができ、しかし、SQLは、このタスクのために私のために、より容易になるように思わ

関連する問題