- その1 送信と受信
- その2 ACKとpersistent
- その3 エクスチェンジと一時的なキュー ←この記事
- その4 RPC
皆様どうも、こんにちは!
こまりの自称バックエンドエンジニア、桑木です。
前回までの記事では、メッセージをキューに入れたり出したりする処理について、いくつかのパターンを試しました。これによってシステムで利用するための最低限の知識は得られたと思います。
今回からは、より効率的にメッセージをやりとりするための機能について説明していきます。
この記事の内容
プロセス(コンシューマ)専用のキュー
ある1つのキューをconsume
しているプロセス(コンシューマ)が複数ある場合、そのキューに入ったある特定のメッセージを受信できるのはどれか1つのコンシューマだけです。この動作は結果的に負荷分散として機能しますが、特定のコンシューマに宛ててメッセージを送信する目的には適していません。
そこで、コンシューマごとに専用のキューを用意して独占的(排他的)に利用させることで、その特定のコンシューマにメッセージを受信させることができます。共通のキューと両方consume
すれば、負荷分散的な処理をしつつ個別のプロセス間メッセージをやり取りすることができるようになります。
こういった専用のキューは予め用意しておくことが難しいので、プロセス起動時に一意な名前のキューを作成し、プロセス終了時にはキューを削除する、という使い方をすることになると思います。RabbitMQはキューの作成時にキュー名の指定がない場合、一意な名前を生成してキューを作成します。また、キューのオプションで条件によって自動的にキューを削除する指定が可能です。
エクスチェンジ(交換局, exchange)
さて、これまでの記事ではキューにメッセージを追加するために、キューの名前を指定してメッセージを追加していました。RabbitMQではキューにメッセージを追加する別の方法として、エクスチェンジという仕組みが存在します。これはまずエクスチェンジに対してメッセージを送信し、エクスチェンジが条件に応じてキューにメッセージを配信(追加)する方法です。これは一般には出版購読型モデルやルーティングと呼ばれる方式を実現するものです。
この方式では、キューとエクスチェンジはバインディングによって紐付けられます。エクスチェンジに送信されたメッセージは、エクスチェンジやバインディングの設定値に応じてキューに配信されるかどうか判定されます。
キューやエクスチェンジにバインディングはいくつ設定してもよく、複数のバインディングにメッセージが適合すればそれぞれのキューにメッセージが追加されます。同じキューに複数のバインディングが適合する場合でも、同じメッセージが複数追加されることはありません。特に指定しなければ、どのバインディングにも適合しなかったメッセージは破棄されます。
また、エクスチェンジ同士をバインディングすることもできます。エクスチェンジをループ状にバインディングすることもでき、その時は同じエクスチェンジに到達した時点で判定を終了します。
エクスチェンジの種類とルーティングキー
RabbitMQには標準でFanout, Direct, Topic, Headersの4種類のエクスチェンジが用意されています。これらはメッセージがバインディングに適合するかどうか判定する方法に違いがあります。
バインディングにはルーティングキーなど判定用の値を設定しておき、メッセージごとに付加されたこれらの値と比較して適合するかどうかが判定されます。
Fanoutエクスチェンジ
適合するか判定すると言いつつも、Fanoutエクスチェンジでは全てのメッセージが無条件にバインディングに適合します。ルーティングキーは無視されます。
Directエクスチェンジ
Directエクスチェンジでは、メッセージのルーティングキーが完全に一致した場合に適合していると判定されます。
RabbitMQは無名のDirectエクスチェンジをデフォルトで持っており、全てのキューはキュー名をルーティングキーにしたバインディングが暗黙的に作成されています。よって、無名(空文字列)のエクスチェンジにキュー名をルーティングキーに指定したメッセージをRabbitMQに送信すると、そのキューにメッセージが追加されます。
Topicエクスチェンジ
前述のDirectエクスチェンジでは完全に一致した時だけ適合していましたが、Topicエクスチェンジではバインディングの設定にキーワード単位のワイルドカードを使用することができます。
メッセージのルーティングキーはkomari.okayama.japan
のように、0個以上の「キーワード」をピリオド.
で連結したものを使用します。連結後の文字数制限は255文字です。
バインディングの設定(ルーティングパターン)にも「キーワード」をピリオドで連結したものを設定しますが、*
と#
の特別な「キーワード」を使用することができます。*
は1つの任意のキーワード、#
は0個以上の任意のキーワードとマッチします。これらはキーワード単位で判定されることに注意してください。
例えばメッセージのkomari.okayama.japan
というルーティングキーは、komari.okayama.japan
, komari.*.japan
, komari.*.*
, *.*.*
, komari.#
, #.komari.#
, #.komari.#.okayama.#
, #
などにマッチします。しかし、ko*.okayama.japan
, ko#
, komari.*
, komari.*.*.*
, #.okayama.#.komari.#
, *
などにはマッチしません。
Headersエクスチェンジ
Headersエクスチェンジではルーティングキーは無視され、メッセージのヘッダ情報で判定されます。AMQPのメッセージにはメールやHTTPのようなヘッダが存在するので、これに任意のヘッダ情報を追加して利用します。
バインディングにはヘッダ名とその値のペアを0個以上設定することができ、メッセージの当該ヘッダの値が完全に一致すれば合致していると判定されます。複数のヘッダ項目を設定している場合、バインディングのx-match
ヘッダにall
もしくはany
を設定することで、全てのヘッダ項目が一致する必要があるのか、どれか1つのヘッダ項目が一致すれば良いのか指定することができます。
ヘッダ名には文字列しか使用できませんが、値には文字列以外にも数値型やブール型などいくつかの型が使用できることになっています。また文字数の制限などもないので、Topicエクスチェンジよりも柔軟なルーティングが可能です。しかし、RabbitMQでは処理効率の観点からヘッダ情報をあまり大きくすることは推奨されていません。他の3種類のエクスチェンジだけでもそれなりのことができるので、Headersエクスチェンジを利用する場合でも他の種類のエクスチェンジと併用できないか検討した方が良いかもしれません。
エクスチェンジにメッセージを送信する
amqplibではキューの作成にassertQueue
メソッドを使用したのと同じように、エクスチェンジの作成にはassertExchange
メソッドを使用します。またエクスチェンジにメッセージを送るのもsendToQueue
メソッドではなくpublish
メソッドを使用します。
次のコードは、以前の記事のメッセージをキューに入れるコードを、エクスチェンジにメッセージを送信するように変更したものです。キュー用のメソッドをエクスチェンジ用のメソッドに変更したくらいで、ほとんど違いはありません。
// send-exchange.js
const amqp = require("amqplib");
// const QUEUE = "queue-test";
const EX = "exchange-test";
const NAME = process.argv[2] || "NoName";
function sleep(ms, random = 0) {
return new Promise(resolve => setTimeout(resolve, ms + Math.random() * random));
}
(async function main() {
const connection = await amqp.connect("");
const channel = await connection.createChannel();
// await channel.assertQueue(QUEUE);
await channel.assertExchange(EX, "fanout"); // エクスチェンジの作成
let count = 1;
while (count <= 100) {
const message = `${NAME}${count}`;
// channel.sendToQueue(QUEUE, Buffer.from(message));
channel.publish(EX, "", Buffer.from(message)); // エクスチェンジにメッセージを送る
console.log(new Date().getTime(), "Send", message);
await sleep(1500, 1000);
count++;
}
await channel.close();
await connection.close();
})();
エクスチェンジを作成するassertExchange
メソッドでは第2引数にエクスチェンジの種類を指定します。このコードではfanout
を指定しています。
エクスチェンジにメッセージを送信するpublish
メソッドでは第2引数にメッセージのルーティングキーを指定します。Fanoutエクスチェンジはルーティングキーを無視するので、とりあえず空文字列を指定しています。
$ node send-exchange.js
1592285844778 Send NoName1
1592285846672 Send NoName2
1592285848232 Send NoName3
...
エクスチェンジには何もバインディングしていないので、メッセージはRabbitMQにより破棄されます。
専用キュー経由でエクスチェンジから受信する
次のコードは以前の記事のメッセージを受信するコードを改変して、エクスチェンジのメッセージを受信するようにしたものです。キューとエクスチェンジのバインディングにはbindQueue
メソッドを使用します。
// receive-exchange.js
const amqp = require("amqplib");
const EX = "exchange-test";
(async function main() {
const connection = await amqp.connect("");
const channel = await connection.createChannel();
const resQueue = await channel.assertQueue("", { exclusive: true }); // キューを作成
await channel.assertExchange(EX, "fanout"); // エクスチェンジを作成
await channel.bindQueue(resQueue.queue, EX, ""); // キューをエクスチェンジにバインド
await channel.consume(resQueue.queue, async function (msg) {
console.log(new Date().getTime(), "Receive", msg.content.toString());
channel.ack(msg);
});
})();
キューを作成するassertQueue
メソッドの第1引数にはキュー名を指定しますが、空文字列を指定することでRabbitMQがランダム生成した名前が使用されます。この時生成されたキュー名は戻り値のqueue
プロパティで参照できます。
このメソッドで指定しているexclusive: true
オプションは、そのキューを排他的なキューにします。このオプションが指定されたキューは、このキューを作成したコネクション以外では使えなくなります。よって、コネクションが閉じられるとキューも削除されます。
キューをエクスチェンジにバインドするbindQueue
メソッドの引数は、前から順にキュー名、エクスチェンジ名、ルーティングキーを指定します。Fanoutエクスチェンジはルーティングキーを無視するので空文字列を指定しています。
先程のsend-exchange.js
を起動して、時間差でこのreceive-exchange.js
を2つ起動してみます。
$ node send-exchange.js
1592289869404 Send NoName1
1592289871831 Send NoName2
1592289873470 Send NoName3
1592289875871 Send NoName4
1592289878077 Send NoName5
1592289879679 Send NoName6
1592289881250 Send NoName7
1592289882887 Send NoName8
1592289884995 Send NoName9
1592289887150 Send NoName10
1592289889214 Send NoName11
1592289891430 Send NoName12
...
$ node receive-exchange.js
1592289879684 Receive NoName6
1592289881254 Receive NoName7
1592289882889 Receive NoName8
1592289884998 Receive NoName9
1592289887153 Receive NoName10
1592289889215 Receive NoName11
1592289891439 Receive NoName12
...
$ node receive-exchange.js
1592289885002 Receive NoName9
1592289887154 Receive NoName10
1592289889215 Receive NoName11
1592289891439 Receive NoName12
...
プログラムを実行するたびにキューを作成してバインドしているので、バインド前のメッセージを受信することはありません。そしてFanoutエクスチェンジはバインディングされている全てのキューにメッセージを入れるので、同じメッセージをそれぞれのreceive-exchange.js
が受信しています。
ここでreceive-exchange.js
の1つをCtrl
+C
で終了させると、exclusive
オプションの影響によりキューが消えることが確認できます。
このプログラムではメッセージの受信者が自身でエクスチェンジにバインドしています。この場合、メッセージの送信者は誰が受信しているか気にする必要がなくなります。このように送信者は一方的に送信し、それを受信したい者だけが宣言して受信する方式のことを「出版購読型モデル」などと呼びます。
せっかくだからHeadersエクスチェンジを使ってみる
Topicエクスチェンジはそれなりに強力なので、Headersエクスチェンジならではの使い方が思いつきませんが物は試しで無理にでも使用してみます。ついでにエクスチェンジ同士のバインディングも試します。
先程のreceive-exchange.js
を修正してヘッダ情報を追加するようにします。送信先のエクスチェンジはfanoutのままにしておきます。
// send-exchange-headers.js
- channel.publish(EX, '', Buffer.from(message));
+ const headers = {
+ "Fizz": count % 3 == 0,
+ "Buzz": count % 5 == 0,
+ };
+ channel.publish(EX, '', Buffer.from(message), { headers });
エクスチェンジ間のバインディングも試してみたいので、送信側が送信するFanoutエクスチェンジとは別にHeadersエクスチェンジを作成して、その間をバインディングします。
// receive-exchange-headers.js
const amqp = require("amqplib");
const EX = "exchange-test";
const EX_HEADER = "exchange-test.headers";
(async function main() {
const connection = await amqp.connect("");
const channel = await connection.createChannel();
const resQueue = await channel.assertQueue("", { exclusive: true });
await channel.assertExchange(EX, "fanout");
await channel.assertExchange(EX_HEADER, "headers"); // Headersエクスチェンジを作成
await channel.bindExchange(EX_HEADER, EX, ""); // EX_HEADERエクスチェンジをEXエクスチェンジにバインド( EX -> EX_HEADER )
const args = {
"x-match": "all",
"Fizz": true,
"Buzz": true,
};
await channel.bindQueue(resQueue.queue, EX_HEADER, "", args); // キューをエクスチェンジにバインド( EX_HEADER -> queue )
await channel.consume(resQueue.queue, async function (msg) {
console.log(new Date().getTime(), "Receive", msg.content.toString());
channel.ack(msg);
});
})();
メッセージを送信するpublish
メソッド(もしくはsendToQueue
メソッド)のオプションにheaders
プロパティを指定することで、メッセージに任意のヘッダを追加できます。
キューとエクスチェンジのバインドにはbindQueue
メソッドを使用しますが、エクスチェンジ同士のバインドにはbindExchange
メソッドを使用します。第2引数から第1引数に指定したエクスチェンジにメッセージが配信されます。
そしてbindQueue
メソッド(もしくはbindExchange
メソッド)の第4引数に、バインディングに対する追加のパラメータを指定します。Headersエクスチェンジに対するバインディングの追加のパラメータは、チェックするヘッダ名とその値をそのまま追加パラメータとして与えることで指定できます。
先ほどと同様にこの2つのプログラムを実行してみます。
$ node send-exchange-headers.js
1592376328343 Send NoName1
1592376329990 Send NoName2
1592376331933 Send NoName3
...
$ node receive-exchange-headers.js
1592376354193 Receive NoName15
1592376385099 Receive NoName30
...
メッセージにはFizzヘッダとBuzzヘッダを追加しています。それぞれ3の倍数の時と5の倍数の時に値がtrue
になります。
Headersエクスチェンジへのバインディングでは、その両方のヘッダがtrue
の時だけマッチするように指定しているので、結果として15の倍数のメッセージだけ受信することができました。
とりあえずエクスチェンジについて試してみました
DirectとTopicのエクスチェンジは、空文字列を指定していたルーティングキーの部分に適切な文字列を設定すれば良いだけなので、この記事ではあえて試しません。
ここまでに説明した機能を駆使すれば、もうシステム構築には困らないのではないでしょうか。色々なシステムモデルが考えられますね。とはいえRabbitMQにはまだまだ細かい機能があります。次回以降ではRPCやDeadLetterなどについて説明していきたいと思います。