返回

Node 连接 Codis

Codis

const Zookeeper = require('zookeeper-cluster-client')
const Redis = require('ioredis')
const EventEmitter = require('events')

class Codis extends EventEmitter {
  constructor(options) {
    super(options)
    const {serverUrl, path, password ,db =0 } = options
    this._zkAddr = serverUrl
    this._proxyPath = path
    this._password = password
    this._db = db
    this._zk = Zookeeper.createClient(this._zkAddr)
    this._connPoolIndex = -1
    this._connPool = []
    this._initFromZK()
  }

  _getConnectInfo(addr) {
    if (typeof addr !== 'string' || addr.indexOf(':') < 0) return
    const [host, port] = addr.split(':')
    return {host, port}
  }

  _initFromZK() {
    this._connPool = []
    this._zk.connect()

    this._zk.once('connected', async () => {
      console.log('zk connected')
      const proxyNameList = await this._zk.getChildren(this._proxyPath, this._watcher)

      for (let proxy of proxyNameList) {
        const proxyInfo = await this._zk.getData(`${this._proxyPath}/${proxy}`, this._watcher)
        console.log(proxyInfo.toString())
        const data = JSON.parse(proxyInfo.toString())
        if (data['state'] === 'online') {
          const {port, host} = this._getConnectInfo(data['addr'])
          this._connPool.push(new Redis(port, host, {password: this._password, db: this._db}))
        }
      }
      this.emit('ready', this._getProxy())
    })
  }

  _getProxy() {
    this._connPoolIndex += 1
    const len = this._connPool.length
    if (this._connPoolIndex >= len) this._connPoolIndex = 0
    if (len !== 0) return this._connPool[this._connPoolIndex]
  }

  _watcher(event) {
    const {type, state, path} = event
    console.log(`type: ${type} || state: ${state}`)
    if (type === 'SESSION' && state === 'CONNECTING') {

    } else if (type === 'SESSION' && state === 'EXPIRED_SESSION') {
      this._zk.close()
    } else if (['CREATED', 'DELETED', 'CHANGED', 'CHILD'].includes(type) && state === 'CONNECTED') {
      this._initFromZK()
    } else {
      console.error("zookeeper connection state changed but not implemented: event:%s state:%s path:%s" % (type, state, path))
    }
  }

  getResource() {
    return this._getProxy()
  }

  close() {
    try {
      this._zk.close()
    } catch (err) {
      console.error(err)
    }
  }
}

module.exports = Codis
Licensed under CC BY-NC-SA 4.0