2016-06-30 8 views
0

を用いてストリーミングでIは、JSONのDATAFRAMEにRDDをスパークストリーミングのtransform()関数内ドキュメントを変換するtoJSON()方法を用います。to.JSON()スパークpyspark

私は次のようにコーディングするためpysparkを使用しています:

def process(rdd): 
    rddDataframe = sqlContext.createDataFrame(rdd) 
    rddback = rddDataFrame.toJSON() 
return rdd 

dstream_test = dstream_in.transform(lambda rdd: process(rdd)) 

をしかし、私は次のエラーを得た:

UnpicklingError: invalid load key, '{' 

この問題を解決する方法を、私を助けてください。

+0

あなたのコードは意味をなさない送信、行ごとに、あなたの変換を定義します。また、データサンプルを提供してもらえますか? – ShuaiYuan

答えて

1

rddを関数に渡さないで、関数を自分のrddに渡します。

は、それが

def transform(row): 
    .... 

your_rdd = your_rdd.map(transform) 
関連する問題