今回はチュートリアル5「Topic」を学ぶ。
関連記事
- CentOS7.3にRabbitMQをインストールしてGUIで管理できるようにする
- チュートリアル1「Hello World」
- チュートリアル2「Work queues」
- チュートリアル3「Publish/Subscribe」
- チュートリアル4「Routing」
- チュートリアル5「Topics」 ←いまここ
- チュートリアル6「RPC」
今回学ぶこと
- 高度なメッセージの振り分け
- Topic Exchange
Topics
前回のチュートリアルでは、ロギングシステムにdirect Exchangeを使った重要度によるフィルタリングを追加した。しかし、direct Exchangeには制限があり、バインディングキーとルーティングキーの完全一致でしかルーティングできない。たとえば、複数条件や正規表現的な複雑なルーティングができないのだ。
今回は、複雑なルーティングができるように改良する。
具体的には、ログの重要度だけでなく、送信されるログの中身(ルーティングキーの内容)をみてフィルタリングを行う、syslogのような考え方だ。そのため、direct Exchangeの代わりにtopic Exchangeを使う。
Topic exchange
topic Exchangeはdirect Exchangeに似ているが、任意のルーティングキーではなく、ドット区切りの単語リストを使ってより高度なルーティングができる。たとえば「stock.usd.nyse」や「nyse.vmw」、「quick.orange.rabbit」などだ。このルーティングキーには最大255バイトまで指定できる。もちろんバインディングキーにも同様な形式が必要だ。
topic Exchangeでは、バインディングキーに2つの特殊文字が指定できる。
- *(アスタリスク): ドットで区切られた1単語に一致する
- #(ハッシュ): ゼロ、もしくは複数の単語に一致する(正規表現の?みたいな感じ)
図をつかって詳しく説明する。
上図では、動物をあらわすメッセージを送信している。メッセージには3単語からなるルーティングキーが使われており、{スピード}.{色}.{種類} のように指定する。たとえば「quick.orange.fox」や「lazy.pink.rabbit」など。
- key: *.orange.* → Q1に配信される
- すべてのオレンジ色の動物
- quick.orange.foxやlazy.orange.catなど
- key: *.*.rabbit → Q2に配信される
- すべてのウサギ
- quick.pink.rabbitやlazy.brown.rabbitなど
- key: lazy.# → Q2に配信される
- すべてののろまな動物
- lazy.orange.foxやlazy.brown.rabbit.maleなど
- #の場合は、4単語あってもlazyな動物だったら配信される
ルーティングキーが一致しないメッセージは、direct Exchange同様に破棄される。
topic exchangeについて補足
topic Exchangeの機能は充実しているので、他のfanoutやdirectタイプのExchangeと同じように扱える。キューとバインディングキーが「#」でバインドされていれば、fanout Exchangeのようにすべてのメッセージを受信する。特殊文字の「*」や「#」を使わなければ、direct Exchangeのように完全一致のメッセージを受信する。万能なので、とりあえずtopic Exchange選んどけば大丈夫的なところはある。(私感)
Putting it all together
チュートリアル5をすべて終えると、最終的に以下のようなコードになる。
Producer(emit_log_topic.js)
// emit_log_topic.js
const amqp = require('amqplib/callback_api');
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
conn.createChannel((err, ch) => {
// Exchange名
const ex = 'topic_logs';
// 引数からRoutingKeyを定義
const args = process.argv.slice(2);
const key = (args.length > 0) ? args[0] : 'anonymous.info';
const msg = args.slice(1).join('') || 'Hello World';
// topicタイプのExchangeを定義
ch.assertExchange(ex, 'topic', { durable: false });
// topic_logs Exchangeにルーティングキーを指定してパブリッシュする
ch.publish(ex, key, Buffer.from(msg));
console.log(` [x] Sent ${key}: '${msg}`);
});
setTimeout(() => {
conn.close();
process.exit(0);
}, 500);
});
Consumer(receive_logs_topic.js)
// receive_log_topic.js
const amqp = require('amqplib/callback_api');
const args = process.argv.slice(2);
if (args.length === 0) {
console.log('Usage: receive_logs_topic.js <facility>.<severity>');
process.exit(1);
}
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
conn.createChannel((err, ch) => {
// Exchange名
const ex = 'topic_logs';
// topicタイプのExchangeを定義
ch.assertExchange(ex, 'topic', { durable: false });
ch.assertQueue('', { exclusive: true }, (err, q) => {
console.log(' [*] Waiting for logs. To exit press CTRL+C');
// 引数で指定されたルーティングキー分のキューをバインドする
args.forEach(key => {
// QueueとExchangeをRoutingkeyで関連付ける
ch.bindQueue(q.queue, ex, key);
});
ch.consume(q.queue, msg => {
console.log(` [x] ${msg.fields.routingKey}: '${msg.content.toString()}`);
}, { noAck: true });
});
});
});
実行する
# すべてのログを受信する
shell1$ node receive_logs_topic.js "#"
[*] Waiting for logs. To exit press CTRL+C
# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
[x] Sent kern.critical: 'A critical kernel error
shell1:
[x] kern.critical: 'A critical kernel error
# "kern.*" のログを受信する
shell1$ node receive_logs_topic.js "kern.*"
[*] Waiting for logs. To exit press CTRL+C
# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
[x] Sent kern.critical: 'A critical kernel error
shell1:
[x] kern.critical: 'A critical kernel error
# "*.critical"のログを受信する
shell1$ node receive_logs_topic.js "*.critical"
[*] Waiting for logs. To exit press CTRL+C
# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
[x] Sent kern.critical: 'A critical kernel error
shell1:
[x] kern.critical: 'A critical kernel error
# "kern.*", "*.critical"のログを受信する
shell1$ node receive_logs_topic.js "kern.*" "*.critical"
[*] Waiting for logs. To exit press CTRL+C
# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
[x] Sent kern.critical: 'A critical kernel error
shell1:
[x] kern.critical: 'A critical kernel error
# "kern..critical"のログを受信する
shell1$ node receive_logs_topic.js "kern..critical"
[*] Waiting for logs. To exit press CTRL+C
# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
[x] Sent kern.critical: 'A critical kernel error
shell1:
[x] kern.critical: 'A critical kernel error
ここまででチュートリアル5「Topic」は終了。
次回はチュートリアル6「RPC(Remote produre call)」をやっていく。
以上
written by @bc_rikko
0 件のコメント :
コメントを投稿