2017-08-25 13 views
0

こんにちは私は、あるテーブルに挿入した後にカフカに情報を送る小さいカサンドラトリガーを書いています。ここに私のトリガーのコードは次のとおりです。cassandraトリガー作成時のjava.lang.NoClassDefFoundError

public class InsertDataTrigger implements ITrigger { 

    public Collection<Mutation> augment(Partition update) { 

     //checking if trigger works and some debug info; 
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); 
     System.out.println("Hello " + dateFormat.format(new Date())); 
     System.out.println("This Insert Data Trigger"); 
     System.out.println("default charset " + Charset.defaultCharset());  //IMPORTANT check if it's important 

     //here we're gonna build the message to kafka based on inserted data 
     try { 
      UnfilteredRowIterator it = update.unfilteredIterator(); 
      CFMetaData cfMetaData = update.metadata(); 

      System.out.println("PartitionKey " + new String(update.partitionKey().getKey().array())); 
      System.out.println("update.metadata().clusteringColumns().toString() " + update.metadata().clusteringColumns().toString()); 

      while (it.hasNext()) { 
       JSONObject message = new JSONObject(); 

       Unfiltered un = it.next(); 
       Clustering clt = (Clustering) un.clustering(); 

       message.put("partitionkey", new String(update.partitionKey().getKey().array())); 

       System.out.println("clt.toString(cfMetaData) " + clt.toString(cfMetaData)); 
       System.out.println("clt.getRawValues() " + new String(clt.getRawValues()[0].array())); 
       System.out.println("partition.columns().toString() " + update.columns().toString()); 

       message.put("datetime", new String(clt.getRawValues()[0].array())); 

       Iterator<Cell> cells = update.getRow(clt).cells().iterator(); 

       while (cells.hasNext()) { 
        Cell cell = cells.next(); 
        System.out.println("cell.column().name.toString() " + cell.column().name.toString()); 
        System.out.println("cell.toString()" + cell.toString()); 
        Double x = cell.value().getDouble(); 
        System.out.println("cell.value().getDouble() " + x); 
        //if(cell.column().name.toString() == "value") 
        System.out.println(x); 
        message.put(cell.column().name.toString(), x); 
        //else 
        // message.put(cell.column().name.toString(),cell.value().toString()); 
       } 
       System.out.println("un.toString()" + un.toString(cfMetaData)); 

       if (!message.isEmpty()) { 
        System.out.println(message.toString()); 

        //Sending data to kafka 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", "localhost:9092"); 
        props.put("acks", "all"); 
        props.put("retries", 0); 
        props.put("batch.size", 16384); 
        props.put("linger.ms", 1); 
        props.put("buffer.memory", 33554432); 
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

        Producer<String, String> producer = new KafkaProducer<>(props); 
        producer.send(new ProducerRecord<>("test", message.toString()));//move topic name to some properties 
        producer.close(); 
       } 


      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     return Collections.emptyList(); 
    } } 

そして、ここでは私のポンポンファイルは次のとおりです。

<?xml version="1.0" encoding="UTF-8"?> 
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.1</version> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 

    <modelVersion>4.0.0</modelVersion> 

    <groupId>io.github.carldata</groupId> 
    <artifactId>InsertDataTrigger</artifactId> 
    <version>1.0</version> 

    <dependencies> 
     <!-- https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all --> 
     <dependency> 
      <groupId>org.apache.cassandra</groupId> 
      <artifactId>cassandra-all</artifactId> 
      <version>3.11.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.11.0.0</version> 
     </dependency> 
    </dependencies> 

</project> 

プロジェクトは罰金構築し、jarファイルを作成しますが、私はカサンドラにトリガーを作成しようとすると、それが上記の例外で失敗します。

答えて

2

ほとんどの場合、kafka-clients jarはCassandra libディレクトリにありません。あなたのプロジェクトがその依存関係を含んでいない限り(すなわちfat/uber jarを構築する)。

は、kafka-clients jarとCassandra依存関係の依存関係に問題があります。特にorg.xerial.snappy snappy-javaには異なるバージョンがあります。それはうまくいくかもしれませんが、それを見て何かを探してください。問題があれば、Kafkaクライアントの瓶を自分のものにすることができます。

+0

非常に私はuber jarを作成して、問題を解決しました。 – CodeDog

関連する問題