0
こんにちは私は、あるテーブルに挿入した後にカフカに情報を送る小さいカサンドラトリガーを書いています。ここに私のトリガーのコードは次のとおりです。cassandraトリガー作成時のjava.lang.NoClassDefFoundError
public class InsertDataTrigger implements ITrigger {
public Collection<Mutation> augment(Partition update) {
//checking if trigger works and some debug info;
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println("Hello " + dateFormat.format(new Date()));
System.out.println("This Insert Data Trigger");
System.out.println("default charset " + Charset.defaultCharset()); //IMPORTANT check if it's important
//here we're gonna build the message to kafka based on inserted data
try {
UnfilteredRowIterator it = update.unfilteredIterator();
CFMetaData cfMetaData = update.metadata();
System.out.println("PartitionKey " + new String(update.partitionKey().getKey().array()));
System.out.println("update.metadata().clusteringColumns().toString() " + update.metadata().clusteringColumns().toString());
while (it.hasNext()) {
JSONObject message = new JSONObject();
Unfiltered un = it.next();
Clustering clt = (Clustering) un.clustering();
message.put("partitionkey", new String(update.partitionKey().getKey().array()));
System.out.println("clt.toString(cfMetaData) " + clt.toString(cfMetaData));
System.out.println("clt.getRawValues() " + new String(clt.getRawValues()[0].array()));
System.out.println("partition.columns().toString() " + update.columns().toString());
message.put("datetime", new String(clt.getRawValues()[0].array()));
Iterator<Cell> cells = update.getRow(clt).cells().iterator();
while (cells.hasNext()) {
Cell cell = cells.next();
System.out.println("cell.column().name.toString() " + cell.column().name.toString());
System.out.println("cell.toString()" + cell.toString());
Double x = cell.value().getDouble();
System.out.println("cell.value().getDouble() " + x);
//if(cell.column().name.toString() == "value")
System.out.println(x);
message.put(cell.column().name.toString(), x);
//else
// message.put(cell.column().name.toString(),cell.value().toString());
}
System.out.println("un.toString()" + un.toString(cfMetaData));
if (!message.isEmpty()) {
System.out.println(message.toString());
//Sending data to kafka
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("test", message.toString()));//move topic name to some properties
producer.close();
}
}
} catch (Exception e) {
e.printStackTrace();
}
return Collections.emptyList();
} }
そして、ここでは私のポンポンファイルは次のとおりです。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<modelVersion>4.0.0</modelVersion>
<groupId>io.github.carldata</groupId>
<artifactId>InsertDataTrigger</artifactId>
<version>1.0</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all -->
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>
</project>
プロジェクトは罰金構築し、jarファイルを作成しますが、私はカサンドラにトリガーを作成しようとすると、それが上記の例外で失敗します。
非常に私はuber jarを作成して、問題を解決しました。 – CodeDog