欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

SpringBoot 与 WebFlux 集成以实现 SSE 事件

最编程 2024-07-09 22:05:29
...

前言

在前台页面需要不停获取服务器端的数据时,无非有两种操作,一种是通过前台页面使用轮询的方式,定时向服务器后台发送请求,以获取最新的数据;另一种就是在前台页面和后台服务之间建立长连接,服务器端一有数据产生就向前端页面推送。

这里的SSE是服务器发送事件(Server-Sent Events) 的缩写,在WebFlux框架里,服务器端是如何向前端(或调用端)实现服务器发送事件的呢?在有前端页面的情况下,又是如何实现的呢?

带着上面的这些疑问,来了解WebFlux框架,WebFlux框架是一款响应式编程web框架,什么是响应式编程呢,根据wikipedia上的定义:

响应式编程是就是对于数据流和传播改变的一种声明式的编程规范。这意味着可以通过编程语言轻松地表达静态(例如数组)或动态(例如事件发射器)数据流,并且存在相关执行模型内的推断依赖性,这有助于自动传播数据流涉及的变化。

围绕着WebFlux框架的,有这么几个关键字,异步的、非阻塞的、响应式的,那么是不是能够实现数据一有变化,就通知到对应的调用端呢,这些还有待证实。

基于WebFlux框架的SSE应用

首先,在pom文件中,引入webflux框架;

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

第二,html代码,共有四个页面;
sse.html页面代码:

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8"/>
    <title>服务器推送事件</title>
</head>
<body>
<div>    
    <div id="data"></div>    
    <div id="result"></div><br/>
</div>
<script th:inline="javascript" >
//服务器推送事件
if (typeof (EventSource) !== "undefined") { 
    //第一种写法
    //接收服务器倒计时时间推送,使用HTML5 服务器发送事件(Server-Sent Events),参考资料:https://www.runoob.com/html/html5-serversentevents.html
    var source = new EventSource("/sse/countDown");
    console.log(source);
    
    source.addEventListener("countDown", function(e) {
        document.getElementById("result").innerHTML = e.data;
    }, false);//使用false表示在冒泡阶段处理事件,而不是捕获阶段。
    
    //第二种写法
    //随机数获取
    var source1 = new EventSource("/sse/retrieve");
    //当抓取到消息时
    source1.onmessage = function (evt) {
        document.getElementById("data").innerHTML = "股票行情:" + evt.data;
    };
} else {
    //注意:ie浏览器不支持
    document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件...";  
    var xhr;
    var xhr2;
    if (window.XMLHttpRequest){
        //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
        xhr=new XMLHttpRequest();
        xhr2=new XMLHttpRequest();
    }else{
        //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
        xhr=new ActiveXObject("Microsoft.XMLHTTP");
        xhr2=new ActiveXObject("Microsoft.XMLHTTP");
    }
    console.log(xhr);
    console.log(xhr2);
    xhr.open('GET', '/sse/countDown');
    xhr.send(null);//发送请求
    xhr.onreadystatechange = function() {
        console.log("s响应状态:" + xhr.readyState);
        //2是空响应,3是响应一部分,4是响应完成
        if (xhr.readyState > 2) {
            //这儿可以使用response(对应json)与responseText(对应text)
            var newData = xhr.response.substr(xhr.seenBytes);
            newData = newData.replace(/\n/g, "#");
            newData = newData.substring(0, newData.length - 1);
            var data = newData.split("#");
            console.log("获取到的数据:" + data);
            document.getElementById("result").innerHTML = data;
            //长度重新赋值,下次截取时需要使用
            xhr.seenBytes = xhr.response.length;
        }
    }
        
    xhr2.open('GET', '/sse/retrieve');
    xhr2.send(null);//发送请求
    xhr2.onreadystatechange = function() {
        console.log("s响应状态:" + xhr2.readyState);
        //0: 请求未初始化,2 请求已接收,3 请求处理中,4  请求已完成,且响应已就绪
        if (xhr2.readyState > 2) {
            //这儿可以使用response(对应json)与responseText(对应text)
            var newData1 = xhr2.response.substr(xhr2.seenBytes);
            newData1 = newData1.replace(/\n/g, "#");
            newData1 = newData1.substring(0, newData1.length - 1);
            var data1 = newData1.split("#");
            console.log("获取到的数据:" + data1);
            document.getElementById("data").innerHTML = data1;
            //长度重新赋值,下次截取时需要使用
            xhr2.seenBytes = xhr2.response.length;
        }
    }
}
</script>
</body>
</html>

sse2.html页面代码:

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8"/>
    <title>服务器推送</title>
</head>
<body>
<div>
    <div id="dataModule"></div><br/>
    <div id="note" style="width: 100%;" ></div>
</div>
<script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
<script th:inline="javascript" >
$(function() {
    var time=1;
    var xhr;
    if (window.XMLHttpRequest){
        //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
        xhr=new XMLHttpRequest();
    }else{
        //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
        xhr=new ActiveXObject("Microsoft.XMLHTTP");
    }
    console.log(xhr);
    xhr.open('GET', '/quotes');
    xhr.send(null);//发送请求
    xhr.onreadystatechange = function() {
        console.log("s响应状态:" + xhr.readyState);
        //2是空响应,3是响应一部分,4是响应完成
        if (xhr.readyState > 2) {
            //这儿可以使用response(对应json)与responseText(对应text)
            var newData = xhr.response.substr(xhr.seenBytes);
            newData = newData.replace(/\n/g, "#");
            newData = newData.substring(0, newData.length - 1);
            var data = newData.split("#");
            //显示加载次数,和大小
            $("#dataModule").append("第"+time+"次数据响应"+data.length+"条<br/>");
            
            $("#note").append("<div style='clear: both;'>第"+time+"次数据响应"+data.length+"条</div><div id='note"+time+"' style='width: 100%;'></div>");
            var html="";
            console.log("数据:" + data);          
            for(var i=0;i<data.length;i++) {
                 var obj = JSON.parse(data[i]);
                 html=html + "<div style='margin-left: 10px;margin-top: 10px; width: 80px;height: 80px;background-color: gray;float: left;'>"+obj.ticker+"</div>";
            }           
            $("#note"+time).html(html);
            time++;
            //长度重新赋值,下次截取时需要使用
            xhr.seenBytes = xhr.response.length;
        }
    }
})
</script>
</body>
</html>

sse3.html页面代码:

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8"/>
    <title>服务器推送</title>
</head>
<body>
<div>
    <div id="dataModule"></div><br/>
    <div id="note" style="width: 100%;" ></div>
</div>
<script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
<script th:inline="javascript" >
$(function() {
    var time=1;
    var xhr;
    if (window.XMLHttpRequest){
        //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
        xhr=new XMLHttpRequest();
    }else{
        //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
        xhr=new ActiveXObject("Microsoft.XMLHTTP");
    }
    console.log(xhr);
    xhr.open('POST', '/quotes');
    xhr.send(null);//发送请求
    xhr.onreadystatechange = function() {
        console.log("s响应状态:" + xhr.readyState);
        //2是空响应,3是响应一部分,4是响应完成
        if (xhr.readyState > 2) {
            //这儿可以使用response(对应json)与responseText(对应text)
            var newData = xhr.response.substr(xhr.seenBytes);
            newData = newData.replace(/\n/g, "#");
            newData = newData.substring(0, newData.length);
            console.log("数据:" + newData);
            if(newData){
                //将字符串类型的json转成json对象
                var data = JSON.parse(newData.split("#"));
                //显示加载次数,和大小
                $("#dataModule").append("第"+time+"次数据响应"+data.length+"条<br/>");
                
                $("#note").append("<div style='clear: both;'>第"+time+"次数据响应"+data.length+"条</div><div id='note"+time+"' style='width: 100%;'></div>");
                var html="";
                console.log("数据:" + data);
                for(var i=0;i<data.length;i++) {
                     var obj = data[i];
                     html=html + "<div style='margin-left: 10px;margin-top: 10px; width: 80px;height: 80px;background-color: gray;float: left;'>"+obj.ticker+"</div>";
                }           
                $("#note"+time).html(html);
                time++;
                //长度重新赋值,下次截取时需要使用
                xhr.seenBytes = xhr.response.length;
            }else{
                $("#dataModule").append("响应完成!!!");
            }
        }
    }
})
</script>
</body>
</html>

sse4.html页面代码:

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8"/>
    <title>服务器推送</title>
</head>
<body>
<div>
    <div id="dataModule"></div><br/>
    <div id="note" style="width: 100%;" ></div>
</div>
<script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
<script th:inline="javascript" >
$(function() {
    var time=1;
    var xhr;
    if (window.XMLHttpRequest){
        //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
        xhr=new XMLHttpRequest();
    }else{
        //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
        xhr=new ActiveXObject("Microsoft.XMLHTTP");
    }
    console.log(xhr);
    xhr.open('POST', '/echo1');
    xhr.send(null);//发送请求
    xhr.onreadystatechange = function() {
        console.log("s响应状态:" + xhr.readyState);
        //2是空响应,3是响应一部分,4是响应完成
        if (xhr.readyState > 2) {
            //这儿可以使用response(对应json)与responseText(对应text)
            var newData = xhr.response.substr(xhr.seenBytes);
            newData = newData.replace(/\n/g, "#");
            newData = newData.substring(0, newData.length - 1);
            var data = newData.split("#");
            console.log("获取到的数据:" + data);
            document.getElementById("dataModule").innerHTML = data;
        }
    }
})
</script>
</body>
</html>

注意:在前端页面,接收服务器的推送请求,需要html5的SSE支持,除了IE外,其他的浏览器都支持;

第三,后台代码;

import java.math.BigDecimal;
import java.math.MathContext;
import java.time.Instant;

/**
 * 需要推送的实体类
 * @author 程就人生
 * @Date
 */
public class Quote { 

    private static final MathContext MATH_CONTEXT = new MathContext(2); 

    private String ticker; 

    private BigDecimal price; 

    private Instant instant; 

    public Quote() {

    } 

    public Quote(String ticker, BigDecimal price) {

        this.ticker = ticker;

        this.price = price;

    } 

    public Quote(String ticker, Double price) {

        this(ticker, new BigDecimal(price, MATH_CONTEXT));

    }

    @Override
    public String toString() {

        return "Quote{" +

                "ticker='" + ticker + '\'' +

                ", price=" + price +

                ", instant=" + instant +

                '}';

    }

    public final String getTicker() {
        return ticker;
    }

    public final void setTicker(String ticker) {
        this.ticker = ticker;
    }

    public final BigDecimal getPrice() {
        return price;
    }

    public final void setPrice(BigDecimal price) {
        this.price = price;
    }

    public final Instant getInstant() {
        return instant;
    }
    
    public final void setInstant(Instant instant) {
        this.instant = instant;
    }
}

import java.math.BigDecimal;
import java.math.MathContext;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

import org.springframework.stereotype.Component;

import com.example.entity.Quote;

import reactor.core.publisher.Flux;

/**
 * 推送数据,模拟生成
 * @author 程就人生
 * @Date
 */
@Component
public class QuoteGenerator { 

    private final MathContext mathContext = new MathContext(2); 

    private final Random random = new Random(); 

    private final List<Quote> prices = new ArrayList<>(); 

    /**
     * 生成行情数据
     */
    public QuoteGenerator() {

        this.prices.add(new Quote("CTXS", 82.26));

        this.prices.add(new Quote("DELL", 63.74));

        this.prices.add(new Quote("GOOG", 847.24));

        this.prices.add(new Quote("MSFT", 65.11));

        this.prices.add(new Quote("ORCL", 45.71));

        this.prices.add(new Quote("RHT", 84.29));

        this.prices.add(new Quote("VMW", 92.21));

    }

    public Flux<Quote> fetchQuoteStream(Duration period) { 

        // 需要周期生成值并返回,使用 Flux.interval
        return Flux.interval(period)

                // In case of back-pressure, drop events
                .onBackpressureDrop()

                // For each tick, generate a list of quotes
                .map(this::generateQuotes)

                // "flatten" that List<Quote> into a Flux<Quote>
                .flatMapIterable(quotes -> quotes)

                .log("io.spring.workshop.stockquotes");//以日志的形式输出

    }

    /**

     * Create quotes for all tickers at a single instant.

     */
    private List<Quote> generateQuotes(long interval) {

        final Instant instant = Instant.now();

        return prices.stream()

                .map(baseQuote -> {

                    BigDecimal priceChange = baseQuote.getPrice()

                            .multiply(new BigDecimal(0.05 * this.random.nextDouble()), this.mathContext);

                    Quote result = new Quote(baseQuote.getTicker(), baseQuote.getPrice().add(priceChange));

                    result.setInstant(instant);

                    return result;

                })

                .collect(Collectors.toList());

    }

}

import java.time.Duration;

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import com.example.entity.Quote;
import com.example.generator.QuoteGenerator;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * 数据处理handler,相当于service层
 * @author 程就人生
 * @Date
 */
@Component
public class QuoteHandler { 

    private final Flux<Quote> quoteStream; 

    public QuoteHandler(QuoteGenerator quoteGenerator) {

        this.quoteStream = quoteGenerator.fetchQuoteStream(Duration.ofMillis(1000 * 10)).share();

    } 

    public Mono<ServerResponse> hello(ServerRequest request) {
        Long start = System.currentTimeMillis();
        return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)

                .body(BodyInserters.fromObject("Hello Spring!" + start));

    } 

    public Mono<ServerResponse> echo(ServerRequest request) {

        return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)

                .body(request.bodyToMono(String.class), String.class);

    } 

    public Mono<ServerResponse> streamQuotes(ServerRequest request) {
        
        Long start = System.currentTimeMillis();
        
        System.out.println("--------------" + start + "--------------");

        return ServerResponse.ok()

                .contentType(MediaType.APPLICATION_STREAM_JSON) //返回多次

                .body(this.quoteStream, Quote.class);

    } 

    public Mono<ServerResponse> fetchQuotes(ServerRequest request) {

        int size = Integer.parseInt(request.queryParam("size").orElse("10"));

        return ServerResponse.ok()

                .contentType(MediaType.APPLICATION_JSON)         //返回一次

                .body(this.quoteStream.take(size), Quote.class);

    }
}

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

import com.example.handler.QuoteHandler;

/**
 * 路由,相当于Controller层
 * @author 程就人生
 * @Date
 */
@Configuration
public class QuoteRouter { 

   @Bean
   public RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {

      return RouterFunctions

            .route(RequestPredicates.GET("/hello1").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), quoteHandler::hello)
            
            .andRoute(RequestPredicates.POST("/echo1").and(RequestPredicates.accept(MediaType.TEXT_PLAIN).and(RequestPredicates.contentType(MediaType.TEXT_PLAIN))), quoteHandler::echo)
            //响应一次
            .andRoute(RequestPredicates.POST("/quotes").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), quoteHandler::fetchQuotes)
            //响应多次
            .andRoute(RequestPredicates.GET("/quotes").and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);

   }
}

import java.time.Duration;

import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

/**
 * 服务器发送事件SSE(Server-Sent Events)
 * 页面渲染及请求
 * @author 程就人生
 * @Date
 */
@Controller
@RequestMapping("/sse")
public class SseController {

    //三分钟倒计时
    private int count_down_sec=3*60*60;
    
    /**
     * 推送页面1
     * @return
     */
    @GetMapping
    public String sse(){
        return "sse";
    }
    
    /**
     * 推送页面2
     * @return
     */
    @GetMapping("/two")
    public String sse2(){
        return "sse2";
    }
    
    /**
     * 推送页面3
     * @return
     */
    @GetMapping("/three")
    public String sse3(){
        return "sse3";
    }
    
    /**
     * 推送页面4
     * @return
     */
    @GetMapping("/four")
    public String sse4(){
        return "sse4";
    }

    //报头设置为 "text/event-stream",以便于发送事件流
    @GetMapping(value="/countDown",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<ServerSentEvent<Object>> countDown() {
        //每一秒钟推送一次
        return Flux.interval(Duration.ofSeconds(1))
            .map(seq -> Tuples.of(seq, getCountDownSec()))
            .map(data -> ServerSentEvent.<Object>builder()
                    .event("countDown") //和前端addEventListener监听的事件一一对应
                    .id(Long.toString(data.getT1()))  //为每次发送设置一个id
                    .data(data.getT2().toString())
                    .build());
    }
    
    private String getCountDownSec() {
        if (count_down_sec>0) {
            int h = count_down_sec/(60*60);
            int m = (count_down_sec%(60*60))/60;
            int s = (count_down_sec%(60*60))%60;
            count_down_sec--;
            return "活动倒计时:"+h+" 小时 "+m+" 分钟 "+s+" 秒";
        }
        return "活动倒计时:0 小时 0 分钟 0 秒";
    }
    
    //报头设置为 "text/event-stream",以便于发送事件流,这种写法等同于MediaType.TEXT_EVENT_STREAM_VALUE "text/event-stream;charset=UTF-8"
    @GetMapping(value = "/retrieve",produces = MediaType.TEXT_EVENT_STREAM_VALUE)   
    @ResponseBody
    public double retrieve() {
        try {   
            //每0.5秒刷新数据 
            Thread.sleep(500);  
        } catch (InterruptedException e) {  
            e.printStackTrace();    
        }   
        //模拟股票实时变动数据    
        return Math.ceil(Math.random() * 10000);    
    }
}

最后,测试运行结果;

总结
虽然参考了很多资料,对于响应式编程还是很陌生,写个demo后,依旧没有感受到它的精华,基于WebFlux框架实现SSE事件,不难看出来还是基于长连接的,在实际场景中,基于长连接的推送事件是否适用,还值得再思考。

参考资料:
https://docs.spring.io/spring-framework/docs/5.0.3.RELEASE/spring-framework-reference/web-reactive.html#webflux-dispatcher-handler
https://blog.****.net/wshl1234567/article/details/80320116
https://blog.****.net/Message_lx/article/details/81075766
https://www.cnblogs.com/Alandre/category/957422.html
https://segmentfault.com/a/1190000020686218?utm_source=tag-newest
https://my.oschina.net/bianxin/blog/3063713
html5服务器发送事件
https://www.runoob.com/html/html5-serversentevents.html
https://www.xttblog.com/spring-webflux.html

推荐阅读