0
現在、akkaストリームで遊んでいて、次の例を試しました。akkaストリームの接続kafkaとakka http
特定のHTTPエンドポイントを要求するときに、kafkaから最初の要素を取得します。 これは私が書いたコードとその動作です。
get {
path("ticket"/IntNumber) { ticketNr =>
val future = Consumer.plainSource(consumerSettings, Subscriptions.topics("tickets"))
.take(1)
.completionTimeout(5 seconds)
.runWith(Sink.head)
onComplete(future) {
case Success(record) => complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, record.value()))
case _ => complete(HttpResponse(StatusCodes.NotFound))
}
}
}
これが(akka)ストリームを使用する理想的な方法であるかどうかは疑問です。 カフカストリームをHTTPレスポンスストリームに直接接続する方法はありますか?
val kafkaTicketsSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String]("tickets", _))
.toMat(Producer.plainSink(producerSettings))(Keep.right)
post {
path("ticket") {
(entity(as[Ticket]) & extractMaterializer) { (ticket, mat) => {
val f = Source.single(ticket).map(t => t.description).runWith(kafkaTicketsSink)(mat)
onComplete(f) { _ =>
val locationHeader = headers.Location(s"/ticket/${ticket.id}")
complete(HttpResponse(StatusCodes.Created, headers = List(locationHeader)))
}
}
}
}
}
多分これも改善することができます??:私はこれを行う投稿するとき例えば
、