2016-12-21 14 views
0

を移行するとき、私は次のようにインパラでの組み込み関数を使用しています。今、私は(Jupyterノートでpysparkを使用して)SparkSQLに移行していますインパラ組み込み関数は利用できませんインパラからSparkSQL

select id, parse_url(my_table.url, "QUERY", "extensionId") from my_table 

また、以下試してみました

NameError: name 'parse_url' is not defined 

my_table.select(my_table.id.cast('string'), parse_url(my_table.url.cast('string'), "QUERY", "extensionId")).show() 

しかし、私は次のエラーを得た

my_table.registerTempTable("my_table") 

sqlContext.sql("select id, url, parse_url(url, 'QUERY', 'extensionId') as new_url from my_table").show(100) 

しかし、すべてnew_urlnullになります。

私がここで逃したものは何ですか?また、どのように人々はそのような問題を扱うだろうか?ありがとう!

答えて

1

一部欠けている部分:

  • あなたはスパークとインパラの機能を実行することはできません。
  • Sparkで使用できる同じ名前と構文のHive UDFがありますが、ネイティブ実装と関数ラッパーはありません。このため、HiveサポートのあるHiveContext/SparkSessionを使用してSQLを呼び出すことができます。

    spark.sql("""SELECT parse_url(
        'http://example.com?extensionId=foo', 'QUERY', 'extensionId' 
    )""").show() 
    
    +-----------------------------------------------------------------+ 
    |parse_url(http://example.com?extensionId=foo, QUERY, extensionId)| 
    +-----------------------------------------------------------------+ 
    |                foo| 
    +-----------------------------------------------------------------+ 
    

    NULL出力所与の部分が一致させることができないことを意味します:一般的に、それはうまく動作するはず

​​
+---------------------------------------------------------+ 
|parse_url(http://example.com?bar=foo, QUERY, extensionId)| 
+---------------------------------------------------------+ 
|              null| 
+---------------------------------------------------------+ 

あなたは同様のを達成できます結果はUDFを使用して発生しますが、大幅に遅くなります。定義されたデータと

from typing import Dict 
from urllib.parse import parse_qsl, urlsplit 
from pyspark.sql.functions import udf 
from pyspark.sql.types import StringType, MapType 

def parse_args(col: str) -> Dict[str, str]: 
    """ 
    http://stackoverflow.com/a/21584580/6910411 
    """ 
    try: 
     return dict(parse_qsl(urlsplit(col).query)) 
    except: 
     pass 

parse_args_ = udf(parse_args, MapType(StringType(), StringType())) 

df = sc.parallelize([ 
    ("http://example.com?bar=foo",), 
    ("http://example.com?extensionId=foo",), 
]).toDF(["url"]) 

以下のように使用することができる。

+----------------------------+ 
|parse_args(url)[extensionId]| 
+----------------------------+ 
|      null| 
|       foo| 
+----------------------------+ 

df.select(parse_args_("url")["extensionId"]).show() 

結果た状態で

関連する問題