2017-04-06 13 views
1

私はdynamodbにsparkでdataframeを書きたいと思っています。 だから私はrdd.saveAsHadoopDataset(JobConf)を使用しています。しかし、rddタイプはmismatch.itです。これは、タイプがhadoopRDDのrddです。私はrddにデータフレームを変換するのが好きです。df.rddはhadodopRDDではなくrddです。私はspark-scala APIを使用しています.DyanmodbにDataframeを書き込むより良い方法があれば、それは役に立つでしょう。SparkのDataFrameをHadoopRDDに変換する方法

答えて

1

RDDを変換する必要はありません。

Hadoop APIはキーと値のペアを中心に構成されているため、SparkはデータがTuple2オブジェクトに格納されているRDDの周りにPairRDDFunctions(追加の機能を追加)を自動的にラップします。したがって、データをRDD[(T,V)]に入れるだけでよい場合は、saveAsHadoopDatasetメソッドを利用できます。設定が必要なものは何でも

import org.apache.hadoop.mapred.JobConf 
val tupleRDD : RDD[(Int, Int)] = sc.parallelize(Array((1,2), (3,4), (5,6))) 
val jobConf = new JobConf() 

セット:ここ

は一例です。

tupleRDD.saveAsHadoopDataset(jobConf) 
+0

答えてくれてありがとう。上のspark-Scalaの例はありますか?実際には私はスパークの初心者ですので、それは役立つでしょう。事前にありがとうございます。 – Yogesh

+0

どのような種類のRDDが動作するかの例を追加しました。 – jamborta

0

誰かがspark-scalaからdyanmodbにデータフレームを書きたい場合は、次に、以下のことが役に立つかもしれません。

import com.amazonaws.services.dynamodbv2.document.Item 
import com.amazonaws.services.dynamodbv2.document.DynamoDB 

var json_arr=df.toJSON.collect() //Convert dataframe to json array 
val table = dynamoDB.getTable("table_name") //dynamoDB is connection to dynamodb 
for (element <- json_arr) { 
     val item = Item.fromJSON(element) 
     table.putItem(item) 
    } 
+0

dynamoDBはどのようにインスタンス化されましたか? –

関連する問題