使用SSE 流式传输
为什么用 SSE
趁着 ai 的风,项目需要启动一个 ai 智能助手,了解了下,当前环境里做 ai 助手的主要请求方式就是流式请求,相较于之前做过的 WebSocket 项目而言,这次又是一个好玩儿的技术点,刚好体验下如何在搭配后端 agent 能力下做出令人满意的营销智能助手
什么是 SSE
SSE 全称 Server-Sent Events (SSE)
是一种允许服务器主动向客户端浏览器推送数据的技术。 它基于 HTTP 协议,但与传统的 HTTP 请求-响应模式不同,SSE 允许服务器在建立连接后,通过一个持久的连接不断地向客户端发送消息。 听起来跟 WebSocket 有点像是吧?
工作原理
当然,以下是经过简化和优化后的描述
建立连接
客户端通过普通 HTTP 请求订阅服务器的 SSE 端点。服务器响应并保持连接打开,以维持持久通信。
服务器推送消息
当服务器有新数据时,通过该持久连接向客户端发送事件。每个事件包含遵循特定格式的文本数据流。
客户端接收消息
客户端监听来自服务器的事件,并在接收到新数据时触发相应的处理程序。
连接管理
若连接中断,客户端会自动尝试重新连接,确保通信的连续性。
在我们项目中的运用
当前我们项目当前仅需要传输 string,这一点刚好契合 SSE 的特征,
SSE 只支持文本格式的数据流
请求头
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
数据格式与编码
事件流格式:SSE 使用特定的文本格式进行通信。 每条消息应以 data: 开头,多个字段间用换行符分隔。 data: 消息内容\n
字符编码:确保发送的消息是 UTF-8 编码,这是 SSE 的默认编码方式 每次推送,可由多个消息组成,每个消息之间以空行分隔(即最后一个字段以\n\n 结尾)。需要使用正则特殊处理一下
第三方流式请求插件推荐
useEventSource
https://github.com/vueuse/vueuse/blob/main/packages/core/useEventSource/index.md
microsoft/fetch-event-source
@microsoft/fetch-event-source,是微软开发的一个库,用于通过 Fetch API 实现 SSE 的流式数据传输。它封装了请求发送、消息接收和连接恢复的逻辑,非常适合流式数据的处理。 这里采用的是这个后期改进了下。 主要参数如下
- method: HTTP 方法,通常为 POST。
- headers: 请求头信息,通常需要指定 Content-Type 为 application/json。
- body: 请求体内容,可以根据需求传递给后端。
- onmessage: 处理流式消息的回调函数,每当服务器发送一条消息时会调用。
- onclose: 服务器关闭连接时的回调。
- onerror: 出现错误时的回调。
import React, { FC, useEffect, useState } from 'react';
import { fetchEventSource } from '@microsoft/fetch-event-source';
cosnt App =()=>{
const [data, serData] = useState<[]any>([]);
const ssePost = (msg:string)=>{
fetchEventSource('/api/sse',{
method:"POST",
headers: { 'Content-Type': 'application/json', },
body: JSON.stringify({ query: msg }),
onmessage(event){
// 收到的流式数据更新数据
setData(prevData => [...prevData, JSON.parse(res.data)]);
},
// 报错
onerror(err) {
// 报错信息
console.error('Error:', err);
},
// 关闭连接
onclose() {
console.log('关闭连接');
},
});
};
return
<div >
{data?.map(item => ( <div>{item.msg}</div> ))} </div>
};
ai 智能助手的坑
打字机效果
当然还有打字机效果,这个前端做也行,后端处理也可以,关键在于一秒吐出 2 到 5 个字符。 前端实现有个难点在于光标的闪烁问题。 打字机光标效果的实现思路两种:
- 递归查找当前 DOM 树中的最后一个节点内容不为空的文本叶子节点,然后追加一个闪烁的光标 DOM,依赖 CSS 选择器,通过:last-child:after 子代选择器和伪类选择器锁定位置,其实也是递归查最后一个非空叶子节点。
- 采用第三方库https://typeitjs.com/
处理后端返回字段
多个图表信息
因为当前存在客户信息的情况下,有性别,城市,用户画像等 tab 且每种信息需要使用不同的 Chart 图表
- 性别 使用 饼图
- 城市占比 使用 柱状图
- 用户画像 使用 折线图 这就需要在处理接口返回 data 时,让后端返回不同的 dataType 一次返回一条 tab 信息,前端采用 key:value 的形式拼接信息后在放到页面展示
在页面端使用切换不同的类型图表来展示就够了 也就是意味着,流式请求需要每次只返回一条信息包裹不同 chart 的信息,这里需要前端单独处理。 而非我们常规请求返回也给大 json,可以从里面取到对应的 key--value
// 常规response
data: {
age :[
{xvalue:'12',yvalue:'boy'}
],
city :[
{xvalue:'12',yvalue:'北京'},
{xvalue:'27',yvalue:'南京'}
],
}
因为流式请求相当于是对常规后端response的jsonData信息进行一个Object.entry
操作forEach
行为,需要设定一个 infoArr 来进行承接。
const currentDataListItem='[{xvalue:'12',yvalue:'boy'}]'
switch(type){
case 'age':
retrun infoArr.push(data)
case 'city':
retrun infoArr.push(data)
}
随后在页面 client 端,接受到当前的dataItem
,对不同 type 下的 chart 信息进行二次组合,放到对应的 chart图表中进行展示即可。
// 模拟从后端接收的数据流
const mockData = [
{ dataType: "gender", data: { male: 60, female: 40 } },
{ dataType: "city", data: { NewYork: 30, Beijing: 50, London: 20 } },
{ dataType: "userProfile", data: { Jan: 10, Feb: 20, Mar: 30, Apr: 40 } },
]
//处理返回数据
function processData(data) {
switch (data.dataType) {
case "gender":
renderPieChart("genderChart", data.data)
break
case "city":
renderBarChart("cityChart", data.data)
break
case "userProfile":
renderLineChart("userProfileChart", data.data)
break
default:
console.log("未知的数据类型")
}
}
// 模拟接口返回数据
const listData = mockData.forEach((data) => {
processData(data)
})
<template>
<div>
<h3>接收到的消息:</h3>
<ul>
<li v-for="(message, index) in messages" :key="index">{{ message }}</li>
</ul>
</div>
<button @click="sendMessage">发送消息</button>
</template>
'<script>'
import {onBeforeUnmount, onMounted, ref} from 'vue';
import myAxios from "@/plugins/myAxios.js";
export default {
setup() {
const messages = ref([]); // 存储接收到的消息
let eventSource = null; // 用于保存 EventSource 实例
onMounted(() => {
// 创建 EventSource 实例,连接到服务器端的 SSE 端点
eventSource = new EventSource('http://localhost:8080/sse/create');
// 监听特定事件名 'sse-message'
eventSource.addEventListener('sse-message', (event) => {
console.log('收到消息:', event.data);
// 解析消息并保存到 messages 中
messages.value.push(event.data);
});
// 监听连接关闭或错误
eventSource.onerror = (error) => {
console.error('SSE 连接出错:', error);
eventSource.close(); // 关闭连接
};
});
onBeforeUnmount(() => {
// 组件销毁时关闭 SSE 连接
if (eventSource) {
eventSource.close();
}
});
const sendMessage = async () => {
await myAxios.post('/sse/send');
}
return {
messages,
sendMessage
};
},
};
'</script>'
markdown 格式文本展示
需要采用当前现有的 markdown 插件展示即可, 但在项目实际操作中,在打字机效果加载中, 会出现
- 开头第一个字放大效果
- 或者 p 标签下间距突然放大的效果
需要特殊处理css的!important
等级。
接口的报错处理
在项目中使用 fetchEventSource
进行 SSE 通信时,遇到了以往使用常规拦截器,进行 get和post请求报错以外情况的报错。
常规请求中, 拦截器只需要判断
- 后端约定的success是否成功,
- 或status的状态码是非200状态 而在sse的请求中,对于拦截的效果一开始没有考虑周全,按照常规的方法进行拦截发现不太够。
- 仅仅处理了response报错拦截50x,40x可以处理
- 存在服务中断,没有status的情况
- 存在链接时间超时问题
- 返回content直接包含errorCode
- 判断content-type一旦不在只
Content-Type: text/event-stream
也是接口出错
错误处理
连接失败或中断: 当与服务器的连接由于网络问题或其他原因断开时,
fetchEventSource
提供了onerror
回调来捕获这类错误。在此回调中实现重试逻辑,或者根据需要终止重试 。import { fetchEventSource } from "@microsoft/fetch-event-source" fetchEventSource("/api/sse", { onerror(err) { console.error("Error:", err) if (err instanceof Response && err.status === 503) { // 可能是暂时性的服务不可用,可以设置重试间隔 throw new Error("Retriable error") } else { // 非重试性错误,停止重试 throw err } }, // 其他配置... })
HTTP 响应错误: 如果后端返回了一个非 2xx 状态码,可以在
onopen
回调中检查response
对象的状态码onopen(response) { if (!response.ok) { throw new Error(`Server returned ${response.status} status`); } },
消息解析错误: 在
onmessage
回调中,如果接收到的消息格式不正确,可能导致 JSON 解析失败onmessage(event) { try { const data = JSON.parse(event.data); console.log('Received message:', data); } catch (e) { console.error('Failed to parse message:', e); } },
示例代码
import React, { useEffect } from "react"
import { fetchEventSource } from "@microsoft/fetch-event-source"
const SseComponent = () => {
useEffect(() => {
const controller = new AbortController()
fetchEventSource("/api/sse", {
signal: controller.signal,
onopen(response) {
if (!response.ok) {
throw new Error(`Server returned ${response.status} status`)
}
},
onmessage(event) {
try {
const data = JSON.parse(event.data)
console.log("Received message:", data)
} catch (e) {
console.error("Failed to parse message:", e)
}
},
onerror(err) {
console.error("Error:", err)
if (err instanceof Response && err.status === 503) {
// 设置重试间隔
throw new Error("Retriable error")
} else {
// 停止重试
throw err
}
},
// 定义其他选项...
})
return () => controller.abort() // 清理函数,用于取消请求
}, [])
return <div> messages</div>
}
export default SseComponent
做了这些拦截后,对用户的体验更加有好些,当然这是在前期服务不稳定的情况下,已经全然拦截住了。还是很有趣的一次拦截体验。