spring boot SSE 开发实践
最编程
2024-05-04 17:49:45
...
SSE简介
SEE (Server-sent events) 是一种通过http推送消息连接技术。 前端通过 web api EventSource 来跟后端建立连接。 后端可以通过这个连接发送任意的字符串数据。 SEE 的 MIME 请求类型 是text/event-stream
SSE 跟 Websocket 差异
SSE | Websocket |
---|---|
单通道连接只支持接收来自后端的消息 | 双工通道,能收发消息 |
支持自动重连 | 不支持需要手动编码 |
SSE 应用场景
SSE 比较适适合做通知类型的需求。类似订单通知。更新通知
项目初始化
Spring Initializr 这里已经设置好了所需要的依赖
.
项目目录
SEE Session 管理
在文件SseSession 中 sessionMap 用于 储存用户连接进来的SseEmitter对象。
package com.demo.sse;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.demo.sse.exception.SseException;
public class SseSession {
private static Map<String,SseEmitter> sessionMap = new ConcurrentHashMap<>();
public static void add(String sessionKey,SseEmitter sseEmitter){
if(sessionMap.get(sessionKey) != null){
throw new SseException("User exists!");
}
sessionMap.put(sessionKey, sseEmitter);
}
public static boolean exists(String sessionKey){
return sessionMap.get(sessionKey) != null;
}
public static boolean remove(String sessionKey){
SseEmitter sseEmitter = sessionMap.get(sessionKey);
if(sseEmitter != null){
sseEmitter.complete();
}
return false;
}
public static void onError(String sessionKey,Throwable throwable){
SseEmitter sseEmitter = sessionMap.get(sessionKey);
if(sseEmitter != null){
sseEmitter.completeWithError(throwable);
}
}
public static void send(String sessionKey,String content) throws IOException{
sessionMap.get(sessionKey).send(content);
}
}
服务接口
- connect 创建新的 sse 连接
- send 推送消息到某个用户
- close 关闭sse 连接
public interface SseServer {
public SseEmitter conect(String userId);
public boolean send(String userId, String content);
public boolean close(String userId);
}
接口实现
new SseEmitter(0L) 这个代表设置 time out 时间为无限制 。 线上建议设置上一个时间 。 不然很容易导致推送对象在 seesionMap 一直保存。
package com.demo.sse.server.impl;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.demo.sse.SseSession;
import com.demo.sse.exception.SseException;
import com.demo.sse.server.SseServer;
@Service
public class SseServerImpl implements SseServer {
private static final Logger log = LoggerFactory.getLogger(SseServerImpl.class);
@Override
public SseEmitter conect(String userId) {
if(SseSession.exists(userId)){
SseSession.remove(userId);
}
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onError((err)-> {
log.error("type: SseSession Error, msg: {} session Id : {}",err.getMessage(), userId);
SseSession.onError(userId, err);
});
sseEmitter.onTimeout(() -> {
log.info("type: SseSession Timeout, session Id : {}", userId);
SseSession.remove(userId);
});
sseEmitter.onCompletion(() -> {
log.info("type: SseSession Completion, session Id : {}", userId);
SseSession.remove(userId);
});
SseSession.add(userId, sseEmitter);
return sseEmitter;
}
@Override
public boolean send(String userId, String content) {
if(SseSession.exists(userId)){
try{
SseSession.send(userId, content);
return true;
}catch(IOException exception){
log.error("type: SseSession send Erorr:IOException, msg: {} session Id : {}",exception.getMessage(), userId);
}
}else{
throw new SseException("User Id " + userId + " not Found");
}
return false;
}
@Override
public boolean close(String userId) {
log.info("type: SseSession Close, session Id : {}", userId);
return SseSession.remove(userId);
}
}
控制器实现
由于是demo 这里实现了几个API接口方便测试。最主要的还是subscribe 接口 这是让前端连接sse 流的。
package com.demo.sse.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.demo.sse.dto.SendMegRequest;
import com.demo.sse.server.SseServer;
@Controller
@RequestMapping(value = "sse")
@ResponseBody
@CrossOrigin
public class SseController {
@Autowired
private SseServer sseServer;
@GetMapping(value = "/subscribe/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(@PathVariable("userId") String userId){
return sseServer.conect(userId);
}
@PostMapping(value = "/send/{userId}")
public String sendMessage(@PathVariable("userId") String userId ,@RequestBody SendMegRequest sendMegRequest){
if(sseServer.send(sendMegRequest.getUserId(), sendMegRequest.getSendMsg())){
return "Success";
}
return "Faild";
}
@GetMapping(value = "/close/{userId}")
public void close(@PathVariable("userId") String userId){
sseServer.close(userId);
}
}
客户端
<!doctype html>
<html lang="en">
<head>
<title>sse demo</title>
</head>
<body>
<div>sse demo</div>
your user id : <input type="text" id="connectionUserId" value=""></input>
<button type="button" id="connectionBtn"> connection</button>
<div id="infoChat" class="">
your user id : <span id="userId"></span> <br />
user id <input type="text" id="toUserId"></input> <br />
msg : <input type="text" id="message"></input> <br />
<button type="button" id="sendBtn"> send </button>
<button type="button" id="closeBtn"> close </button>
</div>
<div id="result"></div>
</body>
</html>
<script>
var defaultUrl = 'http://localhost:8080/sse';
const Http = new XMLHttpRequest();
var userId;
var connectiionBtn = document.querySelector("#connectionBtn");
var sendBtn = document.querySelector("#sendBtn");
var closeBtn = document.querySelector('#closeBtn')
connectiionBtn.onclick = () => {
userId = document.getElementById("connectionUserId").value;
console.log(userId);
let source = new EventSource(`${defaultUrl}/subscribe/${userId}`);
source.onmessage = (event) => {
text = document.getElementById('result').innerText;
text += '\n' + event.data;
document.getElementById('result').innerText = text;
};
source.onopen = (event) => {
text = document.getElementById('result').innerText;
text += '\n subscribe success';
console.log(event);
document.getElementById('result').innerText = text;
document.getElementById('userId').innerText = userId;
};
source.onerror = (err) => {
console.log(err);
document.getElementById('result').innerText = err;
}
}
sendBtn.onclick = async () => {
let toUserId = document.getElementById("toUserId").value;
let msg = document.getElementById("message").value;
const data = {
userId: toUserId,
msg: msg
};
const response = await fetch(`${defaultUrl}/send/${userId}`, {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
response.text().then((data) => {
if (data != "Success") {
alert(data)
}
});
}
closeBtn.onclick = () => {
fetch(`${defaultUrl}/close/${userId}`).then((res) => {
alert('close success');
})
}
</script>
github 代码:github.com/zhufan12/Sp…
如果有哪里不足的地方欢迎指导
推荐阅读
-
spring boot SSE 开发实践
-
Spring Boot 实践 (2) 引入模板引擎(Thymeleaf)(1)
-
Spring Boot 实践 - 前端字符串日期自动转换为后台日期类型。
-
Spring Boot 后端开发基础
-
Spring Boot Web 开发 @Controller @RestController 教程
-
spring boot3 登录开发-3(2 个 SMS 身份验证登录/注册逻辑实现)
-
Spring Boot 学习(4)--开发环境升级与项目 jdk 升级-=========================== 割===========================
-
Spring Boot 3.2 集成了最新的 Spring Security6 实践
-
全栈开发实践|电子商务平台设计与实施(Spring Boot + MyBatis + Thymeleaf)
-
Spring原理与实践】"缓存缓存开发系列 "带你深入分析Spring的缓存缓存功能开发指南。