0

私のcassandra CFはパーティションキーとして日付とIDを持っています。 クエリー中に私は日付しか知りませんので、IDの範囲をループします。Cassandra Spark Connector

私の質問は、コネクタが次のコードをどのように実行するかを中心にしています。

SparkDriverコードがどのように見える -

SparkConf conf = new SparkConf().setAppName("DemoApp") 
.conf.setMaster("local[*]") 
.set("spark.cassandra.connection.host", "10.*.*.*") 
.set("spark.cassandra.connection.port", "*"); 

JavaSparkContext sc = new JavaSparkContext(conf); 
SparkContextJavaFunctions javaFunctions = CassandraJavaUtil.javaFunctions(sc); 

String date = "23012017"; 

for(String id : idlist) { 

JavaRDD<CassandraRow> cassandraRowsRDD = 

javaFunctions.cassandraTable("datakeyspace", "sample2") 
      .where("date = ?",date) 
      .where("id = ? ", id) 
      .select("data"); 

cassandraRowsRDDList.add(cassandraRowsRDD); 
} 

List<CassandraRow> collectAllRows = new ArrayList<CassandraRow>(); 
     for(JavaRDD<CassandraRow> rdd : cassandraRowsRDDList){ 
      //do transformations 

      collectAllRows.addAll(rdd.collect()); 
    } 

1)私はidlistの上にIループあればお聞きしたかったまず第一に、これが効率的になり、idlistは増え続ける可能性があります1000個の要素を持っていると言いますか?各選択クエリをクラスタ内でどのように配布するか、特にCassandra DB接続をどのように維持するのか?

2)私のドライバプログラムでループした後、私はすべての行をリストに入れて、各行に変換を適用し、重複を除外します。これはクラスターに点火することによっても配布されるのでしょうか、それともドライバー側で行われますか?

親切に助けてください!

答えて

0

spark cassandraコネクタが提供するより良い方法があります。 (date、id)のrddを作成し、dateおよびid列に対してjoinWithCassandraTable関数を呼び出すことができます。コネクターそれはスマートにすべてのデータがワーカーだけによって取り出され、シャッフルがなくても各ワーカーはそれが持つ日付とIDのデータだけをフェッチします。

関連する問題