現在、私はSpark Streamingと協力しており、Kafkaからのメッセージを読むことができます。カフカプロデューサーでは、トピックにメッセージを送り、Spark Streamingの助けを借りてこのトピックを読んでみたいと思います。スパークストリーミング:Print JavaInputDStream
私はメッセージを照会するために、次のJavaコードを使用します。
package apache_spark_streaming;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public final class Spark_Kafka_Example {
private static final String BOOTSTRAP_SERVERS_CONNECTION = "XXXXX";
private static final String SPARK_CONNECTION = "spark://XXXXX:7077";
private static final String TOPIC_NAME = "KafkaTesting1";
private static final Set<String> TOPIC_1 = new HashSet<>(Arrays.asList(TOPIC_NAME.split(",")));
public static Map<String, Object> getProperties() {
try {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS_CONNECTION);
kafkaParams.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName());
kafkaParams.put("value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName());
kafkaParams.put("group.id", "Stream Testing");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
return kafkaParams;
}
catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) throws Exception {
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("Kafka Example").setMaster(SPARK_CONNECTION);
JavaStreamingContext sc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
sc
, LocationStrategies.PreferConsistent()
, ConsumerStrategies.Subscribe(TOPIC_1, getProperties())
);
stream.print();
sc.start();
sc.awaitTermination();
}
}
私の問題は、私がどのようにコマンドラインに出力メッセージをするのか分からないということです。おそらく、JavaInputDStreamを正しく使用する方法がわかっていないのかもしれません。
は現在、私は、印刷()関数でのみ出力として得る:
17/07/10午後04時59分20秒INFO jobschedulerの:時間のための追加されたジョブ1499698760000のMS
Iあなたはこの "問題"で私を助けてくれることを願っています。 を更新しました
あなたは、コンテンツを抽出し、それを処理するためのフィルタやマップを使用するストリームを作成した後、私は
stream.foreachRDD(consumerRecordJavaRDD -> {
consumerRecordJavaRDD.foreach(stringStringConsumerRecord -> {
//.to get topic name: stringStringConsumerRecord.topic()
//To get value : stringStringConsumerRecord.value()
} }
どのようなメッセージを出力しますか?ロガーを設定する必要があります。 –
お返事ありがとうございます。私はOracle DBからデータを読み込み、次の文字列をKafkaに送信します(例:Bambi Duesseldorf、Duesseldorf、Nordrhein-Westfalen、Perlmuttknopf、Der、2016-09-29 08:41:58.538,10) Loggerを設定できますか? – hbenner
@hbenner答えとして追加するのではなく、あなたの答えを更新する – Ajay2707