0

RDD [PersonType] = [pid、cid、firstname、lastname、age、source、sourceType、message]の値はRDD = [1000,100、Vikash、Singh、33 、source、sourceType、message]Cassandra行の設定タイプの更新列

ここでは、[pid、cid、firstname、lastname、age、dept、mrids]としてcsaandra行を設定しています。 cassandraの値が[1000,100、vikash、singh、33、bank、{sourceold.sourceTypeold.messageold}

とすると、古い値と新しい値の両方でcassandra列のmridsを更新したいと考えています。 私は新しいキャサンドラの値を[1000,100、vikash、singh、33、bank、{sourceold.sourceTypeold.messageold、source.sourceType.message}

とする必要があります。

val rdd[personType] = rdd1 
val rdd2 = sc.cassandraTable(keyspace,tablename) 
       .select("p_id","c_id", "mrids") 

これを達成するためには次のコードを書いてください。

答えて

0

これはあなたを始めてくれるはずです。

これは、キーに基づいてrddの結合を行い、別のrddのセットにデータを追加する方法を示しています。最後に

val temp = List((1, 4, Set(1)), 
        (2, 5, Set(2)), 
        (3, 6, Set(3)) 
        ) 
val temp2 = List((1, 11, 11), 
        (2, 11, 22), 
        (3, 11, 33) 
       ) 
val temp_rdd = sc.parallelize(temp) 

val temp2_rdd = sc.parallelize(temp2) 

val test = temp_rdd.map{case(key, data, set)=>((key),(data, set))} 
         .join(temp2_rdd.map{case(key, data, set_new_value)=>((key),(data, set_new_value))}) 
         .map{case(key, ((data1, set),(data2, set_new_value)))=>(key, set.toSet + set_new_value)} 


test.collect().foreach(println) 

あなたはRDDの結果セットを保存するためにrdd.saveToCassandraを使用することができます。

関連する問題