Spark StreamingでJavaを使用してKafkaからJSONデータを取り出し、JSONをカスタムクラス(Transaction
)に解析して挿入しますデータをCassandraテーブルに追加することはできませんが、mapToRow()
機能を動作させることができません。スパークストリーミング - Java - KafkaからCassandraにJSONを挿入
私はあなたがしなければならないすべてはこの線に沿って何かあると言う例のトンを見てきました:
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String> lines = stream.map(
new Function<Tuple2<String,String>, String>(){
@Override
public String call(Tuple2<String,String> tuple2) {
return tuple2._2();
}
}
);
javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();
しかし、私はこれを行うとき、私はエラーを取得:
The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions
私は不足しているのは私の授業で何らかの装飾があると思っていますが、私はどれを見つけ出すことに成功していません。私はまた、公共のget/setメソッドを確立しようとした
@Table(keyspace = "myKeyspace", name = "myTableName",
readConsistency = "QUORUM",
writeConsistency = "QUORUM",
caseSensitiveKeyspace = false,
caseSensitiveTable = false)
public class Transaction implements java.io.Serializable{
@PartitionKey(0)
@Column(name="transaction_id")
public int TransactionId;
...
public Transaction(){}
}
:私はDataStaxマッピング注釈のすべてを試した
public class Transaction implements java.io.Serializable{
public int TransactionId;
...
public Transaction(){}
}
:私は基本的にクラスにプロパティバッグを作り、裸の骨を行く試みました各プロパティのために、民間へのプロパティの設定:
public class Transaction implements java.io.Serializable{
private int transactionId;
...
public Transaction(){}
public int getTransactionId() {
return transactionId;
}
public void setTransactionId(int transactionId) {
this.transactionId = transactionId;
}
}
私はのRDD
にDStream
を解析することができました以下のクラスを使用して210:
JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());
しかし、私はこの投稿でそれをしたら、それはまだ動作しません:上からlines
オブジェクトに作用する次のコードと併せて
public class Transaction implements java.io.Serializable{
...
public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
ArrayList<Transaction> transactions = new ArrayList<Transaction>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
try {
transactions.add(mapper.readValue(line, Transaction.class));
} catch (Exception e) {
System.out.println("Skipped:" + e);
}
}
return transactions;
}
}
}
、 writeBuilder()。saveToCassandra()チェーンを使用します。
ここのお手伝いをよろしくお願いいたします。