当前位置:首页 » 《休闲阅读》 » 正文

使用 EMQX 开源版的 Webhook 机制处理消息并存储数据

1 人参与  2024年11月18日 12:41  分类 : 《休闲阅读》  评论

点击全文阅读


1、前言

EMQX 是一款强大的开源 MQTT 消息代理,它支持大量的连接和高吞吐量,适用于各种物联网应用。Webhook 是 EMQX 提供的扩展功能之一,用于将消息推送到外部的 HTTP 服务。在本文中,我们将介绍如何使用 EMQX 开源版的 Webhook 机制,并展示如何处理收到的 Webhook 请求,将其中的数据存储到数据库中。

2、Webhook 简介

Webhook 是一种常见的 HTTP 回调机制,用于将事件或数据推送到外部服务器。当 MQTT 客户端发布消息时,EMQX 可以通过 Webhook 将该消息发送给指定的 HTTP 端点,方便我们在接收到消息后进一步处理数据。

3、搭建 Webhook 服务

接下来,我们编写一个简单的 SpringBoot 2.7服务,用于接收 EMQX 的 Webhook 请求并将其中的数据存储到数据库中。

3.1、项目依赖

pom.xml 中添加以下依赖:

    <dependencies>       <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <optional>true</optional>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>com.fasterxml.jackson.core</groupId>            <artifactId>jackson-core</artifactId>            <version>2.13.5</version>        </dependency>        <!-- Jackson Databind -->        <dependency>            <groupId>com.fasterxml.jackson.core</groupId>            <artifactId>jackson-databind</artifactId>            <version>2.13.5}</version>        </dependency>        <!-- Jackson Annotations -->        <dependency>            <groupId>com.fasterxml.jackson.core</groupId>            <artifactId>jackson-annotations</artifactId>            <version>2.13.5</version>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>8.0.33</version>        </dependency>        <dependency>            <groupId>com.baomidou</groupId>            <artifactId>mybatis-plus-boot-starter</artifactId>            <version>3.5.6</version>        </dependency>    </dependencies>  

3.2、实现 Webhook 控制器

3.2.1、Controller
package ....这里填写你自己的import com.fasterxml.jackson.core.type.TypeReference;import com.fasterxml.jackson.databind.ObjectMapper;import lombok.AllArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.*;import java.io.IOException;import java.util.Map;@RestController@RequestMapping("/emqx/test")@AllArgsConstructor@Slf4jpublic class WebhookController {    private final EmqxTestService emqxTestService;    private final ObjectMapper objectMapper = new ObjectMapper();    @PostMapping("/webhook")    public String webhook(@RequestBody String payload) {        try {            // 解析主 JSON 字符串为 Map            Map<String, Object> payloadMap = objectMapper.readValue(payload, new TypeReference<Map<String, Object>>() {});            // 从主 Map 中提取 clientid 和 topic            String clientId = (String) payloadMap.get("clientid");            String topic = (String) payloadMap.get("topic");            log.info("Received clientid: {}", clientId);            log.info("Received topic: {}", topic);            // 提取 payload 字段的 JSON 字符串            String payloadString = (String) payloadMap.get("payload");            // 解析 payload 字段的 JSON 字符串为 Map            Map<String, Object> payloadDataMap = objectMapper.readValue(payloadString, new TypeReference<Map<String, Object>>() {});            // 从 payload 数据中提取 msg 参数            String msg = (String) payloadDataMap.get("msg");            log.info("Received msg: {}", msg);            // 创建 EmqxTest 实例并设置字段            EmqxTest testData = new EmqxTest();            testData.setData(payload);            testData.setClientId(clientId);            testData.setTopic(topic);            // 保存数据            emqxTestService.insertData(testData);        } catch (IOException e) {            log.error("解析JSON有效负载失败", e);            return "Error parsing payload";        }        return "Received";    }}
 3.2.2、Service
package ....这里填写你自己的import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;import com.ldb.tool.entity.EmqxTest;import com.ldb.tool.mapper.EmqxTestMapper;import lombok.AllArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import java.util.Date;import java.util.List;@Service@AllArgsConstructor@Slf4jpublic class EmqxTestService {    private final EmqxTestMapper emqxTestMapper;    public EmqxTest insertData(EmqxTest testData) {        EmqxTest emqxTest = new EmqxTest();        // 你可以手动设置其他需要的字段,如 clientId, topic, data 等        emqxTest.setClientId(testData.getClientId());        emqxTest.setTopic(testData.getTopic());        emqxTest.setData(testData.getData());        emqxTest.setCreateTime(new Date()); // 如果你有自动填充策略,可以忽略这行        this.emqxTestMapper.insert(emqxTest);        return emqxTest;    }}
3.2.3、Mapper
package ...这里填写你自己的;import com.baomidou.mybatisplus.core.mapper.BaseMapper;import com.ldb.tool.entity.EmqxTest;public interface EmqxTestMapper extends BaseMapper<EmqxTest> {}
 3.2.4、Entity
package ...这里填写你自己的;import com.baomidou.mybatisplus.annotation.IdType;import com.baomidou.mybatisplus.annotation.TableId;import com.baomidou.mybatisplus.annotation.TableName;import lombok.Data;import java.io.Serializable;import java.util.Date;@TableName("emqx_test")@Datapublic class EmqxTest implements Serializable {    private static final long serialVersionUID = 1L;    @TableId(value = "id", type = IdType.ASSIGN_ID)    private Long id;    private String clientId;    private String topic;    private String data;    private Date createTime;    private Date updateTime;}

4、配置 EMQX Webhook

4.1、运行

我们这里使用docker来运行EMQX。

通过 Docker 运行 EMQX | EMQX文档

4.1.1、获取镜像
docker pull emqx/emqx:5.8.0
4.1.2、启动容器
docker run -d --name emqx \  -p 1883:1883 -p 8083:8083 \  -p 8084:8084 -p 8883:8883 \  -p 18083:18083 \  -v $PWD/data:/opt/emqx/data \  -v $PWD/log:/opt/emqx/log \  emqx/emqx:5.8.0

4.2、配置EMQX-Webhook

4.2.1、创建Webhook

访问EMQX可视化后台(http://localhost:18083/)=>集成=>Webhook=>创建Webhook

在填写设置的时候,需要注意的是我们本地docke访问宿主机,在容器内部URL:127.0.0.1,指向的是容器本身,你可以获取宿主机IP作为URL,比如192.168.30.44。

我们通过URL选项的测试按钮可以点击测试是否正常请求。

5、测试 Webhook

在保证我们的Java-Webhook、EMQX服务运行的情况下,我们可以通过MQTTX(简介 - MQTTX 文档)软件去模拟一台直连的MQTT设备发起一个主题,因为我们在创建Webhook的时候触发者是消息发布。

5.1、MQTTX发送主题

首先我们需要新建一个MQTT连接,配置如下所以,未设置认证的话不需要用户名密码。

右下角,我们填写主题(Topic)的消息路由为listen/me,消息内容为{"msg": "send messgae","status":1},点击小飞机按钮发送。

5.2、查看Webhook触发情况

在EMQX后台,集成=>Webhook,查看送达情况。

在查看我们的Java服务的日志打印,也收到了。

查看sql表,也已经正常保存。

6、结论

Webhook 是一种强大的机制,MQTT 消息发布事件触发后,通过 HTTP 推送到 Spring Boot 服务,对接收到的数据进行解析和存储。这种机制能够让我们轻松地将消息从 EMQX 转发到其他服务,从而实现复杂的业务逻辑处理。


7、参考资料

EMQX官方文档

点击全文阅读


本文链接:http://zhangshiyu.com/post/188137.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

最新文章

  • 林晚夏江肆年(进错房,嫁给八零最牛特种兵在线阅读)全文免费阅读无弹窗大结局_(林晚夏江肆年)进错房,嫁给八零最牛特种兵在线阅读免费阅读全文最新章节列表_笔趣阁(林晚夏江肆年) -
  • 进错房,嫁给八零最牛特种兵完整版阅读小说(林晚夏江肆年)全文免费阅读无弹窗大结局_(进错房,嫁给八零最牛特种兵完整版阅读)林晚夏江肆年免费阅读全文最新章节列表_笔趣阁(进错房,嫁给八零最牛特种兵完整版阅读) -
  • 新雪藏旧事全文全文(商云萝周砚京)全文免费阅读无弹窗大结局_(新雪藏旧事全文小说免费阅读)最新章节列表_笔趣阁(新雪藏旧事全文) -
  • 在线免费小说重生七零替嫁:不嫁教授,嫁军官_乔珊珊乔婉月新热门小说_热门小说乔珊珊乔婉月
  • 免费小说《冯云漪厉晋泽》已完结(冯云漪厉晋泽)热门小说大结局全文阅读笔趣阁
  • 祁兰湘邵黎晖小说_祁兰湘邵黎晖完整版大结局小说免费阅读
  • 完整免费小说老公心疼青梅将她留宿新房,却将怀孕的我赶出家门(乔玥傅慎行姜禾)_老公心疼青梅将她留宿新房,却将怀孕的我赶出家门(乔玥傅慎行姜禾)完本小说免费阅读(乔玥傅慎行姜禾)
  • 新雪藏旧事:结局+番外+完结免费小说在线阅读_小说完结推荐新雪藏旧事:结局+番外+完结商云萝周砚京热门小说
  • 初逢青山梦长安(顾怀瑾沈书妤)阅读 -
  • 无删减版《绝对权力:从天崩开局走上官途巅峰》在线免费阅读
  • 《绝对权力:从天崩开局走上官途巅峰》小说在线试读,《绝对权力:从天崩开局走上官途巅峰》最新章节目录
  • 裴泽苏星辰何娇(满目星辰不及你小说)精彩章节在线阅读

    关于我们 | 我要投稿 | 免责申明

    Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1