私はKafkaコンシューマ実装の統合テストを行っています。 私はwurstmeister/kafkaドッカーの画像とApache Kafkaのコンシューマーを使用します。 トピックに「予期せぬ」メッセージを送信すると、私の喧嘩をするシナリオが発生します。 kafkaConsumer.poll(POLLING_TIMEOUT)
は、RUNモードで無限ループになっているようです。私がDEBUGでも、それは私が一時停止して戻ったときに動作します。Apache kafka消費者アンケート予期しないメッセージの無限ループ
予想外のメッセージを送信するときにこの問題は発生しません(逆シリアル化の例外は発生しません)。
kafka:
image: wurstmeister/kafka
links:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "ProductLocation:1:1,ProductInformation:1:1,InventoryAvailableToSell:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Javaのジェネリック消費者:
@Override
public Collection<T> consume() {
String eventToBePublishedName = ERROR_WHILE_RESETTING_OFFSET;
boolean success = false;
try {
kafkaConsumer.resume(kafkaAssignments);
if (isPollingTypeFull) {
// dummy poll because its needed before resetting offset.
// https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition
kafkaConsumer.poll(POLLING_TIMEOUT);
resetOffset();
} else if (!offsetGotResetFirstTime) {
resetOffset();
offsetGotResetFirstTime = true;
}
eventToBePublishedName = ERROR_WHILE_POLLING;
ConsumerRecords<Object, T> records;
List<T> output = new ArrayList<>();
do {
records = kafkaConsumer.poll(POLLING_TIMEOUT);
records.forEach(cr -> {
T val = cr.value();
if (val != null) {
output.add(cr.value());
}
});
} while (records.count() > 0);
eventToBePublishedName = CONSUMING;
success = true;
kafkaConsumer.pause(kafkaAssignments);
return output;
} finally {
applicationEventPublisher.publishEvent(
new OperationResultApplicationEvent(
this, OperationType.ConsumingOfMessages, eventToBePublishedName, success));
}
}
デシリアライズ:ここ
はカフカのための私のdocker-compose
の構成である私の統合テストで
public T deserialize(String topic, byte[] data) {
try {
JsonNode jsonNode = mapper.readTree(data);
JavaType javaType = mapper.getTypeFactory().constructType(getValueClass());
JsonNode value = jsonNode.get("value");
return mapper.readValue(value.toString(), javaType);
} catch (IllegalArgumentException | IOException | SerializationException e) {
LOGGER.error("Can't deserialize data [" + Arrays.toString(data)
+ "] from topic [" + topic + "]", e);
return null;
}
}
、私は、トピックを作成しますタイムスタンプの付いたトピック名に送信することで、すべてのテストで使用できます。これにより、新しいトピックが作成され、テストはステートレスになります。
これは私がカフカの消費者を設定する方法です:あなたはこのに直面した場合
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaConfiguration.getServer());
properties.put("group.id", kafkaConfiguration.getGroupId());
properties.put("key.deserializer", kafkaConfiguration.getKeyDeserializer().getName());
properties.put("value.deserializer", kafkaConfiguration.getValueDeserializer().getName());
あなたはどんな例外について話していますか?私は投票が無限ループになると言いました。それは、その話題の以前の消費者が閉鎖されていなかったためです。どちらもカフカの消費者はスレッドセーフではありません==>消費者を閉めるだけで解決しました。私はnullを返し、コードサンプルに示されているようにポーリングされたコンシューマーレコードの結果をフィルタリングします。オフセットは自動的にコミットされます。 –
私は以前のコンシューマーアプリを修正する方法について話しています。私は予期しないメッセージを逆シリアル化できないため、以前の消費者が死亡したと言っていましたが。その場合は、SerDesの例外(またはその他の例外)をキャッチして、そのアプリケーションを修正して1で終了してオフセットを終了し、続行する必要があります。 –
消費者は死ぬことはなく(したがって例外をスローしません)、投票中は吊り下げられたままです。これは、Apache kafkaの消費者スレッドの安全性に関連しています。新しいトピックを作成する前に、同じトピックとグループIDのカフカコンシューマーを特定のトピックとグループIDで閉じる必要があります。それが問題でした。 –