当前位置:首页 » 《随便一记》 » 正文

【RabbitMQ】| 狮子带你(超详细)原生Java操作兔子队列

12 人参与  2023年05月03日 14:49  分类 : 《随便一记》  评论

点击全文阅读


目录

一. ? 前言二. ? 原生Java操作RabbitMQⅠ. 简单模式1. 添加依赖2. 编写生产者3. 编写消费者 Ⅱ. 工作队列模式1. 编写生产者2. 编写消费者3. 实现 Ⅲ. 发布订阅模式1. 编写生产者2. 编写消费者 Ⅳ. 路由模式1. 编写生产者2. 编写消费者 Ⅴ. 通配符模式1. 编写生产者2. 编写消费者 三. ? 总结

一. ? 前言

RabbitMQ 是一种快速、灵活、可靠的消息传递方式,可用于构建分布式应用程序、异步处理任务、实现消息队列等。下面是 Java 原生操作 RabbitMQ 的一些好处和用途:

简单易用:RabbitMQ 提供了丰富的 Java 客户端库,开发者可以轻松地使用 Java 代码进行消息发送和接收,无需学习复杂的消息传递协议和 API。

可扩展性强:RabbitMQ 支持集群和分布式部署,可以轻松地实现横向和纵向扩展,以适应不同规模和负载的应用需求。

可靠性高:RabbitMQ 提供了多种消息传递模式,包括持久化消息、确认机制、事务机制等,确保消息传递的可靠性和一致性。

异步处理能力:RabbitMQ 可以异步处理任务,提高应用程序的响应速度和吞吐量,实现任务削峰、应对高并发等需求。

可用于多种场景:RabbitMQ 可以用于构建分布式应用程序、实现消息队列、异步处理任务、实现实时数据同步等场景,具有广泛的应用场景和发展前景。

二. ? 原生Java操作RabbitMQ

Ⅰ. 简单模式

1. 添加依赖

    <!--rabbitmq依赖-->    <dependency>        <groupId>com.rabbitmq</groupId>        <artifactId>amqp-client</artifactId>        <version>5.14.0</version>    </dependency>

2. 编写生产者

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;//生产者public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {//        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.建立信道        Channel channel = connection.createChannel();//        4.创建队列,若队列已存在则使用该队列        /**         * 参数1:队列名         * 参数2:是否持久化,true表示MQ重启后队列还存在         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次访问她的消费者才能访问         * 参数4:是否自动删除,true表示不再使用队列时,自动删除         * 参数5:其他额外参数         */        channel.queueDeclare("simple_queue",false,false,false,null);//        5.发送消息        String message = "hello rabbitmq";        /**         * 参数1:交换机名,""表示默认交换机         * 参数2:路由键,简单模式就是队列名         * 参数3:其他额外参数         * 参数4:要传递的消息字节数组         */        channel.basicPublish("","simple_queue",null,message.getBytes());//        6.关闭信道和连接        channel.close();        connection.close();        System.out.println("=====发送成功====");    }}

image-20230411151940196

3. 编写消费者

因为消费者不知道生产者什么时候发送消息过来,所以消费者需要一直监听生产者

import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 消费者 */public class Consumer {    public static void main(String[] args) throws IOException, TimeoutException {//        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        4.监听队列        /**         * 参数1:监听的队列名         * 参数2:是否自动签收,如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息。         * 参数3:Consumer的实现类,重写该类方法表示接收到这个消息之后该如何消费消息         */        channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("接收消息,消息为:"+message);            }        });    }}

image-20230411152552799

Ⅱ. 工作队列模式

image-20230411154500986

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:

一个队列对应多个消费者。一条消息只会被一个消费者消费。消息队列默认采用轮询的方式将消息平均发送给消费者。

其实就是 简单模式plus版本。

1. 编写生产者

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {//        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        4.创建队列,持久化队列        channel.queueDeclare("work_queue",true,false,false,null);//        5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还保存到磁盘        for (int i = 0; i < 100; i++) {            channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,                    ("您好,这是今天的第"+i+"条消息").getBytes(StandardCharsets.UTF_8));        }//        6.关闭资源        channel.close();        connection.close();    }}

image-20230411155449300

2. 编写消费者

这里使用创建了三个消费者,来接收生产者的消息

import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer {    public static void main(String[] args) throws IOException, TimeoutException {        //        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        监听队列,处理消息        channel.basicConsume("work_queue",true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("消费者1消费消息,消息为:"+message);            }        });    }}

3. 实现

先把三个消费者运行起来,再运行生产者,得到的消息就会轮询均分

image-20230411160256568

Ⅲ. 发布订阅模式

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)

特点:

生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

1. 编写生产者

这里创建了三条队列,一条是发送短信,一条是站内信,一条是邮件队列、

import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import javax.swing.plaf.TreeUI;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;/** * 发布订阅者模式跟简单和工作模式不一样,不是使用默认的交换机,而是自己创建fanout交换机,生产者把消息发到交换机,由交换机转发到与之绑定的队列 */public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {        //        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.建立信道        Channel channel = connection.createChannel();//        4.创建交换机        /**         * 参数1:交换机名         * 参数2:交换机类型         * 参数3:交换机是否持久化         */        channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);//        5.创建队列        channel.queueDeclare("SEND_MAIL",true,false,false,null);        channel.queueDeclare("SEND_MESSAGE",true,false,false,null);        channel.queueDeclare("SEND_STATION",true,false,false,null);//        6.交换机绑定队列        /**         * 参数1:队列名         * 参数2:交换机名         * 参数3:路由关键字,发布订阅模式只需要写""即可         */        channel.queueBind("SEND_MAIL","exchange_fanout","");        channel.queueBind("SEND_MESSAGE","exchange_fanout","");        channel.queueBind("SEND_STATION","exchange_fanout","");//        7.发送消息        for (int i = 0; i < 10; i++) {            channel.basicPublish("exchange_fanout","",null,                    ("您好,尊敬的用户,秒杀商品活动开始啦:"+i).getBytes(StandardCharsets.UTF_8));        }//        8.关闭资源        channel.close();        connection.close();    }}

2. 编写消费者

import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;//短信消费者public class ConsumerMessage {    public static void main(String[] args) throws IOException, TimeoutException {        //        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        4.监听队列        /**         * 参数1:监听的队列名         * 参数2:是否自动签收,如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息。         * 参数3:Consumer的实现类,重写该类方法表示接收到这个消息之后该如何消费消息         */        channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("发送短信,消息为:"+message);            }        });    }}
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;//邮件消费者public class ConsumerMail {    public static void main(String[] args) throws IOException, TimeoutException {        //        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        4.监听队列        /**         * 参数1:监听的队列名         * 参数2:是否自动签收,如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息。         * 参数3:Consumer的实现类,重写该类方法表示接收到这个消息之后该如何消费消息         */        channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("发送邮件,消息为:"+message);            }        });    }}
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;//站内信消费者public class ConsumerStation {    public static void main(String[] args) throws IOException, TimeoutException {        //        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        4.监听队列        /**         * 参数1:监听的队列名         * 参数2:是否自动签收,如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息。         * 参数3:Consumer的实现类,重写该类方法表示接收到这个消息之后该如何消费消息         */        channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("发送站内信,消息为:"+message);            }        });    }}

发布订阅模式也允许多个消费者监听同一个队列(工作模式),例如 两个发送短信消费者监听同一个短信生产者,这样短信生产者的消息将会被轮询平分。

Ⅳ. 路由模式

image-20230411173014108

使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站

的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式

(Routing)完成这一需求。意思就是只发给与绑定相同路由关键字的队列

特点:

每个队列绑定路由关键字RoutingKey。

生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。

1. 编写生产者

import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;/** * 发布订阅者模式跟简单和工作模式不一样,不是使用默认的交换机,而是自己创建fanout交换机,生产者把消息发到交换机,由交换机转发到与之绑定的队列 */// 生产者public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {        // 1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");        // 2.创建连接        Connection connection = connectionFactory.newConnection();        // 3.建立信道        Channel channel = connection.createChannel();        // 4.创建交换机        channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);        // 5.创建队列        channel.queueDeclare("SEND_MAIL2",true,false,false,null);        channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);        channel.queueDeclare("SEND_STATION2",true,false,false,null);        // 6.交换机绑定队列        channel.queueBind("SEND_MAIL2","exchange_routing","import");        channel.queueBind("SEND_MESSAGE2","exchange_routing","import");        channel.queueBind("SEND_STATION2","exchange_routing","import");        channel.queueBind("SEND_STATION2","exchange_routing","normal");        // 7.发送消息        channel.basicPublish("exchange_routing","import",null,                "双十一大促活动".getBytes());        channel.basicPublish("exchange_routing","normal",null,                "小心促销活动".getBytes());        // 8.关闭资源        channel.close();        connection.close();    }}

2. 编写消费者

import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;//发送邮件消费者public class ConsumerMail {    public static void main(String[] args) throws IOException, TimeoutException {        //        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        4.监听队列        /**         * 参数1:监听的队列名         * 参数2:是否自动签收,如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息。         * 参数3:Consumer的实现类,重写该类方法表示接收到这个消息之后该如何消费消息         */        channel.basicConsume("SEND_MAIL2",true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("发送邮件,消息为:"+message);            }        });    }}
//发送信息消费者public class ConsumerMessage {    public static void main(String[] args) throws IOException, TimeoutException {        //        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        4.监听队列        /**         * 参数1:监听的队列名         * 参数2:是否自动签收,如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息。         * 参数3:Consumer的实现类,重写该类方法表示接收到这个消息之后该如何消费消息         */        channel.basicConsume("SEND_MESSAGE2",true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("发送短信,消息为:"+message);            }        });    }}
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;//站内信消费者public class ConsumerStation {    public static void main(String[] args) throws IOException, TimeoutException {        //        1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");//        2.创建连接        Connection connection = connectionFactory.newConnection();//        3.创建信道        Channel channel = connection.createChannel();//        4.监听队列        /**         * 参数1:监听的队列名         * 参数2:是否自动签收,如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息。         * 参数3:Consumer的实现类,重写该类方法表示接收到这个消息之后该如何消费消息         */        channel.basicConsume("SEND_STATION2",true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("发送站内信,消息为:"+message);            }        });    }}

Ⅴ. 通配符模式

image-20230411204903551

通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消

息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。

通配符规则:

1 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。

2 队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。

1. 编写生产者

image-20230411211252594

package com.itbz.mq.topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 发布订阅者模式跟简单和工作模式不一样,不是使用默认的交换机,而是自己创建fanout交换机,生产者把消息发到交换机,由交换机转发到与之绑定的队列 */// 生产者public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {        // 1.创建连接工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("120.79.50.65");        connectionFactory.setPort(5672);        connectionFactory.setUsername("lion");        connectionFactory.setPassword("lion");        connectionFactory.setVirtualHost("/");        // 2.创建连接        Connection connection = connectionFactory.newConnection();        // 3.建立信道        Channel channel = connection.createChannel();        // 4.创建交换机        channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);        // 5.创建队列        channel.queueDeclare("SEND_MAIL3",true,false,false,null);        channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);        channel.queueDeclare("SEND_STATION3",true,false,false,null);        // 6.交换机绑定队列        channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");        channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");        channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");        // 7.发送消息            // 三个队列都匹配上了        channel.basicPublish("exchange_topic","mail.message.station",null,                "双十一大促活动".getBytes());            // 只发给station        channel.basicPublish("exchange_topic","station",null,                "小心促销活动".getBytes());        // 8.关闭资源        channel.close();        connection.close();    }}

2. 编写消费者

跟前面差不多。。。

三. ? 总结

这篇万字长文总结了原生Java操作RabbitMQ的各种过程,希望对您有帮助哦!!!咱们下期见!?


点击全文阅读


本文链接:http://zhangshiyu.com/post/60471.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1