2017-01-18 19 views
1

私はこのような文字列の形でカフカへの私のデータを保存することができています:ケースクラスのAkkaストリームをKafkaに直接保存する方法は?

val producerSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer) 
    .withBootstrapServers("localhost:9092") 


def kafkaSink(source: Source[ModbusMessage, NotUsed]) = source.map(a => s"Id:${a.sensorId}:Values:${a.values.mkString(",")}").map { m => 
    new ProducerRecord[String, String]("sampleData", m) 
    }. 
    runWith(Producer.plainSink(producerSettings)) 

が、カフカに直接私の場合クラスを保存する方法があります。 My caseクラスModbusMessageの形式でデータを保存したい場合と同様です。

誰かが私に素晴らしいことだ簡単な例を提供することができた場合は!

ありがとうございました。 ヘルプは高く評価されています。

+0

シリアル化について話していますか?そうであれば、テキスト表現(JSONなど)やバイナリ表現が必要ですか? –

+0

どちらがパフォーマンスに最適です! liftJsonのようなライブラリを使ってJsonに変換し、それをKafkaの中のStringとして保存する必要がありますか? –

+0

これは一般的に広いです。私は最初に簡略化されたJSONのアプローチに行きます。 –

答えて

2

メッセージキーとメッセージ値で構成されるデータメッセージのカフカモデルは、キーと値の生バイト(byte[])に基づいています。したがって、ケースクラスをbyte[]に変換するシリアライザを用意する必要があります。上記の例では、キーと値の両方にStringSerializerを使用し、シリアライザはString -> byte[]を変換しています。

しかし、私のケースクラスを直接カフカに保存する方法はありますか。 My caseクラスModbusMessageの形式でデータを保存したい場合と同様です。

ケースクラスModbusMessageをお持ちの場合は、ModbusMessage -> byte[]シリアライザを実装する必要があります。このようなシリアライザを自分で実装することもできますが、他の人があなたの質問にコメントしているので、AvroやProtobufなどのシリアライゼーションフレームワークを選択することもできます。あなたはまた見てみたいかもしれません。 https://github.com/scala/pickling

liftJsonのようなライブラリを使用してJsonに変換し、それをKafkaの文字列として保存する必要がありますか?

いいえ、あなたは必要ありません。ケースクラスModbusMessageからbyte[]に直接変換することができます(また逆方向に逆方向に変換することもできます)。 JSON表現もbyte[]にシリアル化して、カフカに送信できるようにする必要があります(ここでは、変換コストは2倍になります)。

+0

この回答が@ShivanshSrivastavaに役立った場合は、それを受け入れたものとしてマークする必要があります。 –

関連する問題