灏天阁

Websocket 异常处理

· Yin灏

背景

花两天时间重构 Websocket 模块,关于链接异常处理的细节比较多,而且错误处理中容易出现冲突。本文主要是整理下链接异常 && 消息异常的处理方案。
建议可以先看一下我之前关于 tcp 消息丢失的文章

链接异常 && 异常处理

一、关于 websocket 长链接的异常,主要分几种异常情况:

1、创建连接异常;

测试在 chrome 下网络丢包频率高的话,大概 40s-50s 会报错,如果监听了 onerror 事件可以捕获到报错(WebSocket connection to 'wss://\*\*\*' failed: Error in connection establishment: net::ERR_CONNECTION_TIMED_OUT);
如果需要统一/固定时长,可以在创建时通过定时器监听(通过 socket.readyState 判断)超过多长时间没有创建成功则重新连接。

但当自己来处理链接时长后,需要注意在重连的时候关闭连接且将前一个链接的事件绑定解绑。否则还可能会收到原链接的相关事件推送(e.g. 设置了 30s 重连,但 40 秒后链接超时触发 onerror);

// 我这里用了单例模式来实现
let instance;
let onmessage, onopen, onerror, onclose;
class Ws {
  static getInstance() {
    if (!instance) {
      instance = new Ws();
    }
    return instance;
  }
  createSocket(url) {
    this.socketInstance = new WebSocket(url);
    this.socketInstance.binaryType = "arraybuffer";
    // 因为事件执行的this指向的是socket实例,需要修改this指向
    onmessage = this.onmessage.bind(this);
    onopen = this.onopen.bind(this);
    onerror = this.onerror.bind(this);
    onclose = this.onclose.bind(this);
    // socket 事件绑定
    this.socketInstance.addEventListener("message", onmessage);
    this.socketInstance.addEventListener("open", onopen);
    this.socketInstance.addEventListener("error", onerror);
    this.socketInstance.addEventListener("close", onclose);
    // 设置30s重连
    this.timeoutEvent(30000, () => {
      // 创建连接超时重连
      if (this.socketInstance && this.socketInstance.readyState !== 1) {
        this.closeSocket();
        this.reconnection();
      }
    });
  }
  onmessage() {}
  onopen() {}
  onerror() {}
  onclose() {}
  closeSocket() {
    // 解除事件绑定后,关闭连接
    this.socketInstance.removeEventListener("open", onopen);
    this.socketInstance.removeEventListener("message", onmessage);
    this.socketInstance.removeEventListener("error", onerror);
    this.socketInstance.removeEventListener("close", onclose);
    this.socketInstance.close();
  }
  reconnection() {
    this.createSocket("ws://127.0.0.1:8891/ws");
  }
}
instance = Ws.getInstance();

export default instance;

2、创建成功后的异常;

首先,正常情况下由服务端/客户端主动触发的断线,都会通知到对方链接已断开,如:

  1. 服务端/客户端 关闭进程/crash 等;
  2. 服务端/客户端 断网;
  3. 服务端/客户端 主动断开连接;

例如浏览器的 webScoket onclose 事件可以监听到服务端通知链接断开(断开的信息CloseEvent code一般为 1000, 服务端可以自行设置 code),如果需要发送消息则需要重新建立链接;

有一种情况是例外的,如果服务端和客户端之间的网络不稳定,丢包概率比较大,两端是无法感知的,所以需要通过心跳进行健康检查:

// 连接监听到 onopen 后,进行心跳检查, 5s 一次
onopen() {
    clearInterval(this.heartbeatTimmer)
    this.heartbeatTimmer = setInterval(() => {
        this.sendMessage('ping') // 自定义心跳消息
    }, 5000)
}

但因为 tcp 是无状态的,我们无法监听到心跳是否发送成功,所以需要通过另外一种方式来监听链接状态是否正常,可以通过定时器监听服务端返回的心跳包(设置超时时间,如果超过 60s 没收到心跳回应则重新连接):

onmessage(event) {
    if(isPong(event)) {
        this.listenPong()
    }
}
listenPong() {
    clearTimeout(this.pongTimmer)
    // 超过60s没收到服务端心跳则重连
    this.pongTimmer = setTimeout(() => {
        this.closeSocket()
        this.reconnection()
    }, 60000)
}

3、关闭连接异常;

客户端和服务端都有自行关闭链接的机制(即使安排一样的超时机制也可能会不一致),两边的状态可能会不一致, e.g.

  1. 服务端仍正常连接,客户端已断开,并会发起重连;
  2. 服务端已断开,客户端未断开(服务端断开的消息未收到,或已收到断开消息,但客户端未处理);
  3. 客户端自动关闭连接(测试在 chrome 超过 60s 没收到消息的情况下会自动断开连接,CloseEvent.code 是 1006);

对 1 来说,客户端断开连接发起重连后,按照最新的连接收发消息,不会出现异常。

对 3 来说,不同的 client 有不一样的处理逻辑,我们需要在关闭后进行重连处理。

onclose(event) {
    // 1006 客户端主动关闭
    if(event && event.code === 1006 && this.socketInstance.readyState !== 1) {
        this.closeSocket()
        this.reconnection()
    }
}

但 2 的情况,因为客户端记录仍是正常的,发送消息的时候会抛出异常:WebSocket is already in CLOSING or CLOSED state.我们需要做两个处理:

// 当收到服务端断开消息时,需要重连
onclose(event) {
    // 1006 客户端主动关闭
    // 1000 服务端关闭
    if(event && (event.code === 1006 || event.code === 1000) && this.socketInstance.readyState !== 1) {
        this.closeSocket()
        this.reconnection()
    }
}
// 当发送消息时需要检查链接状态,如果状态异常则需要重连
sendMessage(message) {
    if (!this.socketInstance || this.socketInstance.readyState !== 1) {
        this.closeSocket()
        this.reconnection()
        return
    }
}

消息异常 && 异常捕获

同样因为 tcp 无状态,所以我们无法直接判断消息是否发送成功,需要在业务层面实现 AcKnowledgements (发送消息给服务端后,服务端需要返回一条确认消息,收到确认消息才认为消息已发送成功)。

大部分业务只需要知道消息是否发送成功,所以通过 AcKnowledgements 已完成需求。但在一些稳定性要求较高的项目上,仍需要:

  1. 确认消息发送失败(实际上,AcKnowledgements 也可能丢失);
  2. 丢失的消息需要自动重发;
  3. 基于 2 的情况,收到的消息可能是重复的(因为存在可能发送成功的消息,丢失了 也就是实际发送成功了,但客户端未收到 AcKnowledgements),需要自行处理去重;

为了更好的数据管理,我设置了队列来进行消息管理,每次发送的消息时,将消息放入发送队列:

sendMessage(message) {
    if (!this.socketInstance || this.socketInstance.readyState !== 1) {
        this.closeSocket()
        this.reconnection()
        return
    }
    msgQueue.enqueue(message)
    this.socketInstance.send(message)
}

当收到系统回执时,需要将消息从发送队列中删除:

onmessage(message) {
    msgQueue.dequeue(message)
}

同时,队列管理需要进行定时的消息重发:

let instance;

// 关键消息结构
// localMsgId, 本地消息id
// createTimestamp, 创建的时间戳(这里最好用服务器时间)
class MsgQueue {
  msgList = []; // 发送消息列表
  pending = false; // 是否在队列检查中
  period = 5000; // 超时时间
  static getInstance() {
    if (!instance) {
      instance = new MsgQueue();
    }
    return instance;
  }
  enqueue(msg) {
    this.msgList.push(msg); // 放入消息队列
    if (!this.pending) {
      // 如果消息队列没有执行任务则开启执行
      this.listen();
    }
  }
  dequeue(msg) {
    this.msgList = this.msgList.filter(
      (item) => item.localMsgId !== msg.localMsgId
    );
  }
  listen() {
    if (!this.msgList.length) {
      this.pending = false;
      return;
    }
    let now = new Date().getTime(); // 同理,最好用服务器时间
    let msg = this.msgList[0];
    let distance = now - msg.createTimestamp;

    this.pending = true;
    if (distance >= this.period) {
      socket.sendMessage(msg); // 重发
      this.dequeue(msg);
      this.listen();
    } else {
      setTimeout(() => {
        this.listen();
      }, this.period - distance);
    }
  }
}
instance = MsgQueue.getInstance();

export default instance;

关于去重,我这里只需要在 onmessage 中进行检查,已有的消息直接忽略:

onmessage(event) {
    if(isDuplicated(event.data)) {
        return
    }
}

Socket readyState

  • 0 (WebSocket.CONNECTING) 正在链接中
  • 1 (WebSocket.OPEN) 已经链接并且可以通讯
  • 2 (WebSocket.CLOSING) 连接正在关闭
  • 3 (WebSocket.CLOSED) 连接已关闭或者没有链接成功

涉及到的测试工具

  1. 断网,使用 chrome network 工具;
  2. 客户端丢包, 使用 macOS 的 NLC 工具设置丢包率;
  3. 服务端丢包, ubuntu 服务器,使用 linux tc 命令模拟丢包;
// 设置丢包率 10%
tc qdisc add dev eth0 root netem loss 10%
// 查看配置
tc qdisc show // dev eth0 root refcnt 2 limit 1000 loss 10%
// 删除配置
tc qdisc del dev eth0 root

备注:丢包率会影响 tcp 发送时间,因为 tcp 丢包后会重传,不同系统重传及超时机制不一样(e.g. linux 最小重传时间是 200ms, 最大重传时间是 120s, 重传次数为 15)

问题记录

  1. 为什么已经关闭的连接 onclose 仍会执行;
    socketInstance 设置了 null,事件绑定未取消,对象仍在内存中。事件仍会执行,需要手动解除事件绑定。

  2. 为什么 new Socket() 的错误(WebSocket connection to 'wss://\*\*\*' failed: Error in connection establishment: net::ERR_CONNECTION_TIMED_OUT)不能阻止错误冒泡?

根据 javascript-doesnt-catch-error-in-websocket-instantiation,这个答案描述的是因为这个错误是异步抛出的,所以无法捕获。
尝试像 settimeout 一样用 promise 包裹也无法捕获到,但这个错误不会影响代码的执行,而且也传播到 onerror 上可以被捕获到。

  1. 如何通过命令行查看 websocket 状态?
    通过 top 命令查看进程 pid,通过 losf -p pid 查看对应的进程 TCP 链接。

参考文档

1、WebSockets: developer.mozilla.org/zh-CN/docs/…

- Book Lists -