Spark StreamingからレコードをKafkaにプッシュするサンプルコードを提供できますか?Spark StreamingからKafkaにデータをプッシュ
-1
A
答えて
0
スパークストリーミングを使用すると、カフカのトピックからデータを消費することができます。
あなたはカフカのトピックにレコードを公開したい場合は、カフカProducerを使用することができます[https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example]
それとも、複数のソースのコネクタを使用してカフカトピックにデータを公開するカフカConnectを使用することができます。[http://www.confluent.io/product/connectors/]
ご覧ください。 SparkストリーミングとKafkaの統合の詳細については、以下のリンクを参照してください。
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
0
私はJavaを使用してそれを行っています。この関数をJavaDStream<String>
に渡して、.foreachRDD()
の引数として使用できます。各RDDに対してKafkaProducer
を作成するための最良の方法ではありません。socket example in Spark documentationのようなKafkaProducers
の「プール」を使用してこれを行うことができます。ここで
は私のコードです:
public static class KafkaPublisher implements VoidFunction<JavaRDD<String>> {
private static final long serialVersionUID = 1L;
public void call(JavaRDD<String> rdd) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "loca192.168.0.155lhost:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1000);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
private static final long serialVersionUID = 1L;
public void call(Iterator<String> partitionOfRecords) throws Exception {
Producer<String, String> producer = new KafkaProducer<>(props);
while(partitionOfRecords.hasNext()) {
producer.send(new ProducerRecord<String, String>("topic", partitionOfRecords.next()));
}
producer.close();
}
});
}
}
関連する問題
- 1. Spark Streaming Kafka backpressure
- 2. Spark Streaming Kafka Consumer
- 3. Kafka Streaming with apache spark
- 4. Kafka - Spark Streaming - 1つのパーティションからのデータの読み出し
- 5. Spark Streaming Kafka Receivers API-numPartitions
- 6. java.lang.NoClassDefFoundError:org/apache/spark/streaming/kafka/KafkaUtils
- 7. CSVをKafkaからSpark Streamingに送信する
- 8. Spark Streaming with Kafka:空のコレクション例外
- 9. Flafka(Http - > Flume - > Kafka - > Spark Streaming)
- 10. Apache KafkaのStreams APIとSpark Streaming
- 11. kafkaで他のVMからのSpark Streamingの使用方法
- 12. Spark Streamingの最初からKafkaトピックからレコードを読み取る方法は?
- 13. pysparkはspark-streaming-kafka-0-10 libをサポートしていますか?
- 14. Akkaをapache spark streaming&Kafkaで使用しますか?
- 15. kafka connect 0.10とSpark Structured Streamingでfrom_jsonを使用するには?
- 16. Spark Streaming Kafka Consumerの "java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord"を修正するには?
- 17. Spark Streamingを使用してKafkaからbinlogデータを読み取るときに "ClassNotFoundException"が発生しました
- 18. Spark Streaming + KafkaでforeachRDDが遅いのはなぜですか?
- 19. Spark StreamingのKafkaトピックから2つのDStreamを作成できない
- 20. kafka-Spark-Streamingからデータを読み取っているときにEmptyが取得される
- 21. Kafka 0.9.0とSpark Streaming 2.1.0:kafka.cluster.BrokerEndPointはkafka.cluster.Brokerにキャストできません
- 22. Spark StreamingでStringからStructTypeを作成
- 23. Spark Structured Streamingでの書き込み時のKafkaオフセットのキャプチャ
- 24. Spark Structured StreamingエグゼキュータとKafkaパーティション間のマッピングの説明
- 25. Spark StreamingでHbaseデータを読み取る
- 26. スカラマニフェストが見つからないSpark Streaming
- 27. Spark Streaming with wholeTextFiles
- 28. Spark Cassandra Streaming
- 29. KafkaUtils java.lang.NoClassDefFoundError Spark Streaming
- 30. spark streaming fileStream