私は現在、redis pub/subシステムを設計するためにjavaを使用しており、問題が発生しています。私はあなたの詳細が表示されます:ここredisサブスクライバはredisパブリッシャでは使用できません
出版社:
public class RedisMessagePublisher implements MessagePublisher {
public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
this.redisTemplate = redisTemplate;
this.topic = topic;
}
private StringRedisTemplate redisTemplate;
private ChannelTopic topic;
@Override
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
パブリッシャが正しいか、正しく動作することができます。
その後のは、加入者クラスに移動してみましょう:私ははるかに簡単に使用できるように、加入者クラスで
public class RedisMessageSubscriber implements MessageListener {
//action inspect here
private Action2<Message, byte[]> action;
public void setAction(Action2<Message, byte[]> action) {
logger.info("action set");
this.action = action;
}
private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("===> redis subscribe message in <===");
if (action != null)
action.call(message, bytes);
else
logger.info("===> action is null <===");
}
}
は、私がアクションを注入するRxJavaを使用。
しかし、私は出版社からのメッセージを公開した後の質問は、ここにある、私のメッセージは、onMessageメソッドに転送することができることをC、ログの印刷は、私が期待したものではなかったことができます:私が期待したもの
===> redis subscribe message in <===
===> action is null <===
新しいメッセージを公開すると、加入者はそれを取得し、作成したActionを実行しました。
私は以下のパブリッシャーとサブスクライバをトリガするために使用されるサービス:
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
:
@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);
@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
HttpServletResponse response) {
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
redisMessagePublisher.publish(message);
return new ApiResponse("success","message sent");
}
}
コード以上のことから、私は、トピックをサブスクライブし、加入者に新しいアクションを設定し、Cすることができます
しかし、なぜ出版社を起動させた後でも、サブスクライバはメッセージを受け取ることができますが、NULLのアクションはまだ実行されませんでした。
誰でも手助けできますか?このメカニズムに問題はありますか?
==== EDIT =====以下
RedisMessageConfigコード:
@Configuration
public class RedisMessageConfig {
@Bean
ChannelTopic topic() {
return new ChannelTopic("useraddresspubsub:queue");
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Autowired
private RedisConnectionFactory JedisConnectionFactory;
@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(JedisConnectionFactory);
container.addMessageListener(messageListener(), topic());
return container;
}
}
==== ====解決
最後に、私はこれを得ました1つのmpのアイデアで解決され、わずかにmyredismessagescriberからredredessageconfigへのredismessageconfigにフローがあるので、myredismessageconfigに変更されたので、redismessageconfigでは、まずアクションtそれでは、redismessageconfigは新しいredismessagesubscriberを作成し、作成された新しいアクションを保持します。以下のコード:以下
@Component
public class MyRedisMessageConfig extends RedisMessageConfig {
private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);
public MyRedisMessageConfig() {
super.action = new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
String result = new String(message.getBody());
logger.info("received:" + result);
}
};
}
}
この問題の原因となったマルチスレッドを意味しますか? – CharlieShi
いいえ。複数のスレッドがあなたに見えるだけです。この問題は、共有可能な可変状態によって引き起こされます。 – mp911de
あなたはこのシナリオを打ち消すためにいくつかの実行可能なアイデアを共有していますか? – CharlieShi