2017-07-05 18 views
0

今、私は下のコードでhdfsからデータを読み込もうとしましたが、fucntion 'jsonParse'を使用して通常のcsvに値を連結しようとしました。この方法はうまくいかないようですが、レコードの一部を印刷しようとすると、 'data'変数がPipelinedRDDでないことがわかります。どのようにして 'data'(通常RDD)したい、感謝:PipelinedRDDと通常のRDDの違いは何ですか?

def jsonParse(x): 
    s=json.loads(x) 
    print "ssssssssssss"+s['age']+","+s['sex']+","+s['xueya']+","+s['danguchun']+","+s['na']+","+s['k']+","+s['yaowu'] 
    return s['age']+","+s['sex']+","+s['xueya']+","+s['danguchun']+","+s['na']+","+s['k']+","+s['yaowu'] 

conf = SparkConf() 
sc = SparkContext(conf=conf) 
hc = HiveContext(sc) 
#json=sc.textFile('hdfs://hacluster/new') 
json=hc.sql("select * from default.test_yj_200 limit 1000").toJSON() 

data=json.map(jsonParse) 

答えて

1

PipelinedRDD

PipelinedRDD操作がパイプライン化し、作業者に送信されます。コードは上から下に実行されます。これはRDDのサブクラスです。

RDD

平行に上に加工することができる要素の定数、分割されたコレクションを表します。

関連する問題