が、そんなことはなくamqplibもPromiseベースで利用することができた。
CallbackとPromiseのそれぞれで実装した内容を比較しながら、使い方をまとめる。
コールバックの場合
// callback.js
const amqp = require('amqplib/callback_api');
/**
 * エラー処理
 * @param err 
 * @param conn 
 */
function bail(err, conn) {
  console.error(err);
  if (conn) {
    conn.close(() => {
      process.exit(1);
    });
  }
}
/**
 * メッセージ出力
 * @param msg
 */
function logMessage(msg) {
  console.log('log message:', msg.content.toString());
}
/**
 * RabbitMQサーバに接続
 * @param err 
 * @param conn 
 */
function onConnect(err, conn) {
  if (err !== null) return bail(err);
  // チャンネル作成
  conn.createChannel((err, ch) => {
    if (err !== null) return bail(err);
    // Exchangeの定義
    const ex = 'test_exchange';
    const exOpts = {
      durable: false,
      autoDelete: true
    };
    const mq = ch.assertExchange(ex, 'topic', exOpts);
    // Queueの定義
    const qOpts = {
      durable: false,
      autoDelete: true,
      exclusive: true
    };
    ch.assertQueue('', qOpts, (err, ok) => {
      if (err !== null) return bail(err);
      const queue = ok.queue;
      const routingKey = 'test.routing';
      // Bindingの定義
      ch.bindQueue(queue, ex, routingKey);
      // メッセージ送信
      ch.consume(queue, logMessage, { noAck: true }, err => {
        if (err !== null) return bail(err);
        console.info('Waiting');
      });
    });
  });
}
// 接続
amqp.connect('amqp://*****', onConnect);Promiseの場合
// promise.js
const amqp = require('amqplib');
/**
 * メッセージ出力
 * @param msg
 */
function logMessage(msg) {
  console.log('log message:', msg.content.toString());
}
// 接続
amqp.connect('amqp://*****').then(conn => {
  // チャンネル作成
  return conn.createChannel().then(ch => {
    // Exchangeの定義
    const ex = 'test_exchange';
    const exOpts = {
      durable: false,
      autoDelete: true
    };
    const mq = ch.assertExchange(ex, 'topic', exOpts);
    // Queueの定義
    mq = mq.then(() => {
      const qOpts = {
        durable: false,
        autoDelete: true,
        exclusive: true
      };
      return ch.assertQueue('', qOpts);
    });
    // Bindingの定義
    mq = mq.then(q => {
      const queue = q.queue;
      const routingKey = 'test.routing';
      ch.bindQueue(queue, ex, routingKey);
    });
    // メッセージ送信
    mq = mq.then(queue => {
      return ch.consume(queue, logMessage, { noAck: true });
    });
    return mq.then(() => {
      console.info('Waiting');
    });
  });
}).catch(console.warn);callbackの場合は、require('amqplib/callback_api')を読み込んでいたが、Promiseを使う場合は、require('amqplib')だけで良い。
上記のようにすべての処理(connectやassertExchangeなど)でPromiseを使って非同期処理を扱うことができる。
今回は説明しやすくするために、thenの戻り値を変数に入れて使っていたが、もちろんチェーンさせて書くことができる。
また、エラー処理も大外でcatchすれば良いだけなので簡単。
参考サイト
- amqplib | Channel API reference
- GitHub - squaremo/amqp.node: AMQP 0-9-1 library and client for Node.JS
以上
written by @bc_rikko
 

 
0 件のコメント :
コメントを投稿