今回はチュートリアル3「Publish/Subscribe」を学ぶ。
関連記事
- CentOS7.3にRabbitMQをインストールしてGUIで管理できるようにする
- チュートリアル1「Hello World」
- チュートリアル2「Work queues」
- チュートリアル3「Publish/Subscribe」 ←いまここ
- チュートリアル4「Routing」
- チュートリアル5「Topics」
- チュートリアル6「RPC」
今回学ぶこと
- Publish/Subscribeメッセージモデル
- Exchange
- Temporary Queues
- Binding
Publish/Subscrive
前回のチュートリアルでは、タスクを1つのワーカーに確実に配信するためにWork Queuesをつくった。
今回はWork Queuesと違い、複数のConsumerにメッセージを配信する仕組みについて学ぶ。このパターンは「Publish/Subscribe」と呼ばれている。
このパターンを説明するために、シンプルなロギングシステムを作成する。
ロギングシステムは以下の2つのプログラムで構成されている。
- emit_log.js: ログメッセージを発信する
- receive_log.js: ログメッセージを受信と表示をする
さらに起動しているすべての受信プログラムにメッセージを配信できるようにする。
Exchanges
前回までは、直接キューを指定しメッセージの送受信していた。今回はRabbitMQの古メッセージングモデル(Pub/Subメッセージングモデル)を導入する。
復習
あたらしいメッセージングモデルを学ぶ前に、前回までの復習をする。- Producer: メッセージを送るユーザアプリケーション
- Queue: メッセージを格納するバッファ(RabbitMQ内にある)
- Consumer: メッセージを受け取るユーザアプリケーション
Pub/Subメッセージングモデル
Pub/Subメッセージングモデルのアイデアは、Producerが直接キューにメッセージを送信しないことである。そのかわりにExchangeにメッセージを送信するだけで、それ以降どのキューに配信されるかは知る必要がない。ExchangeはProducerからメッセージを受け取り、特定のキューに追加するか、複数のキューに追加するか、それとも破棄するかをExchangeタイプによって判断してくれる。
Hello Worldの記事で紹介した構成が、まさにPub/Subメッセージングモデルである。
今回はこの構成をつくっていく。
Exchange Types
Exchangeが行う処理は、以下の4つのタイプで定義することができる。- Direct exchange
- メッセージに付与されているルーティングキーと、Bindingに設定されているバインディングキーを見て、完全一致するキューに配送する
- Fanout exchange
- バインドされているすべてのキューにメッセージを配送する
- Topic exchange
- 基本的にはDirect exchangeと同じ
- より高度な振り分けができる(正規表現っぽくバインドできる)
- Headers exchange
- 任意のヘッダの値を比較することで、一致したキューに配送する
Exchange Typesを確認する
他のExchange Typeは以下のコマンドで確認できる。
$ rabbitmqctl list_exchanges
Listing exchanges ...
amq.fanout fanout
amq.rabbitmq.log topic
amq.topic topic
amq.match headers
amq.headers headers
amq.direct direct
amq.rabbitmq.trace topic
direct
このリストには、amq.* Exchangeとdefault Exchange(無名)がある。
Nameless exchange
前回まではExchangeについてまったく指定しなかったが、ちゃんとキューにメッセージが配送されていた。実は、Default Exchange(空文字)を使っていたためだ。
さきほどのlist_exchangesでいうと、最終行の「 (空文字) direct」がそれにあたる。
ch.sendToQueue('hello', Bugger.from('Hello World'))はNameless Exchangeを使っており、第1引数で指定したキュー名(hello)に送信していたことになる。
Exchangeの定義とPublish
今回は、すべてのConsumerにメッセージを送りたいので、FanoutタイプのExchangeを作成する。そしてそのExchangeにlogsという名前をつける。
そして、logs ExchangeにメッセージPublish(送信)する。
// logsという名前のExchangeを定義
ch.assertExchange('logs', 'fanout', { durable: false });
// logsにパブリッシュする
ch.publish('logs', '', Buffer.from('Hello World'));
logs ExchangeにPublishするだけなので、ch.publishの第2引数は空にして特定のキューに送らないようにする。
Temporary queues
前回までのようにキューにhelloやtask_queueという名前をつけることは、ProducerとConsumerとの間でキューを共有するときに重要だった。なぜなら同じキューを参照しないとメッセージの送受信ができなかったためだ。
しかし、今回のロギングシステムには名前付きのキューはいらない。一部のログや古いメッセージがほしいのではなく、最新のメッセージだけを取得して表示したいからだ。
そのため、RabbitMQに接続するたびに新たな空のキューをつくる。これを行うためには、ランダムな名前のキューを作成する必要があるのだが、サーバに用意させるのが手っ取り早い。
次に、Consumerが切断したら、もうランダムな名前のキューは必要ないので自動的に削除させる。(残しておくとメモリが圧迫されてしまう)
amqpl.nodeクライアントでキュー名を空にすると、ランダムに生成された名前で、永続化されていない(非durable)キューがつくられる。
ch.assertQueue('', { exclusive: true ], (err, q) => {
// RabbitMQによって生成されたランダムなキュー名
console.log(q.queue);
});
キュー名を指定せずにassetQueueをすると、戻り値のキューインスタンスにRabbitMQによって生成されたランダムな名前のキューが返ってくる。たとえば「amq.gen-JzTY20BRgKO-HjmUJj0wLg」のような値だ。
そして{ exclusive: true }を指定することで、他のConsumerが利用できないキューが作成され、また切断された時点でキューを自動で削除してくれる。
※ もちろんキューの名前を指定できるけど、他のConsumerが定義したキューとかぶらないように注意が必要
Bindings
ExchangeとTemporary Queuesのところで、すでにfanoutタイプのExchangeとランダム名のQueueは作成しているので、次はExchangeとQueueをどんな条件で関連付けるか設定する。この関連付けをBindingという。
ch.bindQueue(queue_name, 'logs', '');
第1引数にQueue、第2引数にExchangeを指定すると、ExchangeとQueueが関連付けられ、メッセージを登録できるようになる。
すでにあるBindigを確認するには、以下のコマンドを実行する。
$ rabbitmqctl list_bindings
Putting it all together
チュートリアル3をすべて終えると、最終的に以下のようなコードになる。
// emit_log.js
const amqp = require('amqplib/callback_api');
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
conn.createChannel((err, ch) => {
// Exchangeの名前
const ex = 'logs';
// メッセージ
const msg = process.argv.slice(2).join('') || 'Hello World';
// 名前: logs、タイプ:fanoutのExchangeを定義
ch.assertExchange(ex, 'fanout', { durable: false });
// メッセージをパブリッシュする
ch.publish(ex, '', Buffer.from(msg));
console.log(` [x] Sent ${msg}`);
});
setTimeout(() => { conn.close(); process.exit(0); }, 500);
});
// receive_logs.js
const amqp = require('amqplib/callback_api');
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
conn.createChannel((err, ch) => {
// Exchangeの名前
const ex = 'logs';
// 名前: logs、タイプ:fanoutのExchangeを定義
ch.assertExchange(ex, 'fanout', { durable: false });
// 無名のTemporary Queueを定義
ch.assertQueue('', { exclusive: true }, (err, q) => {
console.log(` [*] Waiting for messages in ${q.queue}. To exit press CTRL+C`);
// ExchangeとTemporary Queueを関連付ける(バインディングする)
ch.bindQueue(q.queue, ex, '');
ch.consume(q.queue, msg => {
console.log(` [x] ${msg.content.toString()}`);
}, { noAck: true });
});
});
});
まずはConsumerを2プロセス起動する。
shell1$ node receive_logs.js
[*] Waiting for messages in amq.gen-07YhjvHTRN4LfiRUQpcNLQ. To exit press CTRL+C
shell2$ node receive_logs.js
[*] Waiting for messages in amq.gen-07YhjvHTRN4LfiRUQpcNLQ. To exit press CTRL+C
次にログメッセージを発信して、shell1とshell2の両方のConsumerにメッセージが送信されることを確認する。
shell3$ node emit_log.js
[x] Sent Hello World
shell1: [x] Hello World
shell2: [x] Hello World
shell3$ node emit_log.js hello...
[x] Sent hello...
shell1: [x] hello...
shell2: [x] hello...
最後にExchangeやBinding、Queuesについて確認する。
# Consumerのプロセスが2つ起動している場合
## Exchangeを確認(logs fanout)
$ rabbitmqctl list_exchanges
Listing exchanges ...
amq.fanout fanout
amq.rabbitmq.log topic
amq.topic topic
amq.match headers
amq.headers headers
amq.direct direct
logs fanout
amq.rabbitmq.trace topic
direct
## Bindingを確認(logs exchange ↔ queueが関連づいていることを確認)
$ rabbitmqctl list_bindings
Listing bindings ...
exchange amq.gen-07YhjvHTRN4LfiRUQpcNLQ queue amq.gen-07YhjvHTRN4LfiRUQpcNLQ []
exchange amq.gen-pGx756YcHyzh38K6cOEUIg queue amq.gen-pGx756YcHyzh38K6cOEUIg []
exchange hello queue hello []
exchange task_queue queue task_queue []
logs exchange amq.gen-07YhjvHTRN4LfiRUQpcNLQ queue []
logs exchange amq.gen-pGx756YcHyzh38K6cOEUIg queue []
# Consumerプロセスが起動していない場合
## Bindingsを確認(ExchangeとQueueの関連付けがないことを確認)
$ rabbitmqctl list_bindings
Listing bindings ...
exchange hello queue hello []
exchange task_queue queue task_queue []
## Queueを確認(生成されたランダムなQueueがないことを確認)
$ rabbitmqctl list_queues
Listing queues ...
hello 0 ← チュートリアル1の残骸
task_queue 0 ← チュートリアル2の残骸
ここまででチュートリアル3「Publish/Subscribe」は終了。
次回はチュートリアル4「Routing」をやっていく。
以上
written by @bc_rikko
0 件のコメント :
コメントを投稿