2017-08-14 11 views
0

私はakkaストリームのカフカ(とakkaストリーム全般)を使って新しいです。私は別のトピックにメッセージを公開するためにグラフを作成しようとしています。 処理されたメッセージをコミットするためにプロデューサをフローとしてどのように接続できますか?私はProducer.flowを使用してみましたが、私はあなたがGraphDSLを使用しているため、コンパイラは、前の段階からPassThrough種類を推測することはできませんcommitScaladslプロデューサーフローをグラフに接続

object TestFoo { 
    import akka.kafka.ProducerMessage.Message 
    implicit val system = ActorSystem("test-kafka") 
    implicit val materializer = ActorMaterializer() 
    val evenNumbersTopic = "even_numbers" 
    val allNumbersTopic = "all_numbers" 
    lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int]) 
    .withBootstrapServers("localhost:9092") 
    .withGroupId("group1") 
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
    lazy val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic))) 
    val producerSettings = ProducerSettings(system, new StringSerializer(), new StringSerializer()) 
    .withBootstrapServers("localhost:9092") 
    val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 
    type TypedMessage = Message[String, Int,CommittableOffset] 
    val bcast = b.add(Broadcast[TypedMessage](2)) 
    val merge = b.add(Merge[TypedMessage](2)) 

    val evenFilter = Flow[TypedMessage].filter ( c => c.record.value() % 2 == 0) 
    val justEven = Flow[TypedMessage].map{ 
     case Message(pr, offset) => 
     val r = new ProducerRecord[String, Int]("general", pr.value()) 
     Message(r, offset) 
    } 
    val allNumbers = Flow[TypedMessage].map{ 
     case Message(pr, offset) => 
     val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value()) 
     Message(r, offset) 
    } 

    val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg => 
     val r = new ProducerRecord[String, Int]("general", msg.record.value()) 
     Message(r, msg.committableOffset) 
    } 
    source ~> toMsg ~> bcast 

    bcast ~> evenFilter ~> justEven ~> merge 
    bcast ~> allNumbers ~> merge 
    merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result => 
     result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl() 
    } 
    ClosedShape 
    })} 
+0

この例では、他の多くのコンパイルエラーが発生しています。コンパイルエラーを簡単に再現できるように修正できますか? –

+0

@StefanoBonettiはい、コンパイルエラーが少ないコードを更新しました。 – igx

答えて

0

を取得することはできません。 タイプパラメータをProducer.flow関数に明示的に渡します。

merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result => 
    result.message.passThrough.commitScaladsl() 
} 

私が結合していないのparamとしてKVを残している、あなたのプロデューサーを生成するためにバインドされているものは何でも、キー/値の型が適合してください。上記のコードを正しく配線するには、producerSettingsタイプをマージステージのものと一致させる必要があります。

val producerSettings = ProducerSettings(system, new StringSerializer(), new JsonSerializer[Int]) 
    .withBootstrapServers("localhost:9092") 
+0

ありがとうございますが、実際には同じ結果が得られます。 '' 'val sink = Producer.flow [String、String、CommittableOffset](producerSettings).mapAsync .comitScaladsl() } '' '' – igx

+0

私に 'producerSettings'のようなものがあります。 '[String、String]'を返しますが、実際には '[String、Int]'というレコードを与えています。私はいくつかの変更を提案する答えを修正した –

関連する問題