私はkafkaキューからデータを読み込んでいるスタンドアロンのスパーククラスタを持っています。 kafkaキューには5つのパーティションがあり、スパークはパーティションの1つのデータのみを処理しています。私は、次を使用しています:ここでKafka - Spark Streaming - 1つのパーティションからのデータの読み出し
は私のMavenの依存関係です:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>kafka-custom</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
マイカフカのプロデューサーは単にキューに100件のメッセージを入れている単純なプロデューサである:ここでは
public void generateMessages() {
// Define the properties for the Kafka Connection
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerServer); // kafka server
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// Create a KafkaProducer using the Kafka Connection properties
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopic, "value-" + i);
producer.send(record);
}
producer.close();
}
です私のスパークストリーミングジョブのメインコード:
public void processKafka() throws InterruptedException {
LOG.info("************ SparkStreamingKafka.processKafka start");
// Create the spark application
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.executor.cores", "5");
//To express any Spark Streaming computation, a StreamingContext object needs to be created.
//This object serves as the main entry point for all Spark Streaming functionality.
//This creates the spark streaming context with a 'numSeconds' second batch size
jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchInterval));
//List of parameters
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", this.getBrokerList());
kafkaParams.put("client.id", "SpliceSpark");
kafkaParams.put("group.id", "mynewgroup");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
List<TopicPartition> topicPartitions= new ArrayList<TopicPartition>();
for(int i=0; i<5; i++) {
topicPartitions.add(new TopicPartition("mytopic", i));
}
//List of kafka topics to process
Collection<String> topics = Arrays.asList(this.getTopicList().split(","));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
//Another version of an attempt
/*
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(topicPartitions, kafkaParams)
);
*/
messages.foreachRDD(new PrintRDDDetails());
// Start running the job to receive and transform the data
jssc.start();
//Allows the current thread to wait for the termination of the context by stop() or by an exception
jssc.awaitTermination();
}
PrintRDDDetailsの呼び出し方法には次のようになります。
public void call(JavaRDD<ConsumerRecord<String, String>> rdd)
throws Exception {
LOG.error("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
}
1つのパーティションからデータが取得されるだけです。私はカフカで5つのパーティションがあることを確認しました。コールメソッドが実行されると、適切な数のパーティションが出力されますが、1つのパーティションにあるレコードだけが出力されます。この単純なコードから取り出した処理は、1つのパーティションのみを処理していることを示します。
ありがとうございます - 私はそれを試してみましょう。 – Erin
Param - うまくいった!どうもありがとうございました。 – Erin
恐ろしい!私は嬉しいです:) – Param