スタッフブログ・バックエンド

Node.jsで習得するRabbitMQによるメッセージキューイング その1 (送信と受信)

皆様どうも、こんにちは!
こまりのバックエンドエンジニア、桑木です。

新たにサービスを開発するにあたって、マイクロサービスに近しいアーキテクチャにしようということになりました。その上でサービス間通信をどうするかいくつか検討した結果、RabbitMQによるメッセージキューイングを採用することにしました。

何ヶ月か前に、一度ドキュメントを読みながら試してある程度の理解はしたのですが、他の作業をしているうちに細かいところを忘れてしまいました。今回は再度ドキュメントを読むついでに理解したことを記事として残しておこうと思います。

メッセージキューイング(Message Queueing)とは

プログラム間でデータを通信する方式の1つで、データをメッセージと呼ばれる単位で扱って、メッセージの処理にキュー(queue, 待ち行列)を利用するものです。メッセージは任意のサイズのバイナリ列ですが、通常は意味のあるひと塊のデータを1つのメッセージにします。

そしてキューとは待ち行列のことで、メッセージは到着した順に列(キュー)の後ろに並んで、列(キュー)の先頭から順番に出ていくという特徴があります。

RabbitMQなどのMQシステムは、このメッセージキューの管理を専門とするシステムです。最も基本的な機能として、メッセージを送りたいプログラムからメッセージを受け取ってキューに追加する機能。メッセージを受け取りたいプログラムの求めに応じて、キューからメッセージを取り出して送る機能があります。

バッファや負荷分散としての役割が期待できる

MQの本質は前述のキューイングにあります。プログラム間でメッセージを直接送らずにキューを経由させることで、受け取る側が任意のタイミングで処理可能な分量だけメッセージを受け取ることが可能になります。また、送る側も受け取る側の状況を考慮せずに任意のタイミングでメッセージを送ることが可能になります。よって、たとえ受け取る側が停止していたとしても、キューに追加しておけばいずれ起動した時に処理されることが期待できます。

マイクロサービスのように複数のサービスで構成されるシステムの場合、負荷に応じてインスタンスをリアルタイムで増減させることもありえます。この場合も、それぞれのインスタンスが処理可能な分量を適宜取り出すことによって結果的に負荷分散が実現されます。

Node.jsからamqplibを使用してRabbitMQにアクセス

では実際にNode.jsからRabbitMQを利用してみます。RabbitMQは公式サイトを参考に好きな方法で用意してください。私はとりあえずWindows版の実行形式バイナリでローカルPCにインストールしました。(インストーラを実行したらErlangが必要と言われるので、それもインストールします)

Node.jsからRabbitMQへのアクセスに使用できるライブラリはいくつかあるようですが、ここではRabbitMQ公式ページのチュートリアルでも使われているamqplibを利用します。このライブラリは高度な抽象化をせずにプロトコルAMQP0.9.1の機能をそのまま操作できることが特徴だと書いてあります。

このライブラリでRabbitMQを利用する流れとしては、まずライブラリのconnectメソッドでコネクションを取得する必要があります。そのコネクションのcreateChannelメソッドかcreateConfirmChannelメソッドでチャネルを作成し、チャネルのメソッドで各種操作(キューの作成やメッセージの送受信等々)をすることになります。基本的にこのプロトコルでは何をするにもチャネル経由になります。不正な操作などでエラーが出たときは、そのチャネルだけがクローズされます。

まぁ、そんな細かい話はともかく、とりあえずメッセージを出したり入れたりしてみましょう。

キューにメッセージを入れる(enqueue)

次のコードは、ウェイトを挟みつつキューに100個のメッセージを入れる動作をします。そのメッセージには指定した名前に連番を結合した文字列を使用します。キューに入れる毎に2秒前後のランダムな待ち時間を設定しています。途中で止めるにはCtrl+Cを押します。

// send.js
const amqp = require("amqplib");
const QUEUE = "queue-test";
const NAME = process.argv[2] || "NoName";

function sleep(ms, random = 0) {
    return new Promise(resolve => setTimeout(resolve, ms + Math.random() * random));
}

(async function main() {
    const connection = await amqp.connect("");
    const channel = await connection.createChannel();
    await channel.assertQueue(QUEUE);

    let count = 1;
    while (count <= 100) {
        const message = `${NAME}${count}`;
        // キューにメッセージを入れる
        channel.sendToQueue(QUEUE, Buffer.from(message));

        console.log(new Date().getTime(), "Send", message);
        await sleep(1500, 1000);
        count++;
    }

    await channel.close();
    await connection.close();
})();

チャネルのsendToQueueメソッドは指定したキューにメッセージを入れるためのメソッドです。このメソッドは同期処理ですが、メッセージをエンコード(≒メモリ内部でコピー)したらメソッドから処理が戻ってきます。実際に送信させるには1度イベントループを回す必要があります。その前にprocess.exit()connection.close()してしまうと送信されません。

キューからメッセージを取り出す(dequeue)

次のコードは、キューにメッセージが来たらそのメッセージを取り出して内容を表示する動作をします。受け取った後に1.5秒前後のランダムな待ち時間を設定しています。終了するにはCtrl+Cを押します。

// receive.js
const amqp = require("amqplib");
const QUEUE = "queue-test";

function sleep(ms, random = 0) {
    return new Promise(resolve => setTimeout(resolve, ms + Math.random() * random));
}

(async function main() {
    const connection = await amqp.connect("");
    const channel = await connection.createChannel();
    await channel.prefetch(1);
    await channel.assertQueue(QUEUE);

    // メッセージを受信したらコールバック関数が呼び出される
    await channel.consume(QUEUE, async function (msg) {
        console.log(new Date().getTime(), "Receive", msg.content.toString());
        await sleep(1000, 1000);
        channel.ack(msg);
    });
})();

チャネルのconsumeメソッドでキューからメッセージの受信を宣言します。メッセージ毎に指定したコールバック関数が呼び出されます。もちろんこのコールバックは非同期です。consumeは「(全て)消費する」という意味ですが、キューに存在するメッセージを全て受信した後も、新たなメッセージがキューに入れば即座に受信します。なので、実質的には「コンシューマ(消費者)登録」とか「購読」といった類いの動作になります。

同時に複数実行してみる

というわけで、コンソールを複数起動して上記のreceive.jssend.jsを幾つか同時に実行してみましょう。

まずreceive.jsを起動してメッセージを待ち受けます。

$ node receive.js

まだキューにメッセージがないので何も出力されません。次にsend.jsを起動してメッセージを送ります。

$ node send.js A
1585224372336 Send A1
1585224374130 Send A2
1585224376146 Send A3
...
~~~~~~~~~~ receive
1585224372336 Receive A1
1585224374128 Receive A2
1585224376147 Receive A3
...

Sendとほぼ同時にReceiveが表示されていて、これはほぼリアルタイムで処理されているので、更にsend.jsを起動してメッセージを増やします。

$ node send.js B
1585224380696 Send B1
1585224382711 Send B2
1585224385094 Send B3
...
~~~~~~~~~~ receive
...
1585224381196 Receive B1
1585224382585 Receive A6
1585224384463 Receive B2
...

すると、ウェイトの関係でキューにメッセージが溜まってくるので、receive.jsを更に追加で起動して処理させます。

$ node receive.js
1585224402291 Receive B8
1585224403457 Receive B9
1585224405340 Receive B10
...

という感じで実行して出力された物を適当に整形したものが以下になります。スペースの都合で同じ行にまとめましたが、同じ行なら右側の方が時間が後になります。

send.js A   | send.js B   | receive.js  | receive.js 
-----------------------------------------------------
Send A1     |             | Receive A1  |            
Send A2     |             | Receive A2  |            
Send A3     |             | Receive A3  |            
Send A4     |             | Receive A4  |            
Send A5     |             | Receive A5  |            
            | Send B1     | Receive B1  |            
Send A6     |             | Receive A6  |            
            | Send B2     |             |            
Send A7     |             | Receive B2  |            
            | Send B3     |             |            
Send A8     |             | Receive A7  |            
            |             | Receive B3  |            
Send A9     | Send B4     | Receive A8  |            
Send A10    | Send B5     | Receive A9  |            
            | Send B6     |             |            
Send A11    |             | Receive B4  |            
            | Send B7     | Receive A10 |            
Send A12    | Send B8     | Receive B5  |            
Send A13    |             | Receive B6  |            
            | Send B9     |             |            
Send A14    |             | Receive A11 |            
            | Send B10    |             |            
Send A15    |             | Receive B7  |            
            | Send B11    | Receive A12 |            
Send A16    | Send B12    |             | Receive B8 
            |             | Receive A13 |            
Send A17    |             |             | Receive B9 
            |             | Receive A14 |            
            | Send B13    |             |            
Send A18    |             |             | Receive B10
            |             | Receive A15 |            
            | Send B14    |             |            
Send A19    |             | Receive B11 | Receive A16
            | Send B15    |             |            
            |             | Receive B12 |            
Send A20    |             |             | Receive A17
            |             |             | Receive B13
            | Send B16    | Receive A18 |            
Send A21    |             |             | Receive B14
            |             | Receive A19 |            
            | Send B17    |             | Receive B15
Send A22    |             | Receive A20 |            
            | Send B18    | Receive B16 | Receive A21
Send A23    | Send B19    | Receive B17 |            
Send A24    |             |             | Receive A22
            |             | Receive B18 |            
            | Send B20    |             | Receive A23
Send A25    |             | Receive B19 |            
            | Send B21    |             | Receive A24
Send A26    |             | Receive B20 | Receive A25
            | Send B22    |             |            
Send A27    |             | Receive B21 | Receive A26
            | Send B23    |             | Receive B22
            |             | Receive A27 |            
Send A28    | Send B24    |             | Receive B23
            |             | Receive A28 |            
Send A29    |             |             | Receive B24
            | Send B25    |             | Receive A29
            |             | Receive B25 |            
Send A30    |             | Receive A30 |            
            | Send B26    |             | Receive B26

ウェイトをsend.jsは2.0s/M(メッセージ)、receive.jsは1.5s/Mにしてあるので、1対1の時は遅延なしで処理していますが、2対1だとsend.jsが上回って処理待ちが発生しています。これが2対2になると再びreceive.jsが上回って徐々に追いついていることがわかると思います。

とりあえず基本のキューイングを試しました

これだけの機能でも、それなりに通信できそうな気がしますし、ロジックを工夫すれば実用に耐えるアプリも作成できるでしょう。しかし、システムとは気まぐれなもので様々な要因で障害が発生します。

次回以降では、それらの対策に使えそうな機能や今回の説明しなかったことを説明していきたいと思います。

View Prev 一覧へ戻る View Next

関連記事

KOMARI Member

Category