欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

spring boot SSE 开发实践

最编程 2024-05-04 17:49:45
...

SSE简介

SEE (Server-sent events) 是一种通过http推送消息连接技术。 前端通过 web api EventSource 来跟后端建立连接。 后端可以通过这个连接发送任意的字符串数据。 SEE 的 MIME 请求类型 是text/event-stream

SSE 跟 Websocket 差异

SSE Websocket
单通道连接只支持接收来自后端的消息 双工通道,能收发消息
支持自动重连 不支持需要手动编码

SSE 应用场景

SSE 比较适适合做通知类型的需求。类似订单通知。更新通知

项目初始化

Spring Initializr 这里已经设置好了所需要的依赖

.

项目目录

image.png

SEE Session 管理

在文件SseSession 中 sessionMap 用于 储存用户连接进来的SseEmitter对象。

package com.demo.sse;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.demo.sse.exception.SseException;

public class SseSession {


    private static Map<String,SseEmitter> sessionMap = new ConcurrentHashMap<>();

    public static void add(String sessionKey,SseEmitter sseEmitter){
        if(sessionMap.get(sessionKey) != null){
           throw new SseException("User exists!");
        }
        sessionMap.put(sessionKey, sseEmitter);
    }
    
    public static boolean exists(String sessionKey){
        return sessionMap.get(sessionKey) != null;
    }

    public static boolean remove(String sessionKey){
        SseEmitter sseEmitter = sessionMap.get(sessionKey);
        if(sseEmitter != null){
            sseEmitter.complete();
        }
        return false;
    }

    public static void onError(String sessionKey,Throwable throwable){
        SseEmitter sseEmitter = sessionMap.get(sessionKey);
        if(sseEmitter != null){
            sseEmitter.completeWithError(throwable);
        }
    }

    public static void send(String sessionKey,String content) throws IOException{
        sessionMap.get(sessionKey).send(content);
    }

}

服务接口

  • connect 创建新的 sse 连接
  • send 推送消息到某个用户
  • close 关闭sse 连接

public interface SseServer {

    public SseEmitter conect(String userId);
    
    public boolean send(String userId, String content);

    public boolean close(String userId);
    
}

接口实现

new SseEmitter(0L) 这个代表设置 time out 时间为无限制 。 线上建议设置上一个时间 。 不然很容易导致推送对象在 seesionMap 一直保存。

package com.demo.sse.server.impl;


import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.demo.sse.SseSession;
import com.demo.sse.exception.SseException;
import com.demo.sse.server.SseServer;

@Service
public class SseServerImpl implements SseServer  {

    private static final Logger log = LoggerFactory.getLogger(SseServerImpl.class);
    


    @Override
    public SseEmitter conect(String userId) {
        if(SseSession.exists(userId)){
            SseSession.remove(userId);
        }
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onError((err)-> {
            log.error("type: SseSession Error, msg: {} session Id : {}",err.getMessage(), userId);
            SseSession.onError(userId, err);
        });

        sseEmitter.onTimeout(() -> {
            log.info("type: SseSession Timeout, session Id : {}", userId);
            SseSession.remove(userId);
        });
        
        sseEmitter.onCompletion(() -> {
            log.info("type: SseSession Completion, session Id : {}", userId);
            SseSession.remove(userId);
        });
        SseSession.add(userId, sseEmitter);
        return sseEmitter;
    }

    @Override
    public boolean send(String userId, String content) {
        if(SseSession.exists(userId)){
            try{
                SseSession.send(userId, content);
                return true;
            }catch(IOException exception){
                log.error("type: SseSession send Erorr:IOException, msg: {} session Id : {}",exception.getMessage(), userId);
            }
        }else{
            throw new SseException("User Id " + userId + " not Found");
        }
        return false;
    }

    @Override
    public boolean close(String userId) {
        log.info("type: SseSession Close, session Id : {}", userId);
        return SseSession.remove(userId);
    }
    
}


控制器实现

由于是demo 这里实现了几个API接口方便测试。最主要的还是subscribe 接口 这是让前端连接sse 流的。


package com.demo.sse.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.demo.sse.dto.SendMegRequest;
import com.demo.sse.server.SseServer;

@Controller
@RequestMapping(value = "sse")
@ResponseBody
@CrossOrigin
public class SseController {


    @Autowired
    private SseServer sseServer;


    @GetMapping(value = "/subscribe/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe(@PathVariable("userId") String userId){
        return sseServer.conect(userId);
    }
    
    
    @PostMapping(value = "/send/{userId}")
    public String sendMessage(@PathVariable("userId") String userId ,@RequestBody SendMegRequest sendMegRequest){
        if(sseServer.send(sendMegRequest.getUserId(), sendMegRequest.getSendMsg())){
            return "Success";
        }
        return "Faild";
    }

    @GetMapping(value = "/close/{userId}")
    public void close(@PathVariable("userId") String userId){
        sseServer.close(userId);   
    }

}


客户端

<!doctype html>
<html lang="en">

<head>
    <title>sse demo</title>
</head>

<body>
    <div>sse demo</div>

    your user id : <input type="text" id="connectionUserId" value=""></input>
    <button type="button" id="connectionBtn"> connection</button>

    <div id="infoChat" class="">
        your user id : <span id="userId"></span> <br />
        user id <input type="text" id="toUserId"></input> <br />
        msg : <input type="text" id="message"></input> <br />
        <button type="button" id="sendBtn"> send </button>
        <button type="button" id="closeBtn"> close </button>
    </div>

    <div id="result"></div>


</body>

</html>


<script>
    var defaultUrl = 'http://localhost:8080/sse';

    const Http = new XMLHttpRequest();

    var userId;

    var connectiionBtn = document.querySelector("#connectionBtn");

    var sendBtn = document.querySelector("#sendBtn");

    var closeBtn = document.querySelector('#closeBtn')

    connectiionBtn.onclick = () => {
        userId = document.getElementById("connectionUserId").value;
        console.log(userId);
        let source = new EventSource(`${defaultUrl}/subscribe/${userId}`);
        source.onmessage = (event) => {
            text = document.getElementById('result').innerText;
            text += '\n' + event.data;
            document.getElementById('result').innerText = text;
        };


        source.onopen = (event) => {
            text = document.getElementById('result').innerText;
            text += '\n subscribe success';
            console.log(event);
            document.getElementById('result').innerText = text;
            document.getElementById('userId').innerText = userId;
        };

        source.onerror = (err) => {
            console.log(err);
            document.getElementById('result').innerText = err;
        }
    }

    sendBtn.onclick = async () => {
        let toUserId = document.getElementById("toUserId").value;
        let msg = document.getElementById("message").value;
        const data = {
            userId: toUserId,
            msg: msg
        };

        const response = await fetch(`${defaultUrl}/send/${userId}`, {
            method: 'POST',
            headers: {
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(data)
        });

        response.text().then((data) => {
            if (data != "Success") {
                alert(data)
            }
        });

    }


    closeBtn.onclick = () => {
        fetch(`${defaultUrl}/close/${userId}`).then((res) => {
            alert('close success');
        })
    }



</script>

github 代码:github.com/zhufan12/Sp…


如果有哪里不足的地方欢迎指导