**AMQP:**Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。
**Spring AMQP:**基于AMQP协议定义的一套API规范,提供了模板来发送和接受消息。有两部分,spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现。
基本使用流程 1.引入依赖 1 2 3 4 5 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
2.在publisher服务和consumer服务中编写application.yml文件,添加mq连接信息: 1 2 3 4 5 6 7 spring: rabbitmq: host: 127.0 .0 .1 port: 5672 username: ezio password: 123456 virtual-host: /
3.publisher发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Slf4j @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue () { String queueName = "simple.queue" ; String message = "hello, spring amqp!" ; rabbitTemplate.convertAndSend(queueName, message); System.out.println("发送消息完成!" ); } }
4.consumer消费消息 1 2 3 4 5 6 7 8 9 @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue (String msg) { System.out.println("消费者接收到simple.queue的消息:【" + msg + "】" ); } }
5.@RabbitListener 1 2 3 4 5 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), //绑定的队列 exchange = @Exchange(name = "test.direct"), //声明交换机名称 key = {"red", "blue"} //路由key,可配置通配符,red.# or red.* ))
消息预取 消费者会预先从队列中获取消息来等待处理,会忽略消费者的处理能力,可以通过设置预取上限(prefetch)来打到竞争模式的效果,能者多劳。
1 2 3 4 5 6 7 8 9 10 spring: rabbitmq: host: 127.0 .0 .1 port: 5672 username: ezio password: 123456 virtual-host: / listener: simple: prefetch: 1
消息转换器 Spring AMQP的对消息对象的处理是有org.springframework.amqp.support.converter.MessageConverter来处理的。默认实现是SimpleMessageConverter,基于JDK的 ObjectOutputStream 完成序列化。想要发送对象消息,需要自定义消息转换器。
1.引入依赖 1 2 3 4 <dependency > <groupId > com.fasterxml.jackson.core</groupId > <artifactId > jackson-databind</artifactId > </dependency >
2.在springboot启动类中,注入bean 1 2 3 4 5 6 7 8 9 10 11 @SpringBootApplication public class PublisherApplication { public static void main (String[] args) { SpringApplication.run(PublisherApplication.class); } @Bean public MessageConverter messageConverter () { return new Jackson2JsonMessageConverter (); } }