引入最新版本
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
启动类加入
//netty 协议服务端口启动
NettyTcpHandler.start();
package com.cqcloud.platform.handler;
import com.cqcloud.platform.service.IotMqttService;
import com.cqcloud.platform.service.impl.IotMqttServiceImpl;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年9月23日 🐬🐇 💓💕
*/
@Component
public class NettyTcpHandler {
/**
* IoT设备协议端口
*/
private static int PORT = 1883;
/**
* 使用方法在启动类
* 加上 NettyTcpHandler.start();
* @throws Exception
*/
public static void start() throws Exception {
final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
// 创建 IotPushService 实例
IotMqttService iotPushService = new IotMqttServiceImpl();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加处理器,处理所有连接的业务逻辑
pipeline.addLast(new TcpMqttServerHandler(iotPushService));
}
});
// 绑定端口并启动
ChannelFuture future = bootstrap.bind(PORT).sync();
// 等待服务器关闭
future.channel().closeFuture().sync();
} finally {
// 优雅地关闭线程池
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
package com.cqcloud.platform.handler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import cn.hutool.json.JSONObject;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年10月06日 🐬🐇 💓💕
*/
@Slf4j
@Component
public class TcpEventHandler {
// 使用 BiConsumer 来处理两个参数
private final static Map<String, BiConsumer<String, String>> eventActions = new HashMap<>();
public void registerEventAction(String eventCode, BiConsumer<String, String> action) {
// 动态注册事件处理逻辑
eventActions.put(eventCode, action);
}
public static void handleEvent(String evt, String imei, String reportContent) {
// 根据事件类型找到对应的处理逻辑,并执行
eventActions.getOrDefault(evt, TcpEventHandler::handleUnknownEvent).accept(imei, reportContent);
}
private static void handleUnknownEvent(String imei, String reportContent) {
// 处理未知事件的逻辑
System.out.println("imei: " + imei + ", 报告内容: " + reportContent);
}
public static void handleAlarm(String imei, String reportContent) {
log.info("内容: {}", reportContent);
// 获取目标用户列表(包括固定的用户 ID)
List<String> targetUsers = buildTargetUsersList();
// 构建消息并发送
sendAlarmMessage(targetUsers, imei, reportContent);
}
private static List<String> buildTargetUsersList() {
List<String> targetUsers = new ArrayList<>();
targetUsers.add("1");
targetUsers.add("2");
//实际根据业务查询数据
targetUsers.add("26967563820859392");
return targetUsers;
}
private static void sendAlarmMessage(List<String> targetUsers, String imei, String reportContent) {
// 将目标用户列表转换为逗号分隔的字符串
String users = String.join(",", targetUsers);
// 构建 JSON 消息
JSONObject obj = new JSONObject();
obj.set("imei", imei);
obj.set("message", reportContent);
obj.set("userId", users);
// 发送消息
WebSocketHandler.sendMessageToUser(users, obj.toString());
}
}
package com.cqcloud.platform.handler;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import com.cqcloud.platform.service.IotMqttService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 物联网云平台设备协议
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年9月23日 🐬🐇 💓💕
*/
public class TcpMqttServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 接口注入
private final IotMqttService iotPushService;
public TcpMqttServerHandler(IotMqttService iotPushService) {
this.iotPushService = iotPushService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
byte[] byteArray;
if (in.readableBytes() <= 0) {
in.release();
return;
}
byteArray = new byte[in.readableBytes()];
in.readBytes(byteArray);
if (byteArray.length <= 0) {
in.release();
return;
}
// 将消息传递给 iotPushService
iotPushService.pushMessageArrived(byteArray);
// 下发指令,假设返回的是多个指令
List<String> externalValues = extractExternalValue("deviceId");
// 转换为十六进制字符串
String hexString = bytesToHex(byteArray);
System.out.println("来自于物联网云平台设备协议的数据: " + hexString);
// 使用 Optional 判断外部值
// 使用 Optional 判断外部值,如果不为空则逐一处理每条指令
Optional.ofNullable(externalValues).ifPresent(values -> {
values.forEach(value -> {
// 提取外部数据值并发送
System.out.println("来自于物联网云平台设备协议的1883端口的提取外部数据值: " + value);
sendResponse(ctx, value);
});
});
}
// 辅助方法:将字节数组转换为十六进制字符串
private static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xFF & b);
if (hex.length() == 1) {
hexString.append('0'); // 确保每个字节都为两位
}
hexString.append(hex);
}
return hexString.toString().toUpperCase(); // 返回大写格式
}
// 发送响应的统一辅助方法
private void sendResponse(ChannelHandlerContext ctx, String hexResponse) {
byte[] responseBytes = hexStringToByteArray(hexResponse);
ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
ctx.writeAndFlush(responseBuffer);
}
// 将响应消息转换为字节数组
public static byte[] hexStringToByteArray(String s) {
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16));
}
return data;
}
// 查询数据库当前设备号下需要下发的命令
private List<String> extractExternalValue(String deviceId) {
//这里自行查询数据库数据库,这里只模拟一个list集合
ArrayList<Object> list = new ArrayList<>();
// 如果记录不为空,获取最新记录的 externalValue
list.stream().findFirst() // 获取最新的一条记录
.map(latestRecord -> {
// 处理最新记录的逻辑
return ""; // 需要返回的值是 externalValue
});// 获取最新的一条记录
return null; // 如果没有找到,返回 null
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印异常堆栈跟踪,便于调试和错误排查
cause.printStackTrace();
// 关闭当前的通道,释放相关资源
ctx.close();
}
}
package com.cqcloud.platform.handler;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.springframework.stereotype.Component;
import cn.hutool.json.JSONUtil;
import io.micrometer.common.util.StringUtils;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
/**
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年4月12日 🐬🐇 💓💕
*/
@Component
@ServerEndpoint("/websocket/{username}")
public class WebSocketHandler {
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketHandler.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketHandler.onlineCount--;
}
// 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
// 根据名字存储websocket对象CopyOnWriteArraySet线程安全set,ConcurrentHashMap线程安全map
public static Map<String, CopyOnWriteArraySet<WebSocketHandler>> webSocketMap = new ConcurrentHashMap<>();
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
public Session session;
// 心跳时间,长时间没心跳踢掉连接
public long heartBeatTime;
// 初次连接时间,用于控制连接时间过长,踢掉连接
public long beginTime;
/**
* 用户名称
*/
public String username;
/**
* 发送消息
* @param username
* @param message
*/
public static void sendMessageToUser(String username, String message) {
// 检查用户名是否在 map 中存在
if (webSocketMap.containsKey(username)) {
// 获取该用户的 WebSocketHandler 集合
CopyOnWriteArraySet<WebSocketHandler> userHandlers = webSocketMap.get(username);
// 遍历该用户的所有连接(每个用户可能有多个 WebSocket 连接)
for (WebSocketHandler handler : userHandlers) {
// 通过 WebSocketHandler 实例发送消息
handler.sendMessageOne(message, username);
}
} else {
System.out.println("并无在线用户: " + username);
}
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(@PathParam("username") String username, Session session) {
this.username = username;
this.session = session;
this.heartBeatTime = System.currentTimeMillis();
this.beginTime = System.currentTimeMillis();
// 登陆用户必须按照用户id 格式登陆
if (!"server".equals(username) && username.split(",").length < 3) {
return;
}
// 将用户添加到websocket,支持单用户多出链接
if (webSocketMap.containsKey(username)) {
webSocketMap.get(username).add(this);
} else {
CopyOnWriteArraySet websocketSet = new CopyOnWriteArraySet();
websocketSet.add(this);
webSocketMap.put(username, websocketSet);
addOnlineCount(); // 在线数加1
}
//注释掉 会退出
Map<String, Object> messageMap = new ConcurrentHashMap<>();
messageMap.put("type", "0");
messageMap.put("message", username + "加入8000端口的的当前在线人数为" + getOnlineCount());
messageMap.put("to", "all");
messageMap.put("users", webSocketMap.keySet());
messageMap.put("username", "server");
sendMessageAll(JSONUtil.toJsonStr(messageMap));
}
/**
* 发送消息给所有用户
*
* @param message
* @throws IOException
*/
public void sendMessageAll(String message) {
for (String key : webSocketMap.keySet()) {
for (WebSocketHandler websocket : webSocketMap.get(key)) {
websocket.session.getAsyncRemote().sendText(message);
}
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (StringUtils.isNotEmpty(this.username)) {
try {
if (this.session.isOpen()) {
this.session.close();// 强制关闭
}
webSocketMap.get(username).remove(this);// 删除链接
if (webSocketMap.get(username).isEmpty()) {
webSocketMap.remove(username);
subOnlineCount(); // 在线数减1
// 刷新用户列表
Map<String, Object> messageMap = new ConcurrentHashMap<>();
messageMap.put("type", 0);
messageMap.put("message", username + "退出!当前在线人数为" + getOnlineCount());
messageMap.put("users", webSocketMap.keySet());
sendMessageAll(JSONUtil.toJsonStr(messageMap));
}
} catch (Exception e) {
System.err.println("关闭连接出错 : " + e.getLocalizedMessage());
}
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message) {
// 刷新心跳时间
this.heartBeatTime = System.currentTimeMillis();
// 群发消息
cn.hutool.json.JSONObject messageJson = JSONUtil.parseObj(message);
Object type = messageJson.get("type");// 消息类型
Object toUser = messageJson.get("to");// 接收对象
// 心跳检测
if ("999".equals(type)) {
Map<String, Object> messageMap = new ConcurrentHashMap<>();
messageMap.put("type", "1");
messageMap.put("message", "pong");
messageMap.put("username", "服务器");
messageMap.put("to", this.username);
sendMessageOne(JSONUtil.toJsonStr(messageMap), this.username);
return;
}
// 发送消息
if ("All".equalsIgnoreCase(type + "")) {
sendMessageAll(message);
}else {
sendMessageOne(message, toUser + "");
}
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Throwable error) {
error.printStackTrace();
}
/**
* 发送消息
*
* @param message
* @throws IOException
*/
public void sendMessage(String message){
//this.session.getBasicRemote().sendText(message);//同步
this.session.getAsyncRemote().sendText(message);// 异步
}
/**
* 发送消息给指定用户
*
* @param message
* @param toUserName
*/
public void sendMessageOne(String message, String toUserName) {
webSocketMap.keySet().forEach(e -> {
if (e.equals(toUserName)) {
webSocketMap.get(e).forEach(f -> {
try {
f.session.getAsyncRemote().sendText(message);
} catch (Exception e2) {
f.session.getAsyncRemote().sendText(message);
}
});
}
});
}
}
package com.cqcloud.platform.service;
/**
* @author weimeilayer@gmail.com
* @date 💓💕2022年9月8日🐬🐇💓💕
*/
public interface IotMqttService {
/**
* 扩展传输原文
* @param message
*/
void pushMessageArrived(byte[] message);
}
package com.cqcloud.platform.service.impl;
import org.springframework.stereotype.Service;
import com.cqcloud.platform.service.IotMqttService;
import lombok.AllArgsConstructor;
/**
* @author weimeilayer@gmail.com
* @date 💓💕2022年9月8日🐬🐇💓💕
*/
@Service
@AllArgsConstructor
public class IotMqttServiceImpl implements IotMqttService {
/**
* 获取拓展接口原文值
* @param message
*/
@Override
public void pushMessageArrived(byte[] message) {
// 拓展方法
TcpEventHandler.handleAlarm("设备号","告警信息");
}
}
package com.cqcloud.platform.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年4月12日 🐬🐇 💓💕
*/
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter, 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
进行批量用户的id进行下发webscoket信息