2017-11-04 18 views
0

現在、akkaストリームで遊んでいて、次の例を試しました。akkaストリームの接続kafkaとakka http

特定のHTTPエンドポイントを要求するときに、kafkaから最初の要素を取得します。 これは私が書いたコードとその動作です。

get { 
     path("ticket"/IntNumber) { ticketNr => 

     val future = Consumer.plainSource(consumerSettings, Subscriptions.topics("tickets")) 
      .take(1) 
      .completionTimeout(5 seconds) 
      .runWith(Sink.head) 

     onComplete(future) { 
      case Success(record) => complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, record.value())) 
      case _ => complete(HttpResponse(StatusCodes.NotFound)) 
     } 
     } 
    } 

これが(akka)ストリームを使用する理想的な方法であるかどうかは疑問です。 カフカストリームをHTTPレスポンスストリームに直接接続する方法はありますか?

val kafkaTicketsSink = Flow[String] 
    .map(new ProducerRecord[Array[Byte], String]("tickets", _)) 
    .toMat(Producer.plainSink(producerSettings))(Keep.right)  

post { 
      path("ticket") { 
      (entity(as[Ticket]) & extractMaterializer) { (ticket, mat) => { 
       val f = Source.single(ticket).map(t => t.description).runWith(kafkaTicketsSink)(mat) 
       onComplete(f) { _ => 
        val locationHeader = headers.Location(s"/ticket/${ticket.id}") 
        complete(HttpResponse(StatusCodes.Created, headers = List(locationHeader))) 
       } 
       } 
      } 
      } 
     } 

多分これも改善することができます??:私はこれを行う投稿するとき例えば

答えて

1

Sink.queueを使用して、単一のバックプレッシャーされたストリームをアクティブに保つことができます。リクエストが受信されるたびに、マテリアライズド・キューから要素を取得できます。これは、利用可能であれば1つの要素を戻し、それ以外の場合はバックプレッシャーを与えます。線に沿って

何か:Sink.queue

val queue = Consumer.plainSource(consumerSettings, Subscriptions.topics("tickets")) 
    .runWith(Sink.queue()) 

    get { 
    path("ticket"/IntNumber) { ticketNr => 

     val future: Future[Option[ConsumerRecord[String, String]]] = queue.pull() 

     onComplete(future) { 
     case Success(Some(record)) => complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, record.value())) 
     case _ => complete(HttpResponse(StatusCodes.NotFound)) 
     } 
    } 
    } 

詳細情報はdocsで見つけることができます。

関連する問題