Node.jsで習得するRabbitMQによるメッセージキューイング その2 (ACKとpersistent)

スタッフブログ

皆様どうも、こんにちは!
こまりのバックエンドエンジニア、桑木です。

前回の記事では、MQ(メッセージキューイング)の基本であるキューへの出し入れを試しました。ローカルで試す限りでは問題なく動作したと思います。しかし、現実のシステムでは予期しない理由によりプログラムやネットワークが正常に動作しないことは珍しくありません。ということは、基本であるキューへの出し入れすら正常にできるとは限らないことになります。

今回は、そういった障害への対応や、前回の記事であえて言及しなかったことなどについて説明していきます。

メッセージの紛失と重複

MQに限った話ではありませんが、別のプログラムやシステムと通信して何らかの処理をする場合、障害が発生するタイミングや障害の内容によってはデータの不整合が起こります。MQでいえばメッセージの紛失や重複(二重処理)です。

RabbitMQではメッセージの送受信に、よくあるack応答の仕組みを使うこともできるので紛失は防ぐことができます。しかし、再送によって同じメッセージが重複する、ということは原理上避けられないので、冪等性の確保や楽観的ロックなど一般的な分散システムとしての対策をする必要があります。

ack 応答の利用もしくは不使用

RabbitMQはキューからメッセージを取り出してクライアントに送信した後、ack応答があるまで実際にはメッセージを消去しません。nack応答やエラー発生時には、メッセージをキューの元の位置に戻します。

オプションでack応答を待たないようにすることもできます。consumeメソッドのオプションにnoAck: trueを指定した場合、RabbitMQはメッセージをネットワークに送信したら直ちに消去します。

逆にキューにメッセージを入れる時、通常のモード(amqplibのcreateChannelメソッドで作成したチャネル)ではRabbitMQはackなどの応答を返しません。RabbitMQにack応答を要求する場合はcreateConfirmChannelで作成した確認モードのチャネルを使用します。

メッセージの持続性

RabbitMQではキューにメッセージを入れる時、「一時的なメッセージ」にするか「持続的(persistent)なメッセージ」にするか指定できます。一時的なメッセージはRabbitMQが再起動したら消えますが、持続的なメッセージは消えません。そのため持続的なメッセージはキューに入った時にファイルシステムに記録されます。

前述の確認モードチャネルで持続的なメッセージを送信した場合、ファイルにフラッシュ(fsyncシステムコール)してからack応答が返されます。RabbitMQの負荷状況によっては複数メッセージをまとめてフラッシュするので、ack応答まで数百ミリ秒掛かる可能性があります。

システムパフォーマンス(確実性と速度のトレードオフ、および並列化)

MQは通信方式の1つである、と前回の記事で述べました。システム開発者は通信を担うミドルウェアに「確実性」と「速度」を期待します。もちろん現実には、どちらかの性能を高めればもう片方の性能は低くなります。

前述のnoAckや一時的なメッセージを使用するとパフォーマンスは向上しますが、その代わりにメッセージ紛失の可能性は高まります。システムの設計や状況によっては紛失が許容できるメッセージもあるので、メッセージの用途に応じて使い分けると良いでしょう。例として、RPCの通信にRabbitMQを利用しているシステムでRabbitMQサーバが停止するような事態が起こったとします。持続的なメッセージであれば再起動後にも消えずに残っていますが、RPCは既にタイムアウトしているのでメッセージは不要になると考えられます。

また、システム全体のパフォーマンスは通信だけで決まるものではありません。システムの性質にもよりますが、複数のメッセージをまとめて処理するとコンピュータリソースが有効に使える場面は多いです。この場合、適度な並列化はパフォーマンスの向上をもたらします。

RabbitMQはあえて指定しない限りconsumeしたクライアント(コンシューマ)に送信可能なメッセージを全て送信します。未応答メッセージの最大数(プリフェッチカウント)を指定することでRabbitMQが送信するメッセージ数を制御して、クライアントが同時に処理するメッセージ数を制限することができます。

RabbitMQ管理画面

インストールしたRabbitMQの設定を変更していなければ、ブラウザからアクセスできる管理画面が15672番ポートに開いていると思います。ローカルなら http://localhost:15672/ でアクセスできます。初期ユーザとして、ユーザ名とパスワードがともにguestのユーザが存在するので、それでログインします。

管理画面上部の'Queues'タブで現在のキューの状況を確認することができます。キューが存在すればキューの名前が一覧表示されていると思います。

All Queues(0) ... no queues ...

ここからは管理画面でキューの状態を適宜確認しながら、Node.jsでRabbitMQを操作していきます。

キューの作成

前回の記事では、connectメソッドやassertQueueメソッドについて言及しませんでしたが、その動作を確認しておきます。

次のコードは、RabbitMQに接続してqueue-testという名前のキューを(なければ)作成します。

const amqp = require("amqplib");
const QUEUE = "queue-test";

(async function main() {
    const connection = await amqp.connect("");
    const channel = await connection.createChannel();

    // キューがなければ作成される
    await channel.assertQueue(QUEUE);

    await channel.close();
    await connection.close();
})();

connectメソッドの第1引数には接続情報(プロトコル、ホスト、ユーザ等)を指定します。省略した部分はデフォルト値が使用されるので、完全に省略するとamqp://guest:guest@localhost:5672が使用されます。

そして、キューを作成するためにはチャネルのassertQueueメソッドを使用します。assertという名称からは想像しにくいかもしれませんが、このメソッドは指定した通りのキューが存在するか確認するメソッドです。キューの名前を第1引数に指定して、オプションがあれば第2引数に指定します。既に存在するキューとオプションの値が一致すれば何も起こりませんが、異なるとエラーが発生します。

ややこしいことに、指定した名前のキューが存在しなければ新しく作成されます。そして、amqplibではこのメソッド以外にキューを作成する方法はありません。

このコードを実行すると作成したキューが管理画面に表示されると思います。

All queues(1) - queue-test

確認モードでキューにメッセージを入れる

次のコードは前回の記事のコードを変更して、確認モードチャネルを使用するようにしたものです。ack応答があれば画面にACKと出力します。

// send-ack.js
const amqp = require("amqplib");
const QUEUE = "queue-test";
const NAME = process.argv[2] || "NoName";

function sleep(ms, random = 0) {
    return new Promise(resolve => setTimeout(resolve, ms + Math.random() * random));
}

(async function main() {
    const connection = await amqp.connect("");
    const channel = await connection.createConfirmChannel(); // 確認モードを使用する
    await channel.assertQueue(QUEUE);

    let count = 1;
    while (count <= 100) {
        const message = `${NAME}${count}`;

        // 確認モードチャネルのsendToQueueにはコールバック関数を指定する
        channel.sendToQueue(QUEUE, Buffer.from(message), {}, function (err) {
            // コールバック関数は ack もしくは nack 応答があれば実行される
            console.log(new Date().getTime(), err ? "NACK" : "ACK", err);
        });

        console.log(new Date().getTime(), "Send", message);
        await sleep(1500, 1000);
        count++;
    }
    // 全ての応答を待つ
    await channel.waitForConfirms();

    await channel.close();
    await connection.close();
})();

createChannelメソッドをcreateConfirmChannelメソッドに変更し、sendToQueueメソッドの呼び出しにコールバック関数を指定しています。このコールバック関数はackもしくはnack応答時に呼び出されます。Node.jsの多くの非同期メソッドと同じように、コールバックの第1引数は、ack応答であればfalsyな値、nack応答であればtruthyな値になります。

また、チャネルを閉じる前にwaitForConfirmsメソッドを実行しています。このメソッドは確認モードチャネルで送信した全てのメッセージに応答(acknack)されるまで待機します。もし未応答のメッセージがある状態でチャネルをcloseすると、sendToQueueに指定したコールバック関数はエラーオブジェクト(truthyな値)を引数にして呼び出されます。

$ node send-ack.js
1585879776624 Send NoName1
1585879776626 ACK null
1585879778687 Send NoName2
1585879778690 ACK null
1585879781011 Send NoName3
1585879781016 ACK null
...

基本的にack応答がありますが、RabbitMQの仕様ではネットワークエラーやRabbitMQの内部エラー時にnack応答になるようです。

持続的(persistent)メッセージをキューに入れる

上のsend-ack.jsのコードを修正して、sendToQueueメソッドのオプションにpersistent: trueを指定することで「持続的なメッセージ」になります。amqplibでは特に指定しなければ「一時的なメッセージ」になります。

// send-ack-persistent.js
-       channel.sendToQueue(QUEUE, Buffer.from(message), {}, function (err) {
+       channel.sendToQueue(QUEUE, Buffer.from(message), { persistent: true }, function (err) {
$ node send-ack-persistent.js
1585879788498 Send NoName1
1585879788512 ACK null
1585879790700 Send NoName2
1585879790709 ACK null
1585879792848 Send NoName3
1585879792860 ACK null
...

一時的メッセージの時はack応答まで数ミリ秒だったのに対して、持続的メッセージは十数ミリ秒掛かっていることが確認できます。

この状態でRabbitMQ管理画面からqueue-testキューの詳細を閲覧すると、メッセージ数がTotal 200, Persistent 100であることが確認できます。

file

試しにRabbitMQを再起動してみると、Total 100, Persistent 100となって一時的なメッセージが消えていることが確認できます。

file

キューからメッセージを取り出して、一部を nack する

次のコードは前回の記事のコードを変更して、受信したメッセージを半分の確率でnackするようにしたものです。前回のコードでは全てackしていました。

// receive-nack.js
const amqp = require("amqplib");
const QUEUE = "queue-test";

function sleep(ms, random = 0) {
    return new Promise(resolve => setTimeout(resolve, ms + Math.random() * random));
}

(async function main() {
    const connection = await amqp.connect("");
    const channel = await connection.createChannel();
    await channel.prefetch(1); // 未応答の最大メッセージ数を 1 つにする
    await channel.assertQueue(QUEUE);

    await channel.consume(QUEUE, async function (msg) {
        console.log(new Date().getTime(), "Receive", msg.content.toString());
        await sleep(1000, 1000);
        // 半分の確率で nack する
        if (Math.random() > 0.5) {
            console.log(new Date().getTime(), "ACK");
            channel.ack(msg);
        } else {
            console.log(new Date().getTime(), "NACK");
            channel.nack(msg);
        }
    });
})();

前回の記事では特に説明していませんでしたが、チャネルのprefetchメソッドでプリフェッチカウントを1つに指定しています。これを指定しないと、一度に全てのメッセージを受信して、メッセージの個数分のコールバック関数が一斉に呼び出されることになります。

このコードではチャネルに対してプリフェッチカウントを設定しているように見えますが、この設定値はプリフェッチカウントを設定した状態でconsumeしたコンシューマに対して適応されます。追加のオプションでチャネルに対するプリフェッチカウントを設定することもできます。

メッセージへの応答は、チャネルのackメソッドやnackメソッドを使用します。このメソッドの第1引数には、コールバックで渡されたメッセージオブジェクトを指定して応答する対象のメッセージを特定します。ackAllnackAllといった未応答の全てのメッセージを対象とするメソッドもあります。

$ node receive-nack.js
1585817129472 Receive NoName1
1585817130855 ACK
1585817130861 Receive NoName2
1585817132650 ACK
1585817132654 Receive NoName3
1585817134172 NACK
1585817134176 Receive NoName3
1585817135272 ACK
1585817135275 Receive NoName4
1585817136324 NACK
1585817136328 Receive NoName4
1585817137427 ACK
1585817137429 Receive NoName5
...

nackしたメッセージは、可能な限りキューの同じ位置に戻ります。キューの先頭から取り出したものをキューの先頭に戻しているので、nackした瞬間に同じメッセージを受信することになります。

未応答の状態で切断してみる

上のコードではウェイトの後にacknackを実行していますが、前回のコードからackメソッドの行をコメントアウトしてメッセージに応答しないようにします。

// receive-none.js
-       channel.ack(msg);
+       // channel.ack(msg);
$ node receive-none.js
1585908967319 Receive NoName1

プリフェッチカウントを1に設定した上で受信したメッセージに応答していないので、これ以上は何も起きなくなります。この状態で管理画面からキューの状況を確認すると、"Total 100, Ready 99, Unacked 1"と送信可能なメッセージが99、未応答のメッセージが1つあると表示されています。

file

ここでCtrl+Cを押してプログラムを強制終了すると、"Total 100, Ready 100, Unacked 0"となってメッセージがキューに戻っていることが確認できます。

file

とりあえず障害対策やパフォーマンスに関係する機能について試しました

ここまでに説明した機能を使えば、実用に耐えるシステムも十分に開発できると思います。しかし、単純に2つのシステム間でメッセージを送受信するだけではなく、配信やルーティングといった効率的に複数のシステム間でメッセージをやりとりする機能がMQには存在します。

次回からは、そういったRabbitMQの便利機能について説明していきたいと思います。