2016-07-29 10 views
1

Servicetestと組み込みカフカでは、奇妙な動作が観察されています。Spring Kafka、組み込みカフカを使用したテスト

テストスポックテストで、我々はJUnitのルールKafkaEmbeddedを使用すると、次のようにbrokersAsStringを伝播:

@ClassRule 
@Shared 
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1) 

@Autowired 
KafkaListenerEndpointRegistry endpointRegistry 

def setupSpec() { 
    System.setProperty("kafka.bootstrapServers", embeddedKafka.getBrokersAsString()) 
} 

をKafkaEmbeddedのコードを検査することから、​​でインスタンスを作成すると、2つのパーティションを持つ1台のカフカサーバーにつながりますトピックごとに。

テストでパーティションの割り当てとサーバーとクライアントの同期の問題に取り組むために、spring-kafkaのContainerTestUtilsクラスに見られるような戦略に従います。

public static void waitForAssignment(KafkaMessageListenerContainer<String, String> container, int partitions) 
     throws Exception { 

     log.info(
      "Waiting for " + container.getContainerProperties().getTopics() + " to connect to " + partitions + " " + 
       "partitions.") 

     int n = 0; 
     int count = 0; 
     while (n++ < 600 && count < partitions) { 
      count = 0; 
      container.getAssignedPartitions().each { 
       TopicPartition it -> 
        log.info(it.topic() + ":" + it.partition() + "; ") 
      } 

      if (container.getAssignedPartitions() != null) { 
       count = container.getAssignedPartitions().size(); 
      } 
      if (count < partitions) { 
       Thread.sleep(100); 
      } 
     } 
    } 

、我々は次のパターンに気づくのログを観察する場合:

2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {staggering=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {moa=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.696 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {staggering=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.699 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {moa=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.699 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.807 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.811 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {staggering=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.812 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {moa=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:03.602 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : SyncGroup for group timeslot-service-group-06x failed due to coordinator rebalance, rejoining the group 
2016-07-29 11:24:03.637 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[] 
2016-07-29 11:24:03.637 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[] 
2016-07-29 11:24:04.065 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0] 
2016-07-29 11:24:04.066 INFO 1160 --- [   main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 50810 (http) 
2016-07-29 11:24:04.073 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : Started AllocationsDeliveryZonesServiceSpec in 20.616 seconds (JVM running for 25.456) 
2016-07-29 11:24:04.237 INFO 1160 --- [   main] org.eclipse.jetty.server.Server   : jetty-9.2.17.v20160517 
2016-07-29 11:24:04.265 INFO 1160 --- [   main] o.e.jetty.server.handler.ContextHandler : Started [email protected]{/__admin,null,AVAILABLE} 
2016-07-29 11:24:04.270 INFO 1160 --- [   main] o.e.jetty.server.handler.ContextHandler : Started [email protected]{/,null,AVAILABLE} 
2016-07-29 11:24:04.279 INFO 1160 --- [   main] o.eclipse.jetty.server.ServerConnector : Started [email protected]{HTTP/1.1}{0.0.0.0:50811} 
2016-07-29 11:24:04.430 INFO 1160 --- [   main] o.eclipse.jetty.server.ServerConnector : Started [email protected]{SSL-http/1.1}{0.0.0.0:50812} 
2016-07-29 11:24:04.430 INFO 1160 --- [   main] org.eclipse.jetty.server.Server   : Started @25813ms 
2016-07-29 11:24:04.632 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : waiting... 
2016-07-29 11:24:04.662 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : Waiting for [moa] to connect to 2 partitions.^ 
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group. 
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group. 
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0] 
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[moa-0] 
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[deliveryZipCode_v1-0] 
2016-07-29 11:24:13.740 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
[...] 
2016-07-29 11:24:16.644 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
2016-07-29 11:24:16.666 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[staggering-0] 
2016-07-29 11:24:16.750 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
[...] 
2016-07-29 11:24:23.559 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
2016-07-29 11:24:23.660 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group. 
2016-07-29 11:24:23.660 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group. 
2016-07-29 11:24:23.662 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
2016-07-29 11:24:23.686 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[moa-0] 
2016-07-29 11:24:23.686 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[deliveryZipCode_v1-0] 
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[moa-0] 
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0] 
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[deliveryZipCode_v1-0] 

を[..]表示を省略ラインに

に注意してください私たちは、その結果それとして3000ミリ へmetadata.max.age.msを設定頻繁にメタデータ情報の更新を試みます。

今私たちが困惑しているのは、2つのパーティションが接続するのを待つと、待機がタイムアウトするということです。 1つのパーティションが接続するのを待ってから、しばらくするとすべてが正常に実行されます。

私たちは間違ったコードを理解しましたが、埋め込まれたカフカにはトピックごとに2つのパーティションがありますか?リスナーに割り当てられているのは普通ですか?

答えて

0

あなたが見ているフレークさを説明できません。はい、各トピックはデフォルトで2つのパーティションを取得します。私はちょうどフレームワークコンテナテストの1つを実行し、これを参照してください...

09:24:06.139 INFO [testSlow3-kafka-consumer-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] partitions revoked:[] 
09:24:06.611 INFO [testSlow3-kafka-consumer-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] partitions assigned:[testTopic3-1, testTopic3-0] 
+0

私たちがもともと見たフレークさは、消費者にトピックのタイミングと割り当てについてでした。基本的には(https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java)アドレス。 私が見つけたパーティションについて、私は何を混乱させましたか。 コンストラクタから設定されたパーティション数は、トピックが事前に作成されたbefore()メソッドで使用されます。トピックを暗黙的に作成する場合は、パーティション数のデフォルトとして1が使用されているように見えます。 –

関連する問題