2016-07-01 7 views
0

Spark StreamingでJavaを使用してKafkaからJSONデータを取り出し、JSONをカスタムクラス(Transaction)に解析して挿入しますデータをCassandraテーブルに追加することはできませんが、mapToRow()機能を動作させることができません。スパークストリーミング - Java - KafkaからCassandraにJSONを挿入

私はあなたがしなければならないすべてはこの線に沿って何かあると言う例のトンを見てきました:

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
     streamingContext, 
     String.class, 
     String.class, 
     StringDecoder.class, 
     StringDecoder.class, 
     kafkaParams, 
     topicsSet 
); 

JavaDStream<String> lines = stream.map(
    new Function<Tuple2<String,String>, String>(){ 
     @Override 
     public String call(Tuple2<String,String> tuple2) { 
      return tuple2._2(); 
     } 
    } 
); 

javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra(); 

しかし、私はこれを行うとき、私はエラーを取得:

The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions 

私は不足しているのは私の授業で何らかの装飾があると思っていますが、私はどれを見つけ出すことに成功していません。私はまた、公共のget/setメソッドを確立しようとした

@Table(keyspace = "myKeyspace", name = "myTableName", 
     readConsistency = "QUORUM", 
     writeConsistency = "QUORUM", 
     caseSensitiveKeyspace = false, 
     caseSensitiveTable = false) 
public class Transaction implements java.io.Serializable{ 

    @PartitionKey(0) 
    @Column(name="transaction_id") 
    public int TransactionId; 
    ... 

    public Transaction(){} 
} 

:私はDataStaxマッピング注釈のすべてを試した

public class Transaction implements java.io.Serializable{ 

    public int TransactionId; 
    ... 

    public Transaction(){} 
} 

:私は基本的にクラスにプロパティバッグを作り、裸の骨を行く試みました各プロパティのために、民間へのプロパティの設定:

public class Transaction implements java.io.Serializable{ 

    private int transactionId; 
    ... 

    public Transaction(){} 

    public int getTransactionId() { 
     return transactionId; 
    } 

    public void setTransactionId(int transactionId) { 
     this.transactionId = transactionId; 
    } 
} 

私はのRDDDStreamを解析することができました以下のクラスを使用して210:

JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON()); 

しかし、私はこの投稿でそれをしたら、それはまだ動作しません:上からlinesオブジェクトに作用する次のコードと併せて

public class Transaction implements java.io.Serializable{ 

    ... 

    public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> { 
     public Iterable<Transaction> call(Iterator<String> lines) throws Exception { 
      ArrayList<Transaction> transactions = new ArrayList<Transaction>(); 
       ObjectMapper mapper = new ObjectMapper(); 
       while (lines.hasNext()) { 
        String line = lines.next(); 
        try { 
         transactions.add(mapper.readValue(line, Transaction.class)); 
        } catch (Exception e) { 
         System.out.println("Skipped:" + e); 
        } 
       } 

       return transactions; 
     } 
    } 
} 

、 writeBuilder()。saveToCassandra()チェーンを使用します。

ここのお手伝いをよろしくお願いいたします。

答えて

0

この問題は単なるインポートの問題であることが判明しました。私はcom.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*をインポートしていましたが、必要なものすべてを私に渡すと思っていましたが、.mapToRow()関数のためにcom.datastax.spark.connector.japi.CassandraJavaUtil.*も必要でした。

私はこれを解決したら、私は次のエラーを取得し始めた:

スパーク-SQLプロジェクトに引っ張ることで解決された
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$ 
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5) 
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala) 
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38) 
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10) 
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93) 
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala) 
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204) 
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222) 
    at globalTransactions.Process.main(Process.java:77) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$ 
    at java.net.URLClassLoader.findClass(Unknown Source) 
    at java.lang.ClassLoader.loadClass(Unknown Source) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
    at java.lang.ClassLoader.loadClass(Unknown Source) 
    ... 9 more 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-sql_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

が、これは隣の男/ギャルのお役に立てば幸いです。

関連する問題