Published on

Socket 서버 구현 5 (Zookeeper Client, Kafka Producer)

1. Zookeeper Client 사용

  • Zookeeper 서버로부터 연동서버들의 IP주소, Port, 상태정보를 읽어오는 목적으로 사용하였다.

  • Zookeeper 서버에 현재서버의 IP주소, Port, 상태정보를 업데이트 하는 목적으로 사용하였다.

2. Zookeeper Client 구현

  • node-zookeeper-client 모듈을 사용하였다.

  • Zookeeper Client를 통해 Zookeeper에서 트리형태로 관리하고 있는 노드 데이터를 읽어오고, 업데이트한다. (zclient.exists, zclient.setData, zclient.getChildren)

var zooKeeper = require('node-zookeeper-client');

this.zookeeperClientInit = function () {
  var that = this;
  zclient = zooKeeper.createClient('' + constants.ZOOKEEPER_URL, {
    sessionTimeout: 10000,
    spinDelay: 1000,
    retries: 10000,
  });

  zclient.once('expired', function () {
    that.zookeeperClientInit();
  });

  zclient.once('connected', function () {
    isInit = true;
    var sInfo = '' + constants.MY_SERVER_IP + ':' + constants.MY_SERVER_PORT;
    var sData = '{cpu:20}';
    var sPath = '/status/MyServer/' + sInfo;
    zclient.exists(sPath, function (error, stat) {
      if (error) {
        logger.errorLog(' ZooKeeper Init - ' + error.stack);
        return;
      }
      if (stat) {
        zclient.setData(sPath, new Buffer(sData), function (error, stat) {
          if (error) {
            console.log(error.stack);
            logger.errorLog(' ZooKeeper Init - ' + error.stack);
            return;
          }
        });
        getServerInfo('/status/Was');
      } else {
        zclient.create(
          sPath,
          new Buffer(sData),
          zooKeeper.CreateMode.EPHEMERAL,
          function (error, path) {
            if (error) {
              logger.errorLog(
                ' ZooKeeper Init - Failed to create node (' + path + '), error (' + error + ')'
              );
              return;
            } else {
              logger.debugLog(
                ' ZooKeeper Init - node (' +
                  path +
                  ') is successfully created, set data (' +
                  sData +
                  ')'
              );
              getServerInfo('/status/Was');
            }
          }
        );
      }
    });
  });
  zclient.connect();
};

function getServerInfo(path) {
  zclient.getChildren(path, null, function (error, children, stat) {
    if (error) {
      return;
    }
    var serverList = children;
  });
}

3. Kafka Producer

  • Kafka Producer API를 사용해 Kafka Broker에 메시지를 보낸다.

  • node-rdkafka 모듈을 사용하였다. 메시지를 모아서 배치로 전송가능하다.

  • producer.produce(topic, partition, msg, key, timestamp, opaque) 로 메시지를 전송한다.

var kafka = require('node-rdkafka');

producer = new kafka.Producer(
  {
    'client.id': 'my_server',
    'metadata.broker.list': constants.KAFKA_BROKER_LIST,
    'broker.version.fallback': '0.8.2',
    //'compression.codec': 'gzip',
    'retry.backoff.ms': 200,
    'message.send.max.retries': 10,
    'socket.keepalive.enable': true,
    'queue.buffering.max.messages': 100000,
    'queue.buffering.max.ms': 1000,
    'batch.num.messages': 1000000,
    'statistics.interval.ms': 1000,
    dr_msg_cb: true,
    event_cb: true,
  },
  {
    'request.required.acks': 1,
    'produce.offset.report': true,
  }
);

producer.setPollInterval(100);

producer.on('event.stats', function (stats) {
  var message = JSON.parse(stats.message),
    currentTime;
  if (message) {
    currentTime = new Date().getTime();
    if (currentTime > lastLogTime + 20 * 1000) {
      lastLogTime = currentTime;
      logger.debugLog(' Kafka(qsize) : ' + message.msg_cnt);
    }
  } else {
    logger.debugLog(' Kafka(qsize) : message parse error');
  }
});

producer.on('event.log', function (log) {
  logger.debugLog(' Kafka(I) : ' + JSON.stringify(log));
});

producer.on('event.error', function (err) {
  logger.debugLog(' Kafka(ERR1) : ' + err);
});

producer.on('delivery-report', function (err, report) {
  if (err) logger.debugLog(' Kafka(ERR2) : ' + err);
});

producer.connect();

producer.on('ready', function () {
  connected = true;
  logger.debugLog(' complete to initiate Kafka ' + producer.isConnected());
});

this.sendData = function (topic, partition, data, callback) {
  if (connected == true) {
    var msg = JSON.stringify(data);
    try {
      var res = producer.produce(topic, partition, new Buffer(msg), null, Date.now());
      if (res == true) {
        logger.debugLog(' Kafka(S) : topic(' + topic + ') msg(' + msg + ')');
        callback(0);
      } else {
        logger.debugLog(' Kafka(S) Fail: ' + msg);
        callback(1);
      }
    } catch (err) {
      logger.debugLog(' Kafka(ERR3) : ' + err);
      callback(1);
    }
  } else {
    logger.debugLog(' Kafka not connected!');
    callback(1);
  }
};