2016-12-19 9 views
0

Kafkaのトピックから暗号化されたメッセージを読む必要があります。トピックから文字列を読み込み、私の現在のコードは次のようになります。Kafkaトピックからバイナリデータを読み込みます。

JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
        jssc, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topics), kafkaParams) 
       ); 

は、私が確認するために、カフカキューからバイト配列を読み取るために、このコードにどのような変更を行う必要があり、暗号化されたデータは、プロセス中に破損されません。タイプ変換の。 私はKafkaUtilsでこのAPIを見つけることができませんイム、スパークプログラミングガイドから上記のコードを取っている間: http://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html

答えて

0

あなたはカフカ接続hereの良い例を見ることができます。

しかし、あなたが望むのは、あなたのカフカトピックからByteArrayを取ることです。

この接続は次のようなJavaPairInputDStreamを作成する必要があります。応答のための

import org.apache.kafka.common.serialization.ByteArrayDeserializer 
JavaPairInputDStream<String, Array[Byte]> messages = KafkaUtils.createDirectStream(
    jssc, 
    String.class, 
    String.class, 
    StringDecoder.class, 
    ByteArrayDeserializer.class, 
    kafkaParams, 
    topicsSet 
); 
+0

感謝を。私はこれを試してみる。しかし、私たちがなぜこのために別のAPIを使用する必要があるのか​​、JavaPairInputStreamを作成する必要がある理由を教えてください。ちょうど私は何をしているのか理解しています。 –

+0

上記のスニペットでコードをコンパイルできません.Eclipseは、あなたがリストしたcreateDirectStreamのバージョンを認識しません。私が使ったバージョンだけが表示されます。私はkafka010をSpark 2.0.1で使用しています。私はmavenリポジトリからspark kafkaストリーミング・ジャーをダウンロードしました。 –

+0

おそらく、これは違うかもしれないと思います。しかし、あなたはバイトの逆シリアル化を設定する必要があります –

関連する問題