1

keyに指定されたatのパスに値に格納された値のパーケットファイルを作成したいとします。パス/ユーザー/ dir_aでそうRDDのキーで指定されたパスにrddの値を書き込む

#key is path where the value is to written 
rdd = sc.parallelize([('/user/dir_a','tableA'),('/user/dir_b','tableB'),('/user/dir_c','tableC')]) 

、Aは私が何をしたか

を書かれている:

def writeToHdfs(x): 
    path = x[0] 
    outputpath = OUT_DIR + path 
    log.info('Creating dataframe') 
    s = SparkSession(sc) 
    df = s.createDataFrame(x[1], schema)) 
    df.write.parquet(outputpath) 

rdd.foreach(writeToHdfs) 

感謝。

答えて

1

私は、このシナリオのためのボックスソリューションはないと信じています。コードはScalaにありますが、ロジックはPythonでも同じです。

val baseRDD = sc.parallelize(Seq(("/user/dir_a", "tableA"), ("/user/dir_b", "tableB"), ("/user/dir_c", "tableC"))).cache() 

    val groupedRDD = baseRDD.groupByKey() 

    //Bring the keys to driver.Its little expensive operation 
    // but we need keys(paths) after all. 
    val keys = groupedRDD.keys.collect() 

    //Create RDDs specific to ur paths 
    val rddList = keys.map { key => 

     val rdd = baseRDD.filter(f => f._1.==(key)) 

     (key, rdd) 
    } 

    //Now you have list of RDDs specific to paths. iterate each RDD and save them to file 
    rddList.foreach(f => { 

     val path = f._1 
     f._2.values.saveAsTextFile(path) 
    }) 

注:パフォーマンスを向上させる必要があると思われる場合は、RDDをキャッシュしてください。 saveAsTextFile(...)をそれぞれの方法に置き換えてください。

+0

rddListの作成中に次のエラーが発生しました。スパークのように見えるマップ内のフィルタ機能が好きではない。オブジェクトをシリアル化できませんでした:例外:RDDをブロードキャストしようとしているか、アクションまたはトランスフォーメーションからRDDを参照しようとしています。 RDD変換とアクションは、ドライバによってのみ呼び出され、他の変換の内部では呼び出されません。たとえば、rdd1.map(lambda x:rdd2.values.count()* x)は、rdd1.map変換内で値の変換およびカウント動作を実行できないため無効です。詳細は、SPARK-5063を参照してください。 – Patel

+1

keys.map {}はスパークマップではなく、Scalaコレクションマップです。私は答えを投稿する前に、問題なく私のために働いた... – BDR

+0

ああ。テキストファイルでも動作しますが、dataframeを使用しているので、createDataframeは例外を発生させます。オリジナルのrdd.take(1)は、[path、[[row1]、[row2]]]を返します。元のrddの元の1つの値が子のrddに複数の値として存在するようにします。rddListの新しいrddには、テーブルに4つの行があるので4つのキー、値のペアが必要です。 – Patel

関連する問題