今回はチュートリアル6「RPC」を学ぶ。
関連記事
- CentOS7.3にRabbitMQをインストールしてGUIで管理できるようにする
- チュートリアル1「Hello World」
- チュートリアル2「Work queues」
- チュートリアル3「Publish/Subscribe」
- チュートリアル4「Routing」
- チュートリアル5「Topics」
- チュートリアル6「RPC」 ←いまここ
今回学ぶこと
- Remote procedure call(RPC)
- Callback queue
- 非同期のCallbackを実行する
Remote procedure call(RPC)
チュートリアル2では、複数のワーカーに長時間かかるタスクを配信するためワークキューの使い方について学んだ。
もしリモートコンピュータ上の関数を実行してから、その結果を待つ必要があるとした場合、ワークキューとは違った対応が必要になる。それを実現するために「Remote Procedure Call(RPC)」というパターンがある。
今回は、RPCシステムを構築する。ワークキューのチュートリアル同様に長時間の処理をさせる必要があるので、フィボナッチ数を返すRPCサービスを実装する。
Callback queue
RabbitMQを使うとRPCシステムを簡単に構築できる。
クライアントはリクエストメッセージを送信し、サーバはレスポンスメッセージを返す。レスポンスを受け取るために、リクエストとともにcallbackキューについての情報を送信する必要がある。サーバにcallbackキューにcallbackのメッセージを登録してもらうためだ。
(今回は、クライアントもサーバもメッセージの送受信をするのでProducer/ConsumerではなくServer/Clientと呼ぶ)
今回はExchangeなどは使わず、チュートリアル2で実装したデフォルトキューを使う。
// 無名のTemporary Queueを定義
ch.assertQueue('', { exclusive: true });
// rpc_queueキューにメッセージを送信。返信するキュー名を指定
ch.sendToQueue('rpc_queue', Buffer.from('10'), { replyTo: queue_name });
// callback_queueからのレスポンスを受けるコードがつづく
Message properties
AMQPプロトコルには、メッセージを送信するために14個のプロパティが用意されている。しかし、ほとんどの場合は以下の4つのプロパティだけで事足りる。
- persistent
- チュートリアル2で学んだメッセージを永続化するプロパティ
- { persistent: true }の場合、メッセージを永続化(ディスクに保存)する
- { persistent: false }(デフォルト)の場合、メモリ上にしか保存されないのでRabbitMQが落ちるとメッセージも消えてしまう
- contentType
- mime-typeを指定する
- たとえばJSONエンコーディングが必要なら { contentType: 'application/json' }と指定する
- replyTo
- callbackキューの名前を指定する
- { replyTo: 'cb_queue' }で、サーバ側の処理が終わったらcb_queueにメッセージを送る
- correlationId
- リクエストとRPCレスポンスの紐付けを行うためのユニークなID
- 詳しくは次の節で説明する
Correlation Id
さきほどのコードではRPCリクエストのたびにcallbackキューが作られてしまうので非効率だ。その代わりに、クライアントごとに1つのcallbackキューをつくる方法がオススメだ。
しかし、callbackキューを使うと新たな問題が発生する。レスポンスを受信するときに、どのリクエストに対するレスポンスなのかわからない。そこでcorrelationIdプロパティを使用する。
すべてのリクエストにユニークな値を設定することで、callbackキューでメッセージを受信したときにリクエストとレスポンスを関連付けできる。不明なcorrelationIdが指定された場合は、紐づくリクエストがないのでメッセージは安全に破棄される。(correlationIdを偽装して悪意のあるcallbackを実行させずに破棄できる)
エラーで落とすより、なぜcallbackキューで不明なメッセージを破棄(無視)すべきなのか。それはサーバサイドで競合状態が発生する可能性があるためだ。RPCサーバがレスポンス送信直後ではなく、リクエストに対する確認メッセージ(レスポンス)を送信する前に落ちた場合、再起動されたRPCサーバはリクエストを再度処理しようとする。そうなるとクライアント側で重複したレスポンスを適切に処理する必要があり、PRCは(理想的には)べき等であるためだ。
Summary
RPCシステムは以下のように動作する。
- クライアントが起動と同時に、無名で排他的なcallbackキューを作成する
- ch.assertQueue('', { exclusive: true }, callback)
- RPCサーバにリクエストを送るときに、クライアントは以下の2つのプロパティを指定してメッセージを送信する
- replyTo: callbackキューの名前を指定
- correlationId: リクエストごとにユニークな値を設定
- ch.sendToQueue('rpc_queue', Buffer.from('message'), { correlationId: 'xxx', replyTo: 'callback_queue' })
- RPCワーカー(RPCサーバ)は、rpc_queueにリクエストメッセージが登録されるまで待機する。リクエストがくるとジョブを実行し、replyToに指定されているキューを使用して、クライアントにレスポンスメッセージを返信する
- ch.sendToQueue(msg.properties.replyTo, Buffer.from('result'), { correlationId: msg.properties.correlationId })
- クライアントはcallbackキューにレスポンスが登録されるまで待機する。メッセージが到着すると、correlationIdプロパティをチェックし、リクエストの値と一致する場合に、レスポンスを処理する
- ch.consume(q.queue, msg => { if (msg.properties.correlationId === corr) { console.log('done'); } };
Putting it all together
チュートリアル6を最後までやると、最終的に以下のようになる。
フィボナッチ数を返す関数
正の整数のみを受け取ることを想定。
※ 大きな値にすると動作しない or かなり時間がかかるので注意
※ 1core/1GBのインスタンスで「50」を渡したらレスポンスが返ってこなくなった(30くらいでテストするのが妥当)
function fibonacci (n) {
if (n === 0 || n === 1) {
return n;
} else {
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
RPCサーバ(rpc_server.js)
// rpc_server.js
const amqp = require('amqplib/callback_api');
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
conn.createChannel((err, ch) => {
const q = 'rpc_queue';
// rpc_queue Queueを定義
ch.assertQueue(q, { durable: false });
// メッセージを受け取る上限は1つまで
ch.prefetch(1);
console.log(' [x] Awaiting RPC requests');
ch.consume(q, msg => {
const n = parseInt(msg.content.toString());
console.log(` [.] fib(${n})`);
// フィボナッチ数列の計算
const r = fibonacci(n);
// メッセージを送る
ch.sendToQueue(msg.properties.replyTo,
Buffer.from(r.toString()),
{ correlationId: msg.properties.correlationId });
// ackを送信
ch.ack(msg);
});
});
});
function fibonacci (n) {
if (n === 0 || n === 1) {
return n;
} else {
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
クライアント(rpc_client.js)
// rpc_client.js
const amqp = require('amqplib/callback_api');
const args = process.argv.slice(2);
if (args.length === 0) {
console.log('Usage: rpc_client.js {num}');
process.exit(1);
}
amqp.connect('amqp://admin:admin@localhost:5672', (err, conn) => {
conn.createChannel((err, ch) => {
// 無名のTemporary Queueを定義
ch.assertQueue('', { exclusive: true }, (err, q) => {
const corr = generateUuid();
const num = parseInt(args[0]);
ch.consume(q.queue, msg => {
if (msg.properties.correlationId === corr) {
console.log(` [.] Got ${msg.content.toString()}`);
setTimeout(() => {
conn.close();
process.exit(0);
}, 500);
}
}, { noAck: true });
ch.sendToQueue('rpc_queue', Buffer.from(num.toString()), {
correlationId: corr,
replyTo: q.queue
});
});
});
});
function generateUuid() {
return Math.random().toString() + Math.random().toString() + Math.random().toString();
}
実行する
# RPCサーバを起動
shell1$ node rpc_server.js
[x] Awaiting RPC requests
# クライアントを起動し、処理を実行
shell2$ node rpc_client.js 30
# すぐ表示される
shell1
[.] fib(30)
# RPCサーバのレスポンスが来たら表示
shell2
[.] Got 832040
今回のコードはシンプルだが、いろいろ問題を抱えている。
- 実行中のRPCサーバがない場合、クライアントはどう振る舞うべきか?
- クライアントはRPCサーバのレスポンスが遅い場合、タイムアウトにするべきか?
- RPCサーバで例外が発生した場合、クライアントにどのように伝えるべきか?
- 処理前に不正なメッセージを受け取ることを防ぐ必要がある
などなど。
以上でRabbitMQのチュートリアルは終了。
お疲れさまでした。
以上
written by @bc_rikko
0 件のコメント :
コメントを投稿