2017-08-04 18 views
0

私がPySparkコードに変換しようとしているPandasでプロトタイプしたコードがいくつかあります。これは、urlparse Pythonライブラリを使用して、Python辞書への汎用URIを解析し、そのキーを新しい列に変換し、それらの新しい列を元のデータに連結して戻します。以下に簡単な例を示します。実際のデータセットには38個の列があり、それらのすべてを保持することが大事です。Python DictをPySparkのRDDまたはDFに変換する

このようなもの(切り取られた)で結果の
# create some sample data 
df_ex = pd.DataFrame([[102,'text1',u'/some/website/page.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&jsl=161&ln=en&pc=men&dp=www.mysite.com&qfq=news/this-is-an-article&of=2&uf=1&pd=0&irt=0&md=0&ct=1&tct=0&abt=0&lt=792&cdn=1&lnlc=gb&tl=c=141,m=433,i=476,xm=1243,xp=1254&pi=2&&rb=0&gen=100&callback=_ate.track.hsr&mk=some,key,words,about,the,article&'], 
      [781,'text2',u'/libtrc/hearst-network/loader.js'], 
      [9001,'text3',u'/image/view/-/36996720/highRes/2/-/maxh/150/maxw/150/mypic.jpg'], 
      [121,'text4',u'/website/page2.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&qqd=1&pd=0&irt=0&md=0&zzct=1&tct=0&abt=0&lt=792&cdn=0&lnlc=gb&tl=c=414,m=32,i=41,xm=1000,xp=111&callback=_ate.track.hsr&mk=some,other,key,words,about,the,article&'], 
      [781,'text5',u'/libtrc/hearst-network/loader.js']],columns=['num','text','uri']) 

# parse the URI to a dict using urlparse 
df_ex['uri_dict'] = df_ex['uri'].apply(lambda x: dict(urlparse.parse_qsl(urlparse.urlsplit(x).query))) 

# convert the parsed dict to a series 
df_ex_uridict_series = df_ex['uri_dict'].apply(pd.Series) 

# concatenate the parsed dict (now columns) back with original DF 
df_final = pd.concat([df_ex, df_ex_uridict_series], axis=1).drop('uri_dict', axis=1) 

Result after parsing URI; Pandas DF (sparse matrix)

結果は非常に希薄であるが、それは大丈夫です。アプリケーションの場合、私は実際にそれがソートの疎な行列であることを好みます(ただし、良い代替方法、密なアプローチがあると確信できます)。 PySparkで再作成しようとしているのはこの結果です。

これまで(PySpark 2.1.0では)これは(同じデータを使って)です。

# urlparse library 
import urlparse 

# create the sample data as RDD 
data = sc.parallelize([[102,'text1',u'/some/website/page.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&jsl=161&ln=en&pc=men&dp=www.mysite.com&qfq=news/this-is-an-article&of=2&uf=1&pd=0&irt=0&md=0&ct=1&tct=0&abt=0&lt=792&cdn=1&lnlc=gb&tl=c=141,m=433,i=476,xm=1243,xp=1254&pi=2&&rb=0&gen=100&callback=_ate.track.hsr&mk=some,key,words,about,the,article&'],[781,'text2',u'/libtrc/hearst-network/loader.js'],[9001,'text3',u'/image/view/-/36996720/highRes/2/-/maxh/150/maxw/150/mypic.jpg'],[121,'text4',u'/website/page2.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&qqd=1&pd=0&irt=0&md=0&zzct=1&tct=0&abt=0&lt=792&cdn=0&lnlc=gb&tl=c=414,m=32,i=41,xm=1000,xp=111&callback=_ate.track.hsr&mk=some,other,key,words,about,the,article&'],[781,'text5',u'/libtrc/hearst-network/loader.js']]) 

# simple map to parse the uri 
uri_parsed = data.map(list).map(lambda x: [x[0],x[1],urlparse.parse_qs(urlparse.urlsplit(x[2]).query)]) 

これは、RDDの各「行」の中に入れ子にされたpython dictを使用してかなり近づけます。このように:

In [187]: uri_parsed.take(3) 
Out[187]: 
[[102, 
    'text1', 
    {u'abt': [u'0'], 
    u'callback': [u'_ate.track.hsr'], 
    u'cdn': [u'1'], 
    u'colc': [u'1452802104103'], 
    u'ct': [u'1'], 
    u'dp': [u'www.mysite.com'], 
    u'gen': [u'100'], 
    u'irt': [u'0'], 
    u'jsl': [u'161'], 
    u'ln': [u'en'], 
    u'lnlc': [u'gb'], 
    u'lt': [u'792'], 
    u'md': [u'0'], 
    u'mk': [u'some,key,words,about,the,article'], 
    u'of': [u'2'], 
    u'pc': [u'men'], 
    u'pd': [u'0'], 
    u'pi': [u'2'], 
    u'qfq': [u'news/this-is-an-article'], 
    u'rb': [u'0'], 
    u'rev': [u'v4.1.2-wp'], 
    u'si': [u'569800363b029b74'], 
    u'tct': [u'0'], 
    u'tl': [u'c=141,m=433,i=476,xm=1243,xp=1254'], 
    u'uf': [u'1']}], 
[781, 'text2', {}], 
[9001, 'text3', {}]] 

値はリストが含まれているが、それは大丈夫です。彼らは単にリストにとどまることができます。

ここで私がしたいのは、キーから新しい列を作成するためにdictのキー/値のペアを解析してから、これに値のリストを入れます大文字と小文字を区別します)。私が試した

いくつかの点:

  • 本格PySpark DF行くには:UDFを書き、DFの新しい列を作成するwith_columnを使用して適用しました。これは動作しますが、私はdict全体を単一の文字列として返します(キーと値は引用符で囲まれていません)。私はこれを前に進めず、引用符を追加しようとしなかった(良い方法があると思った)。
  • 元のDFを分割する:まずmonotonically_increasing_id()を使用して各DF行に一意のIDを割り当て、2つの列(新しいIDとURI)を分割し、分割をRDDに変換して解析します。これは私が(IDを使用して)戻って参加することができますが、それは私が望む "希薄な行列"を作成するのに役立ちませんでした。

これらのテクノロジ(HiveデータストアでSpark v2.1.0を使用)は、このタイプのデータを表す正しい基礎となる技術ではない可能性があります。おそらく、スキーマレスのデータストアが良いでしょう。しかし、SparkとHiveをデータストアとして使用するようになっています。

ご協力いただければ幸いです!

答えて

1

私は最近、キー値のペアが '='で区切られた文字列を解析するのと同様の問題を探していました。ここでは可能なキーはあらかじめ分かっていませんでした。

私はそれが最も効率的な解決策であるかどうかはわかりませんが、任意のタグを発見して処理するためにrddを数回実行する解決策を考え出しました。

まず、URLと行ラベルを付けNUM-テキストのペアを解析:

def urlparsefn(url): 
    return urlparse.parse_qs(urlparse.urlsplit(url).query) 

# parse the uri to a dictionary 
uri_parsed = data.map(lambda x: (x[0],x[1],urlparsefn(x[2]))) 

次にあなたが使用してそれらを一緒に集約し、各URIの辞書の一意キーを抽出することにより、すべての個別のタグを抽出することができますPython setを使用すると、重複を簡単に削除できます。

# We need to discover all the unique keys before we will know which columns our data frame will have 
combOp = (lambda x, y: x.union(y)) 
possible_keys_set = uri_parsed.map(lambda x: set(x[2].keys())).aggregate(set(), combOp, combOp) 
possible_keys = sorted(list(possible_keys_set)) # sets have no order, this will give us alphabetical order in the final dataframe 

今、私たちはすべてのユニークな可能性の鍵を持っていることを、私たちは列をラベルに異なるnumとテキストを抽出し、各辞書はその中のすべてのタグを持っていることを確認し、その要素のためのいくつかのプレースホルダーテキストを使用することができます特定のURI辞書には存在しません。次に、Pythonのキーワード引数を使用して、RddのRddを構築することができます。

def attrmap(urirow, possible_keys): 
    # Extract the 3 parts of the uri tuple 
    num = urirow[0] 
    text = urirow[1] 
    uridict = urirow[2] 

    # Assign the known fields identifying the row 
    uridict['num'] = num 
    uridict['text'] = text 

    # Run through the possible keys, add a placeholder for any keys not present in the row 
    for key in possible_keys: 
     if key not in uridict: 
      uridict[key] = 'N/A' # Some place holder for values in the list of possible keys, but not in the current uri dictionary 
     else: 
      uridict[key] = uridict[key][0] # All the lists only have 1 item, so just extract the item 

    return uridict 

# Create an rdd of Row type, using the dictionary as kwargs 
uri_allkeys = uri_parsed.map(lambda x: Row(**attrmap(x, possible_keys))) 

その後、最後のものは、新しいNUMに基づいてデータフレーム、テキスト、およびすべての抽出可能な列のスキーマを構築することです。

# Create an item in the schema for the known fields, and each possible key 
df_schema = StructType() 
for possible_key in ['num','text']+possible_keys: 
    df_schema.add(possible_key, StringType(), True) 

# Use the new schema and rdd of rows to create the dataframe 
uri_parsed_df = spark.createDataFrame(uri_allkeys, df_schema) 

これは、データフレームに任意の列を与える必要があります。お役に立てれば!

関連する問題