实现功能
服务端注册的客户端的列表;服务端向客户端发送广播消息;服务端向指定客户端发送消息;服务端向多个客户端发送消息;客户端给服务端发送消息;
效果:
环境
jdk:1.8
SpringBoot:2.4.17
服务端
1.引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.在启动类上加上开启WebSocket的注解
@EnableWebSocket
3.配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* date created : Created in 2024/3/18 16:57
* description : WebSocketConfig 主要解决使用了@ServerEndpoint注解的websocket endpoint不被springboot扫描到的问题
* class name : WebSocketConfig
*/
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
4.服务端实现
/**
* date created : Created in 2024/3/18 16:31
* description : 服务端实现,方法的封装
* class name : WebSocketServer
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{applicationName}")
public class WebSocketServer {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
// 应用名称
private String applicationName;
//虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
// 用来存在线连接用户信息
private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "applicationName") String applicationName) {
try {
this.session = session;
this.applicationName = applicationName;
webSockets.add(this);
sessionPool.put(applicationName, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
log.info("【当前客户端列表】:"+ sessionPool.keySet());
} catch (Exception e) {
}
}
/**
* description : 有连接断开之后的处理方法
* method name : onClose
* param : []
* return : void
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.applicationName);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
log.info("【当前客户端列表】:"+ sessionPool.keySet());
} catch (Exception e) {
}
}
/**
* description : 收到客户端消息的处理方法
* method name : onMessage
* param : [message]
* return : void
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:" + message);
}
/**
* description : 错误处理
* method name : onError
* param : [session, error]
* return : void
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:" + error.getMessage());
error.printStackTrace();
}
/**
* description : 广播消息 给所有注册的客户端发送消息
* method name : sendBroadcastMessage
* param : [message]
* return : void
*/
public void sendBroadcastMessage(String message) {
log.info("【websocket消息】广播消息:" + message);
for (WebSocketServer webSocket : webSockets) {
try {
if (webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* description : 给指定的客户端发送消息
* method name : sendApplicationMessage
* param : [applicationName 客户端的应用名称, message 要发送的消息]
* return : void
*/
public void sendApplicationMessage(String applicationName, String message) {
Session session = sessionPool.get(applicationName);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* description : 给多个客户端发送消息
* method name : sendMassApplicationMessage
* param : [applicationNames 注册的客户端的应用名称, message 要发送的消息]
* return : void
*/
public void sendMassApplicationMessage(String[] applicationNames, String message) {
for (String userId : applicationNames) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
客户端
1.客户端配置
yaml文件的末尾添加
# websocket的配置
websocket:
host: localhost
port: 19022
prefix: websocket
2.客户端配置类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* date created : Created in 2024/3/19 14:36
* description : 注入配置文件中的参数 并生成服务端的对应的url
* class name : WebSocketProperties
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Component
@ConfigurationProperties(prefix = "websocket")
@Configuration
public class WebSocketProperties {
@Value("${spring.application.name}")
String appName;
String host;
String port;
String prefix;
public String getUrl() {
return String.format("ws://%s:%s/%s/%s", host, port, prefix,appName);
}
}
3.客户端实现
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.stereotype.Component;
import javax.websocket.ClientEndpoint;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* date created : Created in 2024/3/18 16:36
* description : 客户端接收服务端的实时消息、发送消息等方法的封装
* class name : WebSocketClient
*/
@ClientEndpoint
@AutoConfigureBefore(WebSocketProperties.class)
@Component
@Import(WebSocketProperties.class)
@Configuration
public class WebSocketClient {
private Session session;
public WebSocketClient() {
try {
WebSocketProperties webSocketProperties = SpringUtils.getBean(WebSocketProperties.class);
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(this, new URI(webSocketProperties.getUrl()));
} catch (DeploymentException | URISyntaxException | IOException e) {
e.printStackTrace();
}
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
System.out.println("Connected to server");
}
@OnMessage
public String onMessage(String message) {
System.out.println("来自WebSocket的消息: " + message);
return message;
}
@OnClose
public void onClose() {
System.out.println("Disconnected from server");
}
public void register() {
try {
session.getBasicRemote().sendText("register");
System.out.println("Registered with server");
} catch (IOException e) {
e.printStackTrace();
}
}
public void unregister() {
try {
session.getBasicRemote().sendText("unregister");
System.out.println("Unregistered from server");
} catch (IOException e) {
e.printStackTrace();
}
}
}
使用@Autowired注入配置类无法注入,使用工具类获取,工具类:
* Copyright (c) 2020 pig4cloud Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
public static Object getBean(String name) {
return context.getBean(name);
}
public static <T> T getBean(Class<T> clazz) {
return context.getBean(clazz);
}
public static <T> T getBean(String name, Class<T> clazz) {
return context.getBean(name, clazz);
}
}