2016-10-12 2 views
6

私のCassandraデータベースにデータを書き込もうとしましたが、エラーが発生しました。Java、Spark and Cassandra java.lang.ClassCastException:com.datastax.driver.core.DefaultResultSetFutureを陰影にキャストできません

私がここに来た何

: 1)Dictionary.java

package com.chatSparkConnactionTest; 

import java.io.Serializable; 

public class Dictionary implements Serializable{ 
private String value_id; 
private String d_name; 
private String d_value; 

public Dictionary(){} 

public Dictionary (String value_id, String d_name, String d_value) { 
    this.setValue_id(value_id); 
    this.setD_name(d_name); 
    this.setD_value(d_value); 
} 

public String getValue_id() { 
    return value_id; 
} 
public void setValue_id(String value_id) { 
    this.value_id = value_id; 
} 
public String getD_name() { 
    return d_name; 
} 
public void setD_name(String d_name) { 
    this.d_name = d_name; 
} 
public String getD_value() { 
    return d_value; 
} 
public void setD_value(String d_value) { 
    this.d_value = d_value; 
    } 
} 

私のメインクラス:

package com.chatSparkConnactionTest; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import java.io.Serializable; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 

import com.datastax.spark.connector.japi.CassandraJavaUtil; 
import com.datastax.spark.connector.japi.CassandraRow; 
import com.datastax.spark.connector.japi.SparkContextJavaFunctions; 
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD; 
import com.datastax.driver.core.Session; 
import com.datastax.spark.connector.cql.CassandraConnector; 
import com.datastax.spark.connector.japi.CassandraRow; 
import com.google.common.base.Objects; 

import org.apache.avro.data.Json; 
import org.apache.hadoop.util.StringUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
//import org.apache.spark.sql.SchemaRDD; 
//import org.apache.spark.sql.cassandra.CassandraSQLContext; 

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Date; 
import java.util.List; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; 
import com.datastax.spark.connector.japi.CassandraRow; 
import com.fasterxml.jackson.core.JsonParseException; 
import com.fasterxml.jackson.databind.JsonNode; 
import org.apache.spark.api.java.function.Function; 


public class JavaDemoRDDWrite implements Serializable { 
    private static final long serialVersionUID = 1L; 
    public static void main(String[] args) { 

     SparkConf conf = new SparkConf(). 
      setAppName("chat"). 
      setMaster("local"). 
      set("spark.cassandra.connection.host", "127.0.0.1"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 

     List<Dictionary> dictionary = Arrays.asList(
       new Dictionary("7", "n1", "v1"), 
       new Dictionary("8", "n2", "v2"), 
       new Dictionary("9", "n3", "v3") 
     ); 
     for (Dictionary dictionaryRow : dictionary) { 
      System.out.println("id: " + dictionaryRow.getValue_id()); 
      System.out.println("name: " + dictionaryRow.getD_name()); 
      System.out.println("value: " + dictionaryRow.getD_value()); 
     } 

     JavaRDD<Dictionary> rdd = sc.parallelize(dictionary); 
     System.out.println("Total rdd rows: " + rdd.collect().size()); 
     javaFunctions(rdd) 
     .writerBuilder("chat", "dictionary", 
       mapToRow(Dictionary.class)) 
     .saveToCassandra(); 

    }; 
} 

のpom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>chat_connaction_test</groupId> 
    <artifactId>ChatSparkConnectionTest</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
<dependencies> 
    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-core</artifactId> 
     <version>3.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.11</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.11</artifactId> 
     <version>2.0.0-M3</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

</dependencies> 
</project> 

そしてここでは、エラーのテキストです。

java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture 
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
16/10/11 17:43:03 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture 
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

16/10/11 17:43:03 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 
16/10/11 17:43:03 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/10/11 17:43:03 INFO TaskSchedulerImpl: Cancelling stage 1 
16/10/11 17:43:03 INFO DAGScheduler: ResultStage 1 (runJob at RDDFunctions.scala:37) failed in 0.274 s 
16/10/11 17:43:03 INFO DAGScheduler: Job 1 failed: runJob at RDDFunctions.scala:37, took 0.291592 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture 
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904) 
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:61) 
    at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:486) 
    at com.chatSparkConnactionTest.JavaDemoRDDWrite.main(JavaDemoRDDWrite.java:69) 
Caused by: java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture 
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

エラーが発生した場合でも、システムはRDDからデータベーステーブルに最初の値を挿入します。他の2行は無視されました。

しかし、念のために、これは私のカサンドラテーブルです:

CREATE TABLE dictionary (
    value_id text, 
    d_value  text, 
    d_name text, 
    PRIMARY KEY (value_id, d_name) 
) WITH comment = 'dictionary values' 
AND CLUSTERING ORDER BY (d_name ASC); 

更新のpom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>chat_connaction_test</groupId> 
    <artifactId>ChatSparkConnectionTest</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
<dependencies> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.11</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.11</artifactId> 
     <version>2.0.0-M3</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

</dependencies> 
</project> 
+0

この問題の解決策が見つかったのか不思議です。それはSpark 2.1.0、spark-cassandra-connector 2.0.1であり、記述したのとまったく同じ問題です。 –

答えて

11

あなたのpom.xmlから以下 "カサンドラ・ドライバ・コア" の依存関係を削除します問題の原因となっているファイルを削除します。 Cassandra DBとやりとりするためには、スパークの依存関係に加えて、 "spark-cassandra-connector"依存関係が必要です。

<dependency> 
    <groupId>com.datastax.cassandra</groupId> 
    <artifactId>cassandra-driver-core</artifactId> 
    <version>3.1.0</version> 
</dependency> 
+0

同じ結果を試しました:java.lang.ClassCastException:com.datastax.driver.core.DefaultResultSetFutureをshade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFutureにキャストできません –

+0

これは更新されたpomです.xml –

+0

とにかく、ありがとうございました! –

関連する問題