2017-03-22 4 views
0

私は 'csv'ファイルをcassandraのテーブルの以下のロジックに従って生成します。スパークCanssandraデータのためのcsvファイルを生成するためのスクリプト

val df = sc.parallelize(Seq(("a",1,"[email protected]"), ("b",2,"[email protected]"),("a",1,"[email protected]"),("a",2,"[email protected]"))).toDF("col1","col2","emailId")

私はロジック下記のとおり 'CSV' ファイルを生成したいです。 3つの異なる 'emailid'があるので、3つの異なる 'csv'ファイルを生成する必要があります。

3つの異なるクエリのための3つのcsvファイル。

select * from table where emailId='[email protected]' select * from table where emailId='[email protected]' select * from table where emailId='[email protected]' どうすればいいですか?誰でもこのことを助けてくれますか?

バージョン: スパーク1.6.2 Scalaの2.10

+0

分析:私がデータフレームを持っていれば、私はそれを以下のようなCSVフォーマットに保存できます。 'df.write .format(" com.databricks.spark.csv ")save(path)'しかし、私はループを繰り返し、各emailIdのcsvファイルを生成したいと思います。 – Ramesh

答えて

1

は、それらを反復処理する電子メールの明確なリストを作成します。反復処理を行う場合は、データフレームと一致する電子メールのみをフィルタリングして、Cassandraに保存します。

import sql.implicits._ 
val emailData = sc.parallelize(Seq(("a",1,"[email protected]"), ("b",2,"[email protected]"),("a",1,"[email protected]"),("a",2,"[email protected]"))).toDF("col1","col2","emailId") 
val distinctEmails = emailData.select("emailId").distinct().as[String].collect 
for (email <- distinctEmails){ 
    val subsetEmailsDF = emailData.filter($"emailId" === email).coalesce(1) 
    //... Save the subset dataframe to cassandra 
} 

注意:coalesce(1)はすべてのデータを1つのノードに送信します。データフレームが大きすぎると、メモリの問題が発生することがあります。

+0

@ Jeremy、あなたの返事に感謝します...あなたの上記は確かに私を助けているはずです。でも、私はemailidのリストを集め、 'df.select(" emailId ")。distinct.collectAsList()'のように繰り返しました。 – Ramesh

+0

実際には、各繰り返しでデータフレームをcsvファイルに保存する必要があります.Cassandraに保存する必要はありません。コードが役立つはずです。ありがとうございました – Ramesh

+0

私は助けてくれてうれしいです。答えがあなたの質問を満たしている場合は、答えの隣にあるチェックマークをクリックして回答を受け入れることを検討してください。回答を受け入れることを灰色にしてから埋めてください。 – Jeremy

関連する問題