2016-04-06 17 views
0

私はユーザー情報が文字列であるデータを扱っています。これらの文字列に固有の整数値を割り当てたいと思います。タプルのRDDでpysparkデータフレーム列を更新する

私はこのスタックオーバーフローポストhereにいくらか従っていました。私は最終的にALSモデルを実行されて何をしたいのか

data = data.map(lambda x: Rating(int(user.lookup(x[0])), int(x[1]), float(x[2]))) 

をした、その後

user = data.map(lambda x:x[0]).distinct().zipWithUniqueId() 

、これまでのところ、私:私はタプルのRDDを持ってするには、以下の式を使用していますこのエラーメッセージが表示されています。

例外:アクションまたはトランスフォーメーションからRDDをブロードキャストするか、RDDを参照しようとしているようです。

データ型が何とか間違っていると思いますが、これを修正する方法がわかりません。

+1

ここ2つの問題があります。最初のものは、DataFrameの値を更新したい、それは不可能です! DataFrameは不変なので、既存のものから新しいものを更新変換で作成する必要があります。次に、RDDを別のRDD変換の中に入れ子にすることはできません。あなたのRDDが小さい場合、あなたは放送変数を考慮するかもしれません。 – eliasah

+0

@eliasahご連絡いただきありがとうございます。 (x [1])、float(x [2]))))))、または作業を行う必要がありますかnewData = data.map(lambda x:Rating(int(user.lookup df = sqlContext.createDataFrame(?、[cols])のようなものですが、私はどこに物を置くかについてはあまりよく分かりません。 2番目の部分については、別のRDD変換の中でRDDをどこに入れ子にしていますか?私のデータは実際にはかなり大きいです。 – user2857014

+1

これはうまくいくかもしれませんが、試してみる必要があります!私はコメントのコードを読むことができません。 2番目の部分については、ユーザーの値はRDDです。ここでRDDをネストしようとしています。 – eliasah

答えて

1

lookupリンクされた回答で示唆されたアプローチは単に無効です。 Sparkはネストされたアクションや変換をサポートしていないので、をmapの中に呼び出すことはできません。データが大きいのであればあなたは、単にjoinと再構築することができます検索に標準のPython dictを使用して処理する:

from operator import itemgetter 
from pyspark.mllib.recommendation import Rating 

data = sc.parallelize([("foo", 1, 2.0), ("bar", 2, 3.0)]) 

user = itemgetter(0) 

def to_rating(record): 
    """ 
    >>> to_rating((("foobar", 99, 5.0), 1000)) 
    Rating(user=1000, product=99, rating=5.0) 
    """ 
    (_, item, rating), user = record 
    return Rating(user, item, rating) 

user_lookup = data.map(user).distinct().zipWithIndex() 

ratings = (data 
    .keyBy(user) # Add user string as a key 
    .join(user_lookup) # Join with lookup 
    .values() # Drop keys 
    .map(to_rating)) # Create Ratings 

ratings.first() 
## Rating(user=1, product=1, rating=2.0) 
関連する問題