|
@@ -3,6 +3,7 @@ package com.macro.mall.tiny.modules.business.controller;
|
|
|
import cn.hutool.json.JSONArray;
|
|
|
import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
+import com.google.common.collect.Maps;
|
|
|
import com.macro.mall.tiny.common.util.EprdmsHttpUtils;
|
|
|
import com.macro.mall.tiny.common.util.SpringUtils;
|
|
|
import com.macro.mall.tiny.config.CustomSpringConfigurator;
|
|
@@ -31,21 +32,14 @@ public class WebSocketServer {
|
|
|
*/
|
|
|
private final RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
|
|
|
|
|
|
+ private static final String topic = "collect:sensor";
|
|
|
+
|
|
|
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
|
|
|
private static final AtomicInteger onlineCount = new AtomicInteger(0);
|
|
|
|
|
|
- //concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
|
|
|
-// private static final CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
|
|
|
-
|
|
|
- //与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
- private Session session;
|
|
|
-
|
|
|
- // 监听的设备列表
|
|
|
- private HashSet<String> sensorMacSet;
|
|
|
-
|
|
|
- private static Map<String, Integer> sensorMacCountMap;
|
|
|
+ private static final Map<String, Integer> sensorMacCountMap = new HashMap<>();
|
|
|
|
|
|
- private RedisSubscribeListener subscribeListener;
|
|
|
+ private static final Map<Session, RedisSubscribeListener> sessionListenerMap = new HashMap<>();
|
|
|
|
|
|
@Value("${keystar.eprdms.url}")
|
|
|
private String eprdmsUrl;
|
|
@@ -62,30 +56,22 @@ public class WebSocketServer {
|
|
|
|
|
|
@OnOpen
|
|
|
public void onOpen(Session session) {
|
|
|
- this.session = session;
|
|
|
addOnlineCount(); //在线数加1
|
|
|
log.info("websocket有新连接加入!当前在线人数为" + getOnlineCount());
|
|
|
- subscribeListener = new RedisSubscribeListener();
|
|
|
- subscribeListener.setSession(session);
|
|
|
- sensorMacSet = new HashSet<>();
|
|
|
- subscribeListener.setSensorMacSet(sensorMacSet);
|
|
|
- //设置订阅topic
|
|
|
- redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("collect:sensor"));
|
|
|
- if (sensorMacCountMap == null) {
|
|
|
- initSensorMac();
|
|
|
- sensorMacCountMap = new HashMap<>();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 连接关闭调用的方法
|
|
|
*/
|
|
|
@OnClose
|
|
|
- public void onClose() {
|
|
|
- removeSensorMac();
|
|
|
+ public void onClose(Session session) {
|
|
|
subOnlineCount(); //在线数减1
|
|
|
- redisMessageListenerContainer.removeMessageListener(subscribeListener);
|
|
|
- subscribeListener = null;
|
|
|
+ RedisSubscribeListener subscribeListener = sessionListenerMap.get(session);
|
|
|
+ if (subscribeListener != null) {
|
|
|
+ redisMessageListenerContainer.removeMessageListener(subscribeListener);
|
|
|
+ Set<String> sensorMacSet = subscribeListener.getSensorMacSet();
|
|
|
+ removeSensorMac(sensorMacSet);
|
|
|
+ }
|
|
|
if (WebSocketServer.onlineCount.get() == 0) {
|
|
|
initSensorMac();
|
|
|
}
|
|
@@ -102,11 +88,18 @@ public class WebSocketServer {
|
|
|
@OnMessage
|
|
|
public void onMessage(String message, Session session) {
|
|
|
log.debug("websocket来自客户端的消息:" + message);
|
|
|
- if (!sensorMacSet.isEmpty()) {
|
|
|
- removeSensorMac();
|
|
|
+ RedisSubscribeListener subscribeListener = sessionListenerMap.get(session);
|
|
|
+ if (subscribeListener == null) {
|
|
|
+ subscribeListener = new RedisSubscribeListener();
|
|
|
+ subscribeListener.setSession(session);
|
|
|
+ //设置订阅topic
|
|
|
+ redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic(topic));
|
|
|
+ sessionListenerMap.put(session, subscribeListener);
|
|
|
}
|
|
|
+ Set<String> sensorMacSet = subscribeListener.getSensorMacSet();
|
|
|
+ removeSensorMac(sensorMacSet);
|
|
|
sensorMacSet.addAll(Arrays.asList(message.split(",")));
|
|
|
- addSensorMac();
|
|
|
+ addSensorMac(sensorMacSet);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -122,16 +115,6 @@ public class WebSocketServer {
|
|
|
}
|
|
|
|
|
|
|
|
|
- /**
|
|
|
- * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
|
|
|
- *
|
|
|
- * @param message
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public void sendMessage(String message) throws IOException {
|
|
|
- this.session.getBasicRemote().sendText(message);
|
|
|
- }
|
|
|
-
|
|
|
|
|
|
public int getOnlineCount() {
|
|
|
return onlineCount.get();
|
|
@@ -147,7 +130,7 @@ public class WebSocketServer {
|
|
|
WebSocketServer.onlineCount.getAndDecrement();
|
|
|
}
|
|
|
|
|
|
- private void addSensorMac() {
|
|
|
+ private void addSensorMac(Set<String> sensorMacSet) {
|
|
|
List<String> openMacList = new ArrayList<>();
|
|
|
sensorMacSet.forEach(sensorMac -> {
|
|
|
Integer count = sensorMacCountMap.getOrDefault(sensorMac, 0);
|
|
@@ -173,7 +156,10 @@ public class WebSocketServer {
|
|
|
log.info(logSb.toString());
|
|
|
}
|
|
|
|
|
|
- private void removeSensorMac() {
|
|
|
+ private void removeSensorMac(Set<String> sensorMacSet) {
|
|
|
+ if (sensorMacSet.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
List<String> closeMacList = new ArrayList<>();
|
|
|
sensorMacSet.forEach(sensorMac -> {
|
|
|
Integer count = sensorMacCountMap.get(sensorMac);
|
|
@@ -201,6 +187,8 @@ public class WebSocketServer {
|
|
|
if (!(listingMacList == null || listingMacList.isEmpty())) {
|
|
|
closeListen(listingMacList.toList(String.class));
|
|
|
}
|
|
|
+ sensorMacCountMap.clear();
|
|
|
+ redisMessageListenerContainer.setMessageListeners(Maps.newHashMap());
|
|
|
}
|
|
|
|
|
|
private void closeListen(List<String> macList) {
|