【WEB系列】SSE服务器发送事件详解

文章目录
  1. I. SSE扫盲
    1. 1. 概念介绍
    2. 2. 特点分析
    3. 3. 应用场景
  2. II. 手动实现sse功能
    1. 1. 项目创建
    2. 2. 功能实现
  3. III. SseEmitter
    1. 1. sse规范
    2. 2. 实现
    3. 3. 小结
  4. IV. 其他
    1. 0. 项目
    2. 1. 一灰灰Blog

SSE全称Server Sent Event,直译一下就是服务器发送事件,一般的项目开发中,用到的机会不多,可能很多小伙伴不太清楚这个东西,到底是干啥的,有啥用

本文主要知识点如下:

  • SSE扫盲,应用场景分析
  • 借助异步请求实现sse功能,加深概念理解
  • 使用SseEmitter实现一个简单的推送示例

I. SSE扫盲

对于sse基础概念比较清楚的可以跳过本节

1. 概念介绍

sse(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件

我们常见的http交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在sse的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式

2. 特点分析

SSE最大的特点,可以简单规划为两个

  • 长连接
  • 服务端可以向客户端推送信息

了解websocket的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别

sse是单通道,只能服务端向客户端发消息;而webscoket是双通道

那么为什么有了webscoket还要搞出一个sse呢?既然存在,必然有着它的优越之处

sse websocket
http协议 独立的websocket协议
轻量,使用简单 相对复杂
默认支持断线重连 需要自己实现断线重连
文本传输 二进制传输
支持自定义发送的消息类型 -

3. 应用场景

从sse的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的case下,多半是可以用它的

比如显示当前网站在线的实时人数,法币汇率显示当前实时汇率,电商大促的实时成交额等等…

II. 手动实现sse功能

sse本身是有自己的一套玩法的,后面会进行说明,这一小节,则主要针对sse的两个特点长连接 + 后端推送数据,如果让我们自己来实现这样的一个接口,可以怎么做?

1. 项目创建

借助SpringBoot 2.2.1.RELEASE来创建一个用于演示的工程项目,核心的xml依赖如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

2. 功能实现

在Http1.1支持了长连接,请求头添加一个Connection: keep-alive即可

在这里我们借助异步请求来实现sse功能,至于什么是异步请求,推荐查看博文: 【WEB系列】异步请求知识点与使用姿势小结

因为后端可以不定时返回数据,所以我们需要注意的就是需要保持连接,不要返回一次数据之后就断开了;其次就是需要设置请求头Content-Type: text/event-stream;charset=UTF-8 (如果不是流的话会怎样?)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 新建一个容器,保存连接,用于输出返回
private Map<String, PrintWriter> responseMap = new ConcurrentHashMap<>();

// 发送数据给客户端
private void writeData(String id, String msg, boolean over) throws IOException {
PrintWriter writer = responseMap.get(id);
if (writer == null) {
return;
}

writer.println(msg);
writer.flush();
if (over) {
responseMap.remove(id);
}
}

// 推送
@ResponseBody
@GetMapping(path = "subscribe")
public WebAsyncTask<Void> subscribe(String id, HttpServletResponse response) {

Callable<Void> callable = () -> {
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
responseMap.put(id, response.getWriter());
writeData(id, "订阅成功", false);
while (true) {
Thread.sleep(1000);
if (!responseMap.containsKey(id)) {
break;
}
}
return null;
};

// 采用WebAsyncTask 返回 这样可以处理超时和错误 同时也可以指定使用的Excutor名称
WebAsyncTask<Void> webAsyncTask = new WebAsyncTask<>(30000, callable);
// 注意:onCompletion表示完成,不管你是否超时、是否抛出异常,这个函数都会执行的
webAsyncTask.onCompletion(() -> System.out.println("程序[正常执行]完成的回调"));

// 这两个返回的内容,最终都会放进response里面去===========
webAsyncTask.onTimeout(() -> {
responseMap.remove(id);
System.out.println("超时了!!!");
return null;
});
// 备注:这个是Spring5新增的
webAsyncTask.onError(() -> {
System.out.println("出现异常!!!");
return null;
});


return webAsyncTask;
}

看一下上面的实现,基本上还是异步请求的那一套逻辑,请仔细看一下callable中的逻辑,有一个while循环,来保证长连接不中断

接下来我们新增两个接口,用来模拟后端给客户端发送消息,关闭连接的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
@ResponseBody
@GetMapping(path = "push")
public String pushData(String id, String content) throws IOException {
writeData(id, content, false);
return "over!";
}

@ResponseBody
@GetMapping(path = "over")
public String over(String id) throws IOException {
writeData(id, "over", true);
return "over!";
}

我们简单的来演示下操作过程

III. SseEmitter

上面只是简单实现了sse的长连接 + 后端推送消息,但是与标准的SSE还是有区别的,sse有自己的规范,而我们上面的实现,实际上并没有管这个,导致的问题是前端按照sse的玩法来请求数据,可能并不能正常工作

1. sse规范

在html5的定义中,服务端sse,一般需要遵循以下要求

请求头

开启长连接 + 流方式传递

1
2
3
Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

数据格式

服务端发送的消息,由message组成,其格式如下:

1
field:value\n\n

其中field有五种可能

  • 空: 即以:开头,表示注释,可以理解为服务端向客户端发送的心跳,确保连接不中断
  • data:数据
  • event: 事件,默认值
  • id: 数据标识符用id字段表示,相当于每一条数据的编号
  • retry: 重连时间

2. 实现

SpringBoot利用SseEmitter来支持sse,可以说非常简单了,直接返回SseEmitter对象即可;重写一下上面的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@RestController
@RequestMapping(path = "sse")
public class SseRest {
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@GetMapping(path = "subscribe")
public SseEmitter push(String id) {
// 超时时间设置为1小时
SseEmitter sseEmitter = new SseEmitter(3600_000L);
sseCache.put(id, sseEmitter);
sseEmitter.onTimeout(() -> sseCache.remove(id));
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}

@GetMapping(path = "push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.send(content);
}
return "over";
}

@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.complete();
sseCache.remove(id);
}
return "over";
}
}

上面的实现,用到了SseEmitter的几个方法,解释如下

  • send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到data中
  • complete(): 表示执行完毕,会断开连接
  • onTimeout(): 超时回调触发
  • onCompletion(): 结束之后的回调触发

同样演示一下访问请求

上图总的效果和前面的效果差不多,而且输出还待上了前缀,接下来我们写一个简单的html消费端,用来演示一下完整的sse的更多特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!doctype html>
<html lang="en">
<head>
<title>Sse测试文档</title>
</head>
<body>
<div>sse测试</div>
<div id="result"></div>
</body>
</html>
<script>
var source = new EventSource('http://localhost:8080/sse/subscribe?id=yihuihui');
source.onmessage = function (event) {
text = document.getElementById('result').innerText;
text += '\n' + event.data;
document.getElementById('result').innerText = text;
};
<!-- 添加一个开启回调 -->
source.onopen = function (event) {
text = document.getElementById('result').innerText;
text += '\n 开启: ';
console.log(event);
document.getElementById('result').innerText = text;
};
</script>

将上面的html文件放在项目的resources/static目录下;然后修改一下前面的SseRest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Controller
@RequestMapping(path = "sse")
public class SseRest {
@GetMapping(path = "")
public String index() {
return "index.html";
}

@ResponseBody
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) {
// 超时时间设置为3s,用于演示客户端自动重连
SseEmitter sseEmitter = new SseEmitter(1_000L);
// 设置前端的重试时间为1s
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.onTimeout(() -> {
System.out.println(id + "超时");
sseCache.remove(id);
});
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
}

我们上面超时时间设置的比较短,用来测试下客户端的自动重连,如下,开启的日志不断增加

其次将SseEmitter的超时时间设长一点,再试一下数据推送功能

请注意上面的演示,当后端结束了长连接之后,客户端会自动重新再次连接,不用写外的重试逻辑了,就这么神奇

3. 小结

本篇文章介绍了SSE的相关知识点,并对比websocket给出了sse的优点(至于啥优点请往上翻)

请注意,本文虽然介绍了两种sse的方式,第一种借助异步请求来实现,如果需要完成sse的规范要求,需要自己做一些适配,如果需要了解sse底层实现原理的话,可以参考一下;在实际的业务开发中,推荐使用SseEmitter

IV. 其他

0. 项目

系列博文

源码

1. 一灰灰Blog

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

一灰灰blog


打赏 如果觉得我的文章对您有帮助,请随意打赏。
分享到