今回はチュートリアル4「Routing」を学ぶ。
関連記事
- CentOS7.3にRabbitMQをインストールしてGUIで管理できるようにする
- チュートリアル1「Hello World」
- チュートリアル2「Work queues」
- チュートリアル3「Publish/Subscribe」
- チュートリアル4「Routing」 ←いまここ
- チュートリアル5「Topics」
- チュートリアル6「RPC」
今回学ぶこと
- RoutingKeyによるメッセージの振り分け
- Direct Exchange
Routing
前回のチュートリアルでは、すべてのConsumerにログメッセージを送信するシンプルなロギングシステムをつくった。
今回は、そのロギングシステムにログレベル(重要度)によって受信するかどうかフィルタリングする機能を追加する。具体的には、ProducerはすべてのログをExchangeにPublishするけど、Consumerは自分が欲しいログ(重要度)のものしか受信しないようにする感じ。
Bindings
前回のチュートリアルでは、以下のようなBindingを実装し、ExchangeとQueueの関連付けを行った。
// ch.bindQueue(queue_name, exchange_name, '');
ch.bindQueue(q.queue, ex, '');
今回は、このBindingにルーティングキーを指定する。(Publishするときと、Subscribeするときの双方にルーティングキーを指定するので、混乱を避けるために、以降PublishするときにbindQueueするパラメータを「バインディングキー」と呼ぶ)
バインディングキーによって、受信するメッセージをフィルタリングできる。
// ch.bindQueue(queue_name, exchange_name, 'routingKey aka.bindingKey');
ch.bindQueue(q.queue, ex, 'black');
このBindingで、メッセージに「black」というバインディングキーを指定した。詳細は後述するが、Consumerはこのバインディングキーとルーティングキーが一致したときだけ受信できるようになる。
ちなみに、バインディングキーの意味付けはExchangerタイプに依存するので、前回のようなすべてのConsumerに向けてメッセージを発信するようなfanout Exchangeはバインディングキーが指定できず、無視される。
Direct exchange
前回のロギングシステムでは、fanoutタイプのExchangeを使うことですべてのConsumerにすべてのメッセージを配信していた。今回は重要度(severity)によってメッセージをフィルタリングできるように機能拡張する。
しかし、fanout Exchangeにはそのような柔軟性がないので、代わりにdirect Exchangeを使用する。direct Exchangeのルーティングアルゴリズムは、メッセージのバインディングキーとルーティングキーが一致したキューに送信される。
direct Exchangeについて、下図を使って説明する。
この設定では、direct Exchangeに2つのキューをバインドしている。1つはバインディングキーが「orange」、他方は「black」と「green」でバインドしている。
そのため、ProducerがExchangeにPublishすると、バインディングキーが「orange」だったら、ルーティングキーが「orange」のキューQ1に配信される。バインディングキーが「black」または「green」だったら、ルーティングキーが「black」と「green」のキューQ2に配信される。これ以外のすべてのメッセージは、ルーティングキーにマッチしないので破棄される。
Multiple bindings
同一のバインディングキーで複数のキューにバインドすることもできる。たとえばバインディングキーが「black」のメッセージをpublishしたら、ルーティングキーが「black」になっているすべてのキューに配信される。(fanout Exchangeみたいなイメージ)
Emmiting logs
前述のとおり、今回はfanoutではなくdirect Exchangeにメッセージをpublishする。その際、ログの重要度をバインディングキーに指定することで、受信側のプログラムは受信したい重要度を選択できるようになる。
// Exchange名
const ex = 'direct_logs';
// Exchangeを定義する
ch.assertExchange(ex, 'direct', { durable: false } );
// メッセージをPublishする(severityはログの重要度: info, warning, error がはいる)
ch.publish(ex, severity, Buffer.from(msg));
Subscribing
前回とほぼ同じだが、実行時の引数で受信したいログの重要度ごとにBindingsを作成する。
// argsには['warning', 'error']のようにログの重要度を指定する
args.forEach(severity => {
ch.bindQueue(q.queue, ex, severity);
});
Putting it all together
チュートリアル4をすべて終えると、最終的に以下のようなコードになる。
Producer(emit_log_direct.js)
// emit_log_direct.js
const amqp = require('amqplib/callback_api');
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
conn.createChannel((err, ch) => {
// Exchange名
const ex = 'direct_logs';
// Message(ex. node emit_log_direct.js info message)
const args = process.argv.slice(2);
const msg = args.slice(1).join('') || 'Hello World';
// Routing key
const severity = (args.length > 0) ? args[0] : 'info';
// directタイプのExchangeの定義
ch.assertExchange(ex, 'direct', { durable: false });
// direct_logs Exchangeにバインディングキーを指定してパブリッシュする
// ex: Exchange名、 severity: binding key、 msg: メッセージ
ch.publish(ex, severity, Buffer.from(msg));
console.log(` [x] Sent ${severity}: ${msg}`);
});
setTimeout(() => {
conn.close();
process.exit(0);
}, 500);
});
Consumer(receive_logs_direct.js)
// receive_logs_direct.js
const amqp = require('amqplib/callback_api');
const args = process.argv.slice(2);
if (args.length === 0) {
console.log('Usage: receive_logs_direct.js [info] [warning] [error]');
process.exit(1);
}
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
conn.createChannel((err, ch) => {
// Exchange名
const ex = 'direct_logs';
// directタイプのExchangeを定義
ch.assertExchange(ex, 'direct', { durable: false });
// 無名キュー(Temporary Queue)を定義
ch.assertQueue('', { exclusive: true }, (err, q) => {
console.log(' [x] Waiting for logs. To exit press CTRL+C');
// 引数で指定されたルーティングキー分のキューをバインドする
args.forEach(severity => {
// QueueとExchangeをRoutingkeyで関連付ける
ch.bindQueue(q.queue, ex, severity);
});
ch.consume(q.queue, msg => {
console.log(` [x] ${msg.fields.routingKey}: '${msg.content.toString()}'`);
}, { noAck: true });
});
});
});
実行する
# 重要度がwarningとerrorだけ受信する
shell1$ node receive_logs_direct.js error warning
[x] Waiting for logs. To exit press CTRL+C
# 重要度がerrorのメッセージを送信する
shell2$ node emit_logs_direct.js error 'test'
[x] Sent error: test
shell1
[x] error: 'test'
# 重要度がwarningのメッセージを送信する
shell2$ node emit_logs_direct.js warning 'test'
[x] Sent error: test
shell1
[x] warning: 'test'
# 重要度がinfoのメッセージを送信する
shell2$ node emit_logs_direct.js info 'test'
[x] Sent info: test
shell1
(受信しない)
ここまででチュートリアル4「Routing」は終了。
次回はチュートリアル5「Topics」をやっていく。
以上
written by @bc_rikko
0 件のコメント :
コメントを投稿