常见的交换机类型:
- Direct Exchange:直连交换机。消息会被路由到所有
路由键
与绑定队列
。
- Fanout Exchange:广播交换机。消息会被路由到所有
绑定队列
,忽略路由键。
- Topic Exchange:主题交换机。消息会被路由到所有
绑定队列
,支持通配符。
最简单的消息队列只需要生产者类
和消费者类
,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @SpringBootTest public class RabbitProducer {
@Test public void testSimpleQueue() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.20.80"); connectionFactory.setUsername("leke"); connectionFactory.setPassword("leke@@@"); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.convertAndSend("e.test", "q.test", "hello, spring amqp!");
} }
@Component public class RabbitConsumer { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "q.test", durable = "true"), exchange = @Exchange(value = "e.test"))) public void listenSimpleQueueMessage(String msg) { System.out.println("spring 消费者接收到消息:【" + msg + "】"); }
}
|
但从项目设计角度出发,我们需要这样使用消息队列,如下:
RabbitMqConstant
常量类:存放交换机名称和队列名称
1 2 3 4 5 6
| public class RabbitMqConstant {
public static final String TEST_EXCHANGE = "e.test"; public static final String TEST_QUEUE = "q.test";
}
|
RabbitConfig
配置类:注册交换机类和队列类,绑定交换机和队列(如果采用注解绑定,可以不写)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class RabbitConfig { @Bean public DirectExchange testExchange() { return new DirectExchange(RabbitMqConstant.TEST_EXCHANGE); }
@Bean public Queue testQueue() { return new Queue(RabbitMqConstant.TEST_QUEUE); }
@Bean public Binding testBinding() { return BindingBuilder.bind(testQueue()) .to(testExchange()) .with(RabbitMqConstant.TEST_QUEUE); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); @SuppressWarnings("unchecked") FatalExceptionStrategy strategy = LekeFatalExceptionStrategy.retryOn(DataAccessException.class, RemoteAccessException.class, Exception.class); factory.setErrorHandler(new ConditionalRejectingErrorHandler(strategy)); return factory; } } class FatalExceptionStrategy extends DefaultExceptionStrategy { @SuppressWarnings("unchecked") public static FatalExceptionStrategy retryOn(Class<? extends Throwable>... exClasses) { Set<Class<? extends Throwable>> set = new HashSet<>(); for (Class<? extends Throwable> clazz : exClasses) { set.add(clazz); } return new FatalExceptionStrategy(set); }
final Set<Class<? extends Throwable>> retryables;
public FatalExceptionStrategy(Set<Class<? extends Throwable>> exceptions) { this.retryables = exceptions == null ? Collections.emptySet() : Collections.unmodifiableSet(exceptions); }
@Override protected boolean isUserCauseFatal(Throwable e) { boolean needRetry = retryables.stream().anyMatch(clazz -> clazz.isInstance(e)); return !needRetry; } }
|
RabbitConsumer
消费者类
1 2 3 4 5 6 7 8 9 10 11
| @Component public class RabbitConsumer {
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = RabbitMqConstant.TEST_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitMqConstant.TEST_EXCHANGE))) public void listenSimpleQueueMessage(String msg) { System.out.println("spring 消费者接收到消息:【" + msg + "】"); }
}
|
RabbitProducer
生产者类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @SpringBootTest public class RabbitProducer {
@Test public void testSimpleQueue() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.20.80"); connectionFactory.setUsername("leke"); connectionFactory.setPassword("leke@@@"); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.convertAndSend(RabbitMqConstant.TEST_EXCHANGE, RabbitMqConstant.TEST_QUEUE, "hello, spring amqp!");
} }
|