当前位置:首页 » 《关注互联网》 » 正文

[C++][第三方库][RabbitMq]详细讲解

12 人参与  2024年10月15日 12:40  分类 : 《关注互联网》  评论

点击全文阅读


目录

1.介绍2.安装1.RabbitMq2.客户端库 3.AMQP-CPP 简单使用1.介绍2.使用 4.类与接口1.Channel2.ev 5.使用1.publish.cc2.consume.cc3.makefile


1.介绍

RabbitMQ消息队列组件,实现两个客户端主机之间消息传输的功能(发布&订阅)核心概念:交换机、队列、绑定、消息交换机类型广播交换:当交换机收到消息,则将消息发布到所有绑定的队列中直接交换:根据消息中的bkey与绑定的rkey对比,一致则放入队列主题交换:使用bkey与绑定的rkey进行规则匹配,成功则放入队列

2.安装

1.RabbitMq

安装sudo apt install rabbitmq-server简单使用
# 安装完成的时候默认有个用户guest,但是权限不够,要创建一个administrator用户,才可以做为远程登录和发表订阅消息#添加用户 sudo rabbitmqctl add_user root <PASSWORD>#设置用户tag sudo rabbitmqctl set_user_tags root administrator #设置用户权限 sudo rabbitmqctl set_permissions -p / root "." "." ".*" # RabbitMQ自带了web管理界面, 执行下面命令开启, 默认端口15672sudo rabbitmq-plugins enable rabbitmq_management 

2.客户端库

C语言库C++库
sudo apt install libev-dev #libev 网络库组件git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.gitcd AMQP-CPP/makemake install
如果安装时出现以下报错,则表示ssl版本出现问题
/usr/include/openssl/macros.h:147:4: error: #error "OPENSSL_API_COMPAT expresses an impossible API compatibility level"   147 | #  error "OPENSSL_API_COMPAT expresses an impossible API compatibility level"       |    ^~~~~ In file included from /usr/include/openssl/ssl.h:18,                  from linux_tcp/openssl.h:20,                  from linux_tcp/openssl.cpp:12: /usr/include/openssl/bio.h:687:1: error: expected constructor, destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’   687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))
解决方案:卸载当前的ssl库,重新进行修复安装
dpkg -l | grep sslsudo dpkg -P --force-all libevent-openssl-2.1-7sudo dpkg -P --force-all opensslsudo dpkg -P --force-all libssl-devsudo apt --fix-broken install

3.AMQP-CPP 简单使用

1.介绍

AMQP-CPP是用于与RabbitMq消息中间件通信的C++库 它能解析从RabbitMq服务发送来的数据,也可以生成发向RabbitMq的数据包AMQP-CPP库不会向RabbitMq建立网络连接,所有的网络IO由用户完成 AMQP-CPP提供了可选的网络层接口,它预定义了TCP模块,用户就不用自己实现网络IO, 也可以选择libevent、libev、libuv、asio等异步通信组件, 需要手动安装对应的组件 AMQP-CPP完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中注意:它需要C++17的支持

2.使用

AMQP-CPP的使用有两种模式: 使用默认的TCP模块进行网络通信使用扩展的libevent、libev、libuv、asio异步通信组件进行通信 此处以libev为例,不需要自己实现monitor函数,可以直接使用AMQP::LibEvHandler

4.类与接口

1.Channel

channel是一个虚拟连接,一个连接上可以建立多个通道 并且所有的RabbitMq指令都是通过channel传输 所以连接建立后的第一步,就是建立channel 因为所有操作是异步的,所以在channel上执行指令的返回值并不能作为操作执行结果 实际上它返回的是Deferred类,可以使用它安装处理函数
namespace AMQP {     /**      *  Generic callbacks that are used by many deferred objects      */     using SuccessCallback = std::function<void()>;     using ErrorCallback = std::function<void(const char *message)>;     using FinalizeCallback = std::function<void()>;        /**      *  Declaring and deleting a queue      */     using QueueCallback = std::function<void(const std::string &name,                                                    uint32_t messagecount,                                                    uint32_t consumercount)>;    using DeleteCallback = std::function<void(uint32_t deletedmessages)>;     using MessageCallback = std::function<void(const Message &message,                                                    uint64_t deliveryTag,                                                    bool redelivered)>;     // 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback     using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;    // 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调     using PublishAckCallback = std::function<void()>;     using PublishNackCallback = std::function<void()>;     using PublishLostCallback = std::function<void()>; // 信道类    class Channel     {         Channel(Connection *connection);         bool connected();        /**             *声明交换机             *如果提供了一个空名称,则服务器将分配一个名称。             *以下flags可用于交换机:             *             *-durable     持久化,重启后交换机依然有效             *-autodelete  删除所有连接的队列后,自动删除交换             *-passive     仅被动检查交换机是否存在             *-internal    创建内部交换             *             *@param name    交换机的名称             *@param-type    交换类型                 enum ExchangeType                 {                     fanout,  广播交换,绑定的队列都能拿到消息                     direct,  直接交换,只将消息交给routingkey一致的队列                     topic,   主题交换,将消息交给符合bindingkey规则的队列                     headers,                     consistent_hash,                     message_deduplication                 };             *@param flags    交换机标志             *@param arguments其他参数             *             *此函数返回一个延迟处理程序。可以安装回调             using onSuccess(), onError() and onFinalize() methods.         */         Deferred &declareExchange(const std::string_view &name,                                  ExchangeType type,                                  int flags,                                  const Table &arguments);        /**             *声明队列             *如果不提供名称,服务器将分配一个名称。             *flags可以是以下值的组合:             *             *-durable 持久队列在代理重新启动后仍然有效             *-autodelete 当所有连接的使用者都离开时,自动删除队列             *-passive 仅被动检查队列是否存在             *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除             *             *@param name        队列的名称             *@param flags       标志组合             *@param arguments  可选参数             *             *此函数返回一个延迟处理程序。可以安装回调             *使用onSuccess()、onError()和onFinalize()方法。             *             Deferred &onError(const char *message)             *             *可以安装的onSuccess()回调应该具有以下签名:             void myCallback(const std::string &name,                  uint32_t messageCount,                  uint32_t consumerCount);             例如:             channel.declareQueue("myqueue").onSuccess(                 [](const std::string &name,                      uint32_t messageCount,                     uint32_t consumerCount) {                        std::cout << "Queue '" << name << "' ";                        std::cout << "has been declared with ";                        std::cout << messageCount;                        std::cout << " messages and ";                        std::cout << consumerCount;                        std::cout << " consumers" << std::endl;          *  });         */         DeferredQueue &declareQueue(const std::string_view &name,                                    int flags,                                    const Table &arguments);        /**             *将队列绑定到交换机             *             *@param exchange     源交换机             *@param queue        目标队列             *@param routingkey   路由密钥             *@param arguments    其他绑定参数             *             *此函数返回一个延迟处理程序。可以安装回调             *使用onSuccess()、onError()和onFinalize()方法。         */         Deferred &bindQueue(const std::string_view &exchange,                            const std::string_view &queue,                            const std::string_view &routingkey,                            const Table &arguments);        /**             *将消息发布到exchange            *您必须提供交换机的名称和路由密钥。             然后,RabbitMQ将尝试将消息发送到一个或多个队列。             使用可选的flags参数,可以指定如果消息无法路由到队列时应该发生的情况。            默认情况下,不可更改的消息将被静默地丢弃。             *              *如果设置了'mandatory'或'immediate'标志,             则无法处理的消息将返回到应用程序。             在开始发布之前,请确保您已经调用了recall()-方法,             并设置了所有适当的处理程序来处理这些返回的消息。             *              *可以提供以下flags:             *              *-mandatory 如果设置,服务器将返回未发送到队列的消息             *-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。             *@param exchange要发布到的交易所             *@param routingkey路由密钥             *@param envelope要发送的完整信封             *@param message要发送的消息             *@param size消息的大小             *@param flags可选标志         */         bool publish(const std::string_view &exchange,                     const std::string_view &routingKey,                     const std::string &message,                     int flags = 0);        /**             *告诉RabbitMQ服务器已准备好使用消息-也就是 订阅队列消息             *             *调用此方法后,RabbitMQ开始向客户端应用程序传递消息。             consumer tag是一个字符串标识符,             如果您以后想通过channel::cancel()调用停止它,             可以使用它来标识使用者。             *如果您没有指定使用者tag,服务器将为您分配一个。             *             *支持以下flags:             *             *-nolocal    如果设置了,则不会同时消耗在此通道上发布的消息             *-noack      如果设置了,则不必对已消费的消息进行确认             *-exclusive  请求独占访问,只有此使用者可以访问队列             *             *@param queue    您要使用的队列             *@param tag      将与此消费操作关联的消费者标记             *@param flags    其他标记             *@param arguments其他参数             *             *此函数返回一个延迟处理程序。             可以使用onSuccess()、onError()和onFinalize()方法安装回调            可以安装的onSuccess()回调应该具有以下格式:                 void myCallback(const std::string_view&tag);             样例:             channel.consume("myqueue").onSuccess(                 [](const std::string_view& tag) {                     std::cout << "Started consuming under tag ";                     std::cout << tag << std::endl;             });         */         DeferredConsumer &consume(const std::string_view &queue,                                  const std::string_view &tag,                                  int flags,                                  const Table &arguments);        /**             *确认接收到的消息             *            *消费者客户端对收到的消息进行确认应答            *            *当在DeferredConsumer::onReceived()方法中接收到消息时,             必须确认该消息,             以便RabbitMQ将其从队列中删除(除非使用noack选项消费)            *             *支持以下标志:             *             *-多条确认多条消息:之前传递的所有未确认消息也会得到确认             *             *@param deliveryTag    消息的唯一delivery标签             *@param flags          可选标志             *@return bool         */         bool ack(uint64_t deliveryTag, int flags=0);    };    class DeferredConsumer     {         /*             注册一个回调函数,该函数在消费者启动时被调用            void onSuccess(const std::string &consumertag)         */         DeferredConsumer &onSuccess(const ConsumeCallback& callback);        /*             注册回调函数,用于接收到一个完整消息的时候被调用             void MessageCallback(const AMQP::Message &message,                  uint64_t deliveryTag, bool redelivered)         */         DeferredConsumer &onReceived(const MessageCallback& callback);        /* Alias for onReceived() */         DeferredConsumer &onMessage(const MessageCallback& callback);        /*             注册要在服务器取消消费者时调用的函数             void CancelCallback(const std::string &tag)         */         DeferredConsumer &onCancelled(const CancelCallback& callback);    };    class Message : public Envelope    {         const std::string &exchange();        const std::string &routingkey();    };        class Envelope : public MetaData    {         const char *body();  // 获取消息正文        uint64_t bodySize(); // 获取消息正文大小    };}

2.ev

typedef struct ev_async {     EV_WATCHER (ev_async);    EV_ATOMIC_T sent; /* private */ }ev_async;  //break type enum {     EVBREAK_CANCEL = 0, /* undo unloop */     EVBREAK_ONE    = 1, /* unloop once */     EVBREAK_ALL    = 2  /* unloop all loops */ }; // 实例化并获取IO事件监控接口句柄struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));# define EV_DEFAULT  ev_default_loop (0)  // 开始运行IO事件监控, 这是一个阻塞接口int  ev_run (struct ev_loop *loop);/* break out of the loop */// 结束IO监控// 如果在主线程进行ev_run(), 则可以直接调用,// 如果在其他线程中进行ev_run(), 需要通过异步通知进行void ev_break (struct ev_loop *loop, int32_t break_type) ;   void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents); // 初始化异步事件结构, 并设置回调函数void ev_async_init(ev_async *w, callback cb);// 启动事件监控循环中的异步任务处理void ev_async_start(struct ev_loop *loop, ev_async *w); // 发送当前异步事件到异步线程中执行void ev_async_send(struct ev_loop *loop, ev_async *w);

5.使用

1.publish.cc

#include <ev.h>#include <amqpcpp.h>#include <amqpcpp/libev.h>#include <openssl/ssl.h>#include <openssl/opensslv.h>int main(){    // 1.实例化底层网络通信框架的IO事件监控句柄    auto *loop = EV_DEFAULT;    // 2.实例化libEvHandler句柄 -> 将AMQP框架与事件监控关联起来    AMQP::LibEvHandler handler(loop);    // 3.实例化连接对象    AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");    AMQP::TcpConnection connection(&handler, address);    // 4.实例化信道对象    AMQP::TcpChannel channel(&connection);    // 5.声明交换机    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)        .onError([](const char *message)                { std::cout << "声明交换机失败: " << message << std::endl; })        .onSuccess([]()                { std::cout << "test-exchange 交换机创建成功" << std::endl; });        // 6.声明队列    channel.declareQueue("test-queue")        .onError([](const char *message)                 { std::cout << "声明队列失败: " << message << std::endl; })        .onSuccess([]()                 { std::cout << "test-queue 队列创建成功" << std::endl; });    // 7.针对交换机和队列进行绑定    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")        .onError([](const char *message)                 { std::cout << "test-exchange - test-queue 绑定失败: " \                 << message << std::endl; })        .onSuccess([]()                 { std::cout << "test-exchange - test-queue 绑定成功"                  << std::endl; });    // 8.向交换机发布消息    for (int i = 0; i < 5; ++i)    {        std::string msg = "Hello SnowK-" + std::to_string(i);        if(channel.publish("test-exchange", "test-queue-key", msg) == false)        {            std::cout << "publish 失败" << std::endl;        }    }    // 9.启动底层网络通信框架 -> 开启IO    ev_run(loop, 0);    return 0;}

2.consume.cc

#include <ev.h>#include <amqpcpp.h>#include <amqpcpp/libev.h>#include <openssl/ssl.h>#include <openssl/opensslv.h>void MessageCB(AMQP::TcpChannel* channel, const AMQP::Message& message,                uint64_t deliveryTag, bool redelivered){    std::string msg;    msg.assign(message.body(), message.bodySize());        // 不能这样使用, AMQP::Message后面没有存'\0'    // std::cout << message << std::endl         std::cout << msg << std::endl;    channel->ack(deliveryTag);}int main(){    // 1.实例化底层网络通信框架的IO事件监控句柄    auto *loop = EV_DEFAULT;    // 2.实例化libEvHandler句柄 -> 将AMQP框架与事件监控关联起来    AMQP::LibEvHandler handler(loop);    // 3.实例化连接对象    AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");    AMQP::TcpConnection connection(&handler, address);    // 4.实例化信道对象    AMQP::TcpChannel channel(&connection);    // 5.声明交换机    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)        .onError([](const char *message)                 { std::cout << "声明交换机失败: " << message << std::endl; })        .onSuccess([]()                 { std::cout << "test-exchange 交换机创建成功" << std::endl; });    // 6.声明队列    channel.declareQueue("test-queue")        .onError([](const char *message)                 { std::cout << "声明队列失败: " << message << std::endl; })        .onSuccess([]()                 { std::cout << "test-queue 队列创建成功" << std::endl; });    // 7.针对交换机和队列进行绑定    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")        .onError([](const char *message)                 { std::cout << "test-exchange - test-queue 绑定失败: " \ << message << std::endl; })        .onSuccess([]()                 { std::cout << "test-exchange - test-queue 绑定成功"; });    // 8.订阅消息对垒 -> 设置消息处理回调函数    auto callback = std::bind(MessageCB, &channel, std::placeholders::_1,                               std::placeholders::_2, std::placeholders::_3);    channel.consume("test-queue", "consume-tag")        .onReceived(callback)        .onError([](const char *message)        {             std::cout << "订阅 test-queue 队列消息失败: " << message << std::endl;            exit(0);         });    // 9.启动底层网络通信框架 -> 开启IO    ev_run(loop, 0);    return 0;}

3.makefile

all: publish consumepublish: publish.ccg++ -o $@ $^ -lamqpcpp -lev -std=c++17consume: consume.ccg++ -o $@ $^ -lamqpcpp -lev -std=c++17.PHONY:cleanclean:rm publish consume


点击全文阅读


本文链接:http://zhangshiyu.com/post/172090.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1