返回

⚛ Atomic 原子操作与多线程

何谓原子操作?

所谓原子操作,就是多线程程序中“最小的且不可并行化的”操作。对于在多个线程间共享的一个资源而言,这意味着同一时刻,多个线程中有且仅有一个线程在对这个资源进行操作,即互斥访问。

JS 中的 Atomic

Chrome 67+实现了SharedArrayBuffer和Atomics。

共享阵列缓冲区的概念是,您将消息发布给工作线程,但不是复制数组的内容,而是发送对它的引用,以便多个工作线程可以共享内存块。

原子的概念是它们提供可以“一次”发生的操作。

这是为什么原子如此重要的一个示例:执行加法时,您将发出多个操作。

a = a + 3;
// ↑ is equivalent to the following:
let reg = a;
reg += 3;
a = reg;

在普通的JS中这不是问题,因为您被授予在每个给定时间仅执行一个函数。

引入Workers和共享内存会打破这种假设,因此a在您递增时reg,有人可能会更改的值,最终您会覆盖其值。要解决此问题,可以使用Atomic.add。

Atomic 的 wait() 和 notify() 方法采用的是 Linux 上的 futexes 模型(“快速用户空间互斥量”),可以让进程一直等待直到某个特定的条件为真,主要用于实现阻塞。

sleep 问题

如果我们想让一段程序休眠一段时间该怎么做? 也许会想到 setTimeout 但是他只会暂停回调函数,不会阻止事件循环的进行,所以其他异步函数还是会执行的。

可以利用类似自旋锁的方式,用一段空转函数占用CPU运行,来实现暂停:

自旋锁不会让出CPU,是一种忙等待状态,自旋锁其实就是:死循环等待锁被释放

function sleep(ms) {
  const target = Date.now() + Number(ms);
  while (target > Date.now()) {}
}

Atomics.wait 方法,该方法会监听一个Int32Array 对象的给定下标下的值,若值未发生改变,则一直等待(阻塞event loop),直到发生超时(由ms参数决定定):

const nil = new Int32Array(new SharedArrayBuffer(4))

function sleep (ms) {

  // 参数校验相关代码略去

  Atomics.wait(nil, 0, 0, Number(ms))
}

另外,模块还对低版本的 js 运行时做了兼容,如果不支持 Atomics,则改用一个占用CPU运行的方式:

function sleep(ms) {

  // 参数校验相关代码略去

  const target = Date.now() + Number(ms);
  while (target > Date.now()) {}
}

尽管Atomic有很强,但它们从来都不容易使用。此外,原子是非常低级的API,并且要表达高级行为,通常需要使多个Atomic操作相互交互。

线程同步

互斥体

互斥体是允许只有一个线程可以同时进入关键部分的原语。关键部分是 lock 和 unlock调用之间的代码。

详细说明:

  • 互斥锁有2种状态:解锁和锁定。
  • 调用lock()互斥锁应将其转换为锁定状态。
  • lock()如果该互斥锁已被其他人锁定,则调用该互斥锁应该会阻塞并等待。
  • 调用unlock()互斥锁应将其转换为解锁状态。
  • 调用unlock()未锁定的互斥锁不是有效操作。

为此,我们需要以下原子方法:

  • Atomics.compareExchange
  • Atomics.wait
  • Atomics.notify
const locked = 1;
const unlocked = 0;

class Mutex {
  /**
   * Instantiate Mutex.
   * If opt_sab is provided, the mutex will use it as a backing array.
   * @param {SharedArrayBuffer} opt_sab Optional SharedArrayBuffer.
   */
  constructor(opt_sab) {
    this._sab = opt_sab || new SharedArrayBuffer(4);
    this._mu = new Int32Array(this._sab);
  }

  /**
   * Instantiate a Mutex connected to the given one.
   * @param {Mutex} mu the other Mutex.
   */
  static connect(mu) {
    return new Mutex(mu._sab);
  }

  lock() {
    for(;;) {
      if (Atomics.compareExchange(this._mu, 0, unlocked, locked) == unlocked) {
        return;
      }
      Atomics.wait(this._mu, 0, locked);
    }
  }

  unlock() {
    if (Atomics.compareExchange(this._mu, 0, locked, unlocked) != locked) {
      throw new Error("Mutex is in inconsistent state: unlock on unlocked Mutex.");
    }
    Atomics.notify(this._mu, 0, 1);
  }
}

当lock被调用时,它将尝试将互斥锁转换为locked状态并检查是否成功。这是通过来完成的compareExchange,这意味着:“加载值并locked在且仅当为时才与之交换unlocked”。compareExchange返回变量的旧值,可用来检查是否成功获取了互斥量。

WaitGroup

WaitGroup通常用于等待所有Worker完成其工作,然后读取累积输出。

  • 调用add(n)WaitGroup应该通过添加给定值来更改计数器
  • 调用doneWaitGroup等效于调用add(-1)
  • 调用waitWaitGroup应该暂停执行,直到计数器等于0
  • 当心在工作开始之前add应该始终调用它,这就是为什么我在构造函数中添加了初始值。召集呼叫add的工作人员done会导致比赛条件,并可能导致意外的早退wait。

为了实现它,我使用了以下原语:

  • Atomics.add
  • Atomics.load
  • Atomics.wait
  • Atomics.notify

这是代码:

class WaitGroup {
  constructor(initial, opt_sab) {
    this._sab = opt_sab || new SharedArrayBuffer(4);
    this._wg = new Int32Array(this._sab);
    this.add(initial);
  }

  static connect(wg) {
    return new WaitGroup(0, wg._sab);
  }

  add(n) {
    let current = n + Atomics.add(this._wg, 0, n);
    if (current < 0) {
      throw new Error('WaitGroup is in inconsistent state: negative count.');
    }
    if (current > 0){
      return;
    }
    Atomics.notify(this._wg, 0);
  }

  done() {
    this.add(-1);
  }

  wait() {
    for (;;) {
      let count = Atomics.load(this._wg, 0);
      if (count == 0){
        return;
      }
      if (Atomics.wait(this._wg, 0, count) == 'ok') {
        return;
      }
    }
  }
}

首先,add实现将计数器增加给定值。由于add返回了旧值,因此我们将其递增n,然后对其进行推理,就好像它被及时冻结了一样。我们只加载一次计数器,这使该调用保持一致性。

在notify被称为与服务员的默认量,这是无穷大。

我们不检查计数器是否0处于等待唤醒状态的原因是,仅在添加期间计数器达到0的情况下才调用通知。

参考文章:

https://zhuanlan.zhihu.com/p/112126898 https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Atomics https://blogtitle.github.io/using-javascript-sharedarraybuffers-and-atomics/ https://www.cnblogs.com/accordion/p/12533305.html