文章参考
导读
【4万字保姆级教程】本文详细的从应用层面上讲解了RabbitMQ的使用以及整合Springboot;对于其概念进行讲解,提供了可以完成日常开发的接口与demo;
文章目录
- 导读
- 工作队列
- 1. work Queues
- 轮询分发消息
- 2. 消息应答
- 概念
- 自动应答
- 手动应答
- 消息自动重新入队
- 3. RabbitMQ持久化
- 设置队列持久化
- 设置消息持久化
- 不公平分发
- 预取值
- 4. 发布确认
- 单个确认发布
- 批量确认发布
- 异步确认发布
- 以上三种发布确认对比
- 交换机
- 概念
- 临时队列
- 绑定(bindings)
- 类型
- 无名交换机
- Fanout exchange(扇型交换机)
- 直接交换机
- 主题交换机
- 死信队列
- 死信队列的概念
- 死信队列的来源
- 死信队列的实验
- 消息TTL时间过期
- 队列达到最大长度,无法添加新的消息到Mq
- 消息被拒绝
- 延迟队列
- 概念
- 实现方式
- 1. 使用TTL + DLX创建延迟队列
- 存在问题
- 2. 使用插件
- 总结
- 整合SpringBoot
- 使用TTL + DLX创建延迟队列
- 使用插件
- 消息高级确认
- 配置文件
- 配置交换机
- 配置回调
- 配置生产者&消费者
- 实验结果
- 备份交换机
- 交换机等配置
- 生产者
- 消费者
- 幂等性
- 幂等性概念
- 消息重复消费
- 解决消息重复消费方案
- 消费端的幂等性
- 唯一ID+指纹码
- Redis原子性
- 优先级队列
- 惰性队列
- 概念
- 两种模式
- 内存开销
- 集群搭建
- 搭建步骤
- 镜像队列
- 使用镜像的原因
- 镜像队列的搭建
- 联邦交换机&队列
- Shovel
工作队列
1. work Queues
工作队列(又称任务队列)的主要目的是避免立即执行资源密集型任务,且等待执行完成。我们可以将任务放入队列中,后台运行的工作进程将任务取出并执行,当有多个工作线程时,这些线程将一起处理任务。
轮询分发消息
一个生产者发送到一个队列中,且由多个工作线程去处理。一个消息只能被处理一次,多个工作线程是竞争的关系。
一个生产者: 发送10条消息
public class Task {
public static final String Queue_Name="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, false, false, false, null);
// 发消息
String message = "hello world";
/*
参数1:交换机名称,默认""
参数2:路由的Key值是哪个 本次是队列名称
参数3:其他参数
参数4:消息体
*/
for (int i = 0; i < 10; i++) {
channel.basicPublish("", Queue_Name, null, (message+"_"+i).getBytes());
}
System.out.println("消息发送完毕");
}
}
启动3个消费者
public class Worker {
public static final String Queue_Name = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/*
参数1:消费哪个队列
参数2:消费成功后是否要自动应答 true自动应答,false手动应答
参数3:消息成功消费的回调
参数4:消息消费异常的回调
*/
channel.basicConsume(Queue_Name, true, (consumerTag, message)-> {
System.out.println(args[0]+"tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println(args[0]+"消息被消费中断:"+consumerTag);
});
}
}
执行结果(轮询非有序)
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_0
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_2
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_1
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_5
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_3
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_8
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_4
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_6
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_7
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_9
2. 消息应答
概念
若工作线程突发异常中断,那么我们可能将丢失正在处理的消息。因此mq引入了一种消息应答机制,保证消费者在处理消息后,告诉MQ已经处理,可以将消息删除。
自动应答
消息发送后立即被认为已经发送成功。
需要在高吞吐量和数据传输安全性方面做权衡,仅适用在消费者可以高效以某种速率处理消息的情况下使用;
该模式可能因为消费者channel关闭造成消息丢失以及未对发送消息数量进行限制导致消息发送过载等风险;
手动应答
推荐手动应答
Channel.basicAck(long tag, boolean multiple) // 肯定确认 mq确定消息成功处理,可以删除
Channel.basicNack(long tag, boolean multiple, boolean requeue) // 不确定 mq不确定消息是否处理
Channel.basicReject(long tag, boolean requeue) // 不处理消息,直接拒绝,直接丢弃,不能批量处理
tag:消息标识;message.getEnvelope().getDeliveryTag();
multiple的true和false,见下面的代码
true:批量应答Channel上未应答的消息,当channel上传tag送的消息是1,2,3,4,当tag=4被确认时,1-3也会被确认收到消息应答;
false:只会应答当前消息,其余消息不会被应答;
requeue:true,拒绝的消息重新入消费队列;
消息自动重新入队
如果消费者异常断开,导致消息未发送ACK确认,MQ将其消息重新排队,很快发送给另一个消费者,保证消息的不丢失;
// 手动确认代码
channel.basicConsume(Queue_Name, false, (consumerTag, message)-> {
System.out.println(args[0]+"tag:"+consumerTag+" message:"+new String(message.getBody()));
/*
应答信道消息
参数2:是否批量确认
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},(consumerTag)->{
System.out.println(args[0]+"消息被消费中断:"+consumerTag);
});
3. RabbitMQ持久化
默认情况下,RabbitMQ因异常导致未成功消费的消息丢弃,若需要持久化需要将队列和消息都标记为持久化;
设置队列持久化
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, true, false, false, null);
注意:若原先声明的队列不是持久化的,启动会报错,需要将原先队列删除后重新创建持久化队列。
设置消息持久化
// 第三个参数:MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());
将消息持久化并不能完全保证消息不丢失,因为消息当准备存储在磁盘的时候还未完全写入,消息还在缓存时的一个间隔点,此时并没有真正写入磁盘,持久性无法完全保证,需要参考后续的确认。
不公平分发
RabbitMQ默认情况是使用轮询消费消息,但如果多个消费者的处理速度不一致,就会导致慢的消费者影响到了快的消费者执行,因此需要有一种能者多劳的分发方式。
设置参数:默认是0,表示轮询分发;作用于手动确认的消费者;
设置prefetchCount = 3。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理3个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它;
int prefetchCount = 1;// 预取值,会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
channel.basicQos(prefetchCount);
代码:默认发送10条消息,用两个(一快一慢)消费者消费;
private static void createFasterConsumer() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 默认0是轮询
channel.basicQos(1);// 预取值为1,表示预取值1个消息
// 手动应答1s
channel.basicConsume(Queue_Name, false, (consumerTag, message)-> {
System.out.println("快的tag:"+consumerTag+" message:"+new String(message.getBody()));
ThreadUtils.sleep(1);
/*
应答信道消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},(consumerTag)->{
System.out.println("快的消息被消费中断:"+consumerTag);
});
}
private static void createSlowerConsumer() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 默认0是轮询
channel.basicQos(1);// 预取值为1,表示预取值1个消息
// 手动应答10s
channel.basicConsume(Queue_Name, false, (consumerTag, message)-> {
System.out.println("慢的tag:"+consumerTag+" message:"+new String(message.getBody()));
ThreadUtils.sleep(10);
/*
应答信道消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},(consumerTag)->{
System.out.println("慢的消息被消费中断:"+consumerTag);
});
}
执行结果:证明快的消费者消费数量多
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_0
慢的tag:amq.ctag-pvROpH3ChDTW8gl0qnbU1A message:hello world_1
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_2
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_3
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_4
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_5
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_6
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_7
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_8
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_9
预取值
由于消息本身发送是异步的,消息的接收处理也是异步的。因此消费者就有一个未确认的消息缓冲区。避免消息堆积在缓冲区中,无法消费。我们可以通过channel.basicQos(N)
来设置未确认的消息缓冲区的大小。该值定义通道上允许的未确认消息的最大数量。一旦数量达到了配置的数量,MQ将停止继续向该channel发送消息,除非收到了消息确认。通常,增加预取值将提高向消费者传递消息的速度,虽然自动应答传输消息的速率是最佳的,但是,该情况下已传递但尚未处理的消息数量也会增加,从而增加了消费则的RAM消耗(随机存储器)。也就是消费者内存的消耗。
举例,同样上面的场景代码,默认发送10条消息,用两个(一快一慢)消费者消费;快的basicQos(1),慢的basicQos(6),可以观察到,慢的缓冲区确实有6条,快的消费了4条;
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_0
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_1
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_7
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_8
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_9
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_2
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_3
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_4
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_5
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_6
4. 发布确认
生产者将信道设置成confirm
模式,在该模式下,信道上的所有消息都会有一个唯一的UID(从1递增)。当消息投递到所有匹配的队列时,broker 就会发送包含UID的确认消息给生产者,让生产者知道消息正确到达目的队列了;
channel.confirmSelect();
若消息和队列是持久化的,那么确认消息会在写入磁盘后发出,回传确认消息的delivery-tag
域包含UID;
broker 也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;
优势:
- confirm模式是异步的,一旦发布一条消息,生产者可以等待ack的同时发送下一条消息,当这条消息最终收到确认后,生产者可以通过回调方法来确认该消息;
- 若因为MQ的问题导致消息丢失,就会发送一条nack消息,生产者同样可以通过回调处理nack消息;
单个确认发布
是一种同步确认发布的方式,当一个消息发布后收到确认了才会继续发布后续消息。waitForConfirmsOrDie(long)
这个方法只有在消息被确认的时候才返回,如果在指定时间内没有返回则会抛异常。
缺点:发布速度慢,当一条消息没有确认发布,就会阻塞后面的消息发布;
实验:使用单个确认发布,发布1000条消息 最终耗时 3350ms;
public static void publishMessageBySync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, false, false, false, null);
// 发消息
String message = "hello world";
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
/*
参数1:交换机名称,默认""
参数2:路由的Key值是哪个 本次是队列名称
参数3:其他参数
参数4:消息体
*/
channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());
boolean b = channel.waitForConfirms();
if (b){
System.out.println("发送成功");
} else {
System.out.println("-----发送失败------");
}
}
long end = System.currentTimeMillis();
System.out.println("消息发送完毕 "+ (end - start));
}
批量确认发布
与单个确认发布相比,批量确认发布可以极大的提高吞吐量;
缺点:当发生故障导致确认发布出现问题时,无法确定是哪一条消息出现了故障;
该方案仍是同步的,依旧会出现阻塞发布消息;
实验:生产者发送1000个消息,每100次确认一回,总共耗时62ms
public static void publishMessageByBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, false, false, false, null);
// 发消息
String message = "hello world";
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
/*
参数1:交换机名称,默认""
参数2:路由的Key值是哪个 本次是队列名称
参数3:其他参数
参数4:消息体
*/
channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());
if (i % 100 == 0) {
boolean b = channel.waitForConfirms();
if (b){
System.out.println("发送成功");
} else {
System.out.println("-----发送失败------");
}
}
}
long end = System.currentTimeMillis();
System.out.println("消息发送完毕 "+ (end - start));
}
异步确认发布
利用回调函数来达到消息的可靠传递。通过中间件也是通过函数回调来保证是否投递成功;他将消息放入一个容器中,每个消息都有一个UID,每次异步确认发布,可以保证消息是否成功确认发布;
通常,我们需要通过使用并发容器ConcurrentSkipListMap
存储所有发送的消息,然后每次确认后将容器中的对应消息删除,剩下的消息就是未确认的消息;
实验:生产者发送1000条消息,异步监听确认发布,耗时36ms。通过日志发现,每次发送deliveryTag都从1开始,并且multiple同时存在true和false;
public static void publishMessageByAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, false, false, false, null);
// 发消息
String message = "hello world";
// 高并发下的hashMap
ConcurrentSkipListMap<Long, String> confirmMap = new ConcurrentSkipListMap<>();
// deliveryTag 消息编号,multiple是否批量确认
// 消息确认成功回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println(String.format("确认的消息 deliveryTag:%d multiple:%s", deliveryTag, multiple));
// 如果是批量删除需要使用headMap
if (multiple) {
ConcurrentNavigableMap<Long, String> headMap = confirmMap.headMap(deliveryTag);
headMap.clear();
} else {
confirmMap.remove(deliveryTag);
}
};
// 消息确认失败回调
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println(String.format("未确认的消息 deliveryTag:%d multiple:%s", deliveryTag, multiple));
};
// 添加异步监听器
channel.addConfirmListener(ackCallback, nackCallback);
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String msg = (message+"_"+i);
confirmMap.put(channel.getNextPublishSeqNo(), msg);
/*
参数1:交换机名称,默认""
参数2:路由的Key值是哪个 本次是队列名称
参数3:其他参数
参数4:消息体
*/
channel.basicPublish("", Queue_Name, null, msg.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("消息发送完毕 "+ (end - start));
}
以上三种发布确认对比
发布确认方案 | 对比 |
---|---|
单独发布确认消息 | 同步等待确认,简单,但吞吐量非常有限 |
批量发布确认消息 | 批量同步等待,简单,合理的吞吐量,出现问题无法确认是哪条消息异常 |
异步发布确认消息 | 最佳性能和资源使用,可以异步处理成功和失败的消息 |
交换机
消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取;
队列,交换机和绑定统称为AMQP实体(AMQP entities);
概念
交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。
生产者只能将消息发送到交换机上,交换机一方面接收生产者的消息,另一方面将其入队;交换机必须要知道如何处理收到的消息,消息的分发与丢弃都由交换机类型来确定
核心思想:生产者生产的消息从不会直接发送到队列上;生产者无需关注消息传递的队列;
临时队列
我们可以创建一个临时名称的队列,一旦断开消费者的连接,队列将被自动删除;
// 临时队列的创建
String tempQueueName = channel.queueDeclare().getQueue();
绑定(bindings)
绑定是交换机和队列之间的桥梁,是指定交换机与队列连接的方式;
如果要让交换机“testE”将消息路由给队列“test”,那么“test”就需要与“testE”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
类型
Direct exchange(直连交换机),Fanout exchange(扇型交换机),Topic exchange(主题交换机),Headers exchange(头交换机);
无名交换机
通过空字符串表示无名交换机;若交换机是非无名交换机,则消息能根据路由发送到队列都是由routingKey(bindingkey)绑定的key指定的;
Fanout exchange(扇型交换机)
他将接收到的所有消息广播到他的所有队列中,绑定的路由键不影响消息的发送与接收。 扇型用来交换机处理消息的广播路由
实验:使用扇形交换机,一个生产者的routeKey与两个消费者的routeKey均不一样,不影响消息的消费;
// 生产者
public static void producer() {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(Exchange_Name, "fanout");
String content = "扇形交换机发送_";
String routeKey = "123";
for (int i = 0; i <5; i++) {
String msg = content+i;
System.out.println("发送:"+msg);
channel.basicPublish(Exchange_Name, routeKey, null, msg.getBytes());
}
System.out.println("发送完成");
} catch (Exception ignored){}
}
// 消费者 两个消费者除了routeKey不一样外其余都一样
public static void consumer1() {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(Exchange_Name, "fanout");
String queueName = channel.queueDeclare().getQueue();
String routeKey = "789";
channel.queueBind(queueName, Exchange_Name, routeKey);
System.out.println("消费者1消费消息------>");
channel.basicConsume(queueName, true, (consumerTag, message)-> {
System.out.println("消费者1 tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("消费者1 消息被消费中断:"+consumerTag);
});
} catch (Exception ignore){};
}
直接交换机
是根据消息携带的路由键(routing key)将消息投递给对应队列的 。可支持单播和多播;
注意:消息的负载均衡是发生在消费者之间的,不是队列之间;
实验:消费者A创建队列1绑定key=789,消费者B创建队列2绑定key=123和456,生产者给123, 456, 789均发送消息,消费者A收到一条,消费者2收到两条;
// 生产者
public static void producer() {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
channel.basicPublish(Exchange_Name, "123", null, "发给routeKey=123".getBytes());
channel.basicPublish(Exchange_Name, "456", null, "发给routeKey=456".getBytes());
channel.basicPublish(Exchange_Name, "789", null, "发给routeKey=789".getBytes());
System.out.println("发送完成");
} catch (Exception ignored){}
}
// 消费者A
public static void consumerA() {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_name_1, false, false, true, null);
String routeKey = "789";
channel.queueBind(Queue_name_1, Exchange_Name, routeKey);
System.out.println("消费者1消费消息------>"+"queue="+Queue_name_1+" routeKey="+routeKey);
channel.basicConsume(Queue_name_1, true, (consumerTag, message)-> {
System.out.println("消费者1"+"queue="+Queue_name_1+" routeKey="+routeKey+" tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("消费者1 消息被消费中断:"+consumerTag);
});
} catch (Exception ignore){
};
}
// 消费者B
public static void consumer2() {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_name_2, false, false, true, null);
String routeKey = "456";
channel.queueBind(Queue_name_2, Exchange_Name, routeKey);
channel.queueBind(Queue_name_2, Exchange_Name, "123");
System.out.println("消费者2消费消息------>"+"queue="+Queue_name_2+" routeKey=123|"+routeKey);
channel.basicConsume(Queue_name_2, true, (consumerTag, message)-> {
System.out.println("消费者2"+"queue="+Queue_name_2+" routeKey=123|"+routeKey+" tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("消费者2 消息被消费中断:"+consumerTag);
});
} catch (Exception ignored){}
}
主题交换机
主题交换机是通过匹配路由键进行消息的转发,对于key匹配上的,转发相应队列,全都不匹配的直接丢弃;
规则:
- 所有的单词均是由.分隔;
- *可以匹配任意一个单词;
- #可以匹配0个或多个单词;
注意:
- 当一个队列的routeKey是#,那么这个队列将接收所有消息,类似于
FANOUT
; - 当一个队列的routeKey没有包含#和*,那么这个队列类似于
DIRECT
;
举例:
routeKey | 匹配 |
---|---|
*.hello.* | 匹配三个单词,且中间的必须是hello |
welcome.* | 匹配两个单词,第一个必须是welcome |
hello.world | 匹配两个单词,第一个是hello,第二个是world |
#.happy | 匹配至少一个单词,且最后一个单词必须是happy |
happy.*.# | 匹配至少两个单词,第一个必须是happy |
routeKey | 匹配 |
---|---|
ok.hello.hello | 匹配【1,2,3】,转发给队列A&队列B |
a.hello.hello | 匹配【1,3】,转发给队列A&队列B |
hello.world | 都不匹配 |
happy.hello.ok | 匹配【1】,转发给队列A |
ok.end.happy | 匹配【1】,转发给队列A |
ok.hello | 匹配【3】,转发给队列B |
hello | 匹配【3】,转发给队列B |
hello.ok.hello.end | 都不匹配 |
实验:模拟图上队列,验证表格结果
// 生产者
public static void producer() {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);
while(true){
Scanner scanner = new Scanner(System.in);
String next = scanner.next();
String msg = String.format("发送数据: routeKey=%s", next);
channel.basicPublish(Exchange_Name, next, null, msg.getBytes());
System.out.println("发送成功: "+msg);
}
} catch (Exception ignored){}
}
// 队列A 对应消费者1
public static void consumer1() {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_name_A, false, false, true, null);
String routeKey1 = "*.hello.*";
channel.queueBind(Queue_name_A, Exchange_Name, routeKey1);
String routeKey2 = "ok.*.*";
channel.queueBind(Queue_name_A, Exchange_Name, routeKey2);
System.out.println("消费者1消费消息------>"+"queue="+ Queue_name_A);
channel.basicConsume(Queue_name_A, true, (consumerTag, message)-> {
System.out.println("消费者1"+"queue="+ Queue_name_A +" routeKey="+message.getEnvelope().getRoutingKey()+" tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("消费者1 消息被消费中断:"+consumerTag);
});
} catch (Exception ignore){
};
}
// 队列B 对应消费者2
public static void consumer2() {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_name_B, false, false, true, null);
String routeKey = "#.hello";
channel.queueBind(Queue_name_B, Exchange_Name, routeKey);
System.out.println("消费者2消费消息------>"+"queue="+ Queue_name_B);
channel.basicConsume(Queue_name_B, true, (consumerTag, message)-> {
System.out.println("消费者2"+"queue="+ Queue_name_B +" routeKey="+message.getEnvelope().getRoutingKey()+" tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("消费者2 消息被消费中断:"+consumerTag);
});
} catch (Exception ignored){}
}
死信队列
死信队列的概念
死信就是无法被消费的消息;当生产者把消息投递到broker或者直接到队列中,消费者从队列中取出消息进行消费,但由于特殊情况,队列中的个别消息无法被消费,这样的消息如果没有后续的处理,就会变成死信,存放在死信队列中;
通常进入死信队列的消息要么是异常消息,要么是超时消息;
死信队列的来源
- 消息TTL时间过期;
- 队列达到最大长度,无法添加新的消息到mq中;
- 消息被拒绝(basic.reject或者nack)并且requeue=false;
死信队列的实验
消息TTL时间过期
场景模拟:声明两个直接交换机和队列,逻辑交换机绑定逻辑队列,死信交换机绑定死信队列,然后生产者发送消息到逻辑交换机,由于逻辑交换机接收超时,导致消息进入死信队列,被消费者2处理;
代码:
声明TTL时间过期
- 在Consumer中可以queueDeclare队列声明处增加参数map,使用key
x-message-ttl
设置;(不推荐)- 在生产者发送消息的时候,通过BasicProperties设置发送消息的TTL,推荐使用,不同消息可以设置不同的消息TTL;
static String logicExchange = "logicExchange";
static String deadExchange = "deadExchange";
static String logicQueue = "logicQueue";
static String deadQueue = "deadQueue";
static String logicRouteKey = "logicKey";
static String deadRouteKey = "deadKey";
// 消费者1 断点打在channel.basicConsume
public static void Consumer1() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(logicExchange, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(deadExchange, BuiltinExchangeType.DIRECT);
Map<String, Object> argMap = new HashMap<>();
// 声明死信交换机
argMap.put("x-dead-letter-exchange", deadExchange);
// 声明死信routeKey
argMap.put("x-dead-letter-routing-key", deadRouteKey);
// 声明过去时间 这里不建议使用,因为是固定的,推荐在发送时指定
// argMap.put("x-message-ttl", 1000*10); // ms
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(logicQueue, false, false, false, argMap);
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(logicQueue, logicExchange, logicRouteKey);
channel.queueBind(deadQueue, deadExchange, deadRouteKey);
System.out.println("消费者1 [正常] 消费消息------>"+"queue="+ logicQueue);
channel.basicConsume(logicQueue, true, (consumerTag, message)-> {
System.out.println("消费者1"+"queue="+ logicQueue +" routeKey="+message.getEnvelope().getRoutingKey()+" tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("消费者1 消息被消费中断:"+consumerTag);
});
}
// 生产者
public static void producer {
Channel channel = RabbitMqUtils.getChannel();
// 发消息
String message = "hello world";
// 声明消息的发送时间为5000ms=5s
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.expiration("5000")
.build();
for (int i = 0; i < 10; i++) {
channel.basicPublish(logicExchange, logicRouteKey, properties, (message+"_"+i).getBytes());
System.out.println("消息发送 "+i);
}
System.out.println("消息发送完毕");
}
// 消费者2 死信队列
public static void Consumer2() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueBind(deadQueue, deadExchange, deadRouteKey);
System.out.println("消费者2 [死信] 消费消息------>"+"queue="+ logicQueue);
channel.basicConsume(deadQueue, true, (consumerTag, message)-> {
System.out.println("消费者2"+"queue="+ logicQueue +" routeKey="+message.getEnvelope().getRoutingKey()+" tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("消费者2 消息被消费中断:"+consumerTag);
});
}
队列达到最大长度,无法添加新的消息到Mq
操作:在声明logic逻辑队列的参数中,增加key值为x-max-length
;标识队列最大消息长度;
实验:我们把生产者的超时限制去掉,然后启动生产者和死信消费者,生产者发送0-99共100条消息,死信消费者消费了0-93的消息,再打开消费者1.消费了94-99共6条消息;
// 在消费者1的代码中,参数map增加设置超时时间的key即可
// 声明队列最大长度6条消息
argMap.put("x-max-length", deadRouteKey);
消息被拒绝
操作:消费者1需要手动确认消息,把确认消息设置成reject;则该消息进入死信队列;
实验:我们把消费者1的消息包含3的都设置成手动拒绝,则死信队列消费者2则收到生产者发送包含3的消息;
// 修改消费者1手动接收消息,且将含有3的消息拒绝,看死信队列是否收到拒绝的消息;
channel.basicConsume(logicQueue, false, (consumerTag, message)-> {
System.out.println("消费者1"+"queue="+ logicQueue +" routeKey="+message.getEnvelope().getRoutingKey()+" tag:"+consumerTag+" message:"+new String(message.getBody()));
// 消息拒绝 参数1:DeliveryTag 参数2:true表示拒绝后把消息重新塞回队列
if (new String(message.getBody()).contains("3"))
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
else
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},(consumerTag)->{
System.out.println("消费者1 消息被消费中断:"+consumerTag);
});
延迟队列
概念
用来存放需要在指定时间被处理的元素的队列,其内部有序;死信队列的一种;
适用于轮询效率低下,短期数据量大的情况;例如可以用于10分钟未处理则自动关闭的订单上,定时任务等等;
实现方式
1. 使用TTL + DLX创建延迟队列
其内容见上述[消息TTL时间过期]进入死信队列;其TTL时间是设置在队列的;
存在问题
由于消息有序,因此若第一条消息延迟20s,第二条消息延迟2s,则当第一条消息执行完后,第二条消息才会得到执行;
2. 使用插件
参考博客, 其TTL时间是设置在交换机的;
安装步骤:
-
官网下载对应版本插件
rabbitmq_delayed_message_exchange-3.9.0.ez
-
解压后放入rabbitMq的plugins目录中
-
返回上层进入sbin目录的cmd,输入命令
# 启动 rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 关闭 rabbitmq-plugins disable rabbitmq_delayed_message_exchange
-
进入管理后台即可
实验:
// 生产者
public static void producer() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 发消息
String message = "hello world";
Map<String, Object> argMap = new HashMap<>();
// 必须声明该key值
argMap.put("x-delayed-type", "direct");
// 声明交换机 String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE, "x-delayed-message", false, false, argMap);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTE_KEY);
// 声明消息的发送时间为10000ms=10s
Map<String, Object> headers = new HashMap<>();
//延迟10s后发送
headers.put("x-delay", 10000);
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.headers(headers)
.build();
for (int i = 0; i < 5; i++) {
channel.basicPublish(EXCHANGE, ROUTE_KEY, properties, (message+"_"+i).getBytes());
System.out.println("消息发送 "+i);
}
System.out.println("消息发送完毕");
}
// 消费者
public static void Consumer() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("消费者 [延迟] 消费消息------>"+"queue="+ QUEUE);
channel.basicConsume(QUEUE, true, (consumerTag, message)-> {
System.out.println("消费者"+"queue="+ QUEUE +" routeKey="+message.getEnvelope().getRoutingKey()+" tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("消费者 消息被消费中断:"+consumerTag);
});
}
总结
延迟队列在需要延迟处理的场景下是非常有用的,使用RabbitMQ来实现延迟队列可以展现RabbitMQ的特性,包括消息可靠发送,可靠投递,死信队列来保障消息至少被消费一次以及未被正常处理的消息不会被丢弃;
通过RabbitMQ集群的特性,可以很好解决单点故障的问题,不会因为单个节点挂掉导致延时队列不可用或消息丢失;
延时队列的其他实现方式有Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮询,各有优缺点,需要根据实际情况选择;
整合SpringBoot
使用TTL + DLX创建延迟队列
实验:声明2个直接交换机X,Y;X是普通交换机,Y是死信交换机。声明4个队列QA,QB,QC,QD;QA的ttl=10s,QB的ttl=40s,QC不设置TTL,QD是死信交换机队列;
步骤与结论:
- 请求 send接口,我们发现不同的队列根据ttl时间,交由死信队列消费;
- 请求test接口,我们发现同一个队列,根据先发原则交由死信队列处理,不是按照ttl处理;
注意:队列C的消息ttl声明是在生产者调用时进行声明的;
-
引入pom
<!--引入rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>${springboot.version}</version> </dependency>
-
配置application.yml
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
-
配置队列,交换机,路由,以及绑定
import org.springframework.amqp.core.*; @Configuration public class TtlQueueConfig { // 普通交换机X public static final String X_EXCHANGE = "X_EXCHANGE"; // 死信交换机Y public static final String Y_EXCHANGE = "Y_EXCHANGE"; // 队列A public static final String QA_QUEUE = "QA_QUEUE"; // 队列B public static final String QB_QUEUE = "QB_QUEUE"; // 队列C public static final String QC_QUEUE = "QC_QUEUE"; // 死信队列D public static final String QD_QUEUE = "QD_QUEUE"; // 队列A路由 public static final String QA_ROUTE_KEY = "QA_ROUTE_KEY"; // 队列B路由 public static final String QB_ROUTE_KEY = "QB_ROUTE_KEY"; // 队列C路由 public static final String QC_ROUTE_KEY = "QC_ROUTE_KEY"; // 死信队列路由 public static final String QD_ROUTE_KEY = "QD_ROUTE_KEY"; // 声明X交换机 @Bean public DirectExchange getXExchange() { return new DirectExchange(X_EXCHANGE); } // 声明死信交换机Y @Bean public DirectExchange getYExchange() { return new DirectExchange(Y_EXCHANGE); } // 声明队列A @Bean public Queue getQueueA() { Map<String, Object> argsMap = new HashMap<>(); argsMap.put("x-dead-letter-exchange", Y_EXCHANGE); // 指定死信队列 argsMap.put("x-dead-letter-routing-key", QD_ROUTE_KEY); // 指定死信路由 argsMap.put("x-message-ttl", 10000);// 指定ttl=10s return QueueBuilder.durable(QA_QUEUE) .withArguments(argsMap) .build(); } // 声明队列B @Bean public Queue getQueueB() { Map<String, Object> argsMap = new HashMap<>(); argsMap.put("x-dead-letter-exchange", Y_EXCHANGE); // 指定死信队列 argsMap.put("x-dead-letter-routing-key", QD_ROUTE_KEY); // 指定死信路由 argsMap.put("x-message-ttl", 20000);// 指定ttl=20s return QueueBuilder.durable(QB_QUEUE) .withArguments(argsMap) .build(); } // 声明队列C @Bean public Queue getQueueC() { Map<String, Object> argsMap = new HashMap<>(); argsMap.put("x-dead-letter-exchange", Y_EXCHANGE); // 指定死信队列 argsMap.put("x-dead-letter-routing-key", QD_ROUTE_KEY); // 指定死信路由 return QueueBuilder.durable(QC_QUEUE) .withArguments(argsMap) .build(); } // 声明死信队列D @Bean public Queue getQueueD() { return QueueBuilder.durable(QD_QUEUE) .build(); } // 队列A绑定 @Bean public Binding queueABindingX() { return BindingBuilder.bind(getQueueA()) .to(getXExchange()) .with(QA_ROUTE_KEY); } // 队列B绑定 @Bean public Binding queueBBindingX() { return BindingBuilder.bind(getQueueB()) .to(getXExchange()) .with(QB_ROUTE_KEY); } // 队列c绑定 @Bean public Binding queueCBindingX() { return BindingBuilder.bind(getQueueC()) .to(getXExchange()) .with(QC_ROUTE_KEY); } // 死信队列D绑定 @Bean public Binding queueDBindingX() { return BindingBuilder.bind(getQueueD()) .to(getYExchange()) .with(QD_ROUTE_KEY); } }
-
配置生产者Controller
@RestController @RequestMapping("ttl") @Slf4j public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send/{message}/{ttl}") public Object sendMessage(@PathVariable(name = "message")String message, @PathVariable(name = "ttl")String ttl) { log.info("发送消息:{} 自定义TTL:{} 时间:{}", message, ttl, new Date().toString()); // 发送给队列A rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE, TtlQueueConfig.QA_ROUTE_KEY, "10s队列消息="+message); // 发送给队列B rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE, TtlQueueConfig.QB_ROUTE_KEY, "20s队列消息="+message); // 发送给队列C rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE, TtlQueueConfig.QC_ROUTE_KEY, "自定义ttl队列消息="+message, (msg) -> { msg.getMessageProperties().setExpiration(ttl); return msg; }); return "发送成功"; } // 测试对同一个队列发送两条不同ttl的消息,则按照先后发送顺序,不是按照ttl时间消费 @GetMapping("test") public Object testSendMessage() { String message = "今天天气不错 时间:"+new Date().toString(); log.info("测试接口 消息:{}", message); // 发送给队列C rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE, TtlQueueConfig.QC_ROUTE_KEY, "自定义ttl=5s队列消息="+message, (msg) -> { msg.getMessageProperties().setExpiration("5000"); return msg; }); rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE, TtlQueueConfig.QC_ROUTE_KEY, "自定义ttl=3队列消息="+message, (msg) -> { msg.getMessageProperties().setExpiration("3000"); return msg; }); return "测试发送成功"; } }
-
配置消费者Listener
@Component @Slf4j public class DeadLetterQueueConsumer { @RabbitListener(queues = TtlQueueConfig.QD_QUEUE) public void receiverD(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{} 死信队列接收消息:{}", new Date().toString(), msg); } }
使用插件
实验:声明1个延迟直接交换机,一个延迟队列
步骤与结论:
请求 send接口,会发送给延迟队列两条消息,第一条是传入的ttl,第二条是固定5s;实验发现,同一个队列不同ttl的消息,可以按照ttl进行处理,不在是按插入队列顺序处理;
注意:ttl声明是在生产者调用时进行声明的,并且调用的是setDelay(ms);
-
配置队列,交换机,路由
@Configuration public class DelayQueueConfig { // 延迟交换机 public static final String DELAYED_EXCHANGE = "DELAYED_EXCHANGE"; // 延迟队列 public static final String DELAY_QUEUE = "DELAY_QUEUE"; // 延迟路由 public static final String DELAY_ROUTE_KEY = "DELAY_ROUTE_KEY"; // 声明延迟交换机 @Bean public CustomExchange getDelayExchange() { Map<String, Object> argsMap = new HashMap<>(); argsMap.put("x-delayed-type", "direct"); // 交换机名,交换机类型,是否持久化,是否自动删除,其余参数 return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, argsMap); } // 声明队列 @Bean public Queue getDelayQueue() { return new Queue(DELAY_QUEUE); } // 绑定队列&交换机 @Bean public Binding delayQueueBindingExchange() { return BindingBuilder.bind(getDelayQueue()) .to(getDelayExchange()) .with(DELAY_ROUTE_KEY) .noargs(); } }
-
设置生产者Controller
@Slf4j @RestController @RequestMapping("delay") public class SendDelayMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("send/{message}/{ttl}") public Object sendMessage(@PathVariable(name = "message")String message, @PathVariable(name = "ttl")int ttl) { log.info("发送消息:{} ttl(ms):{} 时间:{}", message, ttl, new Date().toString()); rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE, DelayQueueConfig.DELAY_ROUTE_KEY, "延迟队列ttl队列消息="+message, (msg) -> { msg.getMessageProperties().setDelay(ttl); return msg; }); rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE, DelayQueueConfig.DELAY_ROUTE_KEY, "延迟队列ttl=5s队列消息="+message, (msg) -> { msg.getMessageProperties().setDelay(5000); return msg; }); return "发送成功"; } }
-
设置消费者监听器
@Component @Slf4j public class DelayQueueConsumer { @RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE) public void receiverD(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{} 延迟队列接收消息:{}", new Date().toString(), msg); } }
消息高级确认
由于RabbitMQ服务或集群的问题导致的生产者消息投递失败,导致消息丢失,需要手动恢复和处理,因此我们需要一种可靠的机制保证消息的可靠传递;
那么从图中可以得出,交换机接收消息失败,或路由异常导致的队列未接收到消息,都应该通知生产者消息发送失败;
本次实验的架构如图所示:
配置文件
publisher-confirm-type
NONE:禁止发布确认模式,是默认值
CORRELATED:发布消息成功到交换机会触发回调方法
SIMPLE:两种效果,效果1:与CORRELATED一样触发回调;效果2:在发布消息成功后使用rabbitTemplate调用waitForConfirms()或waitForConfirmsOrDie()等带broker节点返回发送结果,根据返回结果确定下一步逻辑,注意的是waitForConfirmsOrDie()返回若为false,则会关闭channel,则接下来无法发送消息到broker;(即关闭信道&单个同步确认)
publisher-returns
开启回退消息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
配置交换机
@Configuration
public class ConfirmConfig {
// 交换机名称
public static final String EXCHANGE = "confirm.exchange";
// 队列名称
public static final String QUEUE = "confirm.queue";
// 路由键
public static final String ROUTE_KEY = "confirmKey";
// 声明确认交换机
@Bean
public DirectExchange getConfirmExchange() {
return new DirectExchange(EXCHANGE);
}
// 声明队列
@Bean
public Queue getConfirmQueue() {
return new Queue(QUEUE);
}
// 绑定队列和交换机
@Bean
public Binding confirmQueueBindingExchange() {
return BindingBuilder
.bind(getConfirmQueue())
.to(getConfirmExchange())
.with(ROUTE_KEY);
}
}
配置回调
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
// 将回调接口注册进rabbitTemplate
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机确认回调方法
* 参数含义
* 1.当发消息交换机成功接收 回调后
* correlationData:保存回调消息的Id及相关信息
* isAck: 是否收到消息 true
* cause: 异常信息 null
* 2. 当交换机接收失败 回调
* correlationData:保存回调消息的Id及相关信息
* isAck: 是否收到消息 false
* cause: 异常信息 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean isAck, String cause) {
if (correlationData!=null) {
log.info("查看correlationData id:{} data:{}",correlationData, new String(correlationData.getReturnedMessage().getBody()));
}
if (isAck) {
log.info("交换机成功接收消息 correlationData:{} isAck:{}, cause:{}", correlationData, isAck, cause);
} else {
log.info("交换机【失败】接收消息 correlationData:{} isAck:{}, cause:{}", correlationData, isAck, cause);
}
}
/**
* 消息传递中不可达目的地(队列)将异常信息返回给生产者
* @param message 消息
* @param replyCode 返回code
* @param replyText 退回原因
* @param exchange 交换机
* @param routeKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routeKey) {
log.error("消息:{} 错误码:{} 错误原因:{} 交换机名:{} 路由:{}",
new String(message.getBody()), replyCode, replyText, exchange, routeKey);
}
}
配置生产者&消费者
@RestController
@RequestMapping("confirm")
@Slf4j
public class SendMessageWithConfirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 模拟发送给错误的交换机验证回调
@GetMapping("exchange")
public Object testConfirmMessageByExchange() {
String message = "发送一条消息";
log.info("发送消息给确认交换机 message:{}",message);
// 声明正确消息数据,这块是回调可以接收到的data,可以用该data确定消息;
CorrelationData correlationDataOk = new CorrelationData("1_ok");
correlationDataOk.setReturnedMessage(new Message(message.getBytes(), null));
// 声明失败消息数据,这块是回调可以接收到的data,可以用该data确定消息;
CorrelationData correlationDataError = new CorrelationData("2_error");
correlationDataError.setReturnedMessage(new Message(message.getBytes(), null));
// 发送给错误的交换机(exchangeNameError)
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE+"ddd", ConfirmConfig.ROUTE_KEY, message, correlationDataError);
// 发送给正确的交换机
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE, ConfirmConfig.ROUTE_KEY, message, correlationDataOk);
return "执行成功";
}
// 模拟发送给正确的交换机&错误的路由验证回调
@GetMapping("routeKey")
public Object testConfirmMessageByRouteKey() {
String message = "发送一条消息";
log.info("发送消息给确认交换机错误路由 message:{}",message);
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE, ConfirmConfig.ROUTE_KEY, message);
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE, ConfirmConfig.ROUTE_KEY+"ddd", message);
return "执行成功";
}
}
@Component
@Slf4j
public class ConfirmQueueConsumer {
@RabbitListener(queues = ConfirmConfig.QUEUE)
public void receiver(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("[收到消息] 当前时间:{} 确认队列接收消息:{}", new Date().toString(), msg);
}
}
实验结果
调用confirm/exchange
模拟发送给错误的交换机验证回调,连续发送两条,第一条错误,第二条正确
发送消息给确认交换机 message:发送一条消息
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404
[收到消息] 当前时间:Sun Aug 22 23:07:42 CST 2021 确认队列接收消息:发送一条消息
查看correlationData id:CorrelationData [id=1_ok] data:发送一条消息
查看correlationData id:CorrelationData [id=2_error] data:发送一条消息
交换机成功接收消息 correlationData:CorrelationData [id=1_ok] isAck:true, cause:null
交换机【失败】接收消息 correlationData:CorrelationData [id=2_error] isAck:false, cause:channel error;
调用confirm/route
模拟发送给正确的交换机&错误的路由验证回调,第一条正确,第二条错误
发送消息给确认交换机错误路由 message:发送一条消息
[收到消息] 当前时间:Sun Aug 22 23:11:20 CST 2021 确认队列接收消息:发送一条消息
消息:发送一条消息 错误码:312 错误原因:NO_ROUTE 交换机名:confirm.exchange 路由:confirmKeyddd
交换机成功接收消息 correlationData:null isAck:true, cause:null
交换机成功接收消息 correlationData:null isAck:true, cause:null
备份交换机
当消息发送到队列不可达时,交换机会将不可达消息转发到备份交换机中;
声明交换机时应指定参数:
"alternate-exchange": 备份交换机名称
实验:
架构图如图所示,实验一共发送3条消息给交换机,验证其结果
实验 | 结果 |
---|---|
发送给正确的交换机&路由 | 消费者正常消费 |
发送给正确的交换机&错误路由 | 传给备份交换机,不再由消息回退处理 |
发送给错误的交换机 | 消息确认失败捕获 |
交换机等配置
@Configuration
public class BackupQueueConfig {
// 正常交换机名
public static final String NORMAL_EXCHANGE = "NORMAL.EXCHANGE";
// 备份交换机名
public static final String EXCHANGE = "BACKUP.EXCHANGE";
// 正常队列
public static final String NORMAL_QUEUE = "NORMAL.QUEUE";
// 警告队列
public static final String WARN_QUEUE = "WARN.QUEUE";
// 错误队列
public static final String ERROR_QUEUE = "ERROR.QUEUE";
// 正常路由
public static final String NORMAL_ROUTE_KEY = "NORMAL.ROUTE.KEY";
// 声明普通交换机
@Bean
public DirectExchange getNormalExchange() {
return ExchangeBuilder.directExchange(NORMAL_EXCHANGE)
.withArgument("alternate-exchange", EXCHANGE)
.build();
}
// 声明普通队列
@Bean
public Queue getNormalQueue() {
return new Queue(NORMAL_QUEUE);
}
// 绑定交换机
@Bean
public Binding getBindingExchange() {
return BindingBuilder
.bind(getNormalQueue())
.to(getNormalExchange())
.with(NORMAL_ROUTE_KEY);
}
// 声明备份交换机
@Bean
public FanoutExchange getBackupExchange() {
return new FanoutExchange(EXCHANGE);
}
// 声明警告队列
@Bean
public Queue getWarnQueue() {
return new Queue(WARN_QUEUE);
}
// 声明错误队列
@Bean
public Queue getErrorQueue() {
return new Queue(ERROR_QUEUE);
}
// 绑定警告队列
@Bean
public Binding bindingWarnQueue() {
return BindingBuilder
.bind(getWarnQueue())
.to(getBackupExchange());
}
// 绑定错误队列
@Bean
public Binding bindingErrorQueue() {
return BindingBuilder
.bind(getErrorQueue())
.to(getBackupExchange());
}
}
生产者
@RestController
@RequestMapping("backup")
@Slf4j
public class SendMessageToBackupController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 测试发送消息给正确的交换机 错误的路由 会转到备份交换机
@GetMapping("test")
public Object sendMessage() {
String message = "测试发送消息给正确的交换机 错误的路由 会转到备份交换机";
// 发送给正确的交换机&路由=》消费者正常消费
rabbitTemplate.convertAndSend(BackupQueueConfig.NORMAL_EXCHANGE, BackupQueueConfig.NORMAL_ROUTE_KEY, message+"_发送给正确的交换机&路由=》正确消费者");
// 发送给正确的交换机&错误路由=>传给备份交换机
rabbitTemplate.convertAndSend(BackupQueueConfig.NORMAL_EXCHANGE, BackupQueueConfig.NORMAL_ROUTE_KEY+"error", message+"_发送给正确的交换机&错误路由=>传给备份交换机");
// 发送给错误的交换机=》消息确认捕获
rabbitTemplate.convertAndSend(BackupQueueConfig.NORMAL_EXCHANGE+"error", BackupQueueConfig.NORMAL_ROUTE_KEY+"error", message+"_发送给错误的交换机=》消息确认捕获");
log.info("消息发送完成 msg:{}", message);
return "发送成功";
}
}
消费者
@Component
@Slf4j
public class BackupQueueConsumer {
@RabbitListener(queues = BackupQueueConfig.NORMAL_QUEUE)
public void receiverRight(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("[消费者队列收到消息] 当前时间:{} 确认队列接收消息:{}", new Date().toString(), msg);
}
@RabbitListener(queues = BackupQueueConfig.WARN_QUEUE)
public void receiverWarn(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("[备份-警告队列收到消息] 当前时间:{} 确认队列接收消息:{}", new Date().toString(), msg);
}
@RabbitListener(queues = BackupQueueConfig.ERROR_QUEUE)
public void receiverError(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("[备份-错误队列收到消息] 当前时间:{} 确认队列接收消息:{}", new Date().toString(), msg);
}
}
幂等性
幂等性概念
同一操作发起的一次请求或多次请求的结果是一致的,不会因为多次点击而产生了副作用;
消息重复消费
消费者在消费MQ中的消息时,MQ已把消息发送给了消费者,消费者在给MQ返回消息确认时网络中断,因此MQ未收到该消息的确认会进而转发给其他消费者,或者在网络恢复后再次发给消费者,但实际上消费者已经成功消费了,从而造成了消费者的重复消费消息;
解决消息重复消费方案
对于MQ消费者的幂等性解决,通常会使用一个全局ID,或者UUID等唯一标识,每次消费消息时,利用唯一ID判断是否已消费;
消费端的幂等性
唯一ID+指纹码
指纹码:通过一些规则或者时间戳与特殊标识组成的唯一ID,基于业务生成的唯一键;
我们可以通过查询语句判断这个ID是否存在数据库中,通过指纹码判断是否重复;单个数据库在高并发下会有写入性能瓶颈,通过分库分表解决了性能瓶颈,但不推荐该方案实现幂等性;
Redis原子性
基于setnx命令,具有天然的幂等性;
redis> SETNX key value 返回1 表示key不存在写入成功 返回0 表示key已存在写入失败
优先级队列
优先级0-255,越大越先消费;队列、消息都需设置优先级;
惰性队列
概念
惰性队列会将消息保存在磁盘中,当消费者需要消费相应的消息时,才将其加载到内存中。他的设计目标是能够支持更长的队列,支持更多的消息存储。惰性队列适用于当消费者由于各种原因无法工作时(下线,宕机,关闭)导致消息大量积压的情况;
默认下,生产者发送的消息,队列会尽可能存储在内存中,方便消息发送给消费者;当RabbitMQ需要释放内存的时候,就会将内存的消息放入磁盘中,此操作会耗费较长时间,也会阻塞队列的操作,进而无法接收新的消息;
两种模式
队列具备两种模式:default和lazy;默认为default模式,lazy模式就是惰性队列模式。可以通过调用channel.queueDeclare()参数中设置队列模式;也可以通过Policy策略的方式设置,如果同时使用上述两种方式设置的话,Policy的优先级更高;如果需要修改已有队列的模式话,需要删除原先队列,重新再次定义;
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-queue-mode", "lazy");
channel.queueChannel("queueName", false, false, false, argsMap);
内存开销
消息保存在磁盘中;消费速度慢;消费者关闭时,可以先将消息放入惰性队列中;
集群搭建
搭建步骤
-
修改机子的主机名称
vim /etc/hostname
-
配置各个节点的hosts文件,让各个节点都能互相识别对方
vim /etc/hosts
22.22.22.22 node
22.22.22.23 node2
22.22.22.24 node3
-
以确保各个节点的cookie文件使用的是同一个值
在node1上执行远程操作命令
scp /var/lib/rabbitmq/.erlang/cookie root@node2:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
-
启动RabbitMQ服务,顺带启动Erlang虚拟机和RabbitMQ应用服务(在三台节点上分别执行以下命令)
rabbitmq-server -detached
-
在节点2执行
rabbitmqctl stop_app
(rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ服务)
rabbitmqctl reset rabbitmqctl join_cluster rabbit@node1 rabbitmqctl start_app(只启动应用服务)
-
在节点3执行
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@node2 rabbitmqctl start_app
-
集群状态
rabbitmqctl cluster_status
-
需要重新设置用户
# 创建账户 rabbitmqctl add_user admin 123 # 设置用户角色 rabbitmqctl set_user_tags admin administrator # 设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
-
解决集群节点(node2和node3机器分别执行)
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app rabbitmqctl cluster_status rabbitmqctl forget_cluster_node rabbit@node2(node1机器上执行)
镜像队列
使用镜像的原因
如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘并执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisherconfirm
机制能够确保客户端知道哪些消息已经存入磁盘;
镜像队列的搭建
参考文章
联邦交换机&队列
概念
搭建
Shovel
资料