私は2つの異なるトピックの2つの異なるファイルからの行としてkafkaから読んでいます。ラインの例:kafka mulipleトピックsparkのseggregation
例: はFile1:2015-04-15T18:44:14+01:00,192.168.11.42,%ASA-2-106007:
File2の :"04/15/2012","18:44:14",,"Start","Unknown","Unknown",,"192.168.63.128","444","2","7","192.168.63.128",,,,,,,,,,,,,,,,,
私は2つの異なるtopics.Codeから火花から読み取ることができていますが、以下のようなものです:
SparkConf sparkConfig = new SparkConf().setAppName("KafkaStreaming").setMaster("local[5]");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConfig,Durations.seconds(5));
final HiveContext sqlContext = new HiveContext(jsc.sc());
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jsc,
prop.getProperty("zookeeper.connect"),
prop.getProperty("group.id"),
topicMap
);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
問題I今すぐ参照してください:
行 rddは明白な両方の行を含んでいます。どのように私segregaどのレコードがどのトピックやファイルであるかを調べる。
これが背後にある理由は、私は火花の中に来ている別の話題に異なる論理を適用したいのです。しかし、RDDは時間
ですべての行を持っているあなたは、パラメータとしてscala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler
を受け入れるcreateDirectStream
オーバーロードされたメソッドを選択する必要があります任意の提案