2016-05-30 17 views
1

私は2つの異なるトピックの2つの異なるファイルからの行としてkafkaから読んでいます。ラインの例:kafka mulipleトピックsparkのseggregation

例: はFile1:2015-04-15T18:44:14+01:00,192.168.11.42,%ASA-2-106007:
File2の :"04/15/2012","18:44:14",,"Start","Unknown","Unknown",,"192.168.63.128","444","2","7","192.168.63.128",,,,,,,,,,,,,,,,,

私は2つの異なるtopics.Codeから火花から読み取ることができていますが、以下のようなものです:

SparkConf sparkConfig = new SparkConf().setAppName("KafkaStreaming").setMaster("local[5]"); 
     JavaStreamingContext jsc = new JavaStreamingContext(sparkConfig,Durations.seconds(5)); 
     final HiveContext sqlContext = new HiveContext(jsc.sc()); 
     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jsc, 
                         prop.getProperty("zookeeper.connect"), 
                         prop.getProperty("group.id"), 
                         topicMap 
                         ); 

     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 

        private static final long serialVersionUID = 1L; 

        public String call(Tuple2<String, String> tuple2) { 
         return tuple2._2(); 
        } 
       }); 

問題I今すぐ参照してください:

rddは明白な両方の行を含んでいます。どのように私segregaどのレコードがどのトピックやファイルであるかを調べる。
これが背後にある理由は、私は火花の中に来ている別の話題に異なる論理を適用したいのです。しかし、RDDは時間

ですべての行を持っているあなたは、パラメータとしてscala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandlerを受け入れるcreateDirectStreamオーバーロードされたメソッドを選択する必要があります任意の提案

答えて

1

を感謝しています。それより、MessageAndMetadataオブジェクトを入力として取得する関数をmessageHandlerとして渡すだけで、実際のメッセージとトピックが返されます。

ここで私はScalaで書かれたコードを投稿します。あなたは簡単にJavaでそれを適応させることができます:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String,String)](ssc, 
     kafkaParams, 
     topicOffsetsMap, 
     (m:MessageAndMetadata[String, String])=> (m.topic,m.message()) 
     ) 
関連する問題