上文着重介绍RabbitMQ 七种工作模式介绍RabbitMQ 七种工作模式介绍_rabbitmq 工作模式-CSDN博客
本篇讲解如何在Spring环境下进⾏RabbitMQ的开发.(只演⽰部分常⽤的⼯作模式)
目录
引⼊依赖
一.工作队列模式
二.Publish/Subscribe(发布订阅模式)
三.Routing(路由模式)
四.Topics(通配符模式)
引⼊依赖
在pom.xml 可以导入依赖
<!--Spring MVC相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--RabbitMQ相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
或者创建项目时候勾选相应的选项
进入项目第一步先进行分类 三层架构
进行配置相关rabbitmq属性
一.工作队列模式
生产者:
@RestController@RequestMapping("/produce")public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/work") public String work() { rabbitTemplate.convertAndSend("", Constans.WORK_QUEUE,"hello spring amqp:work..."); return "发送成功"; }}
convertAndSend
是RabbitTemplate
类提供的一个重要方法,用于将消息发送到 RabbitMQ 的指定队列中 。
""
:在这里通常表示交换机(Exchange)的名称为空字符串。第二个参数Constans.WORK_QUEUE
第三个参数"hello spring amqp:work..."
:这就是要发送的实际消息内容 通过网页进行测试是否发送成功
从rabbitmq上可以看出已经发送成功到队列,等待消费者进行消费
消费者:
@Componentpublic class WorkListener { @RabbitListener(queues = Constants.WORK_QUEUE) public void queueListener1(Message message) { System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message); } @RabbitListener(queues = Constants.WORK_QUEUE) public void queueListener2(Message message) { System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message); }}
@RabbitListener 是Spring框架中⽤于监听RabbitMQ队列的注解,通过使⽤这个注解,可以定义⼀个⽅法,以便从RabbitMQ队列中接收消息.该注解⽀持多种参数类型,这些参数类型代表了从RabbitMQ接收到的消息和相关信息.
@Componentpublic class WorkListener { @RabbitListener(queues = Constants.WORK_QUEUE) public void queueListener1(String message) { System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message); } @RabbitListener(queues = Constants.WORK_QUEUE) public void queueListener2(String message) { System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message); }}
1. String :返回消息的内容
2. Message ( org.springframework.amqp.core.Message ):SpringAMQP的
Message 类,返回原始的消息体以及消息的属性,如消息ID,内容,队列信息等.
二.Publish/Subscribe(发布订阅模式)
声明队列,交换机,绑定队列和交换机
//发布订阅模式 public static final String FANOUT_QUEUE1 = "fanout.queue1"; public static final String FANOUT_QUEUE2 = "fanout.queue2"; public static final String FANOUT_EXCHANGE = "fanout.exchange";
//发布订阅模式 @Bean("fanoutQueue1") public Queue fanoutQueue1() { return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build(); } @Bean("fanoutQueue2") public Queue fanoutQueue2() { return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build(); } @Bean("fanoutExchange") public FanoutExchange fanoutExchange() { return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build(); } @Bean("fanoutQueueBinding1") public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){ return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean("fanoutQueueBinding2") public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){ return BindingBuilder.bind(queue).to(fanoutExchange); }
生产者:
@RequestMapping("/fanout") public String fanout(){ rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"", "hello spring amqp:fanout..."); return "发送成功"; }
消费者:
@Componentpublic class FanoutListener { @RabbitListener(queues = Constants.FANOUT_QUEUE1) public void queueListener1(String message) { System.out.println("listener 1 ["+Constants.FANOUT_QUEUE1+"] 接收到消息:" +message); } @RabbitListener(queues = Constants.FANOUT_QUEUE2) public void queueListener2(String message) { System.out.println("listener 2 ["+Constants.FANOUT_QUEUE2+"] 接收到消息:" +message); }}
三.Routing(路由模式)
声明队列,交换机,绑定队列和交换机
//路由模式 public static final String DIRECT_QUEUE1 = "direct.queue1"; public static final String DIRECT_QUEUE2 = "direct.queue2"; public static final String DIRECT_EXCHANGE = "direct.exchange";
//路由模式 @Bean("directQueue1") public Queue directQueue1() { return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build(); } @Bean("directQueue2") public Queue directQueue2() { return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build(); } @Bean("directExchange") public DirectExchange directExchange() { return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build(); } @Bean("directQueueBinding1") public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){ return BindingBuilder.bind(queue).to(directExchange).with("orange"); } @Bean("directQueueBinding2") public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){ return BindingBuilder.bind(queue).to(directExchange).with("black"); } @Bean("directQueueBinding3") public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){ return BindingBuilder.bind(queue).to(directExchange).with("orange"); }
生产者:
@RequestMapping("/direct/{rountingKey}") public String direct(@PathVariable("routingKey") String rountingKey){ rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,"", "hello spring amqp:direct, my routing key is "+rountingKey); return "发送成功"; }
@PathVariable :用于从请求的 URL 路径中提取参数值。
当有一个请求访问/direct/
后面跟着某个具体的值(例如/direct/key1
)时,@PathVariable("routingKey") String rountingKey
会将key1
提取出来,并赋值给rountingKey
变量。
消费者:
@Componentpublic class DirectListener { @RabbitListener(queues = Constants.DIRECT_QUEUE1) public void queueListener1(String message) { System.out.println("listener 1 ["+Constants.DIRECT_QUEUE1+"] 接收到消息:" +message); } @RabbitListener(queues = Constants.DIRECT_QUEUE2) public void queueListener2(String message) { System.out.println("listener 2 ["+Constants.DIRECT_QUEUE2+"] 接收到消息:" +message); }}
四.Topics(通配符模式)
. 代表一个单词
# 代码多个单词
//通配符模式 public static final String TOPIC_QUEUE1 = "topics_queue1"; public static final String TOPIC_QUEUE2 = "topics_queue2"; public static final String TOPIC_EXCHANGE = "topics_exchange";
//通配符模式 @Bean("topicQueue1") public Queue topicQueue1(){ return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build(); } @Bean("topicQueue2") public Queue topicQueue2(){ return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build(); } @Bean("topicExchange") public TopicExchange topicExchange(){ return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build(); } @Bean("topicQueueBinding1") public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*"); } @Bean("topicQueueBinding2") public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit"); } @Bean("topicQueueBinding3") public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#"); }
生产者
@RequestMapping("/topic/{routingKey}") public String topic(@PathVariable("routingKey") String routingKey){ rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is "+routingKey); return "发送成功"; }
消费者
@Componentpublic class TopicListener { @RabbitListener(queues = Constants.TOPIC_QUEUE1) public void queueListener1(String message) { System.out.println("listener 1 ["+Constants.TOPIC_QUEUE1+"] 接收到消息:" +message); } @RabbitListener(queues = Constants.TOPIC_QUEUE2) public void queueListener2(String message) { System.out.println("listener 2 ["+Constants.TOPIC_QUEUE2+"] 接收到消息:" +message); }}
结语: 写博客不仅仅是为了分享学习经历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,接受大家的批评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!