通过Redis监听机制集成WebSocket实现主动数据推送(附代码)
需求
后台实时获取Redis里写入的数据,前端实时展示。我这里应用场景是终端向mqtt推送消息,mqtt将消息存入Redis。后端将消息实时推送前端页面进行展示。
前端获取数据的方式
主动获取:
这种方式有很多,axios,jq,dwr,等等。这种方式有一个特点,都是前端主动去请求后端接口,后端进行响应,平时情况很好使,但在需要实时获取数据的场景则不好用。ajax虽然也可以实时获取数据,但也是需要不断轮询向后端发送请求来获取数据,对后台来说是一种很大的开销,尤其是数据量大,并发访问时。
被动获取:
这篇文章讲的就是前端如何被动获取数据,主要通过WebSocket来实现,特别适合数据推送,极大的提升性能。跟普通的http、https协议有很大的不同,它采用ws、wss的协议;有兴趣可以自己了解,废话少说,上代码。
1、定义配置类
package com.alinket.aidms.device.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* 开启WebScoket支持
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
2、添加WebSocketServer类,暴漏端口给前端
package com.alinket.aidms.device.websocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class WebSocketServer {
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* 收到客户端消息后调用的方法
* 在此处已预留可以添加需要推送的硬件信息 目前来看项目不需要选择推送温湿度,浓度,压强等数据
*
* @param message 客户端发送过来的消息
* @functionname onMassage
* @author zhp
*/
@OnMessage
public void onMessage(String message, Session session) {
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
synchronized (session) {
this.session.getBasicRemote().sendText(message);
}
}
/**
* 群发自定义消息
*/
public void sendInfo(String message) {
log.info("推送消息到窗口");
for (WebSocketServer item : webSocketSet) {
try {
//这里可以设定全部推送
item.sendMessage(message);
} catch (IOException e) {
continue;
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("websocket发生错误" + error);
error.printStackTrace();
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this); //加入set中
log.info("websocket:有新连接开始监听!");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
log.info("连接关闭!");
}
}
前端通过ws://IP:端口号/websocket(这里我自己路径是websocket,自行修改)来进行连接
3、Redis配置
package com.alinket.aidms.device.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@EnableCaching
public class RedisConfigWebSocket extends CachingConfigurerSupport {
/**
* retemplate相关配置
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jacksonSeial.setObjectMapper(om);
// 值采用json序列化
template.setValueSerializer(jacksonSeial);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jacksonSeial);
template.afterPropertiesSet();
return template;
}
/**
* 对hash类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForHash();
}
/**
* 对redis字符串类型数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForValue();
}
/**
* 对链表类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForList();
}
/**
* 对无序集合类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForSet();
}
/**
* 对有序集合类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForZSet();
}
}
4、Redis配置文件(yml格式)
redis:
host: 127.0.0.1
port: 6379
database: 1
timeout: 60s
lettuce:
pool:
max-active: 8
max-wait: 10000
max-idle: 30
min-idle: 10
5、通过监听从Redis消息队列中读取消息
package com.alinket.aidms.device.listener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisMessageListener{
//不同的频道名
private static final String channel = "aidms*";
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean//相当于xml中的bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,MessageListenerAdapter listenerAdapterTimeOut) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//listenerAdapter的通道
container.addMessageListener(listenerAdapter, new PatternTopic(RedisMessageListener.channel));
return container;
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
/**redis 读取内容的template */
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
/**
* 监听redis超时
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter listenerAdapterTimeOut(MessageReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessageTimeOut");
}
}
6、消息监听器配置完成,写发送消息的类
package com.alinket.aidms.device.listener;
import com.alibaba.fastjson.JSONObject;
import com.alinket.aidms.device.websocket.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
@Slf4j
public class MessageReceiver {
WebSocketServer webSocketServer = new WebSocketServer();
/**Battery*/
public void receiveMessage(String message){
System.out.println("data:{}"+message);
try {
webSocketServer.sendInfo(message);
} catch (Exception e) {
log.info("消息发送失败!IO异常"+e);
e.printStackTrace();
}
//return message;
}
/**CH4*/
public void receiveMessage2(String message){
System.out.println("CH4:"+message);
//CH4.add(message);
List list = new ArrayList();
list.add("CH4");
list.add(message.replaceAll("\\\\",""));
String json = JSONObject.toJSONString(list);
log.info("CH4 Data is loading out");
try {
webSocketServer.sendInfo(json);
} catch (Exception e) {
log.info("消息发送失败!IO异常"+e);
e.printStackTrace();
}
//return message;
}
/**Humiture*/
public void receiveMessage3(String message){
System.out.println("Humiture:"+message);
//Humiture.add(message);
List list = new ArrayList();
list.add("Humiture");
list.add(message.replaceAll("\\\\",""));
String json = JSONObject.toJSONString(list);
log.info("Humiture Data is loading out");
try {
webSocketServer.sendInfo(json);
} catch (Exception e) {
log.info("消息发送失败!IO异常"+e);
e.printStackTrace();
}
//return message;
}
/**Location*/
public void receiveMessage4(String message){
System.out.println("Location:"+message);
//Location.add(message);
List list = new ArrayList();
list.add("Location");
list.add(message.replaceAll("\\\\",""));
String json = JSONObject.toJSONString(list);
log.info("Location Data is loading out");
try {
webSocketServer.sendInfo(json);
} catch (Exception e) {
log.info("消息发送失败!IO异常"+e);
e.printStackTrace();
}
//return message;
}
/**Pressure*/
public void receiveMessage5(String message){
System.out.println("Pressure:"+message);
//Pressure.add(message);
List list = new ArrayList();
list.add("Pressure");
list.add(message.replaceAll("\\\\",""));
String json = JSONObject.toJSONString(list);
log.info("Pressure Data is loading out");
try {
webSocketServer.sendInfo(json);
} catch (Exception e) {
log.info("消息发送失败!IO异常"+e);
e.printStackTrace();
}
//return message;
}
/**SignalIntensity*/
public void receiveMessage6(String message){
System.out.println("SignalIntensity:"+message);
//SignalIntensity.add(message);
List list = new ArrayList();
list.add("SignalIntensity");
list.add(message.replaceAll("\\\\",""));
String json = JSONObject.toJSONString(list);
log.info("SignalIntensity Data is loading out");
try {
webSocketServer.sendInfo(json);
} catch (Exception e) {
log.info("消息发送失败!IO异常"+e);
e.printStackTrace();
}
//return message;
}
/**Warn*/
public void receiveMessage7(String message){
System.out.println("Warn:"+message);
//Warn.add(message);
List list = new ArrayList();
list.add("Warn");
list.add(message.replaceAll("\\\\",""));
String json = JSONObject.toJSONString(list);
log.info("Warn Data is loading out");
try {
webSocketServer.sendInfo(json);
} catch (Exception e) {
log.info("消息发送失败!IO异常"+e);
e.printStackTrace();
}
//return message;
}
public void receiveMessageTimeOut(String message){
if(message.equals("accessToken")){
// 1小时超时
// 调用刷新accessToken接口
log.info("accessToken超时");
try {
// appAuthorization.getRefreshToken();
} catch (Exception e) {
log.info("accessToken超时:刷新accessToken失败!");
e.printStackTrace();
}
}
if( message.equals("refreshToken")){
// 1天超时
// 调用刷新鉴权接口获取 accessToken 和 refreshToken
log.info("refreshToken超时");
try {
// appAuthorization.getAppAuthorizationInfo();
} catch (Exception e) {
log.info("refreshToken超时:重新获取accessToken失败!");
e.printStackTrace();
}
}
}
}
至此所有配置完成,最后一步测试需要连接WebCocket,使用EMQ X主动发送数据
1、连接WebSocket,使用网上任意一款WebSocket在线测试工具即可
输入地址端口(上边提到过),点击WebSocket连接,这是通过输入框可以实现客户端向服务器发送
2、使用EMQ X工具通过Mqtt向Redis里推送数据,实现客户端实时接收
接受成功
至此,全部完成