2017-07-10 4 views
0

現在、私はSpark Streamingと協力しており、Kafkaからのメッセージを読むことができます。カフカプロデューサーでは、トピックにメッセージを送り、Spark Streamingの助けを借りてこのトピックを読んでみたいと思います。スパークストリーミング:Print JavaInputDStream

私はメッセージを照会するために、次のJavaコードを使用します。

package apache_spark_streaming; 

import java.util.*; 
import org.apache.spark.SparkConf; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka010.*; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 

public final class Spark_Kafka_Example { 

private static final String BOOTSTRAP_SERVERS_CONNECTION = "XXXXX"; 
private static final String SPARK_CONNECTION = "spark://XXXXX:7077"; 
private static final String TOPIC_NAME = "KafkaTesting1"; 
private static final Set<String> TOPIC_1 = new HashSet<>(Arrays.asList(TOPIC_NAME.split(","))); 

public static Map<String, Object> getProperties() { 

    try { 
     Map<String, Object> kafkaParams = new HashMap<>(); 
     kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS_CONNECTION); 
     kafkaParams.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName()); 
     kafkaParams.put("value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName()); 
     kafkaParams.put("group.id", "Stream Testing"); 
     kafkaParams.put("auto.offset.reset", "earliest"); 
     kafkaParams.put("enable.auto.commit", false); 
     return kafkaParams; 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    return null; 
} 
public static void main(String[] args) throws Exception { 

    // Create context with a 2 seconds batch interval 
    SparkConf sparkConf = new SparkConf().setAppName("Kafka Example").setMaster(SPARK_CONNECTION); 
    JavaStreamingContext sc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 

    JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
      sc 
      , LocationStrategies.PreferConsistent() 
      , ConsumerStrategies.Subscribe(TOPIC_1, getProperties()) 
    );  

    stream.print(); 
    sc.start(); 
    sc.awaitTermination(); 
    } 
} 

私の問題は、私がどのようにコマンドラインに出力メッセージをするのか分からないということです。おそらく、JavaInputDStreamを正しく使用する方法がわかっていないのかもしれません。

は現在、私は、印刷()関数でのみ出力として得る:

17/07/10午後04時59分20秒INFO jobschedulerの:時間のための追加されたジョブ1499698760000のMS

Iあなたはこの "問題"で私を助けてくれることを願っています。 を更新しました

あなたは、コンテンツを抽出し、それを処理するためのフィルタやマップを使用するストリームを作成した後、私は

stream.foreachRDD(consumerRecordJavaRDD -> { 
    consumerRecordJavaRDD.foreach(stringStringConsumerRecord -> { 
    //.to get topic name: stringStringConsumerRecord.topic() 
    //To get value : stringStringConsumerRecord.value() 
    } } 
+0

どのようなメッセージを出力しますか?ロガーを設定する必要があります。 –

+0

お返事ありがとうございます。私はOracle DBからデータを読み込み、次の文字列をKafkaに送信します(例:Bambi Duesseldorf、Duesseldorf、Nordrhein-Westfalen、Perlmuttknopf、Der、2016-09-29 08:41:58.538,10) Loggerを設定できますか? – hbenner

+0

@hbenner答えとして追加するのではなく、あなたの答えを更新する – Ajay2707

答えて

関連する問題