2017/03/08

【Node.js】amqplibをPromiseベースで利用する方法

RabbitMQのチュートリアルをやっていたとき、非同期処理はすべてcallbackで書かれていてコードがゴチャゴチャしてるな、Promiseとか使えないのかな、node-amqpのほうが良いかな、と感じていた。
が、そんなことはなくamqplibもPromiseベースで利用することができた。

CallbackとPromiseのそれぞれで実装した内容を比較しながら、使い方をまとめる。


コールバックの場合


// callback.js
const amqp = require('amqplib/callback_api');

/**
 * エラー処理
 * @param err 
 * @param conn 
 */
function bail(err, conn) {
  console.error(err);
  if (conn) {
    conn.close(() => {
      process.exit(1);
    });
  }
}

/**
 * メッセージ出力
 * @param msg
 */
function logMessage(msg) {
  console.log('log message:', msg.content.toString());
}

/**
 * RabbitMQサーバに接続
 * @param err 
 * @param conn 
 */
function onConnect(err, conn) {
  if (err !== null) return bail(err);

  // チャンネル作成
  conn.createChannel((err, ch) => {
    if (err !== null) return bail(err);

    // Exchangeの定義
    const ex = 'test_exchange';
    const exOpts = {
      durable: false,
      autoDelete: true
    };
    const mq = ch.assertExchange(ex, 'topic', exOpts);

    // Queueの定義
    const qOpts = {
      durable: false,
      autoDelete: true,
      exclusive: true
    };
    ch.assertQueue('', qOpts, (err, ok) => {
      if (err !== null) return bail(err);

      const queue = ok.queue;
      const routingKey = 'test.routing';

      // Bindingの定義
      ch.bindQueue(queue, ex, routingKey);

      // メッセージ送信
      ch.consume(queue, logMessage, { noAck: true }, err => {
        if (err !== null) return bail(err);

        console.info('Waiting');
      });
    });
  });
}

// 接続
amqp.connect('amqp://*****', onConnect);



Promiseの場合


// promise.js
const amqp = require('amqplib');

/**
 * メッセージ出力
 * @param msg
 */
function logMessage(msg) {
  console.log('log message:', msg.content.toString());
}

// 接続
amqp.connect('amqp://*****').then(conn => {
  // チャンネル作成
  return conn.createChannel().then(ch => {
    // Exchangeの定義
    const ex = 'test_exchange';
    const exOpts = {
      durable: false,
      autoDelete: true
    };
    const mq = ch.assertExchange(ex, 'topic', exOpts);

    // Queueの定義
    mq = mq.then(() => {
      const qOpts = {
        durable: false,
        autoDelete: true,
        exclusive: true
      };
      return ch.assertQueue('', qOpts);
    });

    // Bindingの定義
    mq = mq.then(q => {
      const queue = q.queue;
      const routingKey = 'test.routing';

      ch.bindQueue(queue, ex, routingKey);
    });

    // メッセージ送信
    mq = mq.then(queue => {
      return ch.consume(queue, logMessage, { noAck: true });
    });

    return mq.then(() => {
      console.info('Waiting');
    });
  });
}).catch(console.warn);

callbackの場合は、require('amqplib/callback_api')を読み込んでいたが、Promiseを使う場合は、require('amqplib')だけで良い。

上記のようにすべての処理(connectやassertExchangeなど)でPromiseを使って非同期処理を扱うことができる。
今回は説明しやすくするために、thenの戻り値を変数に入れて使っていたが、もちろんチェーンさせて書くことができる。

また、エラー処理も大外でcatchすれば良いだけなので簡単。



参考サイト




以上

written by @bc_rikko

0 件のコメント :

コメントを投稿