2017/03/06

Node.js+RabbitMQでメッセージの送受信をする5(Topics)

仕事でRabbitMQまわりの一部システムを担当することになった。フロントエンド畑の人なので、Node.jsをつかってRabbitMQのチュートリアルをテキトーに和訳し、他で得た知識で補足しながら勉強した内容をまとめていこうと思う。

今回はチュートリアル5「Topic」を学ぶ。



関連記事




今回学ぶこと


  • 高度なメッセージの振り分け
  • Topic Exchange


Topics


前回のチュートリアルでは、ロギングシステムにdirect Exchangeを使った重要度によるフィルタリングを追加した。しかし、direct Exchangeには制限があり、バインディングキーとルーティングキーの完全一致でしかルーティングできない。たとえば、複数条件や正規表現的な複雑なルーティングができないのだ。

今回は、複雑なルーティングができるように改良する。
具体的には、ログの重要度だけでなく、送信されるログの中身(ルーティングキーの内容)をみてフィルタリングを行う、syslogのような考え方だ。そのため、direct Exchangeの代わりにtopic Exchangeを使う。



Topic exchange


topic Exchangeはdirect Exchangeに似ているが、任意のルーティングキーではなく、ドット区切りの単語リストを使ってより高度なルーティングができる。たとえば「stock.usd.nyse」や「nyse.vmw」、「quick.orange.rabbit」などだ。このルーティングキーには最大255バイトまで指定できる。もちろんバインディングキーにも同様な形式が必要だ。

topic Exchangeでは、バインディングキーに2つの特殊文字が指定できる。

  • *(アスタリスク): ドットで区切られた1単語に一致する
  • #(ハッシュ): ゼロ、もしくは複数の単語に一致する(正規表現の?みたいな感じ)


図をつかって詳しく説明する。
上図では、動物をあらわすメッセージを送信している。メッセージには3単語からなるルーティングキーが使われており、{スピード}.{色}.{種類} のように指定する。たとえば「quick.orange.fox」や「lazy.pink.rabbit」など。


  • key: *.orange.*  →  Q1に配信される
    • すべてのオレンジ色の動物
    • quick.orange.foxやlazy.orange.catなど
  • key: *.*.rabbit  →  Q2に配信される
    • すべてのウサギ
    • quick.pink.rabbitやlazy.brown.rabbitなど
  • key: lazy.#  →  Q2に配信される
    • すべてののろまな動物
    • lazy.orange.foxやlazy.brown.rabbit.maleなど
    • #の場合は、4単語あってもlazyな動物だったら配信される


ルーティングキーが一致しないメッセージは、direct Exchange同様に破棄される。


topic exchangeについて補足

topic Exchangeの機能は充実しているので、他のfanoutやdirectタイプのExchangeと同じように扱える。キューとバインディングキーが「#」でバインドされていれば、fanout Exchangeのようにすべてのメッセージを受信する。特殊文字の「*」や「#」を使わなければ、direct Exchangeのように完全一致のメッセージを受信する。

万能なので、とりあえずtopic Exchange選んどけば大丈夫的なところはある。(私感)



Putting it all together


チュートリアル5をすべて終えると、最終的に以下のようなコードになる。

Producer(emit_log_topic.js)


// emit_log_topic.js
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // Exchange名
        const ex = 'topic_logs';
        // 引数からRoutingKeyを定義
        const args = process.argv.slice(2);
        const key = (args.length > 0) ? args[0] : 'anonymous.info';
        const msg = args.slice(1).join('') || 'Hello World';

        // topicタイプのExchangeを定義
        ch.assertExchange(ex, 'topic', { durable: false });
        // topic_logs Exchangeにルーティングキーを指定してパブリッシュする
        ch.publish(ex, key, Buffer.from(msg));
        console.log(` [x] Sent ${key}: '${msg}`);
    });

    setTimeout(() => {
        conn.close();
        process.exit(0);
    }, 500);
});


Consumer(receive_logs_topic.js)


// receive_log_topic.js
const amqp = require('amqplib/callback_api');

const args = process.argv.slice(2);

if (args.length === 0) {
    console.log('Usage: receive_logs_topic.js <facility>.<severity>');
    process.exit(1);
}

amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // Exchange名
        const ex = 'topic_logs';

        // topicタイプのExchangeを定義
        ch.assertExchange(ex, 'topic', { durable: false });

        ch.assertQueue('', { exclusive: true }, (err, q) => {
            console.log(' [*] Waiting for logs. To exit press CTRL+C');

            // 引数で指定されたルーティングキー分のキューをバインドする
            args.forEach(key => {
                // QueueとExchangeをRoutingkeyで関連付ける
                ch.bindQueue(q.queue, ex, key);
            });

            ch.consume(q.queue, msg => {
                console.log(` [x] ${msg.fields.routingKey}: '${msg.content.toString()}`);
            }, { noAck: true });
        });
    });
});



実行する


# すべてのログを受信する
shell1$ node receive_logs_topic.js "#"
 [*] Waiting for logs. To exit press CTRL+C

# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
 [x] Sent kern.critical: 'A critical kernel error

shell1: 
 [x] kern.critical: 'A critical kernel error


# "kern.*" のログを受信する
shell1$ node receive_logs_topic.js "kern.*"
 [*] Waiting for logs. To exit press CTRL+C

# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
 [x] Sent kern.critical: 'A critical kernel error

shell1: 
 [x] kern.critical: 'A critical kernel error


# "*.critical"のログを受信する
shell1$ node receive_logs_topic.js "*.critical"
 [*] Waiting for logs. To exit press CTRL+C

# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
 [x] Sent kern.critical: 'A critical kernel error

shell1: 
 [x] kern.critical: 'A critical kernel error


# "kern.*", "*.critical"のログを受信する
shell1$ node receive_logs_topic.js "kern.*" "*.critical"
 [*] Waiting for logs. To exit press CTRL+C

# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
 [x] Sent kern.critical: 'A critical kernel error

shell1: 
 [x] kern.critical: 'A critical kernel error



# "kern..critical"のログを受信する
shell1$ node receive_logs_topic.js "kern..critical"
 [*] Waiting for logs. To exit press CTRL+C

# shell2$ node emit_log_topic.js "kern.critical" "A critical kernel error"
 [x] Sent kern.critical: 'A critical kernel error

shell1: 
 [x] kern.critical: 'A critical kernel error

ここまででチュートリアル5「Topic」は終了。
次回はチュートリアル6「RPC(Remote produre call)」をやっていく。



以上

written by @bc_rikko

0 件のコメント :

コメントを投稿