灏天阁

避免阻塞事件循环

· Yin灏

image.png

JavaScript 在带有事件循环(event loop)的单线程环境中运⾏,这是⼀种⾮常容易理解的体系结构。这是执⾏传⼊⼯作的连续循环,这些工作可以安排将来执行更多的工作。

while (hasWorkToDo) {
  /* Run timers, I/O callbacks,
       check for incoming connections,
       and so on... */
  doWork();
}

同步工作会立即运行;当没有要执⾏的同步⼯作(或简称为“稍后”)时,异步⼯作就会运⾏,理想情况下,你的应用程序的执行配置应该允许事件循环经常运行,以执行后台工作(例如接受新连接、运行计时器等)。

image.png 这种设计意味着同步工作是一个很大的问题:在它运行的每个连续时刻,事件循环都不能执行任何工作,一个都不能执行!

/* setImmediate registers a callback on the event loop */
setImmediate(() => {
  console.log("This will at some point in the future");
});
/* synchronous work that you will never see the end of */
findNthPrime(9999999);

image.png 在服务器环境中,一个这样的请求可以无限期地阻止所有其他请求。

/* If a request to /computePrimes
   has been sent, this route will
   never respond. It will timeout. */
app.get("/home", () => {
  return response("Welcome Home!");
});

app.get("/computePrimes", () => {
  /* synchronous work that you 
       will never see the end of. */
  return response(findNthPrime(9999999));
});

image.png

这些场景有三种解决方案。

  1. 增加更多的节点
  2. 重构 findNthPrime 以异步方式执行工作
  3. findNthPrime 卸载到另一个线程上执行

投入更多节点

“向其投入更多资源“的行业术语是水平扩展(相对于垂直扩展,后者的游戏名字是“向其投入更好的资源”)。Node.js 的一个特性让人们兴奋不已,它内置了对群集的轻松水平扩展的支持。

总的来说,就是并行运行多个服务器,这样,如果一个服务器很忙,另⼀个可以接受传⼊的请求。这种⽅法的⼀个缺陷是它可以在负载赶上之前隐藏问题。

在我们的服务器实现中,同步操作完成的速度很慢。如果只有一个节点,则需要⼀个请求才能使其停⽌运⾏。增加节点数量将使这些请求的容量增加相同的数量

这种方法很容易实现,但不能避免阻塞事件循环;它只是在组合中添加了更多的事件循环。作为⼀种策略,只要传⼊请求的速率不超过处理它们所需的时间,它就会起作⽤。

重构以异步执行工作

异步⼯作通常不受 CPU 限制。例如,如果读取⼀个⽂件需要 10 ms,那么很可能只有不到 1ms ⽤于等待 CPU,其余时间都在等待磁盘。

另一方面,计算质数完全是 CPU 绑定的:它只是基本的数学运算。

在事件循环架构上,长时间运行的算法可以通过将工作分成块来转换为异步作业。

以下 findNthPrime 实现:

const findNthPrime = (num) => {
  let i,
    primes = [2, 3],
    n = 5;
  const isPrime = (n) => {
    let i = 1,
      p = primes[i],
      limit = Math.ceil(Math.sqrt(n));
    while (p <= limit) {
      if (n % p === 0) {
        return false;
      }
      i += 1;
      p = primes[i];
    }
    return true;
  };
  for (i = 2; i <= num; i += 1) {
    while (!isPrime(n)) {
      n += 2;
    }
    primes.push(n);
    n += 2;
  }
  return primes[num - 1];
};

image.png

这种⽅法的基本⽬标是在同步执⾏块之间添加间隙,允许事件循环在您的算法执⾏时运⾏。您希望这些差距出现在哪⾥取决于您正在寻找的性能概况。如果您的算法阻塞事件循环超过⼀秒钟,那么在任何地⽅添加间隙都是值得的。

在这种情况下, isPrime() ⼤部分⼯作都是通过多次迭代完成的。它已经⽅便地隔离在⼀个函数中,这使得它成为将它推迟到事件循环中的主要候选者。

image.png

Promisify

第一步是将要移动到事件循环中的代码部分隔离到一个 Promise 中:

const isPrime = (n) =>
  new Promise((resolve) => {
    let i = 1,
      p = primes[i],
      limit = Math.ceil(Math.sqrt(n));
    while (p <= limit) {
      if (n % p === 0) {
        return resolve(false);
      }
      i += 1;
      p = primes[i];
    }
    return resolve(true);
  });
// ...
while (!(await isPrime(n))) {
  //...
}

将同步代码变成 Pr omise 不会使代码异步。对于异步代码,必须从事件循环中调⽤它。setImmediate 接受⼀个回调来精确地做到这⼀点:

const isPrime = (n) =>
  new Promise((resolve) =>
    setImmediate(() => {
      let i = 1,
        p = primes[i],
        limit = Math.ceil(Math.sqrt(n));
      while (p <= limit) {
        if (n % p === 0) {
          return resolve(false);
        }
        i += 1;
        p = primes[i];
      }
      return resolve(true);
    })
  );

完整实现

const asyncInterval = setInterval(() => {
  console.log("Event loop executed");
  exCount++;
}, 1);
const findNthPrimeAsync = async (num) => {
  let i,
    primes = [2, 3],
    n = 5;
  const isPrime = (n) =>
    new Promise((resolve) =>
      setImmediate(() => {
        let i = 1,
          p = primes[i],
          limit = Math.ceil(Math.sqrt(n));
        while (p <= limit) {
          if (n % p === 0) {
            return resolve(false);
          }
          i += 1;
          p = primes[i];
        }
        return resolve(true);
      })
    );
  for (i = 2; i <= num; i += 1) {
    while (!(await isPrime(n))) {
      n += 2;
    }
    primes.push(n);
    n += 2;
  }
  return primes[num - 1];
};

为了证明代码现在确实在事件循环中,我们可以尝试在事件循环中安排任务,看看它们是否被执行:

console.log("Calculating Sync Prime...");
let syncCount = 0;
const syncInterval = setInterval(() => {
  console.log("Event loop executed");
  exCount++;
}, 1);

const sync = findNthPrime(nth);
console.log("Sync Prime is", sync);
clearInterval(syncInterval);
console.log("Intervals on event loop:", syncCount);

console.log("Calculating Async Prime...");
let asyncCount = 0;
const asyncInterval = setInterval(() => {
  console.log("Event loop executed");
  asyncCount++;
}, 1);

findNthPrimeAsync(nth)
  .then((n) => console.log("Async Prime is", n))
  .then(() => clearInterval(asyncInterval))
  .then(() => console.log("Intervals on event loop:", asyncCount));

输出如下:

Calculating Sync Prime...
Sync Prime is 541
Intervals on event loop: 0
Calculating Async Prime...
Event loop executed
Event loop executed
Event loop executed
Event loop executed
Event loop executed
Event loop executed
Async Prime is 541
Intervals on event loop: 6

为了更直观,执行情况的图表如下所示:

image.png

卸载到另⼀个线程

在不阻塞主线程的情况下处理同步作业的最后⼀种⽅法是将其完全卸载到另⼀个线程。⼯作池进 ⼀步优化了这个策略。

前提是有⼀个主线程派发⼀个 wor ker :

const nth = 100; // play with this value

const findNthPrimeWorker = (num) =>
  new Promise((resolve) => {
    const worker = new Worker(require.resolve("./worker.js"), {
      workerData: num,
    });

    worker.on("message", (d) => resolve(d));
  });

findNthPrimeWorker(nth);

⼯作⼈员执⾏计算并发送结果:

// worker.js

const findNthPrime = (num) => {
  // ...
};

parentPort.postMessage(findNthPrime(workerData));

image.png

Workers 的限制

Workers 理想的作用是将长时间运行的 CPU 密集型任务移出主线程,但它们并不是万能解决方案。它们的主要限制是可以发送到它们的数据。这些限制在 port.postMessage() 中有记录。

Workers 不是魔法!

还需要注意的是,通过将 Workers 分配给 CPU 密集型任务,它们的运行速度受限于可用线程的数量。如果您的服务器有 8 个线程,运行超过 8 个 Workers 不会使它们运行得更快。

Workers 的好处不在于无限制的并行性,而在于通过将非常快的任务卸载到其他线程中,保证主线程始终空闲,以执行快速的任务。

- Book Lists -