が、そんなことはなくamqplibもPromiseベースで利用することができた。
CallbackとPromiseのそれぞれで実装した内容を比較しながら、使い方をまとめる。
コールバックの場合
// callback.js
const amqp = require('amqplib/callback_api');
/**
* エラー処理
* @param err
* @param conn
*/
function bail(err, conn) {
console.error(err);
if (conn) {
conn.close(() => {
process.exit(1);
});
}
}
/**
* メッセージ出力
* @param msg
*/
function logMessage(msg) {
console.log('log message:', msg.content.toString());
}
/**
* RabbitMQサーバに接続
* @param err
* @param conn
*/
function onConnect(err, conn) {
if (err !== null) return bail(err);
// チャンネル作成
conn.createChannel((err, ch) => {
if (err !== null) return bail(err);
// Exchangeの定義
const ex = 'test_exchange';
const exOpts = {
durable: false,
autoDelete: true
};
const mq = ch.assertExchange(ex, 'topic', exOpts);
// Queueの定義
const qOpts = {
durable: false,
autoDelete: true,
exclusive: true
};
ch.assertQueue('', qOpts, (err, ok) => {
if (err !== null) return bail(err);
const queue = ok.queue;
const routingKey = 'test.routing';
// Bindingの定義
ch.bindQueue(queue, ex, routingKey);
// メッセージ送信
ch.consume(queue, logMessage, { noAck: true }, err => {
if (err !== null) return bail(err);
console.info('Waiting');
});
});
});
}
// 接続
amqp.connect('amqp://*****', onConnect);
Promiseの場合
// promise.js
const amqp = require('amqplib');
/**
* メッセージ出力
* @param msg
*/
function logMessage(msg) {
console.log('log message:', msg.content.toString());
}
// 接続
amqp.connect('amqp://*****').then(conn => {
// チャンネル作成
return conn.createChannel().then(ch => {
// Exchangeの定義
const ex = 'test_exchange';
const exOpts = {
durable: false,
autoDelete: true
};
const mq = ch.assertExchange(ex, 'topic', exOpts);
// Queueの定義
mq = mq.then(() => {
const qOpts = {
durable: false,
autoDelete: true,
exclusive: true
};
return ch.assertQueue('', qOpts);
});
// Bindingの定義
mq = mq.then(q => {
const queue = q.queue;
const routingKey = 'test.routing';
ch.bindQueue(queue, ex, routingKey);
});
// メッセージ送信
mq = mq.then(queue => {
return ch.consume(queue, logMessage, { noAck: true });
});
return mq.then(() => {
console.info('Waiting');
});
});
}).catch(console.warn);
callbackの場合は、require('amqplib/callback_api')を読み込んでいたが、Promiseを使う場合は、require('amqplib')だけで良い。
上記のようにすべての処理(connectやassertExchangeなど)でPromiseを使って非同期処理を扱うことができる。
今回は説明しやすくするために、thenの戻り値を変数に入れて使っていたが、もちろんチェーンさせて書くことができる。
また、エラー処理も大外でcatchすれば良いだけなので簡単。
参考サイト
- amqplib | Channel API reference
- GitHub - squaremo/amqp.node: AMQP 0-9-1 library and client for Node.JS
以上
written by @bc_rikko
0 件のコメント :
コメントを投稿