2017-09-25 20 views
0

私はKafkaコンシューマ実装の統合テストを行っています。 私はwurstmeister/kafkaドッカーの画像とApache Kafkaのコンシューマーを使用します。 トピックに「予期せぬ」メッセージを送信すると、私の喧嘩をするシナリオが発生します。 kafkaConsumer.poll(POLLING_TIMEOUT)は、RUNモードで無限ループになっているようです。私がDEBUGでも、それは私が一時停止して戻ったときに動作します。Apache kafka消費者アンケート予期しないメッセージの無限ループ

予想外のメッセージを送信するときにこの問題は発生しません(逆シリアル化の例外は発生しません)。

kafka: 
    image: wurstmeister/kafka 
    links: 
    - zookeeper 
    ports: 
    - "9092:9092" 
    environment: 
    KAFKA_ADVERTISED_HOST_NAME: localhost 
    KAFKA_ADVERTISED_PORT: 9092 
    KAFKA_CREATE_TOPICS: "ProductLocation:1:1,ProductInformation:1:1,InventoryAvailableToSell:1:1" 
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 
    volumes: 
    - /var/run/docker.sock:/var/run/docker.sock 

Javaのジェネリック消費者:

@Override 
public Collection<T> consume() { 
    String eventToBePublishedName = ERROR_WHILE_RESETTING_OFFSET; 
    boolean success = false; 

    try { 
     kafkaConsumer.resume(kafkaAssignments); 
     if (isPollingTypeFull) { 
      // dummy poll because its needed before resetting offset. 
      // https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition 
      kafkaConsumer.poll(POLLING_TIMEOUT); 
      resetOffset(); 
     } else if (!offsetGotResetFirstTime) { 
      resetOffset(); 
      offsetGotResetFirstTime = true; 
     } 

     eventToBePublishedName = ERROR_WHILE_POLLING; 

     ConsumerRecords<Object, T> records; 

     List<T> output = new ArrayList<>(); 

     do { 
      records = kafkaConsumer.poll(POLLING_TIMEOUT); 
      records.forEach(cr -> { 
       T val = cr.value(); 
       if (val != null) { 
        output.add(cr.value()); 
       } 
      }); 
     } while (records.count() > 0); 

     eventToBePublishedName = CONSUMING; 
     success = true; 
     kafkaConsumer.pause(kafkaAssignments); 
     return output; 
    } finally { 
     applicationEventPublisher.publishEvent(
       new OperationResultApplicationEvent(
         this, OperationType.ConsumingOfMessages, eventToBePublishedName, success)); 
    } 
} 

デシリアライズ:ここ

はカフカのための私のdocker-composeの構成である私の統合テストで

public T deserialize(String topic, byte[] data) { 
    try { 
     JsonNode jsonNode = mapper.readTree(data); 
     JavaType javaType = mapper.getTypeFactory().constructType(getValueClass()); 
     JsonNode value = jsonNode.get("value"); 
     return mapper.readValue(value.toString(), javaType); 
    } catch (IllegalArgumentException | IOException | SerializationException e) { 
     LOGGER.error("Can't deserialize data [" + Arrays.toString(data) 
       + "] from topic [" + topic + "]", e); 
     return null; 
    } 
} 

、私は、トピックを作成しますタイムスタンプの付いたトピック名に送信することで、すべてのテストで使用できます。これにより、新しいトピックが作成され、テストはステートレスになります。

これは私がカフカの消費者を設定する方法です:あなたはこのに直面した場合

Properties properties = new Properties(); 
    properties.put("bootstrap.servers", kafkaConfiguration.getServer()); 
    properties.put("group.id", kafkaConfiguration.getGroupId()); 
    properties.put("key.deserializer", kafkaConfiguration.getKeyDeserializer().getName()); 
    properties.put("value.deserializer", kafkaConfiguration.getValueDeserializer().getName()); 

答えて

0

、ちょうど使用を開始する前に使用してresume後にそれらを使用した後、消費者をclose、またはpause

1

"poison pill"メッセージをスキップするには、例外をキャッチし、コミットされたオフセットを+1します。

+0

あなたはどんな例外について話していますか?私は投票が無限ループになると言いました。それは、その話題の以前の消費者が閉鎖されていなかったためです。どちらもカフカの消費者はスレッドセーフではありません==>消費者を閉めるだけで解決しました。私はnullを返し、コードサンプルに示されているようにポーリングされたコンシューマーレコードの結果をフィルタリングします。オフセットは自動的にコミットされます。 –

+0

私は以前のコンシューマーアプリを修正する方法について話しています。私は予期しないメッセージを逆シリアル化できないため、以前の消費者が死亡したと言っていましたが。その場合は、SerDesの例外(またはその他の例外)をキャッチして、そのアプリケーションを修正して1で終了してオフセットを終了し、続行する必要があります。 –

+0

消費者は死ぬことはなく(したがって例外をスローしません)、投票中は吊り下げられたままです。これは、Apache kafkaの消費者スレッドの安全性に関連しています。新しいトピックを作成する前に、同じトピックとグループIDのカフカコンシューマーを特定のトピックとグループIDで閉じる必要があります。それが問題でした。 –

関連する問題