2017/02/28

Node.js+RabbitMQでメッセージの送受信をする3(Publish/Subscribe)

仕事でRabbitMQまわりの一部システムを担当することになった。フロントエンド畑の人なので、Node.jsをつかってRabbitMQのチュートリアルをテキトーに和訳し、他で得た知識で補足しながら勉強した内容をまとめていこうと思う。

今回はチュートリアル3「Publish/Subscribe」を学ぶ。



関連記事




今回学ぶこと


  • Publish/Subscribeメッセージモデル
  • Exchange
  • Temporary Queues
  • Binding



Publish/Subscrive


前回のチュートリアルでは、タスクを1つのワーカーに確実に配信するためにWork Queuesをつくった。

今回はWork Queuesと違い、複数のConsumerにメッセージを配信する仕組みについて学ぶ。このパターンは「Publish/Subscribe」と呼ばれている。
このパターンを説明するために、シンプルなロギングシステムを作成する。

ロギングシステムは以下の2つのプログラムで構成されている。

  • emit_log.js: ログメッセージを発信する
  • receive_log.js: ログメッセージを受信と表示をする

さらに起動しているすべての受信プログラムにメッセージを配信できるようにする。



Exchanges


前回までは、直接キューを指定しメッセージの送受信していた。今回はRabbitMQの古メッセージングモデル(Pub/Subメッセージングモデル)を導入する。


復習

あたらしいメッセージングモデルを学ぶ前に、前回までの復習をする。

  • Producer: メッセージを送るユーザアプリケーション
  • Queue: メッセージを格納するバッファ(RabbitMQ内にある)
  • Consumer: メッセージを受け取るユーザアプリケーション

Pub/Subメッセージングモデル

Pub/Subメッセージングモデルのアイデアは、Producerが直接キューにメッセージを送信しないことである。そのかわりにExchangeにメッセージを送信するだけで、それ以降どのキューに配信されるかは知る必要がない。

ExchangeはProducerからメッセージを受け取り、特定のキューに追加するか、複数のキューに追加するか、それとも破棄するかをExchangeタイプによって判断してくれる。


Hello Worldの記事で紹介した構成が、まさにPub/Subメッセージングモデルである。
今回はこの構成をつくっていく。



Exchange Types

Exchangeが行う処理は、以下の4つのタイプで定義することができる。

  • Direct exchange
    • メッセージに付与されているルーティングキーと、Bindingに設定されているバインディングキーを見て、完全一致するキューに配送する
  • Fanout exchange
    • バインドされているすべてのキューにメッセージを配送する
  • Topic exchange
    • 基本的にはDirect exchangeと同じ
    • より高度な振り分けができる(正規表現っぽくバインドできる)
  • Headers exchange
    • 任意のヘッダの値を比較することで、一致したキューに配送する


Exchange Typesを確認する


他のExchange Typeは以下のコマンドで確認できる。
$ rabbitmqctl list_exchanges
Listing exchanges ...
amq.fanout          fanout
amq.rabbitmq.log    topic
amq.topic           topic
amq.match           headers
amq.headers         headers
amq.direct          direct
amq.rabbitmq.trace  topic
    direct

このリストには、amq.* Exchangeとdefault Exchange(無名)がある。

Nameless exchange


前回まではExchangeについてまったく指定しなかったが、ちゃんとキューにメッセージが配送されていた。実は、Default Exchange(空文字)を使っていたためだ。
さきほどのlist_exchangesでいうと、最終行の「 (空文字) direct」がそれにあたる。
ch.sendToQueue('hello', Bugger.from('Hello World'))はNameless Exchangeを使っており、第1引数で指定したキュー名(hello)に送信していたことになる。


Exchangeの定義とPublish



今回は、すべてのConsumerにメッセージを送りたいので、FanoutタイプのExchangeを作成する。そしてそのExchangeにlogsという名前をつける。
そして、logs ExchangeにメッセージPublish(送信)する。
// logsという名前のExchangeを定義
ch.assertExchange('logs', 'fanout', { durable: false });
 
// logsにパブリッシュする
ch.publish('logs', '', Buffer.from('Hello World'));

logs ExchangeにPublishするだけなので、ch.publishの第2引数は空にして特定のキューに送らないようにする。



Temporary queues


前回までのようにキューにhelloやtask_queueという名前をつけることは、ProducerとConsumerとの間でキューを共有するときに重要だった。なぜなら同じキューを参照しないとメッセージの送受信ができなかったためだ。

しかし、今回のロギングシステムには名前付きのキューはいらない。一部のログや古いメッセージがほしいのではなく、最新のメッセージだけを取得して表示したいからだ。

そのため、RabbitMQに接続するたびに新たな空のキューをつくる。これを行うためには、ランダムな名前のキューを作成する必要があるのだが、サーバに用意させるのが手っ取り早い。

次に、Consumerが切断したら、もうランダムな名前のキューは必要ないので自動的に削除させる。(残しておくとメモリが圧迫されてしまう)
amqpl.nodeクライアントでキュー名を空にすると、ランダムに生成された名前で、永続化されていない(非durable)キューがつくられる。

ch.assertQueue('', { exclusive: true ], (err, q) => {
    // RabbitMQによって生成されたランダムなキュー名
    console.log(q.queue);
});

キュー名を指定せずにassetQueueをすると、戻り値のキューインスタンスにRabbitMQによって生成されたランダムな名前のキューが返ってくる。たとえば「amq.gen-JzTY20BRgKO-HjmUJj0wLg」のような値だ。
そして{ exclusive: true }を指定することで、他のConsumerが利用できないキューが作成され、また切断された時点でキューを自動で削除してくれる。

※ もちろんキューの名前を指定できるけど、他のConsumerが定義したキューとかぶらないように注意が必要



Bindings


ExchangeとTemporary Queuesのところで、すでにfanoutタイプのExchangeとランダム名のQueueは作成しているので、次はExchangeとQueueをどんな条件で関連付けるか設定する。この関連付けをBindingという。
ch.bindQueue(queue_name, 'logs', '');

第1引数にQueue、第2引数にExchangeを指定すると、ExchangeとQueueが関連付けられ、メッセージを登録できるようになる。

すでにあるBindigを確認するには、以下のコマンドを実行する。
$ rabbitmqctl list_bindings



Putting it all together


チュートリアル3をすべて終えると、最終的に以下のようなコードになる。
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // Exchangeの名前
        const ex = 'logs';

        // メッセージ
        const msg = process.argv.slice(2).join('') || 'Hello World';

        // 名前: logs、タイプ:fanoutのExchangeを定義
        ch.assertExchange(ex, 'fanout', { durable: false });
        // メッセージをパブリッシュする
        ch.publish(ex, '', Buffer.from(msg));
        console.log(` [x] Sent ${msg}`);
    });

    setTimeout(() => { conn.close(); process.exit(0); }, 500);
});

const amqp = require('amqplib/callback_api');

amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // Exchangeの名前
        const ex = 'logs';

        // 名前: logs、タイプ:fanoutのExchangeを定義
        ch.assertExchange(ex, 'fanout', { durable: false });

        // 無名のTemporary Queueを定義
        ch.assertQueue('', { exclusive: true }, (err, q) => {
            console.log(` [*] Waiting for messages in ${q.queue}. To exit press CTRL+C`);

            // ExchangeとTemporary Queueを関連付ける(バインディングする)
            ch.bindQueue(q.queue, ex, '');

            ch.consume(q.queue, msg => {
                console.log(` [x] ${msg.content.toString()}`);
            }, { noAck: true });
        });
    });
});


まずはConsumerを2プロセス起動する。
shell1$ node receive_logs.js
 [*] Waiting for messages in amq.gen-07YhjvHTRN4LfiRUQpcNLQ. To exit press CTRL+C

shell2$ node receive_logs.js
 [*] Waiting for messages in amq.gen-07YhjvHTRN4LfiRUQpcNLQ. To exit press CTRL+C

次にログメッセージを発信して、shell1とshell2の両方のConsumerにメッセージが送信されることを確認する。
shell3$ node emit_log.js
 [x] Sent Hello World

shell1: [x] Hello World
shell2: [x] Hello World

shell3$ node emit_log.js hello...
   [x] Sent hello...

shell1: [x] hello...
shell2: [x] hello...


最後にExchangeやBinding、Queuesについて確認する。
# Consumerのプロセスが2つ起動している場合

## Exchangeを確認(logs fanout)
$ rabbitmqctl list_exchanges
Listing exchanges ...
amq.fanout fanout
amq.rabbitmq.log topic
amq.topic topic
amq.match headers
amq.headers headers
amq.direct direct
logs fanout
amq.rabbitmq.trace topic
 direct

## Bindingを確認(logs exchange ↔ queueが関連づいていることを確認)
$ rabbitmqctl list_bindings
Listing bindings ...
 exchange amq.gen-07YhjvHTRN4LfiRUQpcNLQ queue amq.gen-07YhjvHTRN4LfiRUQpcNLQ []
 exchange amq.gen-pGx756YcHyzh38K6cOEUIg queue amq.gen-pGx756YcHyzh38K6cOEUIg []
 exchange hello queue hello []
 exchange task_queue queue task_queue []
logs exchange amq.gen-07YhjvHTRN4LfiRUQpcNLQ queue  []
logs exchange amq.gen-pGx756YcHyzh38K6cOEUIg queue  []


# Consumerプロセスが起動していない場合
## Bindingsを確認(ExchangeとQueueの関連付けがないことを確認)
$ rabbitmqctl list_bindings
Listing bindings ...
 exchange hello queue hello []
 exchange task_queue queue task_queue []

##  Queueを確認(生成されたランダムなQueueがないことを確認)
$ rabbitmqctl list_queues
Listing queues ...
hello 0  ← チュートリアル1の残骸
task_queue 0  ← チュートリアル2の残骸


ここまででチュートリアル3「Publish/Subscribe」は終了。
次回はチュートリアル4「Routing」をやっていく。



以上

written by @bc_rikko

0 件のコメント :

コメントを投稿