0

以下は私のコードです。com.datastax.spark.connector.writer.NullKeyColumnException:キー列年の無効なNULL値

directKafkaStream.foreachRDD(rdd -> 
    { 
     rdd.foreach(record -> 
      { 
       messages1.add(record._2); 
      }); 
       JavaRDD<String> lines = sc.parallelize(messages1); 
       JavaPairRDD<Integer, String> data = lines.mapToPair(new PairFunction<String, Integer, String>() 
       { 
        @Override 
        public Tuple2<Integer, String> call(String a) 
        { 
         String[] tokens = StringUtil.split(a, '%'); 
         return new Tuple2<Integer, String>(Integer.getInteger(tokens[3]),tokens[2]); 
        } 
       }); // map to get year and name of the movie 
       Function2<String, String, String> reduceSumFunc = (accum, n) -> (accum.concat(n)); // function for reduce 
       JavaPairRDD<Integer, String> yearCount = data.reduceByKey(reduceSumFunc); // reduceByKey to count 
       javaFunctions(yearCount).writerBuilder("movie_keyspace", "movie_count", mapTupleToRow(Integer.class, String.class)).withColumnSelector(someColumns("year","list_of_movies")).saveToCassandra(); // this is the error line 
      }); 

ここではエラーが発生しています。

com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column year 
    at com.datastax.spark.connector.writer.RoutingKeyGenerator$$anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) 
    at com.datastax.spark.connector.writer.RoutingKeyGenerator.fillRoutingKey(RoutingKeyGenerator.scala:47) 
    at com.datastax.spark.connector.writer.RoutingKeyGenerator.apply(RoutingKeyGenerator.scala:56) 
    at com.datastax.spark.connector.writer.TableWriter.batchRoutingKey(TableWriter.scala:126) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:107) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745)   

説明:

1)JavaRDDを記憶することができるが、CassandraのにJavaPairRDDを格納することができないスパーク

2)を使用してカフカとカサンドラに接続しようと

3)DBエラーが発生した行にコメントを付けました

答えて

0

年のいずれかの値がnullですが、これは許可されていません。データをチェックして、ヌル整数を生成するものを探します。