私はカフカとカフカ - スパークストリーミングをアップグレードしましたが、いくつかの方法を変更しながらいくつかの挑戦に直面しています。 KafkaUtilsのようにエラーを投げているだけでなく、Iteratorもエラーを投げています。カフカのバージョンは0.10.1.1です。 誰かが、これらの変更をどのように修正すればいいのか考えている方は、 おかげ最新のカフカバージョン0.10.1.1でKafkaUtilsの代わりに使用されるもの
-1
A
答えて
0
KafkaUtilsは、Apacheスパークストリーミング、Apacheのカフカ
org.apache.spark.streaming.kafka.KafkaUtils
0
KafkaUtilsの以前のパッケージだった「org.apache.sparkのない部分の一部であります.streaming.kafka "。最新のパッケージは "org.apache.spark.streaming.kafka010"です。 kafkaparamsとトピックの詳細を設定し
、次のコードスニペットを確認し、さらに参考のために
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
})
、次のリンクhttps://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
関連する問題
- 1. 1.4.0.RELEASEで廃止されたSpringBootServletInitializerの代わりに使用するもの
- 2. .NET CFでHttpUtility.UrlEncodeの代わりに使用できるもの
- 3. アンドロイドスタジオのreadLine()の代わりに使用できるもの
- 4. Reactのテキストエリアでwindow.getLocation()の代わりに使用するもの?
- 5. Java JNAでLPTSTRの代わりに使用するものは?
- 6. 新しいRetrofit 2 APIのRetrofitError.Kind.NETWORKの代わりに使用するもの
- 7. Swift 3.1のNSEntityDescriptionの代わりに使用するもの
- 8. API 26のandroid.app.action.ENTER_CAR_MODEの代わりに使用するもの
- 9. XMLBeansの代わりに、廃止されたものを使用しますか?
- 10. com.sun.faces.enableRestoreView11Compatibility JSF 1.2の代わりに使用するもの
- 11. $ .getJSON()の代わりに使用するもの
- 12. 静的変数の代わりに使用するもの
- 13. GraphDatabaseSettings.BoltConnectorの代わりに使用するもの
- 14. respond_withの代わりに使用するもの
- 15. Google Transliterate APIの代わりに使用するもの
- 16. nodeSetの代わりに使用するもの
- 17. openWRTのhostapd新しいものの代わりに最後の設定フィールドを使用
- 18. 新しいスレッドの代わりにサーバーソケットクライアント用のExecutorを使用
- 19. 新しいバッファ( "my string"、 "binary")の代わりになるもの
- 20. Oracle SQL:テーブル名の代わりに使用される変数
- 21. なぜ使用されるビューの代わりに、SQL文
- 22. AntiXssEncoder.UrlEncodeを使用している場合、スペースの代わりに%20の代わりに+を使用する最も良い方法
- 23. TThreadの代わりになるもの
- 24. UIAccessibilityVoiceOverStatusChangedの代わりになるもの
- 25. http.request()の代わりになるもの
- 26. thread.sleepの代わりになるもの
- 27. divの代わりにdivの代わりにJSのプリペンドを使用する
- 28. Goにタイプ名の代わりにマップリテラルで使用できるものは?
- 29. ASP.NET MVC5 Webアプリケーションの代わりにMessageBox.Showの代わりになるもの
- 30. 新しいパラメータの代わりに既存のメソッドを使用
を訪問しかし、その一つは、いくつかのエラーを投げているし、我々は変更する必要がある理由ですkafkaのutilsとパッケージのパラメータも。そうすれば、トピックとkafkaparamsのkafkaUtilsとパラメータの新しいパッケージは何かを教えてください。それが良いでしょう。 – Abhishek