灏天阁

有效封装 WebSocket

· Yin灏

前言

在现代 Web 应用中,实时通信已经成为越来越重要的一部分。而 WebSocket 技术的出现,使得实时通信变得更加高效和便捷。

WebSocket 协议是一种基于 TCP 协议的双向通信协议,它能够在客户端和服务器之间建立起持久性的连接,从而实现实时通信。

在前端开发中,为了更好地利用 WebSocket 技术,我们通常会对其进行封装,以便于全局调用并根据自己的业务做不同的预处理。

本文将介绍如何有效封装一个 WebSocket 供全局使用,并根据自己的业务做不同的预处理,实现更方便的调用,减少重复代码。

具体实现

我们将基于 Web API 提供的 WebSocket 类,封装一个 Socket 类,该类将提供以下功能:

  1. 建立 WebSocket 连接,并支持发送 query 参数。
  2. 发送、接收消息,支持对 WebSocket 的事件进行监听。
  3. 断开 WebSocket 连接。
  4. 支持心跳检测。
  5. 可以根据业务需要,对发送和接收的消息进行预处理。

下面是实现代码:

// socket.js
import modal from "@/plugins/modal";
const baseURL = import.meta.env.VITE_APP_BASE_WS;
const EventTypes = ["open", "close", "message", "error", "reconnect"];
const DEFAULT_CHECK_TIME = 55 * 1000; // 心跳检测的默认时间
const DEFAULT_CHECK_COUNT = 3; // 心跳检测默认失败重连次数
const DEFAULT_CHECK_DATA = { Type: 1, Parameters: ["alive"] }; // 心跳检测的默认参数 - 跟后端协商的
const CLOSE_ABNORMAL = 1006; // WebSocket非正常关闭code码

class EventMap {
  deps = new Map();
  depend(eventType, callback) {
    this.deps.set(eventType, callback);
  }
  notify(eventType, event) {
    if (this.deps.has(eventType)) {
      this.deps.get(eventType)(event);
    }
  }
}

class Socket extends WebSocket {
  heartCheckData = DEFAULT_CHECK_DATA;
  heartCheckTimeout = DEFAULT_CHECK_TIME;
  heartCheckInterval = null;
  heartCheckCount = DEFAULT_CHECK_COUNT;
  constructor(options, dep, reconnectCount = 0) {
    let _baseURL = baseURL;
    const {
      url,
      protocols,
      query = {},
      greet = null,
      customBase = null,
    } = options;
    const _queryParams = Object.keys(query).reduce((str, key) => {
      if (typeof query[key] !== "object" && typeof query[key] !== "function") {
        return (str +=
          str.length > 0 ? `&${key}=${query[key]}` : `${key}=${query[key]}`);
      } else {
        return str;
      }
    }, "");
    if (customBase) {
      _baseURL = customBase;
    }
    super(`${_baseURL}${url}?${_queryParams}`, protocols);
    this._currentOptions = options;
    this._dep = dep;
    this._reconnectCount = reconnectCount;
    greet &&
      Object.assign(this, {
        heartCheckData: greet,
      });
    this.initSocket();
  }

  // 初始化WebSocket
  initSocket() {
    // 监听webSocket的事件
    this.onopen = function (e) {
      this._dep.notify("open", e);
      this.heartCheckStart();
    };
    this.onclose = function (e) {
      this._dep.notify("close", e);
      // 如果WebSocket是非正常关闭 则进行重连
      if (e.code === CLOSE_ABNORMAL) {
        if (this._reconnectCount < this.heartCheckCount) {
          this._reconnectCount++;
          const _socket = new Socket(
            this._currentOptions,
            this._dep,
            this._reconnectCount
          );
          this._dep.notify("reconnect", _socket);
        } else {
          return modal.msgError("WebSocket重连失败, 请联系技术客服!");
        }
      }
    };
    this.onerror = function (e) {
      this._dep.notify("error", e);
    };
    this.onmessage = function (e) {
      // 如果后端返回的是二进制数据
      if (e.data instanceof Blob) {
        const reader = new FileReader();
        reader.readAsArrayBuffer(e.data);
        reader.onload = (ev) => {
          if (ev.target.readyState === FileReader.DONE) {
            this._dep.notify("message", ev.target?.result);
          }
        };
      } else {
        // 处理普通数据
        try {
          const _parseData = JSON.parse(e.data);
          this._dep.notify("message", _parseData);
        } catch (error) {
          console.log(error);
        }
      }
    };
  }

  // 订阅事件
  subscribe(eventType, callback) {
    if (typeof callback !== "function")
      throw new Error("The second param is must be a function");
    if (!EventTypes.includes(eventType))
      throw new Error("The first param is not supported");
    this._dep.depend(eventType, callback);
  }

  // 发送消息
  sendMessage(data, options = {}) {
    const { transformJSON = true } = options;
    let result = data;
    if (transformJSON) {
      result = JSON.stringify(data);
    }
    this.send(result);
  }

  // 关闭WebSocket
  closeSocket(code, reason) {
    this.close(code, reason);
  }

  // 开始心跳检测
  heartCheckStart() {
    this.heartCheckInterval = setInterval(() => {
      if (this.readyState === this.OPEN) {
        let transformJSON = typeof this.heartCheckData === "object";
        this.sendMessage(this.heartCheckData, { transformJSON });
      } else {
        this.clearHeartCheck();
      }
    }, this.heartCheckTimeout);
  }

  // 清除心跳检测
  clearHeartCheck() {
    clearInterval(this.heartCheckInterval);
  }

  // 重置心跳检测
  resetHeartCheck() {
    clearInterval(this.heartCheckInterval);
    this.heartCheckStart();
  }
}
// 默认的配置项
const defaultOptions = {
  url: "",
  protocols: "",
  query: {},
};

export const useSocket = (options = defaultOptions) => {
  if (!window.WebSocket)
    return modal.msgWarning("您的浏览器不支持WebSocket, 请更换浏览器!");
  const dep = new EventMap();
  const reconnectCount = 0;
  return new Socket(options, dep, reconnectCount);
};

接下来我们从实际使用的角度解释一下上面的代码,首先我们暴露了一个 useSocket 函数,该函数接收一个 options 配置项参数,支持的参数有:

  • url:要连接的 WebSocket URL;
  • protocols:一个协议字符串或者一个包含协议字符串的数组;
  • query:可以通过 URL 传递给后端的查询参数;
  • greet:心跳检测的打招呼信息;
  • customBase:自定义的 baseURL ,否则默认使用环境变量中定义的 env.VITE_APP_BASE_WS

在调用该函数后,我们首先会判断当前用户的浏览器是否支持 WebSocket,如果不支持给予用户提示。

然后我们实例化了一个 EventMap 类的实例对象 dep,你可以把它当作是一个依赖收集桶,当用户订阅了某个 WebSocket 事件时,我们将收集这个事件对应的回调作为依赖,在事件触发时,再通知该依赖,然后调用该事件对应的回调函数。

接下来我们定义了一个初始的重连次数记录值 reconnectCount 为 0,每当这个 WebSocket 重连时,该值会自增。

之后我们实例化了自己封装的 Socket 类,并传入了我们上面的三个参数。 在 Socket 类的构造函数 constructor 中,我们先取出配置项,把 query 内的参数拼接在 URL 上,然后使用 super 调用父类的构造函数进行建立 WebSocket 连接。

之后我们缓存了当前 Socket 实例化时的参数,再调用 initSocket() 方法去进行 WebSocket 事件的监听:

  • onopen:触发 depopen 对应的回调函数并且打开心跳检测;
  • onclose:触发 depclose 对应的回调函数并且对关闭的 code 码进行判断,如果是非正常关闭连接,将会进行重连,如果重连次数达到阈值,则通知给用户;
  • onerror:触发 deperror 对应的回调函数;
  • onmessage:接收到服务端返回的数据,可以先根据自身业务做一些预处理,比如我就根据不同的数据类型进行了数据解析的预处理,之后再触发 depmessage 对应的回调函数并传入处理过后的数据。

我们也暴露了一些成员方法以供实例对象使用:

  • subscribe:订阅 WebSocket 事件,传入事件类型并须是 EventTypes 内的类型之一,第二个参数则是回调函数;
  • sendMessage:同样的,我们在给服务端发送数据之前也可以根据自身业务做一些预处理,比如我将需要转成 JSON 的数据,在这里统一转换后再发送给服务端;
  • closeSocket:关闭 WebSocket 连接;
  • heartCheckStart:开始心跳检测,会创建一个定时器,在一定时间之后(默认是 55s)给服务端发送信息确认连接是否正常;
  • clearHeartCheck:清除心跳检测定时器(如果当前 WebSocket 连接已经关闭,则自动清除);
  • resetHeartCheck:重置心跳检测定时器。

如何使用

让我们看下如何使用这个封装好的 useSocket 函数,以在 Vue3 中使用为例:

// xx.jsx or xx.vue
import { useSocket } from "./socket.js";
const socket = ref(null); // WebSocket实例
const initWebSocket = () => {
  const options = {
    url: "/<your url>",
    query: {
      // something params
    },
  };
  socket.value = useSocket(options);
  socket.value.subscribe("open", () => {
    console.log("WebSocket连接成功!");
    const greet = "hello";
    // 发送打招呼消息
    socket.value.sendMessage(greet);
  });
  socket.value.subscribe("close", (reason) => {
    console.log("WebSocket连接关闭!", reason);
  });
  socket.value.subscribe("message", (result) => {
    console.log("WebSocket接收到消息:", result);
  });
  socket.value.subscribe("error", (err) => {
    console.log("WebSocket捕获错误:", err);
  });
  socket.value.subscribe("reconnect", (_socket) => {
    console.log("WebSocket断开重连:", _socket);
    socket.value = _socket;
  });
};
initWebSocket();

最后,如果想 debug 我们的心跳检测是否有效,可以使用下面这段代码:

// 测试心跳检测重连 手动模拟断开的情况
if (this._reconnectCount > 0) return;
const tempTimer = setInterval(() => {
  this.close();
  if (this._reconnectCount < 3) {
    console.log("重连");
    this._reconnectCount++;
    const _socket = new Socket(
      this._currentOptions,
      this._dep,
      this._reconnectCount
    );
    this._dep.notify("reconnect", _socket);
  } else {
    return clearInterval(tempTimer);
  }
}, 3 * 1000);

initSocket() 方法中的 this.onopen 事件的回调函数内的最后添加上面这段代码即可。

总结

至此,我们实现了一个 WebSocket 类的封装,提供了连接、断开、消息发送、接收和心跳检测等功能,并可以根据业务需要对消息进行预处理。同时,我们还介绍了如何使用封装好的 useSocket 函数。

WebSocket 封装的好处在于可以让我们在全局范围内方便地使用 WebSocket,提高代码的可读性和可维护性,降低代码的复杂度和重复性。在实际开发过程中,我们可以结合自己的业务需求,对封装的 WebSocket 类进行扩展和优化,以达到更好的效果。

- Book Lists -