博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring-Boot快速集成WebSocket服务端 客户端(支持客户端消息同步回调)
阅读量:3924 次
发布时间:2019-05-23

本文共 11650 字,大约阅读时间需要 38 分钟。

Spring-Boot快速集成WebSocket服务端 客户端(客户端消息同步回调)

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。

在这里插入图片描述

所以WebSocket一旦客户端和服务端建立连接就可以一直保持双向通信。现在调用的很多的第三方的一些服务都是WebSocket的连接方式。是我最近项目里的刚需。
废话不多说上代码

WebSocket服务端实现

使用Spring-Boot实现webSocket倒是简单,基于注解开发真的太爽了。

WebSocket服务端 pom.xml 依赖

org.springframework.boot
spring-boot-starter-websocket

如果不是Sping-Boot框架可能换成其他的socket.api的依赖

我这里定义两个类,一个配置类配置socket的参数,一个WebSocketServer的实现类,使用时会注入IOC容器。

WebSocket 服务端配置类

/** * WebSocket 服务端配置类 */@Configurationpublic class WebSocketServerConfig {
/** * ServerEndpointExporter bean 注入 * @return */ @Bean public ServerEndpointExporter serverEndpointExporter() {
ServerEndpointExporter serverEndpointExporter = new ServerEndpointExporter(); return serverEndpointExporter; }}

这里其他配置我暂时未配置,但是实际使用时,要配置一些参数优化服务。ServerEndpointExporter是必须要注入的,后续服务端类的实装需要用到。

WebSocket服务端

/** * * WebSocket服务端 * @author DavidLei * */@Component@ServerEndpoint("/webSocket/{clientId}")public class CustomizedWebSocketServer {
/** * 日志 */ private Logger logger = LoggerFactory.getLogger(CustomizedWebSocketServer.class); /** * 在线数 */ private static int onlineCount = 0; /** * 线程安全的存储连接session的Map */ private static Map
clients = new ConcurrentHashMap
(); /** * session */ private Session session; /** * 客户端端标识 */ private String clientId; /** * 客户端连接时方法 * @param clientId * @param session * @throws IOException */ @OnOpen public void onOpen(@PathParam("clientId") String clientId, Session session) throws IOException {
logger.info("onOpen: has new client connect -"+clientId); // this.clientId = clientId; this.session = session; addOnlineCount(); clients.put(clientId, this); logger.info("onOpen: now has "+onlineCount+" client online"); } /** * 客户端断开连接时方法 * @throws IOException */ @OnClose public void onClose() throws IOException {
logger.info("onClose: has new client close connection -"+clientId); clients.remove(clientId); subOnlineCount(); logger.info("onClose: now has "+onlineCount+" client online"); } /** * 收到消息时 * @param message * @throws IOException */ @OnMessage public void onMessage(String message) throws IOException {
logger.info("onMessage: [clientId: " + clientId + " ,message:" + message + "]"); } /** * 发生error时 * @param session * @param error */ @OnError public void onError(Session session, Throwable error) {
logger.info("onError: [clientId: " + clientId + " ,error:" + error.getCause() + "]"); } /** * 指定端末发送消息 * @param message * @param clientId * @throws IOException */ public void sendMessageByClientId(String message, String clientId) throws IOException {
for (CustomizedWebSocketServer item : clients.values()) {
if (item.clientId.equals(clientId) ) {
item.session.getAsyncRemote().sendText(message); } } } /** * 所有端末发送消息 * @param message * @throws IOException */ public void sendMessageAll(String message) throws IOException {
for (CustomizedWebSocketServer item : clients.values()) {
item.session.getAsyncRemote().sendText(message); } } public static synchronized int getOnlineCount() {
return onlineCount; } public static synchronized void addOnlineCount() {
CustomizedWebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() {
CustomizedWebSocketServer.onlineCount--; } public static synchronized Map
getClients() {
return clients; }}

注意@ServerEndpoint("/webSocket/{clientId}")注解,这里标识的时Socket的地址,以我的配置为例

我的tomcat端口是8082,socket的链接地址就是 ws://localhost:8082/webSocket/12345678

我因为业务需要我这里实现了两个方法 sendMessageByClientId sendMessageAll

一个是向连接到服务端的指定客户端发送信息,一个是向所有在线客户端发送信息。

WebSocket客户端实现

WebSocket客户端 pom.xml 依赖

org.java-websocket
Java-WebSocket
1.3.8

定义WebSocket客户端

/** * 自定义WebSocket客户端 */public class CustomizedWebSocketClient extends WebSocketClient {
/** * 日志 */ private Logger logger = LoggerFactory.getLogger(CustomizedWebSocketClient.class); /** * 线程安全的Boolean -是否受到消息 */ public AtomicBoolean hasMessage = new AtomicBoolean(false); /** * 线程安全的Boolean -是否已经连接 */ private AtomicBoolean hasConnection = new AtomicBoolean(false); /** * 构造方法 * * @param serverUri */ public CustomizedWebSocketClient(URI serverUri) {
super(serverUri); logger.info("CustomizeWebSocketClient init:" + serverUri.toString()); } /** * 打开连接是方法 * * @param serverHandshake */ @Override public void onOpen(ServerHandshake serverHandshake) {
logger.info("CustomizeWebSocketClient onOpen"); } /** * 收到消息时 * * @param s */ @Override public void onMessage(String s) {
hasMessage.set(true); logger.info("CustomizeWebSocketClient onMessage:" + s); } /** * 当连接关闭时 * * @param i * @param s * @param b */ @Override public void onClose(int i, String s, boolean b) {
this.hasConnection.set(false); this.hasMessage.set(false); logger.info("CustomizeWebSocketClient onClose:" + s); } /** * 发生error时 * * @param e */ @Override public void onError(Exception e) {
logger.info("CustomizeWebSocketClient onError:" + e); } @Override public void connect() {
if(!this.hasConnection.get()){
super.connect(); hasConnection.set(true); } }}

WebSocket客户端配置类

/** * WebSocket客户端配置类 */@Configurationpublic class WebSocketClientConfig {
/** * socket连接地址 */ @Value("${com.dl.socket.url}") private String webSocketUri; /** * 注入Socket客户端 * @return */ @Bean public CustomizedWebSocketClient initWebSocketClient(){
URI uri = null; try {
uri = new URI(webSocketUri); } catch (URISyntaxException e) {
e.printStackTrace(); } CustomizedWebSocketClient webSocketClient = new CustomizedWebSocketClient(uri); //启动时创建客户端连接 webSocketClient.connect(); return webSocketClient; }}

webSocketUri 的yml配置

#socket客户端连接地址com.dl.socket.url: ws://localhost:8082/webSocket//12345678

到目前为止正常的需求实装完了

实装测试api

@Controller@RequestMapping(value = "/socket")public class WebSocketController {
@Autowired private CustomizedWebSocketServer websocketServerCustomized; @Autowired private CustomizedWebSocketClient socketClient; @ResponseBody @PostMapping(value = "/message") public void getSocketMessage(HttpServletRequest request) throws IOException {
JSONObject json = new JSONObject(); json.put("to", request.getSession().getId()); json.put("msg", "欢迎连接WebSocket!!!!"); websocketServerCustomized.sendMessageAll(json.toJSONString()); } }

客户端后台log

在这里插入图片描述
客户端后台log
在这里插入图片描述
正常的收发是完全ok的

客户端消息同步回调

但是这远远不够,我还有其他的业务需求,现在需要调用第三方的socket接口来实时反馈数据,但是第三方的接口后台有着很多运算,往往反馈数据很慢。但是前台调用我的api时有需要返回这些反馈数据。所以需要接口同步回调数据。

加强版的客户端

/** * 自定义WebSocket客户端 * @author machenike */public class CustomizedWebSocketClient extends WebSocketClient {
/** * 日志 */ private Logger logger = LoggerFactory.getLogger(CustomizedWebSocketClient.class); /** * 消息回调接口 */ private WebSocketClientSyncCallback callback = null; /** * 线程安全的Boolean -是否受到消息 */ public AtomicBoolean hasMessage = new AtomicBoolean(false); /** * 线程安全的Boolean -是否已经连接 */ private AtomicBoolean hasConnection = new AtomicBoolean(false); /** * 构造方法 * * @param serverUri */ public CustomizedWebSocketClient(URI serverUri) {
super(serverUri); logger.info("CustomizeWebSocketClient init:" + serverUri.toString()); } /** * 打开连接时 * * @param serverHandshake */ @Override public void onOpen(ServerHandshake serverHandshake) {
logger.info("CustomizeWebSocketClient onOpen"); hasConnection.set(true); } /** * 收到消息时 * * @param s */ @Override public void onMessage(String s) {
hasMessage.set(true); if(callback !=null) {
callback.callback(s); } logger.info("CustomizeWebSocketClient onMessage:" + s); } /** * 当连接关闭时 * * @param i * @param s * @param b */ @Override public void onClose(int i, String s, boolean b) {
this.hasConnection.set(false); this.hasMessage.set(false); logger.info("CustomizeWebSocketClient onClose:" + s); } /** * 发生error时 * * @param e */ @Override public void onError(Exception e) {
logger.info("CustomizeWebSocketClient onError:" + e); } /** * 带有回调的消息发送接口 * @param text * @param callback * @throws NotYetConnectedException */ public void send(String text, WebSocketClientSyncCallback callback) throws NotYetConnectedException {
logger.info("CustomizeWebSocketClient send:" + text); hasMessage.set(false); //设定回调接口 this.callback = callback; super.send(text); //计算等待;10s返回消息 超过10s直接退出 for (int count = 0; ; ) {
logger.debug("socketClient wait:"+count+" second, hasMessage:"+hasMessage); //判断是否收到消息||socket返回数据超时 if (hasMessage.get()||count>10) {
break; } else if (count <=10) {
try {
TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {
e.printStackTrace(); } count++; } } } @Override public void connect() {
logger.info("CustomizeWebSocketClient connect"); super.connect(); } @Override public void reconnect() {
logger.info("CustomizeWebSocketClient reconnect"); super.reconnect(); } /** * 定时判断连接状态:尝试重连 1分钟 */ @Scheduled(cron = "0 0/1 * * * ? ") public void autoConnect(){
logger.info("CustomizeWebSocketClient autoConnect: [hasConnection:"+hasConnection+"]"); if(!hasConnection.get()){
this.reconnect(); } }}

这里我重载了send方法,可以传入回调接口。同时还加上了线程等待,注意这里的判断参数,要换成线程安全的AtomicBoolean

当触发onMessage方法时置成true,默认为false,客户端send时置成false。

socket客户端消息同步回调接口

/** * socket客户端消息同步回调接口 */public interface WebSocketClientSyncCallback {
/** * socket客户端消息回调 * @param message */ void callback(String message);}

实装测试api

@ResponseBody    @PostMapping(value = "/clientCallback")    public String testClientCallback(HttpServletRequest request)  {
try {
socketClient.send("test message",new WebSocketClientSyncCallback(){
@Override public void callback(String message) {
callbackMessage=message; } }); } catch (Exception e) {
e.printStackTrace(); } return callbackMessage; }

socket服务端后台log

在这里插入图片描述

客户端后台log

在这里插入图片描述
可见后台socket客户端等待了3s后收到服务端反馈的数据,实现了同步回调。

转载地址:http://zhugn.baihongyu.com/

你可能感兴趣的文章
数据结构与算法--栈、队列(队列)
查看>>
动态规划
查看>>
增强学习(一)——马尔科夫决策过程(MDP)
查看>>
增强学习(二)——策略迭代与值迭代
查看>>
IPv6地址表示方法详解
查看>>
数据库三级模式
查看>>
微信小程序wxss设置样式
查看>>
Linux C代码获取天气情况
查看>>
python+opencv礼帽黑帽
查看>>
python链表反转
查看>>
c/c++查询M个数在N数组中出现的次数
查看>>
uva 147 - Dollars(动态规划--完全背包)
查看>>
uva 357 - Let Me Count The Ways(动态规划-注意dp初始化的问题)
查看>>
uva 562 - Dividing coins(注意判断条件,可以转换成01背包做)
查看>>
uva 10404 - Bachet's Game(DP)
查看>>
最优二叉搜索树
查看>>
hdu 1008 Elevator
查看>>
hdu 1005 Number Sequence(数学题目,好好看)
查看>>
zoj 2106 Tick and Tick(比较好的数学题目,代码特麻烦,注意精度)
查看>>
zoj 2107 Quoit Design(最近点对问题,好好思考,分治)
查看>>