|
@@ -0,0 +1,194 @@
|
|
|
+package com.macro.mall.tiny.modules.business.controller;
|
|
|
+
|
|
|
+import cn.hutool.json.JSONArray;
|
|
|
+import cn.hutool.json.JSONObject;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import com.macro.mall.tiny.common.util.EprdmsHttpUtils;
|
|
|
+import com.macro.mall.tiny.common.util.SpringUtils;
|
|
|
+import com.macro.mall.tiny.modules.business.dto.SensorListenParam;
|
|
|
+import com.macro.mall.tiny.modules.business.service.RedisSubscribeListener;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.data.redis.listener.ChannelTopic;
|
|
|
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.websocket.*;
|
|
|
+import javax.websocket.server.ServerEndpoint;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+
|
|
|
+@Component
|
|
|
+@ServerEndpoint("/sensor/websocket")
|
|
|
+@Slf4j
|
|
|
+public class WebSocketServer {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
|
|
|
+ */
|
|
|
+ private final RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
|
|
|
+
|
|
|
+ //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
|
|
|
+ private static final AtomicInteger onlineCount = new AtomicInteger(0);
|
|
|
+
|
|
|
+ //concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
|
|
|
+// private static final CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
|
|
|
+
|
|
|
+ //与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
+ private Session session;
|
|
|
+
|
|
|
+ // 监听的设备列表
|
|
|
+ private HashSet<String> sensorMacSet;
|
|
|
+
|
|
|
+ private static Map<String, Integer> sensorMacCountMap;
|
|
|
+
|
|
|
+ private RedisSubscribeListener subscribeListener;
|
|
|
+
|
|
|
+ @Value("${keystar.eprdms.url:localhost:18060}")
|
|
|
+ private String eprdmsUrl;
|
|
|
+ private static final String OPEN_LISTEN_URL = "/eprdms/meteorology/sensor/start/listen";
|
|
|
+ private static final String CLOSE_LISTEN_URL = "/eprdms/meteorology/sensor/start/listen";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接建立成功调用的方法
|
|
|
+ *
|
|
|
+ * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
+ */
|
|
|
+
|
|
|
+ @OnOpen
|
|
|
+ public void onOpen(Session session) {
|
|
|
+ this.session = session;
|
|
|
+ addOnlineCount(); //在线数加1
|
|
|
+ log.info("websocket有新连接加入!当前在线人数为" + getOnlineCount());
|
|
|
+ subscribeListener = new RedisSubscribeListener();
|
|
|
+ subscribeListener.setSession(session);
|
|
|
+ sensorMacSet = new HashSet<>();
|
|
|
+ subscribeListener.setSensorMacSet(sensorMacSet);
|
|
|
+ //设置订阅topic
|
|
|
+ redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("COLLECT_DATA"));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接关闭调用的方法
|
|
|
+ */
|
|
|
+ @OnClose
|
|
|
+ public void onClose() {
|
|
|
+ removeSensorMac();
|
|
|
+ subOnlineCount(); //在线数减1
|
|
|
+ redisMessageListenerContainer.removeMessageListener(subscribeListener);
|
|
|
+ log.info("websocket有一连接关闭!当前在线人数为" + getOnlineCount());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 收到客户端消息后调用的方法
|
|
|
+ *
|
|
|
+ * @param message 客户端发送过来的消息
|
|
|
+ * @param session 可选的参数
|
|
|
+ */
|
|
|
+ @OnMessage
|
|
|
+ public void onMessage(String message, Session session) {
|
|
|
+ log.debug("websocket来自客户端的消息:" + message);
|
|
|
+ if (!sensorMacSet.isEmpty()) {
|
|
|
+ removeSensorMac();
|
|
|
+ }
|
|
|
+ sensorMacSet.addAll(Arrays.asList(message.split(",")));
|
|
|
+ addSensorMac();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发生错误时调用
|
|
|
+ *
|
|
|
+ * @param session
|
|
|
+ * @param error
|
|
|
+ */
|
|
|
+ @OnError
|
|
|
+ public void onError(Session session, Throwable error) {
|
|
|
+ log.error("websocket发生错误:", error);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
|
|
|
+ *
|
|
|
+ * @param message
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void sendMessage(String message) throws IOException {
|
|
|
+ this.session.getBasicRemote().sendText(message);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public int getOnlineCount() {
|
|
|
+ return onlineCount.get();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void addOnlineCount() {
|
|
|
+ WebSocketServer.onlineCount.getAndIncrement();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void subOnlineCount() {
|
|
|
+ WebSocketServer.onlineCount.getAndDecrement();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addSensorMac() {
|
|
|
+ List<String> openMacList = new ArrayList<>();
|
|
|
+ sensorMacSet.forEach(sensorMac -> {
|
|
|
+ Integer count = sensorMacCountMap.getOrDefault(sensorMac, 0);
|
|
|
+ count++;
|
|
|
+ if (count == 1) {
|
|
|
+ openMacList.add(sensorMac);
|
|
|
+ }
|
|
|
+ sensorMacCountMap.put(sensorMac, count);
|
|
|
+ });
|
|
|
+ if (openMacList.isEmpty()) return;
|
|
|
+ SensorListenParam sensorListenParam = new SensorListenParam(openMacList);
|
|
|
+ JSONObject paramJsonObject = JSONUtil.parseObj(sensorListenParam);
|
|
|
+ JSONArray resultArray = EprdmsHttpUtils.PostForEprdms(eprdmsUrl + OPEN_LISTEN_URL, paramJsonObject, JSONArray.class);
|
|
|
+ if (resultArray == null) return;
|
|
|
+ StringBuilder logSb = new StringBuilder("开启监听:").append("\r\n");
|
|
|
+ resultArray.jsonIter().forEach(jsonObject -> {
|
|
|
+ logSb.append("deviceMac:")
|
|
|
+ .append(jsonObject.getStr("deviceMac"))
|
|
|
+ .append(",result:")
|
|
|
+ .append(jsonObject.getStr("result"))
|
|
|
+ .append("\r\n");
|
|
|
+ });
|
|
|
+ log.info(logSb.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void removeSensorMac() {
|
|
|
+ List<String> closeMacList = new ArrayList<>();
|
|
|
+ sensorMacSet.forEach(sensorMac -> {
|
|
|
+ Integer count = sensorMacCountMap.get(sensorMac);
|
|
|
+ if (count == null) return;
|
|
|
+ count--;
|
|
|
+ if (count < 1) {
|
|
|
+ sensorMacCountMap.remove(sensorMac);
|
|
|
+ closeMacList.add(sensorMac);
|
|
|
+ } else {
|
|
|
+ sensorMacCountMap.put(sensorMac, count);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ if (closeMacList.isEmpty()) return;
|
|
|
+ SensorListenParam sensorListenParam = new SensorListenParam(closeMacList);
|
|
|
+ JSONObject paramJsonObject = JSONUtil.parseObj(sensorListenParam);
|
|
|
+ JSONArray resultArray = EprdmsHttpUtils.PostForEprdms(eprdmsUrl + CLOSE_LISTEN_URL, paramJsonObject, JSONArray.class);
|
|
|
+ if (resultArray == null) return;
|
|
|
+ StringBuilder logSb = new StringBuilder("关闭监听:").append("\r\n");
|
|
|
+ resultArray.jsonIter().forEach(jsonObject -> {
|
|
|
+ logSb.append("deviceMac:")
|
|
|
+ .append(jsonObject.getStr("deviceMac"))
|
|
|
+ .append(",result:")
|
|
|
+ .append(jsonObject.getStr("result"))
|
|
|
+ .append("\r\n");
|
|
|
+ });
|
|
|
+ log.info(logSb.toString());
|
|
|
+ sensorMacSet.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+}
|