2017-03-26 4 views
0

Cassandraの一部のデータに対してスパークジョブを実行しようとしています。 私は手持ちのキー(パーティションとクラスタリング列)のRDDを持っています。私はそれらのキーでのみ自分の仕事をしたいと思います。 19Spark Cassandraコネクタがクラスタリングキーで結合する

java.lang.IllegalArgumentException: ckey1 is not a column defined in this metadata 

私のテーブルのスキーマは以下の通りである:私はBoundStatementBuilderで次のエラーが表示されるよりも、

type CassandraKey = (String, String, String, String) 
val columns = SomeColumns(ColumnName("pkey1"),ColumnName("pkey2"),ColumnName("pkey3"),ColumnName("ckey1")) 
val repartitionedKeys: CassandraPartitionedRDD[CassandraKey] = keys.repartitionByCassandraReplica("keyspace", "table", partitionKeyMapper = columns) 
val selectedRows: CassandraJoinRDD[CassandraKey, CassandraRow] = 
    repartitionedKeys.joinWithCassandraTable[CassandraRow](keyspace, table).on(joinColumns = columns) 
selectedRows.collect() 

コードを見てみると
CREATE TABLE "keyspace".table (
pkey1 text, 
pkey2 text, 
pkey3 text, 
ckey1 text, 
ckey2 text, 
ckey3 timestamp, 
data text, 
PRIMARY KEY ((pkey1, pkey2, pkey3), ckey1, ckey2, ckey3) 
) 

私はBoundStatementBuilderのcolumntypesのでそれを見ることができるがされていますReplicaLocator.keyByReplicasで開始されたダミークエリから解決されました。この問合せは、表からパーティション・トークンを検索するために使用され、パーティション・キーでのみ作成されたwhere句を検索します。

さらに、RDDFunction.repartitionByCassandraReplica:183では、指定されたpartitionKeyMapperは無視されますが、問題が発生しないようです。

私はコネクタのバージョン1.5.1

答えて

1

「再分割」の部分を使用していますすることだけなので、そこの列を指定していないか、あなただけのパーティション・キー列を選択しない場合、パーティションキーにすることができます。 joinWithCassandraTable呼び出しですべての結合列のみを指定してください。

+0

RDDアクションでのみ発生する致命的なエラーメッセージ....メソッド呼び出しでスローされる可能性があります。 –

+0

Sparkが遅延しています。スキーマは実行時まで認識されません。つまり、アクションが – RussS

+0

と呼ばれていますが、使用されたダミークエリはメソッド呼び出しで実行できます。 –

関連する問題