Node 连接 🔗 Codis

Codis

const Zookeeper = require('zookeeper-cluster-client')
const Redis = require('ioredis')
const EventEmitter = require('events')

class Codis extends EventEmitter {
  constructor(options) {
    super(options)
    const {serverUrl, path, password ,db =0 } = options
    this._zkAddr = serverUrl
    this._proxyPath = path
    this._password = password
    this._db = db
    this._zk = Zookeeper.createClient(this._zkAddr)
    this._connPoolIndex = -1
    this._connPool = []
    this._initFromZK()
  }

  _getConnectInfo(addr) {
    if (typeof addr !== 'string' || addr.indexOf(':') < 0) return
    const [host, port] = addr.split(':')
    return {host, port}
  }

  _initFromZK() {
    this._connPool = []
    this._zk.connect()

    this._zk.once('connected', async () => {
      console.log('zk connected')
      const proxyNameList = await this._zk.getChildren(this._proxyPath, this._watcher)

      for (let proxy of proxyNameList) {
        const proxyInfo = await this._zk.getData(`${this._proxyPath}/${proxy}`, this._watcher)
        console.log(proxyInfo.toString())
        const data = JSON.parse(proxyInfo.toString())
        if (data['state'] === 'online') {
          const {port, host} = this._getConnectInfo(data['addr'])
          this._connPool.push(new Redis(port, host, {password: this._password, db: this._db}))
        }
      }
      this.emit('ready', this._getProxy())
    })
  }

  _getProxy() {
    this._connPoolIndex += 1
    const len = this._connPool.length
    if (this._connPoolIndex >= len) this._connPoolIndex = 0
    if (len !== 0) return this._connPool[this._connPoolIndex]
  }

  _watcher(event) {
    const {type, state, path} = event
    console.log(`type: ${type} || state: ${state}`)
    if (type === 'SESSION' && state === 'CONNECTING') {

    } else if (type === 'SESSION' && state === 'EXPIRED_SESSION') {
      this._zk.close()
    } else if (['CREATED', 'DELETED', 'CHANGED', 'CHILD'].includes(type) && state === 'CONNECTED') {
      this._initFromZK()
    } else {
      console.error("zookeeper connection state changed but not implemented: event:%s state:%s path:%s" % (type, state, path))
    }
  }

  getResource() {
    return this._getProxy()
  }

  close() {
    try {
      this._zk.close()
    } catch (err) {
      console.error(err)
    }
  }
}

module.exports = Codis

⚛ Atomic 原子操作与多线程

何谓原子操作?

所谓原子操作,就是多线程程序中“最小的且不可并行化的”操作。对于在多个线程间共享的一个资源而言,这意味着同一时刻,多个线程中有且仅有一个线程在对这个资源进行操作,即互斥访问。

JS 中的 Atomic

Chrome 67+实现了SharedArrayBuffer和Atomics。

共享阵列缓冲区的概念是,您将消息发布给工作线程,但不是复制数组的内容,而是发送对它的引用,以便多个工作线程可以共享内存块。

原子的概念是它们提供可以“一次”发生的操作。

这是为什么原子如此重要的一个示例:执行加法时,您将发出多个操作。

a = a + 3;
// ↑ is equivalent to the following:
let reg = a;
reg += 3;
a = reg;

在普通的JS中这不是问题,因为您被授予在每个给定时间仅执行一个函数。

引入Workers和共享内存会打破这种假设,因此a在您递增时reg,有人可能会更改的值,最终您会覆盖其值。要解决此问题,可以使用Atomic.add。

Atomic 的 wait() 和 notify() 方法采用的是 Linux 上的 futexes 模型(“快速用户空间互斥量”),可以让进程一直等待直到某个特定的条件为真,主要用于实现阻塞。

sleep 问题

如果我们想让一段程序休眠一段时间该怎么做?
也许会想到 setTimeout 但是他只会暂停回调函数,不会阻止事件循环的进行,所以其他异步函数还是会执行的。

可以利用类似自旋锁的方式,用一段空转函数占用CPU运行,来实现暂停:

自旋锁不会让出CPU,是一种忙等待状态,自旋锁其实就是:死循环等待锁被释放

function sleep(ms) {
  const target = Date.now() + Number(ms);
  while (target > Date.now()) {}
}

Atomics.wait 方法,该方法会监听一个Int32Array 对象的给定下标下的值,若值未发生改变,则一直等待(阻塞event loop),直到发生超时(由ms参数决定定):

const nil = new Int32Array(new SharedArrayBuffer(4))

function sleep (ms) {

  // 参数校验相关代码略去

  Atomics.wait(nil, 0, 0, Number(ms))
}

另外,模块还对低版本的 js 运行时做了兼容,如果不支持 Atomics,则改用一个占用CPU运行的方式:

function sleep(ms) {

  // 参数校验相关代码略去

  const target = Date.now() + Number(ms);
  while (target > Date.now()) {}
}

尽管Atomic有很强,但它们从来都不容易使用。此外,原子是非常低级的API,并且要表达高级行为,通常需要使多个Atomic操作相互交互。

线程同步

互斥体

互斥体是允许只有一个线程可以同时进入关键部分的原语。关键部分是 lock 和 unlock调用之间的代码。

详细说明:

  • 互斥锁有2种状态:解锁和锁定。
  • 调用lock()互斥锁应将其转换为锁定状态。
  • lock()如果该互斥锁已被其他人锁定,则调用该互斥锁应该会阻塞并等待。
  • 调用unlock()互斥锁应将其转换为解锁状态。
  • 调用unlock()未锁定的互斥锁不是有效操作。

为此,我们需要以下原子方法:

  • Atomics.compareExchange
  • Atomics.wait
  • Atomics.notify
const locked = 1;
const unlocked = 0;

class Mutex {
  /**
   * Instantiate Mutex.
   * If opt_sab is provided, the mutex will use it as a backing array.
   * @param {SharedArrayBuffer} opt_sab Optional SharedArrayBuffer.
   */
  constructor(opt_sab) {
    this._sab = opt_sab || new SharedArrayBuffer(4);
    this._mu = new Int32Array(this._sab);
  }

  /**
   * Instantiate a Mutex connected to the given one.
   * @param {Mutex} mu the other Mutex.
   */
  static connect(mu) {
    return new Mutex(mu._sab);
  }

  lock() {
    for(;;) {
      if (Atomics.compareExchange(this._mu, 0, unlocked, locked) == unlocked) {
        return;
      }
      Atomics.wait(this._mu, 0, locked);
    }
  }

  unlock() {
    if (Atomics.compareExchange(this._mu, 0, locked, unlocked) != locked) {
      throw new Error("Mutex is in inconsistent state: unlock on unlocked Mutex.");
    }
    Atomics.notify(this._mu, 0, 1);
  }
}

当lock被调用时,它将尝试将互斥锁转换为locked状态并检查是否成功。这是通过来完成的compareExchange,这意味着:“加载值并locked在且仅当为时才与之交换unlocked”。compareExchange返回变量的旧值,可用来检查是否成功获取了互斥量。

WaitGroup

WaitGroup通常用于等待所有Worker完成其工作,然后读取累积输出。

  • 调用add(n)WaitGroup应该通过添加给定值来更改计数器
  • 调用doneWaitGroup等效于调用add(-1)
  • 调用waitWaitGroup应该暂停执行,直到计数器等于0
  • 当心在工作开始之前add应该始终调用它,这就是为什么我在构造函数中添加了初始值。召集呼叫add的工作人员done会导致比赛条件,并可能导致意外的早退wait。

为了实现它,我使用了以下原语:

  • Atomics.add
  • Atomics.load
  • Atomics.wait
  • Atomics.notify

这是代码:

class WaitGroup {
  constructor(initial, opt_sab) {
    this._sab = opt_sab || new SharedArrayBuffer(4);
    this._wg = new Int32Array(this._sab);
    this.add(initial);
  }

  static connect(wg) {
    return new WaitGroup(0, wg._sab);
  }

  add(n) {
    let current = n + Atomics.add(this._wg, 0, n);
    if (current < 0) {
      throw new Error('WaitGroup is in inconsistent state: negative count.');
    }
    if (current > 0){
      return;
    }
    Atomics.notify(this._wg, 0);
  }

  done() {
    this.add(-1);
  }

  wait() {
    for (;;) {
      let count = Atomics.load(this._wg, 0);
      if (count == 0){
        return;
      }
      if (Atomics.wait(this._wg, 0, count) == 'ok') {
        return;
      }
    }
  }
}

首先,add实现将计数器增加给定值。由于add返回了旧值,因此我们将其递增n,然后对其进行推理,就好像它被及时冻结了一样。我们只加载一次计数器,这使该调用保持一致性。

在notify被称为与服务员的默认量,这是无穷大。

我们不检查计数器是否0处于等待唤醒状态的原因是,仅在添加期间计数器达到0的情况下才调用通知。

参考文章:

https://zhuanlan.zhihu.com/p/112126898
https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Atomics
https://blogtitle.github.io/using-javascript-sharedarraybuffers-and-atomics/
https://www.cnblogs.com/accordion/p/12533305.html

nest.js 使用 node-http-proxy 导致请求阻塞

问题的原因是代理中间件与body-parser中间件发生了冲突。请求体的解析如果在转发之前,那么后端服务将会收不到请求结束的消息,导致请求一直 pending。

nest默认是启用body-parser的,需要在初始化时将其关闭。
const app = await NestFactory.create(AppModule, { bodyParser: false });

然后需要为不需要代理的请求添加body-parser中间件。body-parser包括了json,urlencoded和text的解析,根据情况按需引用。

const unless = (path, middleware) => (req, res, next) => {
  if (req.path.indexOf(path) !== -1) {
    return next();
  } else {
    return middleware(req, res, next);
  }
};


app.use(unless('/db', json()));
app.use(unless('/db', urlencoded({ extended: false })));

Node.js 中的安全随机值

并不是所有的随机值都是相等的 - 对于与安全相关的代码,您需要一种特定的随机值。

这篇文章的摘要,如果你不想阅读整个事情:

  • 不要使用Math.random()Math.random()是正确答案的情况极少。不要使用它,除非您已阅读整篇文章,并确定您的案件是必要的。
  • 不要直接使用crypto.getRandomBytes。虽然这是一个CSPRNG,但是在“转换”它时会很容易偏离结果,从而使输出变得更加可预测。
  • 如果要生成随机令牌或API密钥:请使用uuid,特别是uuid.v4()方法。 注意node-uuid它不是一样的包,并且不会产生可靠的安全随机值。
  • 如果要在一个范围内生成随机数:使用random-number-csprng

你应该认真考虑阅读整篇文章,虽然不是那么久:)

“随机”的种类

大概有三种类型的“随机”:

  • 真随机:顾名思义。无随机性,无图案或算法适用。这是否真的存在是有争议的。
  • 不可预测的:不是真正随机的,但是攻击者不可能预测。这就是您所需的安全相关代码 - 只要数据不会被猜到,它怎么生成并不重要。
  • 不规则:这是大多数人认为“随机”的想法。一个例子是一个具有星型场景背景的游戏,其中每个星星被画在屏幕上的“随机”位置。这不是真正随机的,它甚至不是不可预知的 - 它看起来并不像是有视觉上的模式。

不规则的数据是快速生成的,但在安全上而言毫无价值 - 即使看起来似乎没有一个模式,攻击者几乎总是可以预测这些值将会是什么。不规则数据的唯一用处是在视觉上表示的东西,如游戏元素或者在笑话网站上随机产生的短语。

不可预测的数据生成速度有点慢,但对于大多数情况来说仍然足够快,并且很难猜到它将具有抗攻击性。不可预测的数据由所谓的CSPRNG提供。

RNG类型(随机数生成器)

  • CSPRNG: 密码安全伪随机数发生器。这是为了安全目的而产生不可预测的数据。
    PRNG:伪随机数发生器。这是一个更广泛的类别,包括刚刚返回不规则值的CSPRNG和生成器 - 换句话说,您不能依靠PRNG为您提供不可预测的值。

RNG: 随机数生成器。这个术语的含义取决于上下文。大多数人将其用作更广泛的类别,包括PRNG和真正的随机数字生成器。

应使用CSPRNG生成与安全相关的目的(即,存在“攻击者”可能性的任何事物)所需的每个随机值。这包括验证令牌,重置令牌,彩票号码,API密钥,生成的密码,加密密钥等等。

Bias

在Node.js中,最广泛可用的CSPRNG是crypto.randomBytes函数,但是您不应该直接使用它,因为很容易弄乱和“偏离”随机值 - 也就是说,使特定的选择值或值集合。

这个错误的一个常见的例子是当你有少于256种可能性(因为单个字节有256个可能的值)时使用%模运算符。这样做实际上使较低的值比较高的值更可能被挑选。

例如,假设您有36个可能的随机值 - 0-9加上a-z中的每个小写字母。一个天真的实现可能看起来像这样:

let randomCharacter = randomByte % 36;

该代码是破碎和不安全的。通过上面的代码,您基本上创建了以下范围(包括):

0-35保持0-35。
36-71变为0-35。
72-107变成0-35。
108-143成为0-35。
144-179成为0-35。
180-215变成0-35。
216-251变为0-35。
252-255变成0-3。

如果您查看上述范围列表,您会注意到,对于4到35(含)之间的每个randomCharacter有7个可能的值,每个随机字符在0和3(含)之间有8个可能的值。这意味着虽然有一个2.64%的机会获得4到35(含)之间的值,但有一个3.02%的机会获得0到3(含)之间的值。

这种差异可能看起来很小,但是攻击者可以轻松有效地减少暴力事件时所需要的猜测量。而这只是一种方法,您可以使您的随机值不安全,尽管它们最初来自安全的随机源。

那么,如何安全地获取随机值?

在Node.js中:

  • 如果您需要在一定范围内的个别随机数:使用random-number-csprng。
  • 如果您需要API键或令牌:使用uuid(而不是node-uuid!),尤其是uuid.v4()方法。

这两者都使用CSPRNG,并以无偏差(即安全)方式“转换”字节。

如何将 JSON 文件导入到 Node.js 中?

你需要使用 fs 模块进行一些操作。

异步版本

var fs = require('fs');

fs.readFile('/path/to/file.json', 'utf8', function (err, data) {
    if (err) throw err; // we'll not consider error handling for now
    var obj = JSON.parse(data);
});

同步版本

var fs = require('fs');
var json = JSON.parse(fs.readFileSync('/path/to/file.json', 'utf8'));

你想用require导入?再考虑一下!

var obj = require('path/to/file.json');

但是,因为以下几点我并不推荐这种方式:

  1. require 只会读取一次文件,后续调用需要同一个文件将返回缓存副本。如果你想读取一个不断更新的.json文件,这不是一个好主意。你可以使用 hack 技巧,但是在这一点上,使用fs更简单。
  2. 如果你的文件没有.json扩展名,require不会将该文件的内容视为JSON。
  3. require 是同步的。如果你有一个非常大的JSON文件,它会阻塞事件循环。你真的需要使用JSON.parsefs.readFile

真的!请用 JSON.parse

错误处理/安全

如果您不确定传递给JSON.parse()的JSON是否是有效的JSON,请确保在try/catch块中调用JSON.parse()。用户提供的JSON字符串可能会使应用程序崩溃,甚至可能导致安全漏洞。如果解析外部提供的JSON,请做好错误处理。

参考文章:
How to parse JSON using Node.js?