- その1 送信と受信
- その2 ACKとpersistent ←この記事
- その3 エクスチェンジと一時的なキュー
- その4 RPC
皆様どうも、こんにちは!
こまりのバックエンドエンジニア、桑木です。
前回の記事では、MQ(メッセージキューイング)の基本であるキューへの出し入れを試しました。ローカルで試す限りでは問題なく動作したと思います。しかし、現実のシステムでは予期しない理由によりプログラムやネットワークが正常に動作しないことは珍しくありません。ということは、基本であるキューへの出し入れすら正常にできるとは限らないことになります。
今回は、そういった障害への対応や、前回の記事であえて言及しなかったことなどについて説明していきます。
この記事の内容
メッセージの紛失と重複
MQに限った話ではありませんが、別のプログラムやシステムと通信して何らかの処理をする場合、障害が発生するタイミングや障害の内容によってはデータの不整合が起こります。MQでいえばメッセージの紛失や重複(二重処理)です。
RabbitMQではメッセージの送受信に、よくあるack
応答の仕組みを使うこともできるので紛失は防ぐことができます。しかし、再送によって同じメッセージが重複する、ということは原理上避けられないので、冪等性の確保や楽観的ロックなど一般的な分散システムとしての対策をする必要があります。
ack 応答の利用もしくは不使用
RabbitMQはキューからメッセージを取り出してクライアントに送信した後、ack
応答があるまで実際にはメッセージを消去しません。nack
応答やエラー発生時には、メッセージをキューの元の位置に戻します。
オプションでack
応答を待たないようにすることもできます。consume
メソッドのオプションにnoAck: true
を指定した場合、RabbitMQはメッセージをネットワークに送信したら直ちに消去します。
逆にキューにメッセージを入れる時、通常のモード(amqplibのcreateChannel
メソッドで作成したチャネル)ではRabbitMQはack
などの応答を返しません。RabbitMQにack
応答を要求する場合はcreateConfirmChannel
で作成した確認モードのチャネルを使用します。
メッセージの持続性
RabbitMQではキューにメッセージを入れる時、「一時的なメッセージ」にするか「持続的(persistent)なメッセージ」にするか指定できます。一時的なメッセージはRabbitMQが再起動したら消えますが、持続的なメッセージは消えません。そのため持続的なメッセージはキューに入った時にファイルシステムに記録されます。
前述の確認モードチャネルで持続的なメッセージを送信した場合、ファイルにフラッシュ(fsyncシステムコール)してからack
応答が返されます。RabbitMQの負荷状況によっては複数メッセージをまとめてフラッシュするので、ack
応答まで数百ミリ秒掛かる可能性があります。
システムパフォーマンス(確実性と速度のトレードオフ、および並列化)
MQは通信方式の1つである、と前回の記事で述べました。システム開発者は通信を担うミドルウェアに「確実性」と「速度」を期待します。もちろん現実には、どちらかの性能を高めればもう片方の性能は低くなります。
前述のnoAck
や一時的なメッセージを使用するとパフォーマンスは向上しますが、その代わりにメッセージ紛失の可能性は高まります。システムの設計や状況によっては紛失が許容できるメッセージもあるので、メッセージの用途に応じて使い分けると良いでしょう。例として、RPCの通信にRabbitMQを利用しているシステムでRabbitMQサーバが停止するような事態が起こったとします。持続的なメッセージであれば再起動後にも消えずに残っていますが、RPCは既にタイムアウトしているのでメッセージは不要になると考えられます。
また、システム全体のパフォーマンスは通信だけで決まるものではありません。システムの性質にもよりますが、複数のメッセージをまとめて処理するとコンピュータリソースが有効に使える場面は多いです。この場合、適度な並列化はパフォーマンスの向上をもたらします。
RabbitMQはあえて指定しない限りconsume
したクライアント(コンシューマ)に送信可能なメッセージを全て送信します。未応答メッセージの最大数(プリフェッチカウント)を指定することでRabbitMQが送信するメッセージ数を制御して、クライアントが同時に処理するメッセージ数を制限することができます。
RabbitMQ管理画面
インストールしたRabbitMQの設定を変更していなければ、ブラウザからアクセスできる管理画面が15672番ポートに開いていると思います。ローカルなら http://localhost:15672/ でアクセスできます。初期ユーザとして、ユーザ名とパスワードがともにguest
のユーザが存在するので、それでログインします。
管理画面上部の'Queues'タブで現在のキューの状況を確認することができます。キューが存在すればキューの名前が一覧表示されていると思います。
ここからは管理画面でキューの状態を適宜確認しながら、Node.jsでRabbitMQを操作していきます。
キューの作成
前回の記事では、connect
メソッドやassertQueue
メソッドについて言及しませんでしたが、その動作を確認しておきます。
次のコードは、RabbitMQに接続してqueue-test
という名前のキューを(なければ)作成します。
const amqp = require("amqplib");
const QUEUE = "queue-test";
(async function main() {
const connection = await amqp.connect("");
const channel = await connection.createChannel();
// キューがなければ作成される
await channel.assertQueue(QUEUE);
await channel.close();
await connection.close();
})();
connect
メソッドの第1引数には接続情報(プロトコル、ホスト、ユーザ等)を指定します。省略した部分はデフォルト値が使用されるので、完全に省略するとamqp://guest:guest@localhost:5672
が使用されます。
そして、キューを作成するためにはチャネルのassertQueue
メソッドを使用します。assertという名称からは想像しにくいかもしれませんが、このメソッドは指定した通りのキューが存在するか確認するメソッドです。キューの名前を第1引数に指定して、オプションがあれば第2引数に指定します。既に存在するキューとオプションの値が一致すれば何も起こりませんが、異なるとエラーが発生します。
ややこしいことに、指定した名前のキューが存在しなければ新しく作成されます。そして、amqplibではこのメソッド以外にキューを作成する方法はありません。
このコードを実行すると作成したキューが管理画面に表示されると思います。
確認モードでキューにメッセージを入れる
次のコードは前回の記事のコードを変更して、確認モードチャネルを使用するようにしたものです。ack
応答があれば画面にACK
と出力します。
// send-ack.js
const amqp = require("amqplib");
const QUEUE = "queue-test";
const NAME = process.argv[2] || "NoName";
function sleep(ms, random = 0) {
return new Promise(resolve => setTimeout(resolve, ms + Math.random() * random));
}
(async function main() {
const connection = await amqp.connect("");
const channel = await connection.createConfirmChannel(); // 確認モードを使用する
await channel.assertQueue(QUEUE);
let count = 1;
while (count <= 100) {
const message = `${NAME}${count}`;
// 確認モードチャネルのsendToQueueにはコールバック関数を指定する
channel.sendToQueue(QUEUE, Buffer.from(message), {}, function (err) {
// コールバック関数は ack もしくは nack 応答があれば実行される
console.log(new Date().getTime(), err ? "NACK" : "ACK", err);
});
console.log(new Date().getTime(), "Send", message);
await sleep(1500, 1000);
count++;
}
// 全ての応答を待つ
await channel.waitForConfirms();
await channel.close();
await connection.close();
})();
createChannel
メソッドをcreateConfirmChannel
メソッドに変更し、sendToQueue
メソッドの呼び出しにコールバック関数を指定しています。このコールバック関数はack
もしくはnack
応答時に呼び出されます。Node.jsの多くの非同期メソッドと同じように、コールバックの第1引数は、ack
応答であればfalsyな値、nack
応答であればtruthyな値になります。
また、チャネルを閉じる前にwaitForConfirms
メソッドを実行しています。このメソッドは確認モードチャネルで送信した全てのメッセージに応答(ack
やnack
)されるまで待機します。もし未応答のメッセージがある状態でチャネルをclose
すると、sendToQueue
に指定したコールバック関数はエラーオブジェクト(truthyな値)を引数にして呼び出されます。
$ node send-ack.js
1585879776624 Send NoName1
1585879776626 ACK null
1585879778687 Send NoName2
1585879778690 ACK null
1585879781011 Send NoName3
1585879781016 ACK null
...
基本的にack
応答がありますが、RabbitMQの仕様ではネットワークエラーやRabbitMQの内部エラー時にnack
応答になるようです。
持続的(persistent)メッセージをキューに入れる
上のsend-ack.js
のコードを修正して、sendToQueue
メソッドのオプションにpersistent: true
を指定することで「持続的なメッセージ」になります。amqplibでは特に指定しなければ「一時的なメッセージ」になります。
// send-ack-persistent.js
- channel.sendToQueue(QUEUE, Buffer.from(message), {}, function (err) {
+ channel.sendToQueue(QUEUE, Buffer.from(message), { persistent: true }, function (err) {
$ node send-ack-persistent.js
1585879788498 Send NoName1
1585879788512 ACK null
1585879790700 Send NoName2
1585879790709 ACK null
1585879792848 Send NoName3
1585879792860 ACK null
...
一時的メッセージの時はack
応答まで数ミリ秒だったのに対して、持続的メッセージは十数ミリ秒掛かっていることが確認できます。
この状態でRabbitMQ管理画面からqueue-test
キューの詳細を閲覧すると、メッセージ数がTotal 200, Persistent 100であることが確認できます。
試しにRabbitMQを再起動してみると、Total 100, Persistent 100となって一時的なメッセージが消えていることが確認できます。
キューからメッセージを取り出して、一部を nack する
次のコードは前回の記事のコードを変更して、受信したメッセージを半分の確率でnack
するようにしたものです。前回のコードでは全てack
していました。
// receive-nack.js
const amqp = require("amqplib");
const QUEUE = "queue-test";
function sleep(ms, random = 0) {
return new Promise(resolve => setTimeout(resolve, ms + Math.random() * random));
}
(async function main() {
const connection = await amqp.connect("");
const channel = await connection.createChannel();
await channel.prefetch(1); // 未応答の最大メッセージ数を 1 つにする
await channel.assertQueue(QUEUE);
await channel.consume(QUEUE, async function (msg) {
console.log(new Date().getTime(), "Receive", msg.content.toString());
await sleep(1000, 1000);
// 半分の確率で nack する
if (Math.random() > 0.5) {
console.log(new Date().getTime(), "ACK");
channel.ack(msg);
} else {
console.log(new Date().getTime(), "NACK");
channel.nack(msg);
}
});
})();
前回の記事では特に説明していませんでしたが、チャネルのprefetch
メソッドでプリフェッチカウントを1つに指定しています。これを指定しないと、一度に全てのメッセージを受信して、メッセージの個数分のコールバック関数が一斉に呼び出されることになります。
このコードではチャネルに対してプリフェッチカウントを設定しているように見えますが、この設定値はプリフェッチカウントを設定した状態でconsume
したコンシューマに対して適応されます。追加のオプションでチャネルに対するプリフェッチカウントを設定することもできます。
メッセージへの応答は、チャネルのack
メソッドやnack
メソッドを使用します。このメソッドの第1引数には、コールバックで渡されたメッセージオブジェクトを指定して応答する対象のメッセージを特定します。ackAll
やnackAll
といった未応答の全てのメッセージを対象とするメソッドもあります。
$ node receive-nack.js
1585817129472 Receive NoName1
1585817130855 ACK
1585817130861 Receive NoName2
1585817132650 ACK
1585817132654 Receive NoName3
1585817134172 NACK
1585817134176 Receive NoName3
1585817135272 ACK
1585817135275 Receive NoName4
1585817136324 NACK
1585817136328 Receive NoName4
1585817137427 ACK
1585817137429 Receive NoName5
...
nack
したメッセージは、可能な限りキューの同じ位置に戻ります。キューの先頭から取り出したものをキューの先頭に戻しているので、nack
した瞬間に同じメッセージを受信することになります。
未応答の状態で切断してみる
上のコードではウェイトの後にack
かnack
を実行していますが、前回のコードからack
メソッドの行をコメントアウトしてメッセージに応答しないようにします。
// receive-none.js
- channel.ack(msg);
+ // channel.ack(msg);
$ node receive-none.js
1585908967319 Receive NoName1
プリフェッチカウントを1に設定した上で受信したメッセージに応答していないので、これ以上は何も起きなくなります。この状態で管理画面からキューの状況を確認すると、"Total 100, Ready 99, Unacked 1"と送信可能なメッセージが99、未応答のメッセージが1つあると表示されています。
ここでCtrl
+C
を押してプログラムを強制終了すると、"Total 100, Ready 100, Unacked 0"となってメッセージがキューに戻っていることが確認できます。
とりあえず障害対策やパフォーマンスに関係する機能について試しました
ここまでに説明した機能を使えば、実用に耐えるシステムも十分に開発できると思います。しかし、単純に2つのシステム間でメッセージを送受信するだけではなく、配信やルーティングといった効率的に複数のシステム間でメッセージをやりとりする機能がMQには存在します。
次回からは、そういったRabbitMQの便利機能について説明していきたいと思います。