1
keyに指定されたatのパスに値に格納された値のパーケットファイルを作成したいとします。パス/ユーザー/ dir_aでそうRDDのキーで指定されたパスにrddの値を書き込む
#key is path where the value is to written
rdd = sc.parallelize([('/user/dir_a','tableA'),('/user/dir_b','tableB'),('/user/dir_c','tableC')])
、Aは私が何をしたか
を書かれている:
def writeToHdfs(x):
path = x[0]
outputpath = OUT_DIR + path
log.info('Creating dataframe')
s = SparkSession(sc)
df = s.createDataFrame(x[1], schema))
df.write.parquet(outputpath)
rdd.foreach(writeToHdfs)
感謝。
rddListの作成中に次のエラーが発生しました。スパークのように見えるマップ内のフィルタ機能が好きではない。オブジェクトをシリアル化できませんでした:例外:RDDをブロードキャストしようとしているか、アクションまたはトランスフォーメーションからRDDを参照しようとしています。 RDD変換とアクションは、ドライバによってのみ呼び出され、他の変換の内部では呼び出されません。たとえば、rdd1.map(lambda x:rdd2.values.count()* x)は、rdd1.map変換内で値の変換およびカウント動作を実行できないため無効です。詳細は、SPARK-5063を参照してください。 – Patel
keys.map {}はスパークマップではなく、Scalaコレクションマップです。私は答えを投稿する前に、問題なく私のために働いた... – BDR
ああ。テキストファイルでも動作しますが、dataframeを使用しているので、createDataframeは例外を発生させます。オリジナルのrdd.take(1)は、[path、[[row1]、[row2]]]を返します。元のrddの元の1つの値が子のrddに複数の値として存在するようにします。rddListの新しいrddには、テーブルに4つの行があるので4つのキー、値のペアが必要です。 – Patel