2017-09-28 11 views
1

私はカフカからスパークダイレクト・ストリームを作成しようとしていますが、directStreamオブジェクトを作成しながら、私のようにエラーを取得しています:スパークDirectStream問題

kafkaUtilsは、HashMapのの(1には適用されませんタイプ内のメソッドcreateDirectStream私が渡しているパラメータ)。 JavaPairInputDStream directKafkaStream = KafkaUtils:この行で

createDirectStream(SSC、String.class、 String.class、StringDecoder.class、StringDecoder.class、kafkaParams、トピック)。

全コード:あなたのコードで

package kafkatest2; 



import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set; 

import org.apache.commons.codec.StringDecoder; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.kafka010.*; 
public class SparkStream { 

    public static void main(String[] args) { 

     SparkConf conf = new SparkConf() 
       .setAppName("kafka-sandbox") 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     // TODO: processing pipeline 
     Map<String,String> kafkaParams = new HashMap<String,String>(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 

     Set<String> topics = Collections.singleton("topic5"); 

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class, 
     String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 


     directKafkaStream.foreachRDD(rdd -> { 
      System.out.println("--- New RDD with " + rdd.partitions().size() 
        + " partitions and " + rdd.count() + " records"); 
      rdd.foreach(record -> System.out.println(record._2)); 
     }); 
     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 
+0

あなたはスパークやカフカのどのバージョンを使用していますか? – himanshuIIITian

+0

スパークバージョン2.2.0ハハハとカフカ0.11.0.1 –

+0

..ですこれはあまりにも私と一緒に起こった... :)それコンパイラはmethodX(A、B)を言って文句methodsX(A、B)と互換性がありません。ほとんどの場合、クラス名は同じですが、パッケージは異なります。 :P –

答えて

0

、間違ったStringDecoderが使用されています。 org.apache.commons.codec.StringDecoderの代わりにkafka.serializer.StringDecoderにする必要があります。次のように

正しいコードは次のとおりです。

package kafkatest2; 



import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set; 

import kafka.serializer.StringDecoder; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.kafka010.*; 
public class SparkStream { 

    public static void main(String[] args) { 

     SparkConf conf = new SparkConf() 
       .setAppName("kafka-sandbox") 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     // TODO: processing pipeline 
     Map<String,String> kafkaParams = new HashMap<String,String>(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 

     Set<String> topics = Collections.singleton("topic5"); 

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class, 
     String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 


     directKafkaStream.foreachRDD(rdd -> { 
      System.out.println("--- New RDD with " + rdd.partitions().size() 
        + " partitions and " + rdd.count() + " records"); 
      rdd.foreach(record -> System.out.println(record._2)); 
     }); 
     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 

私はそれが役に立てば幸い!

+1

ありがとう..それは働いた。 –

関連する問題