Node.jsで習得するRabbitMQによるメッセージキューイング その3 (エクスチェンジと一時的なキュー)

スタッフブログ

皆様どうも、こんにちは!
こまりの自称バックエンドエンジニア、桑木です。

前回までの記事では、メッセージをキューに入れたり出したりする処理について、いくつかのパターンを試しました。これによってシステムで利用するための最低限の知識は得られたと思います。

今回からは、より効率的にメッセージをやりとりするための機能について説明していきます。

プロセス(コンシューマ)専用のキュー

ある1つのキューをconsumeしているプロセス(コンシューマ)が複数ある場合、そのキューに入ったある特定のメッセージを受信できるのはどれか1つのコンシューマだけです。この動作は結果的に負荷分散として機能しますが、特定のコンシューマに宛ててメッセージを送信する目的には適していません。

そこで、コンシューマごとに専用のキューを用意して独占的(排他的)に利用させることで、その特定のコンシューマにメッセージを受信させることができます。共通のキューと両方consumeすれば、負荷分散的な処理をしつつ個別のプロセス間メッセージをやり取りすることができるようになります。

こういった専用のキューは予め用意しておくことが難しいので、プロセス起動時に一意な名前のキューを作成し、プロセス終了時にはキューを削除する、という使い方をすることになると思います。RabbitMQはキューの作成時にキュー名の指定がない場合、一意な名前を生成してキューを作成します。また、キューのオプションで条件によって自動的にキューを削除する指定が可能です。

エクスチェンジ(交換局, exchange)

さて、これまでの記事ではキューにメッセージを追加するために、キューの名前を指定してメッセージを追加していました。RabbitMQではキューにメッセージを追加する別の方法として、エクスチェンジという仕組みが存在します。これはまずエクスチェンジに対してメッセージを送信し、エクスチェンジが条件に応じてキューにメッセージを配信(追加)する方法です。これは一般には出版購読型モデルやルーティングと呼ばれる方式を実現するものです。

この方式では、キューとエクスチェンジはバインディングによって紐付けられます。エクスチェンジに送信されたメッセージは、エクスチェンジやバインディングの設定値に応じてキューに配信されるかどうか判定されます。

キューやエクスチェンジにバインディングはいくつ設定してもよく、複数のバインディングにメッセージが適合すればそれぞれのキューにメッセージが追加されます。同じキューに複数のバインディングが適合する場合でも、同じメッセージが複数追加されることはありません。特に指定しなければ、どのバインディングにも適合しなかったメッセージは破棄されます。

また、エクスチェンジ同士をバインディングすることもできます。エクスチェンジをループ状にバインディングすることもでき、その時は同じエクスチェンジに到達した時点で判定を終了します。

エクスチェンジの種類とルーティングキー

RabbitMQには標準でFanout, Direct, Topic, Headersの4種類のエクスチェンジが用意されています。これらはメッセージがバインディングに適合するかどうか判定する方法に違いがあります。

バインディングにはルーティングキーなど判定用の値を設定しておき、メッセージごとに付加されたこれらの値と比較して適合するかどうかが判定されます。

Fanoutエクスチェンジ

適合するか判定すると言いつつも、Fanoutエクスチェンジでは全てのメッセージが無条件にバインディングに適合します。ルーティングキーは無視されます。

Directエクスチェンジ

Directエクスチェンジでは、メッセージのルーティングキーが完全に一致した場合に適合していると判定されます。

RabbitMQは無名のDirectエクスチェンジをデフォルトで持っており、全てのキューはキュー名をルーティングキーにしたバインディングが暗黙的に作成されています。よって、無名(空文字列)のエクスチェンジにキュー名をルーティングキーに指定したメッセージをRabbitMQに送信すると、そのキューにメッセージが追加されます。

Topicエクスチェンジ

前述のDirectエクスチェンジでは完全に一致した時だけ適合していましたが、Topicエクスチェンジではバインディングの設定にキーワード単位のワイルドカードを使用することができます。

メッセージのルーティングキーはkomari.okayama.japanのように、0個以上の「キーワード」をピリオド.で連結したものを使用します。連結後の文字数制限は255文字です。

バインディングの設定(ルーティングパターン)にも「キーワード」をピリオドで連結したものを設定しますが、*#の特別な「キーワード」を使用することができます。*は1つの任意のキーワード、#は0個以上の任意のキーワードとマッチします。これらはキーワード単位で判定されることに注意してください。

例えばメッセージのkomari.okayama.japanというルーティングキーは、komari.okayama.japan, komari.*.japan, komari.*.*, *.*.*, komari.#, #.komari.#, #.komari.#.okayama.#, #などにマッチします。しかし、ko*.okayama.japan, ko#, komari.*, komari.*.*.*, #.okayama.#.komari.#, *などにはマッチしません。

Headersエクスチェンジ

Headersエクスチェンジではルーティングキーは無視され、メッセージのヘッダ情報で判定されます。AMQPのメッセージにはメールやHTTPのようなヘッダが存在するので、これに任意のヘッダ情報を追加して利用します。

バインディングにはヘッダ名とその値のペアを0個以上設定することができ、メッセージの当該ヘッダの値が完全に一致すれば合致していると判定されます。複数のヘッダ項目を設定している場合、バインディングのx-matchヘッダにallもしくはanyを設定することで、全てのヘッダ項目が一致する必要があるのか、どれか1つのヘッダ項目が一致すれば良いのか指定することができます。

ヘッダ名には文字列しか使用できませんが、値には文字列以外にも数値型やブール型などいくつかの型が使用できることになっています。また文字数の制限などもないので、Topicエクスチェンジよりも柔軟なルーティングが可能です。しかし、RabbitMQでは処理効率の観点からヘッダ情報をあまり大きくすることは推奨されていません。他の3種類のエクスチェンジだけでもそれなりのことができるので、Headersエクスチェンジを利用する場合でも他の種類のエクスチェンジと併用できないか検討した方が良いかもしれません。

エクスチェンジにメッセージを送信する

amqplibではキューの作成にassertQueueメソッドを使用したのと同じように、エクスチェンジの作成にはassertExchangeメソッドを使用します。またエクスチェンジにメッセージを送るのもsendToQueueメソッドではなくpublishメソッドを使用します。

次のコードは、以前の記事のメッセージをキューに入れるコードを、エクスチェンジにメッセージを送信するように変更したものです。キュー用のメソッドをエクスチェンジ用のメソッドに変更したくらいで、ほとんど違いはありません。

// send-exchange.js
const amqp = require("amqplib");
// const QUEUE = "queue-test";
const EX = "exchange-test";
const NAME = process.argv[2] || "NoName";

function sleep(ms, random = 0) {
    return new Promise(resolve => setTimeout(resolve, ms + Math.random() * random));
}

(async function main() {
    const connection = await amqp.connect("");
    const channel = await connection.createChannel();
    // await channel.assertQueue(QUEUE);
    await channel.assertExchange(EX, "fanout"); // エクスチェンジの作成

    let count = 1;
    while (count <= 100) {
        const message = `${NAME}${count}`;
        // channel.sendToQueue(QUEUE, Buffer.from(message));
        channel.publish(EX, "", Buffer.from(message)); // エクスチェンジにメッセージを送る

        console.log(new Date().getTime(), "Send", message);
        await sleep(1500, 1000);
        count++;
    }

    await channel.close();
    await connection.close();
})();

エクスチェンジを作成するassertExchangeメソッドでは第2引数にエクスチェンジの種類を指定します。このコードではfanoutを指定しています。

エクスチェンジにメッセージを送信するpublishメソッドでは第2引数にメッセージのルーティングキーを指定します。Fanoutエクスチェンジはルーティングキーを無視するので、とりあえず空文字列を指定しています。

$ node send-exchange.js
1592285844778 Send NoName1
1592285846672 Send NoName2
1592285848232 Send NoName3
...

All queues(1). Messages Total 0
エクスチェンジには何もバインディングしていないので、メッセージはRabbitMQにより破棄されます。

専用キュー経由でエクスチェンジから受信する

次のコードは以前の記事のメッセージを受信するコードを改変して、エクスチェンジのメッセージを受信するようにしたものです。キューとエクスチェンジのバインディングにはbindQueueメソッドを使用します。

// receive-exchange.js
const amqp = require("amqplib");
const EX = "exchange-test";

(async function main() {
    const connection = await amqp.connect("");
    const channel = await connection.createChannel();
    const resQueue = await channel.assertQueue("", { exclusive: true }); // キューを作成

    await channel.assertExchange(EX, "fanout"); // エクスチェンジを作成
    await channel.bindQueue(resQueue.queue, EX, ""); // キューをエクスチェンジにバインド

    await channel.consume(resQueue.queue, async function (msg) {
        console.log(new Date().getTime(), "Receive", msg.content.toString());
        channel.ack(msg);
    });
})();

キューを作成するassertQueueメソッドの第1引数にはキュー名を指定しますが、空文字列を指定することでRabbitMQがランダム生成した名前が使用されます。この時生成されたキュー名は戻り値のqueueプロパティで参照できます。

このメソッドで指定しているexclusive: trueオプションは、そのキューを排他的なキューにします。このオプションが指定されたキューは、このキューを作成したコネクション以外では使えなくなります。よって、コネクションが閉じられるとキューも削除されます。

キューをエクスチェンジにバインドするbindQueueメソッドの引数は、前から順にキュー名、エクスチェンジ名、ルーティングキーを指定します。Fanoutエクスチェンジはルーティングキーを無視するので空文字列を指定しています。

先程のsend-exchange.jsを起動して、時間差でこのreceive-exchange.jsを2つ起動してみます。

$ node send-exchange.js
1592289869404 Send NoName1
1592289871831 Send NoName2
1592289873470 Send NoName3
1592289875871 Send NoName4
1592289878077 Send NoName5
1592289879679 Send NoName6
1592289881250 Send NoName7
1592289882887 Send NoName8
1592289884995 Send NoName9
1592289887150 Send NoName10
1592289889214 Send NoName11
1592289891430 Send NoName12
...
$ node receive-exchange.js
1592289879684 Receive NoName6
1592289881254 Receive NoName7
1592289882889 Receive NoName8
1592289884998 Receive NoName9
1592289887153 Receive NoName10
1592289889215 Receive NoName11
1592289891439 Receive NoName12
...
$ node receive-exchange.js
1592289885002 Receive NoName9
1592289887154 Receive NoName10
1592289889215 Receive NoName11
1592289891439 Receive NoName12
...

All queues(3). amq.gen-..., amq.gen-...
プログラムを実行するたびにキューを作成してバインドしているので、バインド前のメッセージを受信することはありません。そしてFanoutエクスチェンジはバインディングされている全てのキューにメッセージを入れるので、同じメッセージをそれぞれのreceive-exchange.jsが受信しています。

ここでreceive-exchange.jsの1つをCtrl+Cで終了させると、exclusiveオプションの影響によりキューが消えることが確認できます。
All queues(2). amq.gen-...

このプログラムではメッセージの受信者が自身でエクスチェンジにバインドしています。この場合、メッセージの送信者は誰が受信しているか気にする必要がなくなります。このように送信者は一方的に送信し、それを受信したい者だけが宣言して受信する方式のことを「出版購読型モデル」などと呼びます。

せっかくだからHeadersエクスチェンジを使ってみる

Topicエクスチェンジはそれなりに強力なので、Headersエクスチェンジならではの使い方が思いつきませんが物は試しで無理にでも使用してみます。ついでにエクスチェンジ同士のバインディングも試します。

先程のreceive-exchange.jsを修正してヘッダ情報を追加するようにします。送信先のエクスチェンジはfanoutのままにしておきます。

// send-exchange-headers.js
-       channel.publish(EX, '', Buffer.from(message));
+       const headers = {
+           "Fizz": count % 3 == 0,
+           "Buzz": count % 5 == 0,
+       };
+       channel.publish(EX, '', Buffer.from(message), { headers });

エクスチェンジ間のバインディングも試してみたいので、送信側が送信するFanoutエクスチェンジとは別にHeadersエクスチェンジを作成して、その間をバインディングします。

// receive-exchange-headers.js
const amqp = require("amqplib");
const EX = "exchange-test";
const EX_HEADER = "exchange-test.headers";

(async function main() {
    const connection = await amqp.connect("");
    const channel = await connection.createChannel();
    const resQueue = await channel.assertQueue("", { exclusive: true });

    await channel.assertExchange(EX, "fanout");
    await channel.assertExchange(EX_HEADER, "headers"); // Headersエクスチェンジを作成
    await channel.bindExchange(EX_HEADER, EX, ""); // EX_HEADERエクスチェンジをEXエクスチェンジにバインド( EX -> EX_HEADER )
    const args = {
        "x-match": "all",
        "Fizz": true,
        "Buzz": true,
    };
    await channel.bindQueue(resQueue.queue, EX_HEADER, "", args); // キューをエクスチェンジにバインド( EX_HEADER -> queue )

    await channel.consume(resQueue.queue, async function (msg) {
        console.log(new Date().getTime(), "Receive", msg.content.toString());
        channel.ack(msg);
    });
})();

メッセージを送信するpublishメソッド(もしくはsendToQueueメソッド)のオプションにheadersプロパティを指定することで、メッセージに任意のヘッダを追加できます。

キューとエクスチェンジのバインドにはbindQueueメソッドを使用しますが、エクスチェンジ同士のバインドにはbindExchangeメソッドを使用します。第2引数から第1引数に指定したエクスチェンジにメッセージが配信されます。

そしてbindQueueメソッド(もしくはbindExchangeメソッド)の第4引数に、バインディングに対する追加のパラメータを指定します。Headersエクスチェンジに対するバインディングの追加のパラメータは、チェックするヘッダ名とその値をそのまま追加パラメータとして与えることで指定できます。

先ほどと同様にこの2つのプログラムを実行してみます。

$ node send-exchange-headers.js
1592376328343 Send NoName1
1592376329990 Send NoName2
1592376331933 Send NoName3
...
$ node receive-exchange-headers.js
1592376354193 Receive NoName15
1592376385099 Receive NoName30
...

メッセージにはFizzヘッダとBuzzヘッダを追加しています。それぞれ3の倍数の時と5の倍数の時に値がtrueになります。

Headersエクスチェンジへのバインディングでは、その両方のヘッダがtrueの時だけマッチするように指定しているので、結果として15の倍数のメッセージだけ受信することができました。

とりあえずエクスチェンジについて試してみました

DirectとTopicのエクスチェンジは、空文字列を指定していたルーティングキーの部分に適切な文字列を設定すれば良いだけなので、この記事ではあえて試しません。

ここまでに説明した機能を駆使すれば、もうシステム構築には困らないのではないでしょうか。色々なシステムモデルが考えられますね。とはいえRabbitMQにはまだまだ細かい機能があります。次回以降ではRPCやDeadLetterなどについて説明していきたいと思います。