✨ 在线生成 Nginx 配置

配置高性能、安全、稳定的NGINX服务器的最简单方法。
支持一键复制到服务器~

Node 连接 🔗 Codis

Codis

const Zookeeper = require('zookeeper-cluster-client')
const Redis = require('ioredis')
const EventEmitter = require('events')

class Codis extends EventEmitter {
  constructor(options) {
    super(options)
    const {serverUrl, path, password ,db =0 } = options
    this._zkAddr = serverUrl
    this._proxyPath = path
    this._password = password
    this._db = db
    this._zk = Zookeeper.createClient(this._zkAddr)
    this._connPoolIndex = -1
    this._connPool = []
    this._initFromZK()
  }

  _getConnectInfo(addr) {
    if (typeof addr !== 'string' || addr.indexOf(':') < 0) return
    const [host, port] = addr.split(':')
    return {host, port}
  }

  _initFromZK() {
    this._connPool = []
    this._zk.connect()

    this._zk.once('connected', async () => {
      console.log('zk connected')
      const proxyNameList = await this._zk.getChildren(this._proxyPath, this._watcher)

      for (let proxy of proxyNameList) {
        const proxyInfo = await this._zk.getData(`${this._proxyPath}/${proxy}`, this._watcher)
        console.log(proxyInfo.toString())
        const data = JSON.parse(proxyInfo.toString())
        if (data['state'] === 'online') {
          const {port, host} = this._getConnectInfo(data['addr'])
          this._connPool.push(new Redis(port, host, {password: this._password, db: this._db}))
        }
      }
      this.emit('ready', this._getProxy())
    })
  }

  _getProxy() {
    this._connPoolIndex += 1
    const len = this._connPool.length
    if (this._connPoolIndex >= len) this._connPoolIndex = 0
    if (len !== 0) return this._connPool[this._connPoolIndex]
  }

  _watcher(event) {
    const {type, state, path} = event
    console.log(`type: ${type} || state: ${state}`)
    if (type === 'SESSION' && state === 'CONNECTING') {

    } else if (type === 'SESSION' && state === 'EXPIRED_SESSION') {
      this._zk.close()
    } else if (['CREATED', 'DELETED', 'CHANGED', 'CHILD'].includes(type) && state === 'CONNECTED') {
      this._initFromZK()
    } else {
      console.error("zookeeper connection state changed but not implemented: event:%s state:%s path:%s" % (type, state, path))
    }
  }

  getResource() {
    return this._getProxy()
  }

  close() {
    try {
      this._zk.close()
    } catch (err) {
      console.error(err)
    }
  }
}

module.exports = Codis

MongoDB 中的 JOIN 等效操作

703023-20161116113917935-437841031.png
MongoDB 3.2 介绍了一个新的$lookup操作,这个操作提供了一个类似于LEFT JOIN的操作。

$lookup 聚合操作

{
   $lookup:
     {
       from: <collection to join>,
       localField: <field from the input documents>,
       foreignField: <field from the documents of the "from" collection>,
       as: <output array field>
     }
}

语法类似SQL

SELECT *, <output array field>
FROM collection
WHERE <output array field> IN (SELECT *
                               FROM <collection to join>
                               WHERE <foreignField>= <collection.localField>);

$lookup实际上是对引用集合的$in查询,其中$in的值是要查找的管道中localField值的集合。
如果索引了foreignField,则该查询的复杂性为O(log(n))。如果foreignField没有索引,则查询的复杂性为O(n)

优化

$lookup $unwind 合并

$unwind紧跟在另一个$lookup之后,$unwind$lookupas字段上运行时,优化器可以将$unwind合并到$lookup阶段。 这避免了创建大型中间文档。

$project 投影优化

聚合管道可以确定是否仅需要文档中字段的子集即可获得结果。如果是这样,管道将仅使用那些必填字段,从而减少了通过管道的数据量。

用小结果集驱动大结果集

类似SQL,将筛选结果小的表首先连接,再去连接结果集比较大的表

⚛ Atomic 原子操作与多线程

何谓原子操作?

所谓原子操作,就是多线程程序中“最小的且不可并行化的”操作。对于在多个线程间共享的一个资源而言,这意味着同一时刻,多个线程中有且仅有一个线程在对这个资源进行操作,即互斥访问。

JS 中的 Atomic

Chrome 67+实现了SharedArrayBuffer和Atomics。

共享阵列缓冲区的概念是,您将消息发布给工作线程,但不是复制数组的内容,而是发送对它的引用,以便多个工作线程可以共享内存块。

原子的概念是它们提供可以“一次”发生的操作。

这是为什么原子如此重要的一个示例:执行加法时,您将发出多个操作。

a = a + 3;
// ↑ is equivalent to the following:
let reg = a;
reg += 3;
a = reg;

在普通的JS中这不是问题,因为您被授予在每个给定时间仅执行一个函数。

引入Workers和共享内存会打破这种假设,因此a在您递增时reg,有人可能会更改的值,最终您会覆盖其值。要解决此问题,可以使用Atomic.add。

Atomic 的 wait() 和 notify() 方法采用的是 Linux 上的 futexes 模型(“快速用户空间互斥量”),可以让进程一直等待直到某个特定的条件为真,主要用于实现阻塞。

sleep 问题

如果我们想让一段程序休眠一段时间该怎么做?
也许会想到 setTimeout 但是他只会暂停回调函数,不会阻止事件循环的进行,所以其他异步函数还是会执行的。

可以利用类似自旋锁的方式,用一段空转函数占用CPU运行,来实现暂停:

自旋锁不会让出CPU,是一种忙等待状态,自旋锁其实就是:死循环等待锁被释放

function sleep(ms) {
  const target = Date.now() + Number(ms);
  while (target > Date.now()) {}
}

Atomics.wait 方法,该方法会监听一个Int32Array 对象的给定下标下的值,若值未发生改变,则一直等待(阻塞event loop),直到发生超时(由ms参数决定定):

const nil = new Int32Array(new SharedArrayBuffer(4))

function sleep (ms) {

  // 参数校验相关代码略去

  Atomics.wait(nil, 0, 0, Number(ms))
}

另外,模块还对低版本的 js 运行时做了兼容,如果不支持 Atomics,则改用一个占用CPU运行的方式:

function sleep(ms) {

  // 参数校验相关代码略去

  const target = Date.now() + Number(ms);
  while (target > Date.now()) {}
}

尽管Atomic有很强,但它们从来都不容易使用。此外,原子是非常低级的API,并且要表达高级行为,通常需要使多个Atomic操作相互交互。

线程同步

互斥体

互斥体是允许只有一个线程可以同时进入关键部分的原语。关键部分是 lock 和 unlock调用之间的代码。

详细说明:

  • 互斥锁有2种状态:解锁和锁定。
  • 调用lock()互斥锁应将其转换为锁定状态。
  • lock()如果该互斥锁已被其他人锁定,则调用该互斥锁应该会阻塞并等待。
  • 调用unlock()互斥锁应将其转换为解锁状态。
  • 调用unlock()未锁定的互斥锁不是有效操作。

为此,我们需要以下原子方法:

  • Atomics.compareExchange
  • Atomics.wait
  • Atomics.notify
const locked = 1;
const unlocked = 0;

class Mutex {
  /**
   * Instantiate Mutex.
   * If opt_sab is provided, the mutex will use it as a backing array.
   * @param {SharedArrayBuffer} opt_sab Optional SharedArrayBuffer.
   */
  constructor(opt_sab) {
    this._sab = opt_sab || new SharedArrayBuffer(4);
    this._mu = new Int32Array(this._sab);
  }

  /**
   * Instantiate a Mutex connected to the given one.
   * @param {Mutex} mu the other Mutex.
   */
  static connect(mu) {
    return new Mutex(mu._sab);
  }

  lock() {
    for(;;) {
      if (Atomics.compareExchange(this._mu, 0, unlocked, locked) == unlocked) {
        return;
      }
      Atomics.wait(this._mu, 0, locked);
    }
  }

  unlock() {
    if (Atomics.compareExchange(this._mu, 0, locked, unlocked) != locked) {
      throw new Error("Mutex is in inconsistent state: unlock on unlocked Mutex.");
    }
    Atomics.notify(this._mu, 0, 1);
  }
}

当lock被调用时,它将尝试将互斥锁转换为locked状态并检查是否成功。这是通过来完成的compareExchange,这意味着:“加载值并locked在且仅当为时才与之交换unlocked”。compareExchange返回变量的旧值,可用来检查是否成功获取了互斥量。

WaitGroup

WaitGroup通常用于等待所有Worker完成其工作,然后读取累积输出。

  • 调用add(n)WaitGroup应该通过添加给定值来更改计数器
  • 调用doneWaitGroup等效于调用add(-1)
  • 调用waitWaitGroup应该暂停执行,直到计数器等于0
  • 当心在工作开始之前add应该始终调用它,这就是为什么我在构造函数中添加了初始值。召集呼叫add的工作人员done会导致比赛条件,并可能导致意外的早退wait。

为了实现它,我使用了以下原语:

  • Atomics.add
  • Atomics.load
  • Atomics.wait
  • Atomics.notify

这是代码:

class WaitGroup {
  constructor(initial, opt_sab) {
    this._sab = opt_sab || new SharedArrayBuffer(4);
    this._wg = new Int32Array(this._sab);
    this.add(initial);
  }

  static connect(wg) {
    return new WaitGroup(0, wg._sab);
  }

  add(n) {
    let current = n + Atomics.add(this._wg, 0, n);
    if (current < 0) {
      throw new Error('WaitGroup is in inconsistent state: negative count.');
    }
    if (current > 0){
      return;
    }
    Atomics.notify(this._wg, 0);
  }

  done() {
    this.add(-1);
  }

  wait() {
    for (;;) {
      let count = Atomics.load(this._wg, 0);
      if (count == 0){
        return;
      }
      if (Atomics.wait(this._wg, 0, count) == 'ok') {
        return;
      }
    }
  }
}

首先,add实现将计数器增加给定值。由于add返回了旧值,因此我们将其递增n,然后对其进行推理,就好像它被及时冻结了一样。我们只加载一次计数器,这使该调用保持一致性。

在notify被称为与服务员的默认量,这是无穷大。

我们不检查计数器是否0处于等待唤醒状态的原因是,仅在添加期间计数器达到0的情况下才调用通知。

参考文章:

https://zhuanlan.zhihu.com/p/112126898
https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Atomics
https://blogtitle.github.io/using-javascript-sharedarraybuffers-and-atomics/
https://www.cnblogs.com/accordion/p/12533305.html

nest.js 使用 node-http-proxy 导致请求阻塞

问题的原因是代理中间件与body-parser中间件发生了冲突。请求体的解析如果在转发之前,那么后端服务将会收不到请求结束的消息,导致请求一直 pending。

nest默认是启用body-parser的,需要在初始化时将其关闭。
const app = await NestFactory.create(AppModule, { bodyParser: false });

然后需要为不需要代理的请求添加body-parser中间件。body-parser包括了json,urlencoded和text的解析,根据情况按需引用。

const unless = (path, middleware) => (req, res, next) => {
  if (req.path.indexOf(path) !== -1) {
    return next();
  } else {
    return middleware(req, res, next);
  }
};


app.use(unless('/db', json()));
app.use(unless('/db', urlencoded({ extended: false })));