2016-08-11 3 views
0

私はKafkaクラスタからデータを取得するためにSpark Kafkaコネクタを使用しています。それから私はJavaDStream<String>としてデータを取得しています。データをJavaDStream<EventLog>として取得するには、EventLogはJava Beanですか?Spark Kafka ConnectorでオブジェクトのJavaDStreamを取得するには?

public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) { 
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, zkQuorum, group, topicMap); 
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
     @Override 
     public String call(Tuple2<String, String> tuple2) { 
      return tuple2._2(); 
     } 
    }); 
    jssc.start(); 
    jssc.awaitTermination(); 
    return lines; 
} 

私の目標は、EventLogと同じ仕様のテーブルカサンドラにこのデータを保存することです。 Spark Cassandraコネクタは、javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();のようにinsert文にJavaRDD<EventLog>を受け入れます。私はKafkaからJavaRDD<EventLog>を手に入れたいです。

+0

メッセージを文字列のペアとして使用して変換したいと考えていますか?または、JavaReceiverInputDStream を使用しますか? EventLog型を正確にどこに持ち込みたいのですか?あなたはEventLog型を受け取り、そこからJavaDStreamを構築する受信機を定義しようとしましたか? – Sunny

+0

@サニー私の目的は、カサンドラにデータを書き込むことです。スパークカサンドラコネクタ 'このようなINSERT文でJavaRDD '受け付けます。 'javaFunctions(RDD).writerBuilder( "KS"、 "イベント"、mapToRow(EventLog.class))saveToCassandraを();'。私はこれらのJavaRDD をKafkaから入手したいと思います。 – khateeb

+0

これらのEventLogをkafkaに書き込むコードにもアクセスできますか?カスタムシリアライザが実装されていて、EventLogがシリアル化され、KafkaにEventLogとして書き込まれていますか? – Sunny

答えて

0

は、キー/値の型とデコーダクラスを渡すことができ、オーバーロードcreateStreamメソッドを使用します。

例:

createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class, 
     kafkaParams, topicsMap, StorageLevel.MEMORY_AND_DISK_SER_2()); 

以上があなたJavaPairDStream<String, EventLog>

JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() { 
    @Override 
    public EventLog call(Tuple2<String, EventLog> tuple2) { 
    return tuple2._2(); 
    } 
}); 

にkafka.serializer.Decoderを実装する必要がありEventLogDecoderを与える必要があります。以下はjsonデコーダの例です。

public class EventLogDecoder implements Decoder<EventLog> { 

public EventLogDecoder(VerifiableProperties verifiableProperties) { 
} 

@Override 
public EventLog fromBytes(byte[] bytes) { 
    ObjectMapper objectMapper = new ObjectMapper(); 
    try { 
    return objectMapper.readValue(bytes, EventLog.class); 
    } catch (IOException e) { 
    //do something 
    } 
    return null; 
} 
} 
+0

'StringDecoder'のパッケージ全体を教えてください。 'EventLogDecoder'にはどのような機能が含まれていますか? – khateeb

+0

'Function'の3番目の引数は' Function 、EventLog>() 'のような' EventLog'でなければなりません。 – khateeb

+0

はい更新された第3引数。また、StringDecoderのパッケージはkafka.serializer.StringDecoderです。私はサンプルのデコーダを含めるように答えを更新しました。 '火花submit'を実行している間 –

関連する問題