2017/02/24

Node.js+RabbitMQでメッセージの送受信をする2(Work queues)

仕事でRabbitMQまわりの一部システムを担当することになった。フロントエンド畑の人なので、Node.jsをつかってRabbitMQのチュートリアルをテキトーに和訳し、他で得た知識で補足しながら勉強した内容をまとめていこうと思う。

今回はチュートリアル2「Work queues」を学ぶ。



関連記事




今回学ぶこと


  • Work Queues
  • タスクの負荷分散
  • キューとメッセージの永続化



Work Queues


今回は複数のワーカーで時間のかかるタスクを分散するために使われるWork Queuesを作成する。Work Queuesは受信したメッセージをすぐにワーカーに割り当てていく。ワーカーは割り当てられたメッセージをあとで実行するために内部キューにスケジューリングを行い、逐一実行していく。
※ 前回までのQueuesと同じものだが、区別するためにWork Queuesと呼ぶ


Preparation


前回は「Hello World」という文字列を送受信した。今回はもうちょっと複雑に、文字列の中に含まれるドットの数だけ待機する非同期なプログラムをつくる。(実際は画像のリサイズやPDFのレンダーなどをしてかかる時間をsetTimeoutで擬似的に再現するプログラム)
例えば「Hello...」を送信すると3秒、「Hello.....」なら5秒待機してから完了メッセージを表示する。

Producer(new_task.js)

前回つくったsend.jsのコードを一部変更して、コマンドラインから任意のメッセージを送信できるようにする。Work Queueにタスクを登録するので、new_task.jsというファイル名にする。

const amqp = require('amqplib/callback_api');

// RabbitMQサーバに接続
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // キューの名前
        const q = 'task_queue';
        // $ node new_task.js {parameter} のパラメータを取得
        const msg = process.argv.slice(2).join('') || 'Hello World';

        // キュー送信
        ch.assertQueue(q, { durable: true });
        ch.sendToQueue(q,  Buffer.from(msg), { presistent: true });
        console.log(` [x] Sent "${msg}"`);

        // コネクションの切断
        setTimeout(() => {
            conn.close();
            process.exit(0);
        }, 500);
    });
});



Consumer(worker.js)

前回つくったreceive.jsのコードを変更して、new_task.jsによって登録されたタスクを取得し、ドットの数だけ待機してから処理を実行できるようにする。
Work Queuesからメッセージを取得してタスクを実行するので、worker.jsというファイル名にする。
const amqp = require('amqplib/callback_api');

// RabbitMQサーバに接続
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // キューの名前
        const q = 'task_queue';

        ch.assertQueue(q, {durable: false});
        
        console.log(` [*] Waiting for message in ${q}. To exit press CTRL+C`);
        ch.consume(q, msg => {
            // ドットの数をカウント
            const secs = msg.content.toString().split('.').length - 1;

            console.log(` [x] Received ${msg.content.toString()}`);
            // 遅延(重い処理をしているイメージ)
            setTimeout(() => {
                console.log(' [x] Done');
            }, secs * 1000);
        }, { noAck: true });
    });
});



実行する


ちゃんとドットの数だけ待機して処理してくれるか確認する。
ターミナルを2つ立ち上げて、以下を実行する。
# shell1でworkerを起動。メッセージを待機する
shell1$ node worker.js
 [*] Waiting for message in task_queue. To exit press CTRL+C
 [x] Received Hello World

# shell2でメッセージを送信
shell2$ node new_task.js
 [x] Sent "Hello World"

# shell1 すぐ表示される
 [x] Done


# shell2で新しいメッセージを送信
shell2$ node new_task.js Hello...
 [x] Sent "Hello..."

# shell1 3秒後くらい
 [x] Done



Round-robin dispatching


Work Queuesを使う利点のひとつに、処理を簡単に並列化(分散)できることがあげられる。RabbitMQはラウンドロビン方式(巡回的並列処理)で自動的にワーカーの負荷を分散してくれる。

worker.jsのプロセスを2つ同時に実行した状態で、new_task.jsでタスクを登録して、分散されていることを確認する。

まずはworker.jsを2プロセス同時に起動する。
# woker.jsを2プロセス起動する
shell1$ node worker.js
 [*] Waiting for message in task_queue. To exit press CTRL+C

shell2$ node worker.js
 [*] Waiting for message in task_queue. To exit press CTRL+C

次に、new_task.jsを実行し複数のタスクを一気に登録する。
# new_task.jsでキューを登録する
$ node new_task.js one.
 [x] Sent "one."
$ node new_task.js two..
 [x] Sent "two.."
$ node new_task.js three...
 [x] Sent "three..."
$ node new_task.js four....
 [x] Sent "four...."
$ node new_task.js five.....
 [x] Sent "five....."
$ node new_task.js six......
 [x] Sent "six......"

するとRabbitMQが自動的にshell1とshell2へ順番に分配してくれる。
# shell1
 [x] Received one.
 [x] Done
 [x] Received three...
 [x] Done
 [x] Received five.....
 [x] Done

# shell2
 [x] Received two..
 [x] Done
 [x] Received four....
 [x] Done
 [x] Received six......
 [x] Done



Message acknowledgement


いまのままの実装では、RabbitMQがConsumerにメッセージを送信した時点で、そのメッセージをメモリから削除してしまう。
もしConsumerが予期せぬエラーで落ちた場合、実行中のメッセージが失われてしまう。それだけでなく、RabbitMQがConsumerに送信したけどワーカーの内部キューに格納されている未実行タスクもすべて失われる。

このような状況を解消するために、RabbitMQにはConsumerの完了応答を待ってからメッセージを削除する「Message acknowledgement」という機能がある。この機能を使うと、Consumerが完了応答するまでRabbitMQからタスクが削除されることはない。(厳密には違うけど)

ConsumerからRabbitMQへ送る完了報告をackという。先ほどまでのコードには{ noAck: true }となっているので、タスクが割り振られた瞬間に削除するような設定になっている。{ noAck: false }に変更することで、Consumerの処理が終わり、完了応答(ch.ack(msg))をするまでタスクがRabbitMQ上に残り続ける。

完了報告(ch.ack)を忘れた場合、RabbitMQのWork Queuesに残り続けるため使用メモリが肥大化してしまうので注意が必要。

ch.consume(q, msg => {
    const secs = msg.content.toString().split('.').length - 1;

    console.log(` [x] Received ${msg.content.toString()}`);
    setTimeout(() => {
        console.log(' [x] Done');
        // 完了応答を送る
        ch.ack(msg);
    }, secs * 1000);
}, { noAck: false });



Message durability


Message acknowledgementでは、Consumerが落ちてもWork Queuesからメッセージを失われないようにする方法を学んだ。
しかし、このままではRabbitMQ本体が落ちた場合にキューやメッセージが失われてしまう。

その対策として「durable」「persistent」オプションがある。
durableオプションは、ProducerまたはConsumerがキューと定義するとき(assertQueue)に { durable: true }というオプションをつけることで、定義したキューが消えないようにできる。
persistentオプションは、Producerがメッセージを送信するとき(sendToQueue)に { persistent: true }というオプションをつけることで、送信したメッセージをメモリが消えないようにできる。

ただいまの実装のままではメッセージの永続化(durable、persistent)が機能しない。
なぜならhelloキューはすでに「永続化しない(durable: false)」と定義されてしまっているからだ。そのまま実行しようとすると「Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'false' but current is 'true'"」というエラーが発生する。

そのため「task_queue」という永続化された新たなキューを定義する。

conn.createChannel((err, ch) => {
  const q = 'task_queue';
  // いろいろ...
 
  // declare durable
  ch.assertQueue(q, { durable: true });
  // mark message as presistent
  ch.sendToQueue(q,  Buffer.from(msg), { presistent: true });

  // いろいろ...
});
conn.createChannel((err, ch) => {
   const q = 'task_queue';
    // いろいろ...

    // declare durable
    ch.assertQueue(q, {durable: true});

    // いろいろ...
});

※ 注意
persistentでメッセージの永続化を指定しても、RabbitMQがメモリからディスクに書き込むまでにラグがあるので、その間に落ちた場合はメッセージを失ってしまう。


Fair dispatch


RabbitMQはラウンドロビン方式でタスクを分配するので、例えばワーカーが2つあった場合、奇数番目のメッセージは重く、偶数番目のメッセージが軽い場合は、一方のワーカーに処理が偏ってしまう。
このようなことが起こる原因は、RabbitMQがConsumerの状態を確認せずに、ただ単純に順番に分配しているためである。そのため、片方のワーカーに処理を偏らせないために、ch.prefetch(上限数)で1つのワーカーに1度に送れるメッセージの上限数を指定することで負荷の偏りを回避できる。


Putting it all together


チュートリアル2をすべて終えると、最終的に以下のようなコードになる。

new_task.js


const amqp = require('amqplib/callback_api');

// RabbitMQサーバに接続
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // キューの名前
        const q = 'task_queue';
        const msg = process.argv.slice(2).join('') || 'Hello World';

        // RabbitMQが落ちてもメッセージを喪失しないようにdurableとpresistentを指定
        ch.assertQueue(q, { durable: true });
        ch.sendToQueue(q,  Buffer.from(msg), { presistent: true });

        console.log(` [x] Sent "${msg}"`);

        // コネクション切断
        setTimeout(() => {
            conn.close();
            process.exit(0);
        }, 500);
    });
});


worker.js


const amqp = require('amqplib/callback_api');

// RabbitMQサーバに接続
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
    conn.createChannel((err, ch) => {
        // キューの名前
        const q = 'task_queue';

        // RabbitMQが落ちてもメッセージを喪失しないようにdurableを指定
        ch.assertQueue(q, {durable: true});
        // 受け取るメッセージ数の上限を指定
        ch.prefetch(1);
        
        console.log(` [*] Waiting for message in ${q}. To exit press CTRL+C`);

        // メッセージの受信
        ch.consume(q, msg => {
            const secs = msg.content.toString().split('.').length - 1;

            console.log(` [x] Received ${msg.content.toString()}`);
            setTimeout(() => {
                console.log(' [x] Done');
                // 処理が終わったことをRabbitMQに伝える
                ch.ack(msg);
            }, secs * 1000);
        }, { noAck: false });
    });
});


ここまででチュートリアル2「Work queues」は終了。
次回はチュートリアル3「Publish/Subscribe」をやっていく。


以上

written by @bc_rikko

0 件のコメント :

コメントを投稿