灏天阁

WebSocket技术分享

· Yin灏

简介

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信,即允许服务器主动发送信息给客户端。因此,在 WebSocket 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输,客户端和服务器之间的数据交换变得更加简单。

主代码

const webSocketInit = () => {
  // 防止页面跳转造成 ws 重连,限制能进行 ws 连接的页面
  if (
    verityFailed.current ||
    !window.location.pathname.includes("/whiteboard")
  ) {
    return;
  }
  if (lockReconnect) return;
  lockReconnect = true;
  // 重连之前如果已经存在 ws 连接则关闭
  if (wss && (wss.readyState === 0 || wss.readyState === 1)) {
    wss.close(1000, "Switching to a new connection");
  }
  wss = null;
  try {
    /**
     * connectionAttempts 定义用于计数的变量,这里表示重连的次数
     * wsUrlRef.current [Array] 返回的是可用 ws 地址列表
     * 目的:每次重连都要循环更换地址
     */
    const wsAddress =
      wsUrlRef.current[connectionAttempts % wsUrlRef.current.length];
    /**
     * WebSocketSingleton.getInstance 使用单例模式和闭包保证连接唯一性
     */
    wss = WebSocketSingleton.getInstance(wsAddress);
    connectionAttempts++;
    setWs(() => wss);
    const onopen = () => {
      if (wss && wss.readyState === 1) {
        // 发送认证
        wss.send(
          pack(OpAuth, {
            uid: getStorage("userinfo").uid,
            token: getStorage("userinfo").token,
            platform: "web",
            wid: nowWid,
          })
        );
      }
    };
    // 服务端返回数据接收
    const onmessage = (e) => {
      if (verityFailed.current) return;
      unpack(e.data).then((msg) => {
        // ...
        // 队列:保持白板数据的同步性
        /**
         * 目的:防止客户端发送失败、服务端接收失败、服务端反馈但客户端未接收到等情况
         * 实现:客户端每次发送消息都携带一个 flag 唯一标识,然后服务端将 flag 返回,客户端接收的数据中若包含对应的 flag,则此次请求完成
         */
        if (msg?.type === 22) {
          // flag 32 位字符串,做唯一标识处理
          const flag = msg?.data?.custom?.flag;
          if (flag) {
            // 收集每次接收到的 flag 信息
            instance.dequeue(flag);
          }
          // ...
        }

        // ...

        if (
          (msg?.data?.array && Array.isArray(msg.data.array)) ||
          ["CHANGE", "CREATE", "DELETE"].includes(msg?.data?.op || "")
        ) {
          // ...
        } else if (msg.type === OpAuthReply) {
          // 接收认证消息
          const datas = msg?.data;
          if (datas && datas.code !== 200) {
            // 认证失败
            // 做一些数据清空和状态重置
            if (wss) {
              wss.onopen = null;
              wss.onmessage = null;
              wss.onclose = null;
              wss.onerror = null;
            }
            verityFailed.current = true;
            getWsVerityStatus.current = false;
            setWsStatus(false);
            if (window?.wbObj) window.wbObj.getWsVerityStatus = false;
            window.clearInterval(intervalid);
            intervalid = null;
          } else {
            // 认证成功
            // 成功状态数据设置
            setWsStatus(true);
            setWsUrl(wsAddress);
            getWsVerityStatus.current = true;
            verityFailed.current = false;
            // 心跳服务
            intervalid = window.setInterval(SendHeartbeat, 30 * 1000);
          }
        }
      });
    };
    const onerror = () => {
      // 提交错误日志
      const copyLog = JSON.parse(
        JSON.stringify({
          user_id: data?.uid,
          ws_url: wsAddress,
        })
      );
      sendWsErrorLog(2, copyLog);
      reconnectHandle();
    };
    const reconnectHandle = () => {
      if (verityFailed.current) return;
      lockReconnect = false;
      getWsVerityStatus.current = false;
      setWsStatus(false);
      window.clearInterval(intervalid);
      intervalid = null;
      if (window.location.pathname.includes("/whiteboard")) {
        reconnect();
      }
    };
    wss.onopen = onopen;
    wss.onmessage = onmessage;
    wss.onclose = reconnectHandle;
    wss.onerror = onerror;
  } catch (error) {
    lockReconnect = false;
    reconnect();
  }
};

心跳机制

function SendHeartbeat() {
  try {
    wsSendFun({
      wsconn: wss,
      data: pack(OpHeartbeat, {}),
    });
    console.log("10s heart");
  } catch (err) {
    console.log("心跳报错:", err.message);
    window.clearInterval(intervalid);
    intervalid = null;
  }
}

单例模式

  • instance 不存在或 readyState 为 3 的时候才会重新连接,否则始终保持 instance 的唯一性
// 用闭包实现单例模式
const WebSocketSingleton = (() => {
  let instance;
  const createInstance = (url) => {
    const socket = new WebSocket(url);
    return socket;
  };
  return {
    getInstance(url) {
      // readyState === 3 ws 连接关闭
      if (!instance || instance?.readyState === 3) {
        instance = createInstance(url);
      }
      return instance;
    },
  };
})();

export default WebSocketSingleton;

客户端每次发送 websocket 则记录发送的数据,放于队列中

  • 将每次发送的 ws 数据,存于队列中,用于进行下一步处理
let finallData = {
  timestamp,
  custom: {
    flag: randomString(),
  },
  array: data,
};
wb?.parentData?.instance?.enqueue?.(finallData, ws);

队列处理逻辑

// eslint-disable-next-line import/no-mutable-exports
let instance = null;

class MsgQueue {
  msgList = []; // 发送消息列表
  pending = false; // 是否在队列检查中
  period = 5000; // 超时时间
  // 静态方法,实例化类,单例模式
  static getInstance() {
    if (!instance) {
      instance = new MsgQueue();
    }
    return instance;
  }

  enqueue(msg, wscoon, type = "") {
    if (type === "before") {
      this.msgList.unshift(msg);
    } else {
      this.msgList.push(msg);
    }
    // 每添加一个数据,则执行一次监听
    if (!this.pending) {
      this.listen(wscoon);
    }
  }

  dequeue(flag) {
    // 若已经接收到服务端返回携带相应 flag 的数据,则从队列中清除数据
    this.msgList = this.msgList.filter((item) => item.custom.flag !== flag);
  }

  listen(wscoon) {
    // 若队列中无数据则不进行后续处理
    if (!this.msgList.length) {
      this.pending = false;
      return;
    }
    // 获取当前时间
    const now = new Date().getTime();
    // 获取队列中第一个元素
    const msg = this.msgList[0];
    // 客户端每次发送 ws 消息,都会携带当前时间戳
    // 计算时间差
    const distance = now - msg.timestamp;
    // 修改执行状态
    this.pending = true;
    // 超过 5s 没有接收到服务端返回的鞋带 flag 的数据,则认为客户端发送失败,需要重发
    if (distance >= this.period) {
      const flag = msg?.custom?.flag;
      // 删除此数据
      this.dequeue(flag);
      // 重新记录时间戳
      msg.timestamp = new Date().getTime();
      if (wscoon?.readyState === 1) {
        // 客户端再次发送 ws 效果
        wscoon.send(dataToArrayBuffer(21, msg));
      }
      // 把当前消息放在队列的最前面,
      this.enqueue(msg, wscoon, "before");
      // 递归监听
      this.listen(wscoon);
    } else {
      // 如果没有达到规定时间,则设置一个定时器,时间设置为剩余时间
      // 等到剩余时间过完再次执行监听
      setTimeout(() => {
        this.listen(wscoon);
      }, this.period - distance);
    }
  }
}
instance = MsgQueue.getInstance();
export default instance;

- Book Lists -