2017-06-19 3 views
-1

私はカフカとカフカ - スパークストリーミングをアップグレードしましたが、いくつかの方法を変更しながらいくつかの挑戦に直面しています。 KafkaUtilsのようにエラーを投げているだけでなく、Iteratorもエラーを投げています。カフカのバージョンは0.10.1.1です。 誰かが、これらの変更をどのように修正すればい​​いのか考えている方は、 おかげ最新のカフカバージョン0.10.1.1でKafkaUtilsの代わりに使用されるもの

答えて

0

KafkaUtilsは、Apacheスパークストリーミング、Apacheのカフカ

org.apache.spark.streaming.kafka.KafkaUtils

+0

を訪問しかし、その一つは、いくつかのエラーを投げているし、我々は変更する必要がある理由ですkafkaのutilsとパッケージのパラメータも。そうすれば、トピックとkafkaparamsのkafkaUtilsとパラメータの新しいパッケージは何かを教えてください。それが良いでしょう。 – Abhishek

0

KafkaUtilsの以前のパッケージだった「org.apache.sparkのない部分の一部であります.streaming.kafka "。最新のパッケージは "org.apache.spark.streaming.kafka010"です。 kafkaparamsとトピックの詳細を設定し

、次のコードスニペットを確認し、さらに参考のために

import java.util.*; 
import org.apache.spark.SparkConf; 
import org.apache.spark.TaskContext; 
import org.apache.spark.api.java.*; 
import org.apache.spark.api.java.function.*; 
import org.apache.spark.streaming.api.java.*; 
import org.apache.spark.streaming.kafka010.*; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.common.TopicPartition; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import scala.Tuple2; 

Map<String, Object> kafkaParams = new HashMap<>(); 
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); 
kafkaParams.put("key.deserializer", StringDeserializer.class); 
kafkaParams.put("value.deserializer", StringDeserializer.class); 
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); 
kafkaParams.put("auto.offset.reset", "latest"); 
kafkaParams.put("enable.auto.commit", false); 

Collection<String> topics = Arrays.asList("topicA", "topicB"); 

final JavaInputDStream<ConsumerRecord<String, String>> stream = 
    KafkaUtils.createDirectStream(
    streamingContext, 
    LocationStrategies.PreferConsistent(), 
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
); 

stream.mapToPair(
    new PairFunction<ConsumerRecord<String, String>, String, String>() { 
    @Override 
    public Tuple2<String, String> call(ConsumerRecord<String, String> record) { 
     return new Tuple2<>(record.key(), record.value()); 
    } 
    }) 

、次のリンクhttps://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

関連する問題