WebSocket 的极致封装
我们在使用WebSocket
的时候,最头疼的一件事莫过于在处理onmessage
事件,通常我们会遇到这样的情况,向后端发送一个请求,后端返回一个结果;
我们如何将这个指令返回的数据,对应到这个指令上?这个时候大家都想了很多办法,比如在发送的时候,将指令和数据一起发送,后端返回的时候,将指令和数据一起返回,这样就可以通过指令来对应数据了;
可能有的小伙伴就说了这种情况为啥不直接用http
请求呢?当然是因为需求呀,不讲故事也不说废话,开始干货。
先看效果,不要太在意样式,懒得写,随便弄了点;
问题
标题是Websocket
,但是其实不只是Websocket
有这个问题,还有其他的可能都有这个问题,先看代码:
const ws = new WebSocket("ws://localhost:3000");
ws.onopen = function () {
// 发送消息
ws.send("hello");
};
// 接收消息
ws.onmessage = function (e) {
console.log(e.data);
};
这个代码是一个最简单的Websocket
的使用,不难发现我们发送消息和接收消息并不在同一个作用域里面,这样就会导致我们无法将发送的消息和接收的消息对应起来;
更绝的是,我们这个没有任何标识,例如我这个例子里面,我发送了hello
,后端会返回world
;
模拟环境
上面是一个Websocket
的示例,其实我们还会有很多领域可能会出现这个问题,例如我写这个解决方案的时候,并不是为了解决Websocket
的问题,而是为了解决串口通信的问题;
我这里就模拟一个环境,因为我没办法提供Websocket
的服务,也不能提供串口的服务,所以我就模拟一个环境,也让大家更好的理解这个问题。
class Mock {
constructor() {
let count = 0;
setInterval(() => {
this.onmessage("hello world" + count);
count++;
}, 10000);
}
send(data) {
// 随机延迟
const delay = Math.random() * 1000;
setTimeout(() => {
switch (data) {
case "hello":
this.onmessage("world");
break;
case "world":
this.onmessage("hello");
break;
case "你好":
// 随机是否返回
if (Math.random() > 0.5) {
this.onmessage("好个der");
}
break;
default:
this.onmessage(data);
break;
}
}, delay);
}
onmessage = (data) => {};
}
这个Mock
类,就是一个模拟的Websocket
,我们可以通过send
方法来发送数据,通过onmessage
方法来接收数据;
上面的mock
环境非常简单,就是你在发送消息的时候,立即会返回一个消息,但是会有延时,上面的模拟环境有下面几种情况:
- 我们发送
hello
,后端返回world
; - 我们发送
world
,后端返回hello
; - 我们发送
你好
,后端返回好个der
,但是这个返回是随机的,有可能不返回,模拟丢包的情况; - 每过 10 秒,后端返回
hello world${count}
;
解决方案
先上代码,后面讲解:
class MockHelper {
constructor() {
// 用于存储消息处理器
this.messageHandlers = [];
// 模拟消息发送(websocket)
this.mock = new Mock();
// 模拟消息接收
this.mock.onmessage = (data) => {
// 熔断器
let fusing = false;
const fused = () => {
fusing = true;
};
// 执行消息处理
for (let i = 0; i < this.messageHandlers.length; i++) {
const handler = this.messageHandlers[i];
try {
// 执行消息处理器
handler.handle(data, fused);
} catch (e) {
// 消息处理器执行失败
handler.reject(e);
this.messageHandlers.splice(i, 1);
i--;
}
// 熔断
if (fusing) {
this.messageHandlers.splice(i, 1);
break;
}
}
};
}
send(data, handler) {
// 返回一个 Promise
return new Promise((resolve, reject) => {
// 创建消息处理器
const messageHandler = new MessageHandler();
messageHandler.callback = handler;
messageHandler.resolve = resolve;
messageHandler.reject = reject;
messageHandler.data = data;
// 设置超时,3s
messageHandler.timer = setTimeout(() => {
messageHandler.reject(new Error("timeout"));
this.messageHandlers.splice(
this.messageHandlers.indexOf(messageHandler),
1
);
}, 3000);
// 添加到消息处理器列表
this.messageHandlers.push(messageHandler);
// 发送消息
this.mock.send(data);
});
}
}
// 消息处理器
class MessageHandler {
constructor() {
this.callback = null;
this.timer = null;
this.resolve = null;
this.reject = null;
this.data = null;
}
// 处理消息
handle(data, fused) {
// 消息处理结果
let result = {
sendData: this.data,
receiveData: data,
handleData: null,
};
// 熔断器
let fusing = false;
const _fused = () => {
// 收到熔断信号,清除定时器
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
// 标记为熔断
fusing = true;
// 执行上层的熔断函数
fused();
// 执行 resolve
if (this.resolve) {
// 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
Promise.resolve().then(() => {
this.resolve(result);
this.resolve = null;
});
}
};
// 执行回调函数
if (typeof this.callback === "function") {
try {
result.handleData = this.callback(data, _fused);
} catch (e) {
this.reject(e);
throw e;
}
}
}
}
MockHelper
上面我们定义了一个MockHelper
类,这个类可以理解为对Websocket
的封装,其他类似的处理机制都可以用这种方式来封装;
在实例化的过程中,会对Websocket
进行创建,然后监听对应的消息响应事件,当收到消息的时候,会执行对应的消息处理器:
// 模拟消息接收
this.mock.onmessage = (data) => {
// 熔断器
let fusing = false;
const fused = () => {
fusing = true;
};
// 执行消息处理
for (let i = 0; i < this.messageHandlers.length; i++) {
const handler = this.messageHandlers[i];
try {
// 执行消息处理器
handler.handle(data, fused);
} catch (e) {
// 消息处理器执行失败
handler.reject(e);
this.messageHandlers.splice(i, 1);
i--;
}
// 熔断
if (fusing) {
this.messageHandlers.splice(i, 1);
break;
}
}
};
方法很简单,每行代码都有注释,主要是思想,我们通过一个fusing
变量来标记是否熔断;
熔断的意思就是结束后续的处理流程,执行熔断器之后,就代表这个消息已经被处理了,后续的消息处理器就不需要再处理了;
消息执行出现异常会被捕获,并且执行reject
,然后从消息处理器列表中移除,这里不会被熔断,因为这个消息处理器已经执行失败了,可能是因为消息处理器的逻辑有问题,并不代表这个消息已经被处理了;
send
send
方法是用来发送消息的,这个方法返回一个Promise
,这个Promise
的resolve
和reject
是在消息处理器中执行的;
/**
* 发送消息
* @param data 发送的消息内容
* @param handler 对消息的处理函数
* @return {Promise<unknown>} 返回一个 Promise
*/
function send(data, handler) {
// 返回一个 Promise
return new Promise((resolve, reject) => {
// 创建消息处理器
const messageHandler = new MessageHandler();
messageHandler.callback = handler;
messageHandler.resolve = resolve;
messageHandler.reject = reject;
messageHandler.data = data;
// 设置超时,3s
messageHandler.timer = setTimeout(() => {
messageHandler.reject(new Error("timeout"));
this.messageHandlers.splice(
this.messageHandlers.indexOf(messageHandler),
1
);
}, 3000);
// 添加到消息处理器列表
this.messageHandlers.push(messageHandler);
// 发送消息
this.mock.send(data);
});
}
send
方法接收两个参数,第一个参数是发送的消息内容,第二个参数是对消息的处理函数;
发送的消息内容就不多说了,对消息的处理函数就是我们自定义对返回的消息进行处理的逻辑;
send
方法的内容还是很简单的,最开始初始化一个MessageHandler
对象,并且设置一个超时定时器,如果出现超时就将这个处理器从this.messageHandlers
中删除;
然后,将这个处理器添加到this.messageHandlers
中,this.messageHandlers
最后会在onmessage
中使用,这样就就形成了一闭环;
MessageHandler
在send
中传入的处理函数,最后会被封装成一个MessageHandler
对象,这个对象就是消息处理器;
// 消息处理器
class MessageHandler {
constructor() {
this.callback = null;
this.timer = null;
this.resolve = null;
this.reject = null;
this.data = null;
}
// 处理消息
handle(data, fused) {
// 消息处理结果
let result = {
sendData: this.data,
receiveData: data,
handleData: null,
};
// 熔断器
let fusing = false;
const _fused = () => {
// 收到熔断信号,清除定时器
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
// 标记为熔断
fusing = true;
// 执行上层的熔断函数
fused();
// 执行 resolve
if (this.resolve) {
// 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
Promise.resolve().then(() => {
this.resolve(result);
this.resolve = null;
});
}
};
// 执行回调函数
if (typeof this.callback === "function") {
try {
result.handleData = this.callback(data, _fused);
} catch (e) {
this.reject(e);
throw e;
}
}
}
}
MessageHandler
对象中有五个属性,callback
是对消息的处理函数,timer
是用来设置超时的定时器,resolve
和reject
是用来执行Promise
的,data
是发送的消息内容;
handle
方法是用来处理消息的,这个方法接收两个参数,第一个参数是接收到的消息内容,第二个参数是熔断器;
这里的核心还是熔断器,上面的onmessage
的熔断器很简单,就是设置一个标记,然后在onmessage
中判断这个标记,如果是熔断状态,就不再执行后面的逻辑;
这里面的熔断器会执行一系列的逻辑:
// 熔断器
let fusing = false;
const _fused = () => {
// 收到熔断信号,清除定时器
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
// 标记为熔断
fusing = true;
// 执行上层的熔断函数
fused();
// 执行 resolve
if (this.resolve) {
// 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
Promise.resolve().then(() => {
this.resolve(result);
this.resolve = null;
});
}
};
首先,收到熔断信号,清除定时器,这样就不会再执行超时的逻辑;
然后,标记为熔断,并且执行上层的熔断器,这样在onmessage
中就不会再执行后面的逻辑;
最后,执行resolve
,这里将resolve
放到了Promise.resolve().then()
中,这样就可以等待回调函数执行完毕,再执行resolve
,这样才能拿到handleData
;
使用
使用很简单,直接看代码:
// 实例化
const mockHelper = new MockHelper();
// 直接发送消息,并传入消息处理函数
mockHelper
.send("hello", (data, fused) => {
if (data === "world") {
fused();
return "hello world";
}
})
.then((data) => {
// 这里可以拿到 发送的消息内容,接收的消息内容,处理后的消息内容
console.log("then", data);
})
.catch((e) => {
console.log("catch", e);
});
看到这里不知道你会不会有一堆的疑问,比如:
- 为什么要在
send
中传入消息处理函数?我直接在这个消息处理函数中处理消息不就行了吗? - 为什么要用
Promise
来封装呢?你这里都有回调了,感觉没必要啊? - 你这个最后还不是会有一堆的
if
判断吗?而且我如果要发两次相同的消息,这个代码我还要写两遍吗? - 其他的问题…
这当然是为了方便大家使用,按照现在的封装,我们只需要在send
中传入消息处理函数,然后在这个函数中处理消息,就可以了;
但是实际情况下我们还是要尽可以的对代码进行复用,光说不练假把式,我们来看看怎么复用这个代码:
// 实例化
const mockHelper = new MockHelper();
// 发送 hello 消息
const sendHello = () => {
return mockHelper.send("hello", (data, fused) => {
if (data === "world") {
fused();
return "hello world";
}
});
};
// 发送 world 消息
const sendWorld = () => {
return mockHelper.send("world", (data, fused) => {
if (data === "hello") {
fused();
return "hello world";
}
});
};
// 使用
sendHello().then((data) => {
console.log(data);
});
sendHello().then((data) => {
console.log(data);
});
sendHello().then((data) => {
console.log(data);
});
sendWorld().then((data) => {
console.log(data);
});
sendWorld().then((data) => {
console.log(data);
});
sendWorld().then((data) => {
console.log(data);
});
// ...其他
// 最后才是直接使用 send
mockHelper
.send("xxx", (data, fused) => {
if (data === "xxx") {
fused();
return "...";
}
})
.then((data) => {
console.log(data);
});
对比
在我最开始没有想到这种封装的时候,我就是用下面这种方式来封装的,屎山!!!
按照最开始没有封装的时候的逻辑,我们需要在onmessage
事件中,可能会写一堆的if
判断,然后再执行对应的回调函数,比如:
class WebsocketWrap {
constructor(options) {
this.options = options;
this.ws = null;
this.connect();
}
connect() {
this.ws = new WebSocket(this.options.url);
this.ws.onmessage = (event) => {
this.onmessage(event.data);
};
}
onmessage = (data) => {
// 逻辑处理 a
if (data === "a") {
this.options["onA"](data);
return;
}
// 逻辑处理 b
if (data.xxx === "xxx") {
this.options["onXxx"](data);
return;
}
// 逻辑处理 c
if (data.yyy === "yyy") {
this.options["onYyy"](data);
return;
}
// 各种其他自定义处理...
};
send(data) {
this.ws.send(data);
}
}
// 使用
const ws = new WebsocketWrap({
url: "ws://localhost:8080",
onA: (data) => {
console.log("onA", data);
},
onXxx: (data) => {
console.log("onXxx", data);
},
onYyy: (data) => {
console.log("onYyy", data);
},
});
这样的代码,如果逻辑处理的函数比较多,那么onmessage
函数就会很长,而且不好维护,也不好扩展;
如果现在来了一个新的需求,我们就需要在onmessage
函数中,再添加一个if
判断,然后再添加一个对应的处理函数,这样的代码就会变得越来越难维护;
而我上面的封装就是解决了这个问题,我将onmessage
事件中的if
进行了封装,在send
方法中传入的处理函数,就是对消息的处理函数,也就是这个if
判断的逻辑;
最后消息处理完毕,执行熔断器,然后将处理后的消息通过resolve
返回出去,这样就可以在then
中拿到处理后的消息了;
总结
上面只是一个很简单的封装,主要介绍的还是思想,其实里面还有很多可以优化的地方,比如:
- 如果发送的消息并没有回应应该怎么修改?
- 这里服务端主动推送的消息全都被丢弃了,应该怎么修改?
- 是否可以增加一些配置,比如超时时间,重试次数等等?
- 其他…