灏天阁

为什么说前端监控系统离不开 RabbitMQ?

· Yin灏

为什么说前端监控系统离不开 RabbitMQ?

前端监控系统是采集用户端的异常、性能、业务埋点等数据上报,在服务端做存储,并支持可视化分析的平台。

用户量可能很大,采集的数据可能比较多,这时候服务端的并发压力会比较大,要是直接存入数据库,那数据库服务很可能会崩掉。

那就用现在的数据库,如何保证面对大量并发请求的时候,服务不崩呢?

答案就是消息队列,比如常用的 RabbitMQ:

第一个 web 服务接收请求,把消息存入 RabbitMQ,然后另一个 web 服务从 MQ 中取出消息存入数据库。

有同学说,这不是一样么?

不一样,MQ 的并发量比数据库高很多。之前 web 服务要等数据库存储完成才能响应,而现在只存入 MQ 就可以响应了。那可以支持的并发量就更多。

而数据库的并发比较低,我们可以通过 MQ 把消费的上限调低,就能保证数据库服务不崩。

比如 10w 的消息进来,每次只从中取出 1000 来消费:

并发量被控制住了,自然就崩不了了,从 MQ 中取出慢慢处理就好了。

这就是 MQ 的流量削峰的功能。

而且完全可以加几个 web 服务来同时消费 MQ 中的消息:

知道了 RabbitMQ 能干啥,那我们就来用一下试试吧!

我们通过 docker 来跑 RabbitMQ,从官网下一个 docker desktop:

搜索 rabbitmq 的镜像,选择 3.11-management 的版本:

这个版本是有 web 管理界面的。

点击 run:

映射容器内的 5672、15672 这俩端口到本地的端口。

15672 是管理界面的,5672 是 mq 服务的端口。

等 rabbitmq 跑起来之后:

就可以在浏览器访问 http://localhost:15672 了:

这就是它的 web 管理界面。

输入 guest、guest 进入管理页面:

可以看到 connection、channel、exchange、queue 的分别的管理页面。

这些都是什么呢?

写个 demo 就理解了:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertQueue("aaa");
await channel.sendToQueue("aaa", Buffer.from("hello"));

安装 amqplib 的包,这个是 rabbitmq 的 node 客户端(amqp 是 rabbitmq 的协议)。

上面的代码连接了 rabbitmq 服务,创建了一个名字为 aaa 的队列,并向队列中发送了一个消息。

然后 node 跑一下:

(这里要用 es module 语法并且支持顶层 await 需要在 packege.json 里设置 type 为 module)

之后就可以在管理界面看到这个队列了:

然后我们再写一个消费端:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue("aaa");
channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);

assertQueue 是如果没有就创建队列,有的话就直接返回。

这里取到那个队列,就可以从中消费消息了:

这样,我们就完成了第一次 RabbitMQ 的通信,两个服务之间也是这样通信的。

是不是还挺简单的?

rabbitmq 使用确实挺简单。

那怎么控制并发数呢?

我们改一下代码:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertQueue("aaa", { durable: false });

let i = 1;
setInterval(async () => {
  const msg = "hello" + i;
  console.log("发送消息:", msg);
  await channel.sendToQueue("aaa", Buffer.from(msg));
  i++;
}, 500);

生产者每 0.5s 发送一次消息。

消费者每 1s 处理一条消息:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue("aaa");
channel.prefetch(3);

const currentTask = [];
channel.consume(
  queue,
  (msg) => {
    currentTask.push(msg);
    console.log("收到消息:", msg.content.toString());
  },
  { noAck: false }
);

setInterval(() => {
  const curMsg = currentTask.pop();
  channel.ack(curMsg);
}, 1000);

每条消费者收到的消息要确认之后才会在 MQ 里删除。可以收到消息自动确认,也可以手动确认。

这里我把 noAck 设置为 false 了,也就是不自动确认。

把收到的消息放入一个数组中,每 1s 确认一次。

然后我设置了 prefetch 为 3,也就是每次最多取回 3 条消息来处理。

跑一下试试:

消息生产端:

消息消费端:

可以看到生产者是每 0.5s 往队列里放一条消息。

消费者一开始取出 3 条,然后每处理完一条取一条,保证最多并发处理 3 条。

这就是流量削峰的功能。

不同服务之间的速度差异可以通过 MQ 来缓冲。

大概了解了 rabbitmq 之后,我们来看看它的整体架构图:

Producer 和 Consumer 分别是生产者和消费者。

Connection 是连接,但我们不会每用一次 rabbitmq 就创建一个单独的 Connection,而是在一个 Connection 里做一下划分,叫做 Channel,每个 Channel 做自己的事情。

而 Queue 就是两端存取消息的地方了。

整个接收消息和转发消息的服务就叫做 Broker。

至于 Exchange,我们前面的例子没有用到,这个是把消息放到不同的队列里用的,叫做交换机。

我们前面生产者和消费者都是直接指定了从哪个队列存取消息,那如果是一对多的场景呢?

总不能一个个的调用 sendQueue 发消息吧?

这时候就要找一个 Exchange(交换机) 来帮我们完成把消息按照规则放入不同的 Queue 的工作了。

Exchange 主要有 4 种:

  • fanout:把消息放到这个交换机的所有 Queue
  • direct:把消息放到交换机的指定 key 的队列
  • topic:把消息放到交换机的指定 key 的队列,支持模糊匹配
  • headers:把消息放到交换机的满足某些 header 的队列

一个个来试下:

首先是 direct,生产者端:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange", "direct");

channel.publish("direct-test-exchange", "aaa", Buffer.from("hello1"));
channel.publish("direct-test-exchange", "bbb", Buffer.from("hello2"));
channel.publish("direct-test-exchange", "ccc", Buffer.from("hello3"));

不再是直接 sendToQueue 了,而是创建一个 exchange,然后调用 publish 往这个 exchange 发消息。

其中第二个参数是 routing key,也就是消息路由到哪个队列。

然后创建两个消费者:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue("queue1");
await channel.bindQueue(queue, "direct-test-exchange", "aaa");

channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);
import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue("queue2");
await channel.bindQueue(queue, "direct-test-exchange", "bbb");

channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);

分别创建 queue1 和 queue2 两个队列,绑定到前面创建的 direct-test-exchange 这个交换机上,指定了路由 key 分别是 aaa 和 bbb。

然后把生产者和两个消费者跑起来。

就可以看到队列 queue1 和 queue2 分别接收到了对应的消息:

这就是通过 direct 交换机发送消息的过程。

在管理页面上也可以看到这个交换机的信息:

包括 exchange 下的两个 queue 以及各自的 routing key。

再来试下 topic 类型的 Exchange。

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange2", "topic");

channel.publish("direct-test-exchange2", "aaa.1", Buffer.from("hello1"));
channel.publish("direct-test-exchange2", "aaa.2", Buffer.from("hello2"));
channel.publish("direct-test-exchange2", "bbb.1", Buffer.from("hello3"));

生产者端创建叫 direct-test-exchange2 的 topic 类型的 Exchange,然后发三条消息。

创建两个消费端:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange2", "topic");

const { queue } = await channel.assertQueue("queue1");
await channel.bindQueue(queue, "direct-test-exchange2", "aaa.*");

channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);
import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange2", "topic");

const { queue } = await channel.assertQueue("queue2");
await channel.bindQueue(queue, "direct-test-exchange2", "bbb.*");

channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);

两个消费者端分别创建 queue1 和 queue2 两个队列,绑定到 direct-test-exchange2 的交换机下。

指定路由 key 分别为 aaa.* 和 bbb.*,这里的 * 是模糊匹配的意思。

消费者端也 assertExchange 了,如果不存在就创建,保证 exchange 一定存在。

然后跑一下:

可以看到,两个消费者分别收到了不同 routing key 对应的消息。

当然,在管理界面这里也是可以发消息的:

消费者端同样可以收到:

这就是 topic 类型的交换机,可以根据模糊匹配 routing key 来发消息到不同队列。

再来试下 fanout 类型的 exchange:

生产者:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange3", "fanout");

channel.publish("direct-test-exchange3", "", Buffer.from("hello1"));
channel.publish("direct-test-exchange3", "", Buffer.from("hello2"));
channel.publish("direct-test-exchange3", "", Buffer.from("hello3"));

消费者:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange3", "fanout");

const { queue } = await channel.assertQueue("queue1");
await channel.bindQueue(queue, "direct-test-exchange3", "aaa");

channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);
import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange3", "fanout");

const { queue } = await channel.assertQueue("queue2");
await channel.bindQueue(queue, "direct-test-exchange3", "bbb");

channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);

fanout 是广播消息到 Exchange 下的所有队列,不需要指定 routing key,计算指定了也会忽略。

跑起来可以看到,两个消费者都收到了消息:

这就是 fanout 类型交换机的特点,广播消息到所有绑定到它的 queue。

最后再来看下 headers 类型的 Exchange,这个不是根据 routing key 来匹配了,而是根据 headers:

生产者端:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange4", "headers");

channel.publish("direct-test-exchange4", "", Buffer.from("hello1"), {
  headers: {
    name: "guang",
  },
});
channel.publish("direct-test-exchange4", "", Buffer.from("hello2"), {
  headers: {
    name: "guang",
  },
});
channel.publish("direct-test-exchange4", "", Buffer.from("hello3"), {
  headers: {
    name: "dong",
  },
});

消费者端:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange4", "headers");

const { queue } = await channel.assertQueue("queue1");
await channel.bindQueue(queue, "direct-test-exchange4", "", {
  name: "guang",
});

channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);
import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange4", "headers");

const { queue } = await channel.assertQueue("queue2");
await channel.bindQueue(queue, "direct-test-exchange4", "", {
  name: "dong",
});

channel.consume(
  queue,
  (msg) => {
    console.log(msg.content.toString());
  },
  { noAck: true }
);

跑起来是这样的:

很容易理解,只是从匹配 routing key 变成了匹配 header。

这就是 Exchange,当你需要一对多发消息的时候,就可以选择这些类型的交换机。

回过头来,我们来总结下 rabbitmq 解决了什么问题:

  • 流量削峰:可以把很大的流量放到 mq 种按照一定的流量上限来慢慢消费,这样虽然慢一点,但不至于崩溃。
  • 应用解耦:应用之间不再直接依赖,就算某个应用挂掉了,也可以再恢复后继续从 mq 中消费消息。并不会一个应用挂掉了,它关联的应用也挂掉。

比如前端监控系统的后端服务,就很适合使用 mq 来做流量削峰。

总结

前端监控系统会收到很多来自用户端的请求,如果直接存入数据库很容易把数据库服务搞挂掉,所以一般会加一个 RabbitMQ 来缓冲。

它是生产者往 queue 里放入消息,消费者从里面读消息,之后确认消息收到的流程。

当一对多的时候,还要加一个 Exchange 交换机来根据不同的规则转发消息:

  • direct 交换机:根据 routing key 转发消息到队列
  • topic 交换机:根据 routing key 转发消息到队列,支持模糊匹配
  • headers 交换机:根据 headers 转发消息到队列
  • fanout 交换机:广播消息到交换机下的所有队列

而且消费者可以设置一个消费的并发上限,这样就可以保证服务不会因并发过高而崩溃。

这就是流量削峰的功能。

RabbitMQ 在后端系统中经常能见到,是很常用的中间件。

- Book Lists -