使用SSE 流式传输

umwelt2024-07-24frontendtips

为什么用 SSE

趁着 ai 的风,项目需要启动一个 ai 智能助手,了解了下,当前环境里做 ai 助手的主要请求方式就是流式请求,相较于之前做过的 WebSocket 项目而言,这次又是一个好玩儿的技术点,刚好体验下如何在搭配后端 agent 能力下做出令人满意的营销智能助手

什么是 SSE

SSE 全称 Server-Sent Events (SSE)open in new window

是一种允许服务器主动向客户端浏览器推送数据的技术。 它基于 HTTP 协议,但与传统的 HTTP 请求-响应模式不同,SSE 允许服务器在建立连接后,通过一个持久的连接不断地向客户端发送消息。 听起来跟 WebSocket 有点像是吧?

工作原理

当然,以下是经过简化和优化后的描述

建立连接

客户端通过普通 HTTP 请求订阅服务器的 SSE 端点。服务器响应并保持连接打开,以维持持久通信。

服务器推送消息

当服务器有新数据时,通过该持久连接向客户端发送事件。每个事件包含遵循特定格式的文本数据流。

客户端接收消息

客户端监听来自服务器的事件,并在接收到新数据时触发相应的处理程序。

连接管理

若连接中断,客户端会自动尝试重新连接,确保通信的连续性。

在我们项目中的运用

当前我们项目当前仅需要传输 string,这一点刚好契合 SSE 的特征,

SSE 只支持文本格式的数据流

请求头

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

数据格式与编码

事件流格式:SSE 使用特定的文本格式进行通信。 每条消息应以 data: 开头,多个字段间用换行符分隔。 data: 消息内容\n

字符编码:确保发送的消息是 UTF-8 编码,这是 SSE 的默认编码方式 ​ 每次推送,可由多个消息组成,每个消息之间以空行分隔(即最后一个字段以\n\n 结尾)。需要使用正则特殊处理一下

第三方流式请求插件推荐

useEventSource

https://github.com/vueuse/vueuse/blob/main/packages/core/useEventSource/index.md

microsoft/fetch-event-source

@microsoft/fetch-event-source,是微软开发的一个库,用于通过 Fetch API 实现 SSE 的流式数据传输。它封装了请求发送、消息接收和连接恢复的逻辑,非常适合流式数据的处理。 这里采用的是这个后期改进了下。 主要参数如下

  • method: HTTP 方法,通常为  POST。
  • headers: 请求头信息,通常需要指定  Content-Type  为  application/json。
  • body: 请求体内容,可以根据需求传递给后端。
  • onmessage: 处理流式消息的回调函数,每当服务器发送一条消息时会调用。
  • onclose: 服务器关闭连接时的回调。
  • onerror: 出现错误时的回调。
import React, { FC, useEffect, useState } from 'react';
import { fetchEventSource } from '@microsoft/fetch-event-source';

cosnt App =()=>{
    const [data, serData] = useState<[]any>([]);

   const ssePost = (msg:string)=>{
       fetchEventSource('/api/sse',{
           method:"POST",
           headers: { 'Content-Type': 'application/json', },
           body: JSON.stringify({ query: msg }),

           onmessage(event){
               // 收到的流式数据更新数据
               setData(prevData => [...prevData, JSON.parse(res.data)]);
           },

           //  报错
           onerror(err) {
               // 报错信息
                console.error('Error:', err);
            },

           // 关闭连接
           onclose() {
                console.log('关闭连接');
           },
       });
   };

    return
            <div >
            {data?.map(item => ( <div>{item.msg}</div> ))} </div>

};


ai 智能助手的坑

打字机效果

当然还有打字机效果,这个前端做也行,后端处理也可以,关键在于一秒吐出 2 到 5 个字符。 前端实现有个难点在于光标的闪烁问题。 打字机光标效果的实现思路两种:

  • 递归查找当前 DOM 树中的最后一个节点内容不为空的文本叶子节点,然后追加一个闪烁的光标 DOM,依赖 CSS 选择器,通过:last-child:after 子代选择器和伪类选择器锁定位置,其实也是递归查最后一个非空叶子节点。
  • 采用第三方库https://typeitjs.com/

处理后端返回字段

多个图表信息

因为当前存在客户信息的情况下,有性别,城市,用户画像等 tab 且每种信息需要使用不同的 Chart 图表

  • 性别 使用 饼图
  • 城市占比 使用 柱状图
  • 用户画像 使用 折线图 这就需要在处理接口返回 data 时,让后端返回不同的 dataType 一次返回一条 tab 信息,前端采用 key:value 的形式拼接信息后在放到页面展示

在页面端使用切换不同的类型图表来展示就够了 也就是意味着,流式请求需要每次只返回一条信息包裹不同 chart 的信息,这里需要前端单独处理。 而非我们常规请求返回也给大 json,可以从里面取到对应的 key--value

// 常规response

data: {
  age :[
    {xvalue:'12',yvalue:'boy'}
  ],
    city :[
    {xvalue:'12',yvalue:'北京'},
    {xvalue:'27',yvalue:'南京'}
  ],
}

因为流式请求相当于是对常规后端response的jsonData信息进行一个Object.entry操作forEach行为,需要设定一个 infoArr 来进行承接。

const currentDataListItem='[{xvalue:'12',yvalue:'boy'}]'
switch(type){
    case 'age':
    retrun    infoArr.push(data)
        case 'city':
    retrun    infoArr.push(data)
}

随后在页面 client 端,接受到当前的dataItem,对不同 type 下的 chart 信息进行二次组合,放到对应的 chart图表中进行展示即可。

// 模拟从后端接收的数据流
const mockData = [
    { dataType: "gender", data: { male: 60, female: 40 } },
    { dataType: "city", data: { NewYork: 30, Beijing: 50, London: 20 } },
    { dataType: "userProfile", data: { Jan: 10, Feb: 20, Mar: 30, Apr: 40 } },
]

//处理返回数据

function processData(data) {
    switch (data.dataType) {
        case "gender":
            renderPieChart("genderChart", data.data)
            break
        case "city":
            renderBarChart("cityChart", data.data)
            break
        case "userProfile":
            renderLineChart("userProfileChart", data.data)
            break
        default:
            console.log("未知的数据类型")
    }
}

// 模拟接口返回数据
const listData = mockData.forEach((data) => {
    processData(data)
})
<template>
  <div>
    <h3>接收到的消息:</h3>
    <ul>
      <li v-for="(message, index) in messages" :key="index">{{ message }}</li>
    </ul>
  </div>
  
  <button @click="sendMessage">发送消息</button>
</template>

'<script>'

import {onBeforeUnmount, onMounted, ref} from 'vue';
import myAxios from "@/plugins/myAxios.js";

export default {
  setup() {
    const messages = ref([]); // 存储接收到的消息
    let eventSource = null; // 用于保存 EventSource 实例onMounted(() => {
      // 创建 EventSource 实例,连接到服务器端的 SSE 端点
      eventSource = new EventSource('http://localhost:8080/sse/create');// 监听特定事件名 'sse-message'
      eventSource.addEventListener('sse-message', (event) => {
        console.log('收到消息:', event.data);
        // 解析消息并保存到 messages 中
        messages.value.push(event.data);
      });// 监听连接关闭或错误
      eventSource.onerror = (error) => {
        console.error('SSE 连接出错:', error);
        eventSource.close(); // 关闭连接
      };
    });onBeforeUnmount(() => {
      // 组件销毁时关闭 SSE 连接
      if (eventSource) {
        eventSource.close();
      }
    });const sendMessage = async () => {
      await myAxios.post('/sse/send');
    }return {
      messages,
      sendMessage
    };
  },
};
'</script>'

markdown 格式文本展示

需要采用当前现有的 markdown 插件展示即可, 但在项目实际操作中,在打字机效果加载中, 会出现

  • 开头第一个字放大效果
  • 或者 p 标签下间距突然放大的效果

需要特殊处理css的!important等级。

接口的报错处理

在项目中使用 fetchEventSource进行 SSE 通信时,遇到了以往使用常规拦截器,进行 get和post请求报错以外情况的报错。

常规请求中, 拦截器只需要判断

  • 后端约定的success是否成功,
  • 或status的状态码是非200状态 而在sse的请求中,对于拦截的效果一开始没有考虑周全,按照常规的方法进行拦截发现不太够。
    1. 仅仅处理了response报错拦截50x,40x可以处理
    2. 存在服务中断,没有status的情况
    3. 存在链接时间超时问题
    4. 返回content直接包含errorCode
    5. 判断content-type一旦不在只Content-Type: text/event-stream 也是接口出错

错误处理

  1. 连接失败或中断: 当与服务器的连接由于网络问题或其他原因断开时,fetchEventSource 提供了 onerror 回调来捕获这类错误。在此回调中实现重试逻辑,或者根据需要终止重试 。

    import { fetchEventSource } from "@microsoft/fetch-event-source"
    
    fetchEventSource("/api/sse", {
        onerror(err) {
            console.error("Error:", err)
            if (err instanceof Response && err.status === 503) {
                // 可能是暂时性的服务不可用,可以设置重试间隔
                throw new Error("Retriable error")
            } else {
                // 非重试性错误,停止重试
                throw err
            }
        },
        // 其他配置...
    })
    
  2. HTTP 响应错误: 如果后端返回了一个非 2xx 状态码,可以在 onopen 回调中检查 response 对象的状态码

    onopen(response) {
      if (!response.ok) {
        throw new Error(`Server returned ${response.status} status`);
      }
    },
    
  3. 消息解析错误: 在 onmessage 回调中,如果接收到的消息格式不正确,可能导致 JSON 解析失败

    onmessage(event) {
      try {
        const data = JSON.parse(event.data);
        console.log('Received message:', data);
      } catch (e) {
        console.error('Failed to parse message:', e);
      }
    },
    

示例代码

import React, { useEffect } from "react"
import { fetchEventSource } from "@microsoft/fetch-event-source"

const SseComponent = () => {
    useEffect(() => {
        const controller = new AbortController()

        fetchEventSource("/api/sse", {
            signal: controller.signal,
            onopen(response) {
                if (!response.ok) {
                    throw new Error(`Server returned ${response.status} status`)
                }
            },
            onmessage(event) {
                try {
                    const data = JSON.parse(event.data)
                    console.log("Received message:", data)
                } catch (e) {
                    console.error("Failed to parse message:", e)
                }
            },
            onerror(err) {
                console.error("Error:", err)
                if (err instanceof Response && err.status === 503) {
                    // 设置重试间隔
                    throw new Error("Retriable error")
                } else {
                    // 停止重试
                    throw err
                }
            },
            // 定义其他选项...
        })

        return () => controller.abort() // 清理函数,用于取消请求
    }, [])

    return <div> messages</div>
}

export default SseComponent

做了这些拦截后,对用户的体验更加有好些,当然这是在前期服务不稳定的情况下,已经全然拦截住了。还是很有趣的一次拦截体验。

Last Updated 2/21/2025, 12:31:21 AM