2017/03/02

Node.js+RabbitMQでメッセージの送受信をする4(Routing)

仕事でRabbitMQまわりの一部システムを担当することになった。フロントエンド畑の人なので、Node.jsをつかってRabbitMQのチュートリアルをテキトーに和訳し、他で得た知識で補足しながら勉強した内容をまとめていこうと思う。

今回はチュートリアル4「Routing」を学ぶ。



関連記事




今回学ぶこと


  • RoutingKeyによるメッセージの振り分け
  • Direct Exchange


Routing


前回のチュートリアルでは、すべてのConsumerにログメッセージを送信するシンプルなロギングシステムをつくった。
今回は、そのロギングシステムにログレベル(重要度)によって受信するかどうかフィルタリングする機能を追加する。具体的には、ProducerはすべてのログをExchangeにPublishするけど、Consumerは自分が欲しいログ(重要度)のものしか受信しないようにする感じ。



Bindings


前回のチュートリアルでは、以下のようなBindingを実装し、ExchangeとQueueの関連付けを行った。
// ch.bindQueue(queue_name, exchange_name, '');
ch.bindQueue(q.queue, ex, '');

今回は、このBindingにルーティングキーを指定する。(Publishするときと、Subscribeするときの双方にルーティングキーを指定するので、混乱を避けるために、以降PublishするときにbindQueueするパラメータを「バインディングキー」と呼ぶ)
バインディングキーによって、受信するメッセージをフィルタリングできる。
// ch.bindQueue(queue_name, exchange_name, 'routingKey aka.bindingKey');
ch.bindQueue(q.queue, ex, 'black');

このBindingで、メッセージに「black」というバインディングキーを指定した。詳細は後述するが、Consumerはこのバインディングキーとルーティングキーが一致したときだけ受信できるようになる。
ちなみに、バインディングキーの意味付けはExchangerタイプに依存するので、前回のようなすべてのConsumerに向けてメッセージを発信するようなfanout Exchangeはバインディングキーが指定できず、無視される。



Direct exchange


前回のロギングシステムでは、fanoutタイプのExchangeを使うことですべてのConsumerにすべてのメッセージを配信していた。今回は重要度(severity)によってメッセージをフィルタリングできるように機能拡張する。

しかし、fanout Exchangeにはそのような柔軟性がないので、代わりにdirect Exchangeを使用する。direct Exchangeのルーティングアルゴリズムは、メッセージのバインディングキーとルーティングキーが一致したキューに送信される。

direct Exchangeについて、下図を使って説明する。
この設定では、direct Exchangeに2つのキューをバインドしている。1つはバインディングキーが「orange」、他方は「black」と「green」でバインドしている。
そのため、ProducerがExchangeにPublishすると、バインディングキーが「orange」だったら、ルーティングキーが「orange」のキューQ1に配信される。バインディングキーが「black」または「green」だったら、ルーティングキーが「black」と「green」のキューQ2に配信される。これ以外のすべてのメッセージは、ルーティングキーにマッチしないので破棄される。



Multiple bindings


同一のバインディングキーで複数のキューにバインドすることもできる。たとえばバインディングキーが「black」のメッセージをpublishしたら、ルーティングキーが「black」になっているすべてのキューに配信される。(fanout Exchangeみたいなイメージ)



Emmiting logs


前述のとおり、今回はfanoutではなくdirect Exchangeにメッセージをpublishする。その際、ログの重要度をバインディングキーに指定することで、受信側のプログラムは受信したい重要度を選択できるようになる。
// Exchange名
const ex = 'direct_logs';

// Exchangeを定義する
ch.assertExchange(ex, 'direct', { durable: false } );

// メッセージをPublishする(severityはログの重要度: info, warning, error がはいる)
ch.publish(ex, severity, Buffer.from(msg));



Subscribing


前回とほぼ同じだが、実行時の引数で受信したいログの重要度ごとにBindingsを作成する。
// argsには['warning', 'error']のようにログの重要度を指定する
args.forEach(severity => {
    ch.bindQueue(q.queue, ex, severity);
});



Putting it all together


チュートリアル4をすべて終えると、最終的に以下のようなコードになる。

Producer(emit_log_direct.js)


const amqp = require('amqplib/callback_api');

amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // Exchange名
        const ex = 'direct_logs';
        // Message(ex. node emit_log_direct.js info message)
        const args = process.argv.slice(2);
        const msg = args.slice(1).join('') || 'Hello World';
        // Routing key
        const severity = (args.length > 0) ? args[0] : 'info';

        // directタイプのExchangeの定義
        ch.assertExchange(ex, 'direct', { durable: false });
        // direct_logs Exchangeにバインディングキーを指定してパブリッシュする
        // ex: Exchange名、 severity: binding key、 msg: メッセージ
        ch.publish(ex, severity, Buffer.from(msg));
        console.log(` [x] Sent ${severity}: ${msg}`);
    });

    setTimeout(() => {
        conn.close();
        process.exit(0);
    }, 500);
});


Consumer(receive_logs_direct.js)


const amqp = require('amqplib/callback_api');

const args = process.argv.slice(2);

if (args.length === 0) {
    console.log('Usage: receive_logs_direct.js [info] [warning] [error]');
    process.exit(1);
}

amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // Exchange名
        const ex = 'direct_logs';

        // directタイプのExchangeを定義
        ch.assertExchange(ex, 'direct', { durable: false });

        // 無名キュー(Temporary Queue)を定義
        ch.assertQueue('', { exclusive: true }, (err, q) => {
            console.log(' [x] Waiting for logs. To exit press CTRL+C');

            // 引数で指定されたルーティングキー分のキューをバインドする
            args.forEach(severity => {
                // QueueとExchangeをRoutingkeyで関連付ける
                ch.bindQueue(q.queue, ex, severity);
            });

            ch.consume(q.queue, msg => {
                console.log(` [x] ${msg.fields.routingKey}: '${msg.content.toString()}'`);
            }, { noAck: true });
        });
    });
});


実行する


# 重要度がwarningとerrorだけ受信する
shell1$ node receive_logs_direct.js error warning
 [x] Waiting for logs. To exit press CTRL+C

# 重要度がerrorのメッセージを送信する
shell2$ node emit_logs_direct.js error 'test'
 [x] Sent error: test

shell1
  [x] error: 'test'


# 重要度がwarningのメッセージを送信する
shell2$ node emit_logs_direct.js warning 'test'
 [x] Sent error: test

shell1
 [x] warning: 'test'


# 重要度がinfoのメッセージを送信する
shell2$ node emit_logs_direct.js info 'test'
 [x] Sent info: test

shell1
(受信しない)



ここまででチュートリアル4「Routing」は終了。
次回はチュートリアル5「Topics」をやっていく。


以上

written by @bc_rikko

0 件のコメント :

コメントを投稿