私はこのような文字列の形でカフカへの私のデータを保存することができています:ケースクラスのAkkaストリームをKafkaに直接保存する方法は?
val producerSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
def kafkaSink(source: Source[ModbusMessage, NotUsed]) = source.map(a => s"Id:${a.sensorId}:Values:${a.values.mkString(",")}").map { m =>
new ProducerRecord[String, String]("sampleData", m)
}.
runWith(Producer.plainSink(producerSettings))
が、カフカに直接私の場合クラスを保存する方法があります。 My caseクラスModbusMessageの形式でデータを保存したい場合と同様です。
誰かが私に素晴らしいことだ簡単な例を提供することができた場合は!
ありがとうございました。 ヘルプは高く評価されています。
シリアル化について話していますか?そうであれば、テキスト表現(JSONなど)やバイナリ表現が必要ですか? –
どちらがパフォーマンスに最適です! liftJsonのようなライブラリを使ってJsonに変換し、それをKafkaの中のStringとして保存する必要がありますか? –
これは一般的に広いです。私は最初に簡略化されたJSONのアプローチに行きます。 –