有效封装 WebSocket
前言
在现代 Web 应用中,实时通信已经成为越来越重要的一部分。而 WebSocket 技术的出现,使得实时通信变得更加高效和便捷。
WebSocket 协议是一种基于 TCP 协议的双向通信协议,它能够在客户端和服务器之间建立起持久性的连接,从而实现实时通信。
在前端开发中,为了更好地利用 WebSocket 技术,我们通常会对其进行封装,以便于全局调用并根据自己的业务做不同的预处理。
本文将介绍如何有效封装一个 WebSocket 供全局使用,并根据自己的业务做不同的预处理,实现更方便的调用,减少重复代码。
具体实现
我们将基于 Web API 提供的 WebSocket
类,封装一个 Socket
类,该类将提供以下功能:
- 建立 WebSocket 连接,并支持发送
query
参数。 - 发送、接收消息,支持对
WebSocket
的事件进行监听。 - 断开 WebSocket 连接。
- 支持心跳检测。
- 可以根据业务需要,对发送和接收的消息进行预处理。
下面是实现代码:
// socket.js
import modal from "@/plugins/modal";
const baseURL = import.meta.env.VITE_APP_BASE_WS;
const EventTypes = ["open", "close", "message", "error", "reconnect"];
const DEFAULT_CHECK_TIME = 55 * 1000; // 心跳检测的默认时间
const DEFAULT_CHECK_COUNT = 3; // 心跳检测默认失败重连次数
const DEFAULT_CHECK_DATA = { Type: 1, Parameters: ["alive"] }; // 心跳检测的默认参数 - 跟后端协商的
const CLOSE_ABNORMAL = 1006; // WebSocket非正常关闭code码
class EventMap {
deps = new Map();
depend(eventType, callback) {
this.deps.set(eventType, callback);
}
notify(eventType, event) {
if (this.deps.has(eventType)) {
this.deps.get(eventType)(event);
}
}
}
class Socket extends WebSocket {
heartCheckData = DEFAULT_CHECK_DATA;
heartCheckTimeout = DEFAULT_CHECK_TIME;
heartCheckInterval = null;
heartCheckCount = DEFAULT_CHECK_COUNT;
constructor(options, dep, reconnectCount = 0) {
let _baseURL = baseURL;
const {
url,
protocols,
query = {},
greet = null,
customBase = null,
} = options;
const _queryParams = Object.keys(query).reduce((str, key) => {
if (typeof query[key] !== "object" && typeof query[key] !== "function") {
return (str +=
str.length > 0 ? `&${key}=${query[key]}` : `${key}=${query[key]}`);
} else {
return str;
}
}, "");
if (customBase) {
_baseURL = customBase;
}
super(`${_baseURL}${url}?${_queryParams}`, protocols);
this._currentOptions = options;
this._dep = dep;
this._reconnectCount = reconnectCount;
greet &&
Object.assign(this, {
heartCheckData: greet,
});
this.initSocket();
}
// 初始化WebSocket
initSocket() {
// 监听webSocket的事件
this.onopen = function (e) {
this._dep.notify("open", e);
this.heartCheckStart();
};
this.onclose = function (e) {
this._dep.notify("close", e);
// 如果WebSocket是非正常关闭 则进行重连
if (e.code === CLOSE_ABNORMAL) {
if (this._reconnectCount < this.heartCheckCount) {
this._reconnectCount++;
const _socket = new Socket(
this._currentOptions,
this._dep,
this._reconnectCount
);
this._dep.notify("reconnect", _socket);
} else {
return modal.msgError("WebSocket重连失败, 请联系技术客服!");
}
}
};
this.onerror = function (e) {
this._dep.notify("error", e);
};
this.onmessage = function (e) {
// 如果后端返回的是二进制数据
if (e.data instanceof Blob) {
const reader = new FileReader();
reader.readAsArrayBuffer(e.data);
reader.onload = (ev) => {
if (ev.target.readyState === FileReader.DONE) {
this._dep.notify("message", ev.target?.result);
}
};
} else {
// 处理普通数据
try {
const _parseData = JSON.parse(e.data);
this._dep.notify("message", _parseData);
} catch (error) {
console.log(error);
}
}
};
}
// 订阅事件
subscribe(eventType, callback) {
if (typeof callback !== "function")
throw new Error("The second param is must be a function");
if (!EventTypes.includes(eventType))
throw new Error("The first param is not supported");
this._dep.depend(eventType, callback);
}
// 发送消息
sendMessage(data, options = {}) {
const { transformJSON = true } = options;
let result = data;
if (transformJSON) {
result = JSON.stringify(data);
}
this.send(result);
}
// 关闭WebSocket
closeSocket(code, reason) {
this.close(code, reason);
}
// 开始心跳检测
heartCheckStart() {
this.heartCheckInterval = setInterval(() => {
if (this.readyState === this.OPEN) {
let transformJSON = typeof this.heartCheckData === "object";
this.sendMessage(this.heartCheckData, { transformJSON });
} else {
this.clearHeartCheck();
}
}, this.heartCheckTimeout);
}
// 清除心跳检测
clearHeartCheck() {
clearInterval(this.heartCheckInterval);
}
// 重置心跳检测
resetHeartCheck() {
clearInterval(this.heartCheckInterval);
this.heartCheckStart();
}
}
// 默认的配置项
const defaultOptions = {
url: "",
protocols: "",
query: {},
};
export const useSocket = (options = defaultOptions) => {
if (!window.WebSocket)
return modal.msgWarning("您的浏览器不支持WebSocket, 请更换浏览器!");
const dep = new EventMap();
const reconnectCount = 0;
return new Socket(options, dep, reconnectCount);
};
接下来我们从实际使用的角度解释一下上面的代码,首先我们暴露了一个 useSocket
函数,该函数接收一个 options
配置项参数,支持的参数有:
url
:要连接的 WebSocket URL;protocols
:一个协议字符串或者一个包含协议字符串的数组;query
:可以通过 URL 传递给后端的查询参数;greet
:心跳检测的打招呼信息;customBase
:自定义的baseURL
,否则默认使用环境变量中定义的env.VITE_APP_BASE_WS
。
在调用该函数后,我们首先会判断当前用户的浏览器是否支持 WebSocket
,如果不支持给予用户提示。
然后我们实例化了一个 EventMap
类的实例对象 dep
,你可以把它当作是一个依赖收集桶,当用户订阅了某个 WebSocket
事件时,我们将收集这个事件对应的回调作为依赖,在事件触发时,再通知该依赖,然后调用该事件对应的回调函数。
接下来我们定义了一个初始的重连次数记录值 reconnectCount
为 0,每当这个 WebSocket
重连时,该值会自增。
之后我们实例化了自己封装的 Socket
类,并传入了我们上面的三个参数。 在 Socket
类的构造函数 constructor
中,我们先取出配置项,把 query
内的参数拼接在 URL 上,然后使用 super
调用父类的构造函数进行建立 WebSocket
连接。
之后我们缓存了当前 Socket
实例化时的参数,再调用 initSocket()
方法去进行 WebSocket
事件的监听:
onopen
:触发dep
内open
对应的回调函数并且打开心跳检测;onclose
:触发dep
内close
对应的回调函数并且对关闭的code
码进行判断,如果是非正常关闭连接,将会进行重连,如果重连次数达到阈值,则通知给用户;onerror
:触发dep
内error
对应的回调函数;onmessage
:接收到服务端返回的数据,可以先根据自身业务做一些预处理,比如我就根据不同的数据类型进行了数据解析的预处理,之后再触发dep
内message
对应的回调函数并传入处理过后的数据。
我们也暴露了一些成员方法以供实例对象使用:
subscribe
:订阅 WebSocket 事件,传入事件类型并须是EventTypes
内的类型之一,第二个参数则是回调函数;sendMessage
:同样的,我们在给服务端发送数据之前也可以根据自身业务做一些预处理,比如我将需要转成 JSON 的数据,在这里统一转换后再发送给服务端;closeSocket
:关闭 WebSocket 连接;heartCheckStart
:开始心跳检测,会创建一个定时器,在一定时间之后(默认是 55s)给服务端发送信息确认连接是否正常;clearHeartCheck
:清除心跳检测定时器(如果当前 WebSocket 连接已经关闭,则自动清除);resetHeartCheck
:重置心跳检测定时器。
如何使用
让我们看下如何使用这个封装好的 useSocket
函数,以在 Vue3 中使用为例:
// xx.jsx or xx.vue
import { useSocket } from "./socket.js";
const socket = ref(null); // WebSocket实例
const initWebSocket = () => {
const options = {
url: "/<your url>",
query: {
// something params
},
};
socket.value = useSocket(options);
socket.value.subscribe("open", () => {
console.log("WebSocket连接成功!");
const greet = "hello";
// 发送打招呼消息
socket.value.sendMessage(greet);
});
socket.value.subscribe("close", (reason) => {
console.log("WebSocket连接关闭!", reason);
});
socket.value.subscribe("message", (result) => {
console.log("WebSocket接收到消息:", result);
});
socket.value.subscribe("error", (err) => {
console.log("WebSocket捕获错误:", err);
});
socket.value.subscribe("reconnect", (_socket) => {
console.log("WebSocket断开重连:", _socket);
socket.value = _socket;
});
};
initWebSocket();
最后,如果想 debug 我们的心跳检测是否有效,可以使用下面这段代码:
// 测试心跳检测重连 手动模拟断开的情况
if (this._reconnectCount > 0) return;
const tempTimer = setInterval(() => {
this.close();
if (this._reconnectCount < 3) {
console.log("重连");
this._reconnectCount++;
const _socket = new Socket(
this._currentOptions,
this._dep,
this._reconnectCount
);
this._dep.notify("reconnect", _socket);
} else {
return clearInterval(tempTimer);
}
}, 3 * 1000);
在 initSocket()
方法中的 this.onopen
事件的回调函数内的最后添加上面这段代码即可。
总结
至此,我们实现了一个 WebSocket 类的封装,提供了连接、断开、消息发送、接收和心跳检测等功能,并可以根据业务需要对消息进行预处理。同时,我们还介绍了如何使用封装好的 useSocket
函数。
WebSocket 封装的好处在于可以让我们在全局范围内方便地使用 WebSocket,提高代码的可读性和可维护性,降低代码的复杂度和重复性。在实际开发过程中,我们可以结合自己的业务需求,对封装的 WebSocket 类进行扩展和优化,以达到更好的效果。