灏天阁

WebSocket 的极致封装

· Yin灏

我们在使用WebSocket的时候,最头疼的一件事莫过于在处理onmessage事件,通常我们会遇到这样的情况,向后端发送一个请求,后端返回一个结果;

我们如何将这个指令返回的数据,对应到这个指令上?这个时候大家都想了很多办法,比如在发送的时候,将指令和数据一起发送,后端返回的时候,将指令和数据一起返回,这样就可以通过指令来对应数据了;

可能有的小伙伴就说了这种情况为啥不直接用http请求呢?当然是因为需求呀,不讲故事也不说废话,开始干货。

先看效果,不要太在意样式,懒得写,随便弄了点;

问题

标题是Websocket,但是其实不只是Websocket有这个问题,还有其他的可能都有这个问题,先看代码:

const ws = new WebSocket("ws://localhost:3000");

ws.onopen = function () {
  // 发送消息
  ws.send("hello");
};

// 接收消息
ws.onmessage = function (e) {
  console.log(e.data);
};

这个代码是一个最简单的Websocket的使用,不难发现我们发送消息和接收消息并不在同一个作用域里面,这样就会导致我们无法将发送的消息和接收的消息对应起来;

更绝的是,我们这个没有任何标识,例如我这个例子里面,我发送了hello,后端会返回world

模拟环境

上面是一个Websocket的示例,其实我们还会有很多领域可能会出现这个问题,例如我写这个解决方案的时候,并不是为了解决Websocket的问题,而是为了解决串口通信的问题;

我这里就模拟一个环境,因为我没办法提供Websocket的服务,也不能提供串口的服务,所以我就模拟一个环境,也让大家更好的理解这个问题。

class Mock {
  constructor() {
    let count = 0;
    setInterval(() => {
      this.onmessage("hello world" + count);
      count++;
    }, 10000);
  }

  send(data) {
    // 随机延迟
    const delay = Math.random() * 1000;
    setTimeout(() => {
      switch (data) {
        case "hello":
          this.onmessage("world");
          break;

        case "world":
          this.onmessage("hello");
          break;

        case "你好":
          // 随机是否返回
          if (Math.random() > 0.5) {
            this.onmessage("好个der");
          }
          break;

        default:
          this.onmessage(data);
          break;
      }
    }, delay);
  }

  onmessage = (data) => {};
}

这个Mock类,就是一个模拟的Websocket,我们可以通过send方法来发送数据,通过onmessage方法来接收数据;

上面的mock环境非常简单,就是你在发送消息的时候,立即会返回一个消息,但是会有延时,上面的模拟环境有下面几种情况:

  1. 我们发送hello,后端返回world
  2. 我们发送world,后端返回hello
  3. 我们发送你好,后端返回好个der,但是这个返回是随机的,有可能不返回,模拟丢包的情况;
  4. 每过 10 秒,后端返回hello world${count}

解决方案

先上代码,后面讲解:

class MockHelper {
  constructor() {
    // 用于存储消息处理器
    this.messageHandlers = [];

    // 模拟消息发送(websocket)
    this.mock = new Mock();

    // 模拟消息接收
    this.mock.onmessage = (data) => {
      // 熔断器
      let fusing = false;
      const fused = () => {
        fusing = true;
      };

      // 执行消息处理
      for (let i = 0; i < this.messageHandlers.length; i++) {
        const handler = this.messageHandlers[i];
        try {
          // 执行消息处理器
          handler.handle(data, fused);
        } catch (e) {
          // 消息处理器执行失败
          handler.reject(e);
          this.messageHandlers.splice(i, 1);
          i--;
        }

        // 熔断
        if (fusing) {
          this.messageHandlers.splice(i, 1);
          break;
        }
      }
    };
  }

  send(data, handler) {
    // 返回一个 Promise
    return new Promise((resolve, reject) => {
      // 创建消息处理器
      const messageHandler = new MessageHandler();
      messageHandler.callback = handler;
      messageHandler.resolve = resolve;
      messageHandler.reject = reject;
      messageHandler.data = data;

      // 设置超时,3s
      messageHandler.timer = setTimeout(() => {
        messageHandler.reject(new Error("timeout"));
        this.messageHandlers.splice(
          this.messageHandlers.indexOf(messageHandler),
          1
        );
      }, 3000);

      // 添加到消息处理器列表
      this.messageHandlers.push(messageHandler);

      // 发送消息
      this.mock.send(data);
    });
  }
}

// 消息处理器
class MessageHandler {
  constructor() {
    this.callback = null;
    this.timer = null;
    this.resolve = null;
    this.reject = null;
    this.data = null;
  }

  // 处理消息
  handle(data, fused) {
    // 消息处理结果
    let result = {
      sendData: this.data,
      receiveData: data,
      handleData: null,
    };

    // 熔断器
    let fusing = false;
    const _fused = () => {
      // 收到熔断信号,清除定时器
      if (this.timer) {
        clearTimeout(this.timer);
        this.timer = null;
      }

      // 标记为熔断
      fusing = true;

      // 执行上层的熔断函数
      fused();

      // 执行 resolve
      if (this.resolve) {
        // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
        Promise.resolve().then(() => {
          this.resolve(result);
          this.resolve = null;
        });
      }
    };

    // 执行回调函数
    if (typeof this.callback === "function") {
      try {
        result.handleData = this.callback(data, _fused);
      } catch (e) {
        this.reject(e);
        throw e;
      }
    }
  }
}

MockHelper

上面我们定义了一个MockHelper类,这个类可以理解为对Websocket的封装,其他类似的处理机制都可以用这种方式来封装;

在实例化的过程中,会对Websocket进行创建,然后监听对应的消息响应事件,当收到消息的时候,会执行对应的消息处理器:

// 模拟消息接收
this.mock.onmessage = (data) => {
  // 熔断器
  let fusing = false;
  const fused = () => {
    fusing = true;
  };

  // 执行消息处理
  for (let i = 0; i < this.messageHandlers.length; i++) {
    const handler = this.messageHandlers[i];
    try {
      // 执行消息处理器
      handler.handle(data, fused);
    } catch (e) {
      // 消息处理器执行失败
      handler.reject(e);
      this.messageHandlers.splice(i, 1);
      i--;
    }

    // 熔断
    if (fusing) {
      this.messageHandlers.splice(i, 1);
      break;
    }
  }
};

方法很简单,每行代码都有注释,主要是思想,我们通过一个fusing变量来标记是否熔断;

熔断的意思就是结束后续的处理流程,执行熔断器之后,就代表这个消息已经被处理了,后续的消息处理器就不需要再处理了;

消息执行出现异常会被捕获,并且执行reject,然后从消息处理器列表中移除,这里不会被熔断,因为这个消息处理器已经执行失败了,可能是因为消息处理器的逻辑有问题,并不代表这个消息已经被处理了;

send

send方法是用来发送消息的,这个方法返回一个Promise,这个Promiseresolvereject是在消息处理器中执行的;

/**
 * 发送消息
 * @param data                  发送的消息内容
 * @param handler               对消息的处理函数
 * @return {Promise<unknown>}   返回一个 Promise
 */
function send(data, handler) {
  // 返回一个 Promise
  return new Promise((resolve, reject) => {
    // 创建消息处理器
    const messageHandler = new MessageHandler();
    messageHandler.callback = handler;
    messageHandler.resolve = resolve;
    messageHandler.reject = reject;
    messageHandler.data = data;

    // 设置超时,3s
    messageHandler.timer = setTimeout(() => {
      messageHandler.reject(new Error("timeout"));
      this.messageHandlers.splice(
        this.messageHandlers.indexOf(messageHandler),
        1
      );
    }, 3000);

    // 添加到消息处理器列表
    this.messageHandlers.push(messageHandler);

    // 发送消息
    this.mock.send(data);
  });
}

send方法接收两个参数,第一个参数是发送的消息内容,第二个参数是对消息的处理函数;

发送的消息内容就不多说了,对消息的处理函数就是我们自定义对返回的消息进行处理的逻辑;

send方法的内容还是很简单的,最开始初始化一个MessageHandler对象,并且设置一个超时定时器,如果出现超时就将这个处理器从this.messageHandlers中删除;

然后,将这个处理器添加到this.messageHandlers中,this.messageHandlers最后会在onmessage中使用,这样就就形成了一闭环;

MessageHandler

send中传入的处理函数,最后会被封装成一个MessageHandler对象,这个对象就是消息处理器;

// 消息处理器
class MessageHandler {
  constructor() {
    this.callback = null;
    this.timer = null;
    this.resolve = null;
    this.reject = null;
    this.data = null;
  }

  // 处理消息
  handle(data, fused) {
    // 消息处理结果
    let result = {
      sendData: this.data,
      receiveData: data,
      handleData: null,
    };

    // 熔断器
    let fusing = false;
    const _fused = () => {
      // 收到熔断信号,清除定时器
      if (this.timer) {
        clearTimeout(this.timer);
        this.timer = null;
      }

      // 标记为熔断
      fusing = true;

      // 执行上层的熔断函数
      fused();

      // 执行 resolve
      if (this.resolve) {
        // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
        Promise.resolve().then(() => {
          this.resolve(result);
          this.resolve = null;
        });
      }
    };

    // 执行回调函数
    if (typeof this.callback === "function") {
      try {
        result.handleData = this.callback(data, _fused);
      } catch (e) {
        this.reject(e);
        throw e;
      }
    }
  }
}

MessageHandler对象中有五个属性,callback是对消息的处理函数,timer是用来设置超时的定时器,resolvereject是用来执行Promise的,data是发送的消息内容;

handle方法是用来处理消息的,这个方法接收两个参数,第一个参数是接收到的消息内容,第二个参数是熔断器;

这里的核心还是熔断器,上面的onmessage的熔断器很简单,就是设置一个标记,然后在onmessage中判断这个标记,如果是熔断状态,就不再执行后面的逻辑;

这里面的熔断器会执行一系列的逻辑:

// 熔断器
let fusing = false;
const _fused = () => {
  // 收到熔断信号,清除定时器
  if (this.timer) {
    clearTimeout(this.timer);
    this.timer = null;
  }

  // 标记为熔断
  fusing = true;

  // 执行上层的熔断函数
  fused();

  // 执行 resolve
  if (this.resolve) {
    // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
    Promise.resolve().then(() => {
      this.resolve(result);
      this.resolve = null;
    });
  }
};

首先,收到熔断信号,清除定时器,这样就不会再执行超时的逻辑;

然后,标记为熔断,并且执行上层的熔断器,这样在onmessage中就不会再执行后面的逻辑;

最后,执行resolve,这里将resolve放到了Promise.resolve().then()中,这样就可以等待回调函数执行完毕,再执行resolve,这样才能拿到handleData

使用

使用很简单,直接看代码:

// 实例化
const mockHelper = new MockHelper();

// 直接发送消息,并传入消息处理函数
mockHelper
  .send("hello", (data, fused) => {
    if (data === "world") {
      fused();
      return "hello world";
    }
  })
  .then((data) => {
    // 这里可以拿到 发送的消息内容,接收的消息内容,处理后的消息内容
    console.log("then", data);
  })
  .catch((e) => {
    console.log("catch", e);
  });

看到这里不知道你会不会有一堆的疑问,比如:

  • 为什么要在send中传入消息处理函数?我直接在这个消息处理函数中处理消息不就行了吗?
  • 为什么要用Promise来封装呢?你这里都有回调了,感觉没必要啊?
  • 你这个最后还不是会有一堆的if判断吗?而且我如果要发两次相同的消息,这个代码我还要写两遍吗?
  • 其他的问题…

这当然是为了方便大家使用,按照现在的封装,我们只需要在send中传入消息处理函数,然后在这个函数中处理消息,就可以了;

但是实际情况下我们还是要尽可以的对代码进行复用,光说不练假把式,我们来看看怎么复用这个代码:

// 实例化
const mockHelper = new MockHelper();

// 发送 hello 消息
const sendHello = () => {
  return mockHelper.send("hello", (data, fused) => {
    if (data === "world") {
      fused();
      return "hello world";
    }
  });
};

// 发送 world 消息
const sendWorld = () => {
  return mockHelper.send("world", (data, fused) => {
    if (data === "hello") {
      fused();
      return "hello world";
    }
  });
};

// 使用
sendHello().then((data) => {
  console.log(data);
});
sendHello().then((data) => {
  console.log(data);
});
sendHello().then((data) => {
  console.log(data);
});

sendWorld().then((data) => {
  console.log(data);
});
sendWorld().then((data) => {
  console.log(data);
});
sendWorld().then((data) => {
  console.log(data);
});

// ...其他

// 最后才是直接使用 send
mockHelper
  .send("xxx", (data, fused) => {
    if (data === "xxx") {
      fused();
      return "...";
    }
  })
  .then((data) => {
    console.log(data);
  });

对比

在我最开始没有想到这种封装的时候,我就是用下面这种方式来封装的,屎山!!!

按照最开始没有封装的时候的逻辑,我们需要在onmessage事件中,可能会写一堆的if判断,然后再执行对应的回调函数,比如:

class WebsocketWrap {
  constructor(options) {
    this.options = options;
    this.ws = null;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.options.url);
    this.ws.onmessage = (event) => {
      this.onmessage(event.data);
    };
  }

  onmessage = (data) => {
    // 逻辑处理 a
    if (data === "a") {
      this.options["onA"](data);
      return;
    }

    // 逻辑处理 b
    if (data.xxx === "xxx") {
      this.options["onXxx"](data);
      return;
    }

    // 逻辑处理 c
    if (data.yyy === "yyy") {
      this.options["onYyy"](data);
      return;
    }

    // 各种其他自定义处理...
  };

  send(data) {
    this.ws.send(data);
  }
}

// 使用
const ws = new WebsocketWrap({
  url: "ws://localhost:8080",
  onA: (data) => {
    console.log("onA", data);
  },
  onXxx: (data) => {
    console.log("onXxx", data);
  },
  onYyy: (data) => {
    console.log("onYyy", data);
  },
});

这样的代码,如果逻辑处理的函数比较多,那么onmessage函数就会很长,而且不好维护,也不好扩展;

如果现在来了一个新的需求,我们就需要在onmessage函数中,再添加一个if判断,然后再添加一个对应的处理函数,这样的代码就会变得越来越难维护;

而我上面的封装就是解决了这个问题,我将onmessage事件中的if进行了封装,在send方法中传入的处理函数,就是对消息的处理函数,也就是这个if判断的逻辑;

最后消息处理完毕,执行熔断器,然后将处理后的消息通过resolve返回出去,这样就可以在then中拿到处理后的消息了;

总结

上面只是一个很简单的封装,主要介绍的还是思想,其实里面还有很多可以优化的地方,比如:

  1. 如果发送的消息并没有回应应该怎么修改?
  2. 这里服务端主动推送的消息全都被丢弃了,应该怎么修改?
  3. 是否可以增加一些配置,比如超时时间,重试次数等等?
  4. 其他…

- Book Lists -