2

私はthisをspring integrationを使用してファイルを読み込むための例として使用していますが、それはうまく動作しますが、kafkaプロデューサにファイルを送信しようとすると動作しません。インターネットでこの問題を調べようとしましたが、ヘルプが見つかりませんでした。ここ は私のコードです:bean kafkaListenerContainerを起動できませんでした:java.lang.IllegalArgumentException

ファイル:MessageProcessingIntegrationFlow.java:

@Bean 
public IntegrationFlow writeToFile() { 
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL) 
     .transform(m -> new StringBuilder((String)m).toString().toUpperCase()) 
//    .handle(fileWritingMessageHandler) 
    .handle(loggingHandler()) 
    .handle(kafkaProducerMessageHandler()) 
    .get(); 
} 



//producing channel 
@Bean(name="kafkaChannel") 
public DirectChannel kafkaChannel() { 
    return new DirectChannel(); 
} 

@Bean 
public DirectChannel consumingChannel() { 
    return new DirectChannel(); 
} 


    @Bean 
@ServiceActivator(inputChannel = "kafkaChannel") 
public MessageHandler kafkaProducerMessageHandler() { 
    KafkaProducerMessageHandler<String, String> handler = 
      new KafkaProducerMessageHandler<>(kafkaTemplate()); 
    handler.setTopicExpression(new LiteralExpression(kafkaTopic)); 
    handler.setMessageKeyExpression(new LiteralExpression("kafka-integration")); 
    return handler; 
} 

@Bean 
public KafkaTemplate<String, String> kafkaTemplate() { 
    return new KafkaTemplate<>(producerFactory()); 
} 

@Bean 
public ProducerFactory<String, String> producerFactory() { 
    return new DefaultKafkaProducerFactory<>(producerConfigs()); 
} 

@Bean 
public Map<String, Object> producerConfigs() { 
    Map<String, Object> properties = new HashMap<>(); 
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    // introduce a delay on the send to allow more messages to accumulate 
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); 

    return properties; 
} 

//consumer configuration.... 
@Bean 
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() { 
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = 
     new KafkaMessageDrivenChannelAdapter<>(kafkaListenerContainer()); 
    kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel()); 
    return kafkaMessageDrivenChannelAdapter; 
} 

@SuppressWarnings("unchecked") 
@Bean 
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() { 
    ContainerProperties containerProps = new ContainerProperties(kafkaTopic); //set topic name 
    return (ConcurrentMessageListenerContainer<String, String>) new ConcurrentMessageListenerContainer<>(
     consumerFactory(), containerProps); 
} 

@Bean 
public ConsumerFactory<?, ?> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public Map<String, Object> consumerConfigs() { 
    Map<String, Object> properties = new HashMap<>(); 
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld"); 
    // automatically reset the offset to the earliest offset 
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

    return properties; 
} 

はここでスタックトレースです:

org.springframework.context.ApplicationContextException: Failed to start bean 'kafkaListenerContainer'; nested exception is java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided 
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at com.porterhead.Application.main(Application.java:25) [classes/:na] 
Caused by: java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided 
at org.springframework.util.Assert.isTrue(Assert.java:92) ~[spring-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:199) ~[spring-kafka-1.2.2.RELEASE.jar:na] 
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
... 12 common frames omitted 

私は私が間違っているのかわかりません。この件についてさらに詳しい情報が必要な場合はお知らせください。おかげさまで

答えて

2

KafkaProducerMessageHandlerは一方通行のコンポーネントであり、応答を生成しません。それはただカフカの話題に公開し、それ以上のことはしません。したがって、handle(loggingHandler())のようにしてからフローを続けることはできません。 KafkaProducerMessageHandlerは、フローの最後のエンドポイントでなければなりません。 AbstractReplyProducingMessageHandlerであるFileWritingMessageHandlerとは異なり、フローを続行します。

それでも、問題が適切に記述されていること、つまり何が期待されているのか、何が間違っているのかを検討してください。答えは、私がそれらのすべてのコンポーネントのコードを知っているので、私の最高の推測でした。

+0

私はファイルを読んで、それをkafkaプロデューサーに送ったが、私はカフカにデータを送ることができる他の方法はありますか? – mansoor67890

+1

私たちはそれを手に入れましたが、問題は何ですか。特に問題がない場合は、ここでは推測できません。また、あなたのためにコードを書く人はいません。それは私たちの責任外です。 –

+0

質問を編集しました。 – mansoor67890

関連する問題