Browse Source

feat(redis监听): 增加监听设备列表初始化

gjs 3 years ago
parent
commit
4f42a14656

+ 26 - 5
src/main/java/com/macro/mall/tiny/modules/business/controller/WebSocketServer.java

@@ -48,7 +48,8 @@ public class WebSocketServer {
     @Value("${keystar.eprdms.url:localhost:18060}")
     private String eprdmsUrl;
     private static final String OPEN_LISTEN_URL = "/eprdms/meteorology/sensor/start/listen";
-    private static final String CLOSE_LISTEN_URL = "/eprdms/meteorology/sensor/start/listen";
+    private static final String CLOSE_LISTEN_URL = "/eprdms/meteorology/sensor/stop/listen";
+    private static final String GET_LISTEN_URL = "/eprdms/meteorology/sensor/current/listen";
 
     /**
      * 连接建立成功调用的方法
@@ -66,7 +67,11 @@ public class WebSocketServer {
         sensorMacSet = new HashSet<>();
         subscribeListener.setSensorMacSet(sensorMacSet);
         //设置订阅topic
-        redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("COLLECT_DATA"));
+        redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("collect:sensor"));
+        if (sensorMacCountMap == null) {
+            initSensorMac();
+            sensorMacCountMap = new HashMap<>();
+        }
     }
 
     /**
@@ -174,8 +179,25 @@ public class WebSocketServer {
                 sensorMacCountMap.put(sensorMac, count);
             }
         });
-        if (closeMacList.isEmpty()) return;
-        SensorListenParam sensorListenParam = new SensorListenParam(closeMacList);
+        sensorMacSet.clear();
+        if (!closeMacList.isEmpty()) {
+            closeListen(closeMacList);
+        }
+    }
+
+    private void initSensorMac() {
+        HashMap<String, String> paramMap = new HashMap<>();
+        paramMap.put("monitorClient", "COLLECT_DATA");
+        JSONObject result = EprdmsHttpUtils.getForEprdms(eprdmsUrl + GET_LISTEN_URL + "?monitorClient={monitorClient}", paramMap, JSONObject.class);
+        if (result == null) return;
+        JSONArray listingMacList = result.getJSONArray("sensorMacs");
+        if (!(listingMacList == null || listingMacList.isEmpty())) {
+            closeListen(listingMacList.toList(String.class));
+        }
+    }
+
+    private void closeListen(List<String> macList) {
+        SensorListenParam sensorListenParam = new SensorListenParam(macList);
         JSONObject paramJsonObject = JSONUtil.parseObj(sensorListenParam);
         JSONArray resultArray = EprdmsHttpUtils.PostForEprdms(eprdmsUrl + CLOSE_LISTEN_URL, paramJsonObject, JSONArray.class);
         if (resultArray == null) return;
@@ -188,7 +210,6 @@ public class WebSocketServer {
                     .append("\r\n");
         });
         log.info(logSb.toString());
-        sensorMacSet.clear();
     }
 
 }

+ 12 - 11
src/main/java/com/macro/mall/tiny/modules/business/service/RedisSubscribeListener.java

@@ -33,17 +33,18 @@ public class RedisSubscribeListener implements MessageListener {
     public void onMessage(Message message, byte[] bytes) {
         String msg = new String(message.getBody());
         log.debug("[{}]主题发布:{}", new String(bytes), msg);
-        if (session != null && session.isOpen()) {
-            String deviceMac = new JSONObject(msg).getStr("deviceMac");
-            if (!sensorMacSet.contains(deviceMac)) {
-                return;
-            }
-            try {
-                session.getBasicRemote().sendText(msg);
-            } catch (IOException e) {
-                log.error("[redis监听器]发布消息异常:", e);
-            }
-        }
+        String deviceMac = new JSONObject(msg).getStr("deviceMac");
+        System.out.println(deviceMac);
+//        if (session != null && session.isOpen()) {
+//            if (!sensorMacSet.contains(deviceMac)) {
+//                return;
+//            }
+//            try {
+//                session.getBasicRemote().sendText(msg);
+//            } catch (IOException e) {
+//                log.error("[redis监听器]发布消息异常:", e);
+//            }
+//        }
     }
 
     public Session getSession() {