2017-09-07 3 views
0

私は基本的なスカラのakka http CRUDアプリケーションを持っています。関連するクラスについては以下を参照してください。akkaストリームkafka(reactive-kafka)をakka httpアプリケーションに統合する方法は?

エンティティが作成/更新されるたびに、エンティティIDといくつかのデータ(jsonとして)をKafkaトピックに書き込むだけです。

私はhttp://doc.akka.io/docs/akka-stream-kafka/current/producer.htmlを見ていますが、scalaとakkaには新しく、アプリケーションに統合する方法が不明ですか?

たとえば、上のドキュメントから、これはkafkaに書き込むプロデューサの例です。したがって、私は何か類似している必要があると思いますが、私のアプリケーションのどこに行かなければならないのでしょうか?ユーザーを作成した後、私のサービスのcreateメソッドで別のマップ呼び出しを追加することはできますか?

多くの感謝!

val done = Source(1 to 100) 
    .map(_.toString) 
    .map { elem => 
    new ProducerRecord[Array[Byte], String]("topic1", elem) 
    } 
    .runWith(Producer.plainSink(producerSettings)) 

または私はここに私のhttps://github.com/hseeberger/accessus Server.scalaでbindAndHandle()メソッドの例のような何かをする必要がありますか?

WebServer.scala

object System { 

    implicit val system = ActorSystem() 
    implicit val dispatcher = system.dispatcher 
    implicit val actorMaterializer = ActorMaterializer() 

} 

object WebServer extends App { 

    import System._ 

    val config = new ApplicationConfig() with ConfigLoader 
    ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig)) 
    val injector = Guice.createInjector(new MyAppModule(config)) 
    val routes = injector.getInstance(classOf[Router]).routes 
    Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port) 

} 

Router.scala

def routes(): Route = { 
    post { 
     entity(as[User]) { user => 
     val createUser = userService.create(user) 
     onSuccess(createUser) { 
      case Invalid(y: NonEmptyList[Err]) => { 
      throw new ValidationException(y) 
      } 
      case Valid(u: User) => { 
       complete(ToResponseMarshallable((StatusCodes.Created, u))) 
      } 
     } 
     } 
    } ~ 
    // More routes here, left out for example 
} 

Service.scala

def create(user: User): Future[MaybeValid[User]] = { 
    for { 
     validating <- userValidation.validateCreate(user) 
     result <- validating match { 
     case Valid(x: User) => 
      userRepo.create(x) 
      .map(dbUser => Valid(UserConverters.fromUserRow(x))) 
     case y: DefInvalid => 
      Future{y} 
     } 
    } yield result 
    } 

Repo.scala

def create(user: User): Future[User] = { 
    mutateDbProvider.db.run(
     userTable returning userTable.map(_.userId) 
     into ((user, id) => user.copy(userId = id)) += 
     user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now()))) 
    ) 
    } 
+0

:だから私は、無効なユーザーのためのロジックを変更しますが、有効なユーザーを返すcreate'カフカ毎回 'に' user.userId'を送信するために探していますか? –

+0

ええ、おそらく "エンティティ"のような他の情報: "ユーザー" – Rory

答えて

1

Routeをアンマーシャリングすると、Entityから1 Userだけが書かれ​​ているので、私はあなたが必要と思わないProducer.plainSink。むしろ、私はProducer.sendも同様に動作すると思います。また、例外として、例外をスローすることは "慣用的"なスケーラではありません。ちょうどあなたが望むものを明確にすること

val producer : KafkaProducer = new KafkaProducer(producerSettings) 

val routes : Route = 
    post { 
    entity(as[User]) { user => 
     val createUser = userService.create(user) 
     onSuccess(createUser) { 
     case Invalid(y: NonEmptyList[Err]) => 
      complete(BadRequest -> "invalid user") 
     case Valid(u: User) => { 
      val producerRecord = 
      new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""") 

      onComplete(producer send producerRecord) { _ => 
      complete(ToResponseMarshallable((StatusCodes.Created, u))) 
      } 
     } 
     } 
    } 
    } 
+0

@Roryあなたは大歓迎です。私は、あなたが提供した例に似たテストキットに慣れていません。 –

+0

oncompleteを使用して 'producer send producerRecord'を呼び出すのに問題がないかどうかを確認します(HTTP APIレスポンスを送信する前にkafkaへの送信が完了するのを待っていません)。基本的にこれを行うだけです:producer send producerRecord; (ToResponseMarshallable((StatusCodes.Created、u))) – Rory

+0

@Rory 'onComplete'を使用しないと、クライアントは' Created'レスポンスコードを受け取ることができますが、メッセージは送信されません。この分岐した状態がokの場合、問題はありませんが、通常のパターンは、レコードが送信された後にのみ有効な応答を返すことです。これは設計上の選択です:指定されたコードが間違っている場合や、検証済みのコードを待っていたい場合に、速い応答をしたいですか? –

関連する問題