2016-09-18 15 views
1

私はspring-cloud-starter-stream-kafkaをテストしています。以下のエラーが発生しました。Springクラウドストリームアプリ、Dispatcherにチャンネル 'unknown.channel.name'のチャンネル登録者がありません

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) 
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) 
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) 
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43) 
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) 
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) 
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:607) 
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) 
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) 
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:604) 
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) 
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) 
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) 
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) 
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
... 32 common frames omitted 

マイStreamApplication.java

package de.codecentric; 

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.annotation.StreamListener; 
import org.springframework.context.annotation.Bean; 
import org.springframework.integration.annotation.InboundChannelAdapter; 
import org.springframework.integration.annotation.Poller; 
import org.springframework.integration.core.MessageSource; 
import org.springframework.integration.support.MessageBuilder; 

@SpringBootApplication 
@EnableBinding({PersonProcessor.class, LogProcessor.class}) 
public class StreamApplication { 

    public static void main(String[] args) { 
     SpringApplication.run(StreamApplication.class, args); 
    } 

    @StreamListener(LogProcessor.CHANNEL) 
    public void logEvent(EventLog el) { 
     System.out.println("Received event log: " + el.id); 
    } 

    @StreamListener(PersonProcessor.CHANNEL) 
    public void logPerson(Person p) { 
     System.out.println("Received person: " + p.name); 
    } 

    @Bean 
    @InboundChannelAdapter(value = PersonProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1")) 
    public MessageSource<Person> timerMessageSource() { 
     return() -> MessageBuilder.withPayload(new Person()).build(); 
    } 

    @Bean 
    @InboundChannelAdapter(value = LogProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1")) 
    public MessageSource<EventLog> logMessageSource() { 
     return() -> MessageBuilder.withPayload(new EventLog()).build(); 
    } 

    public static class EventLog { 
     private static int seq = 0; 
     public String id = seq++ + ""; 
    } 

    public static class Person { 
     private static int seq = 0; 
     public String name = "hi " + seq++; 
    } 
} 

LogProcessor.java

package de.codecentric; 

import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.cloud.stream.annotation.Output; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.SubscribableChannel; 


public interface LogProcessor { 
    String CHANNEL = "logs"; 

    @Output(LogProcessor.CHANNEL) 
    MessageChannel output(); 

    @Input(LogProcessor.CHANNEL) 
    SubscribableChannel input(); 
} 

PersonProcessor.java

package de.codecentric; 

import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.cloud.stream.annotation.Output; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.SubscribableChannel; 


public interface PersonProcessor { 
    String CHANNEL = "person"; 

    @Output(PersonProcessor.CHANNEL) 
    MessageChannel output(); 

    @Input(PersonProcessor.CHANNEL) 
    SubscribableChannel input(); 
} 

私はまた、出力を見ることができます:

受けた者:HI 0 受信したイベントログ:0 受信したイベントログ:4 受けた者:HI 4 受信したイベントログ:9 受けた者:HI 9

感謝。

+0

これが助けになるかどうかわかりませんが、私は非常に同じ例外がありました。問題はコード内の@RefreshScopeアノテーションであることが分かりました。投稿されたスニペットから削除して関連性がないと思った場合):https://github.com/spring-cloud/spring-cloud-stream/issues/461 – chrx

答えて

4

これが問題なのかどうかはよく分かりませんが、入力チャネルと出力チャネルには異なる宛先名が必要です。

CHANNELIN = personIn,CHANNELOUT = personOut

プロセッサは、自分自身にメッセージを送信するためのものではありません。メッセージを受信して​​処理し、結果を別の宛先に送信することを意図しています。

プロセッサはメッセージ自体を生成しません。これが送信元の目的です。

+1

Garyが上に述べたことに加えて、各チャンネルは別のBeanとBeanの名前は '@ Input/@ Output'アノテーションの引数になります(そしてメソッドの名前にデフォルト設定されています)ので、設定によって効果的に2つのBeanが互いに衝突します。別々のチャンネルには別々の名前を付ける必要があります(バインドに失敗するのではなく、エラー状態であることを確認するような設定が必要です)。彼らは同じ 'destination 'に設定することができます。 –

0

私は同じ問題を抱えていました。私はCamden SR7からDalston SR4にアップグレードしました。

関連する問題