Node.jsで習得するRabbitMQによるメッセージキューイング その4 (RPC)

スタッフブログ

皆様どうも、こんにちは!
こまりの自称バックエンドエンジニア、桑木です。

今までの記事では、メッセージを送信するプログラムと受信するプログラムを分けて考えていました。しかし、実際に運用するプログラムで、そのどちらかだけということはあまりないと思います。今回はもう少し実用的なプログラムを考えてみます。

マイクロサービスのような分散システムでは一般的にRPCと呼ばれる手法が使われることも多いかと思います。RPCはメッセージの送信と受信が1対1になりがちなので、サンプルとしては丁度良さそうです。今回はシンプルなRPCを実現するコードをざっくりと実装してみます。

RPC(Remote Procedure Call)

その前にRPCとは何でしょうか。文字通り解釈すると、外部(リモート)の処理を呼び出していれば何でもRPCになりそうです。それこそHTTPでREST APIを実行するのでも外部の処理には違いないはずです。しかし、これをRPCとは呼ばない気がします。

調べてみてもはっきりした定義がわかりませんが、どうやらRPCとは外部の処理を呼び出すコードの記述、もしくは呼び出される側のコードの記述に着目した命名のようです。つまりは、呼び出される側は普通に関数を定義していて、呼び出す側もローカルにある関数を実行するように呼び出しているなら、それがRPCだ、ということで良さそうです。

replyTo と correlationId

ところで、MQをRPCに使うのであれば呼び出しに対する戻り値(レスポンス)が必要になります。MQのモデルではメッセージは一方通行です。特定の相手にメッセージを送りたい場合、その相手が受信(consume)しているであろうキューに対してメッセージを送信しますが、受信側はそれが誰から送られてきたメッセージか知る方法はありません。

そこでRPCの呼出し側(RPCクライアント)は、メッセージに自身が受信しているキューの名前を明示的に含めることで受信側(RPCサーバ)にレスポンスの送り先を伝えます。RPCサーバはレスポンスのメッセージを、そのキューに対して送ることで戻り値がRPCクライアントに伝わります。

この時、どんな方法でキュー名をメッセージに含めても良いのですが、メッセージのreply-toプロパティを使用する実装が一般的なようです。

また、RPCクライアントで非同期処理などをしていると、同じRPCクライアントから同時に複数のRPC呼出しを実行することがあります。この場合、レスポンスが呼出し順に返ってくるとも限らないので、どのRPC呼出しに対応するレスポンスか判別する必要があります。

これには、一意の識別情報(ID)をRPC呼出しのメッセージに含めて送信し、RPCサーバはレスポンスに同じ識別情報をそのまま含めることで、どのRPC呼出しに対するレスポンスか判別します。

この識別情報にはメッセージのcorrelation-idプロパティを使用する実装が一般的なようです。

RPCサーバ

以上のことを踏まえて、RPCを受け付けるためのクラスRPCServerを作成します。

// RPCServer.mjs
import amqp from 'amqplib';

export class RPCServer {
    constructor(methods) {
        this.methods = methods;
    }

    async open(url = '', queue) {
        this.connection = await amqp.connect(url);
        this.channel = await this.connection.createConfirmChannel();
        await this.channel.assertQueue(queue);

        await this.channel.consume(queue, (msg) => this.request(msg));
    }

コンストラクタで呼出しを受け付けるメソッドを実装したオブジェクトを受け取って、openメソッドでキューをconsumeしてメッセージを待ち受けます。メッセージを受信したらrequestメソッドが実行されます。

    async request(msg) {
        const replyTo = msg.properties.replyTo;
        const id = msg.properties.correlationId;
        const requestJSON = msg.content.toString();
        console.log(Date.now(), 'req', replyTo, id, requestJSON);

        let result;
        let headers;
        try {
            const [name, ...args] = JSON.parse(requestJSON);
            result = await this.methods[name](...args);
        } catch (err) {
            result = err?.message;
            headers = {
                err: true,
            };
        }

        await this.send(replyTo, id, result, headers);
        console.log(Date.now(), 'res', replyTo, id, result);

        this.channel.ack(msg);
    }

    send(queue, id, data, headers) {
        return new Promise((resolve, reject) => {
            const buff = Buffer.from(JSON.stringify(data));
            const opt = {
                correlationId: id,
                headers,
            };
            this.channel.sendToQueue(queue, buff, opt, (err) => {
                if (err) {
                    return reject(err);
                }
                resolve();
            });
        });
    }
}

簡便化のために、リクエストメッセージの形式はメソッド名と引数の配列をJSON化した文字列、レスポンスメッセージの形式は戻り値をそのままJSON化した文字列とします。

レスポンスメッセージを送るキューは、リクエストメッセージのreply-toプロパティの値(amqplibmsg.properties.replyToに格納します)をそのまま使用し、correlation-idもリクエストメッセージと同じ値をそのまま使用します。

メッセージのパースやメソッドの実行でエラーが発生したら、エラーメッセージを戻り値としてヘッダにerr: trueを設定したレスポンスメッセージを返します。

レスポンスメッセージを送信したらackでリクエストメッセージをキューから消費します。メソッドがエラーだったとしても、リクエストの処理そのものはそれが正常な結果なので、nackではなくackをします。

// server.mjs
import { RPCServer } from './RPCServer.mjs';

const QUEUE = 'RPC-server-test';

const sleep = (ms, random = 0) => new Promise((resolve) => setTimeout(resolve, ms + Math.random() * random));

const methods = {
    async add(a, b) {
        await sleep(1000, 500);
        return a + b;
    },
    async double(num) {
        await sleep(2000, 500);
        return num * 2;
    },
    err() {
        throw new Error('エラー');
    },
};

(async function main() {
    const serv = new RPCServer(methods);
    await serv.open('', QUEUE);
})();

RPCで呼出しを受け付けるメソッドをRPCServerに渡してopenで待ち受けます。

とりあえず、メソッドは適当にウェイトを入れつつ合計値を返すadd、2倍の値を返すdouble、わざとエラーを発生させるerrを実装しました。

RPCクライアント

今度はRPCを呼び出すためのクラスRPCClientを作成します。

// RPCClient.mjs
import amqp from 'amqplib';

export class RPCClient {
    async open(url = '', remote) {
        this.remote = remote;

        this.connection = await amqp.connect(url);
        this.channel = await this.connection.createChannel();
        const tempQueue = await this.channel.assertQueue('', { exclusive: true });
        this.queue = tempQueue.queue;

        await this.channel.consume(this.queue, (msg) => this.response(msg));

        return this.getRemote();
    }

    async close() {
        await this.connection.close();
    }

サーバ側と似たような処理ですが、openメソッドでレスポンスメッセージ用のプライベートなキューを作成してconsumeします。メッセージを受信したらresponseメソッドが実行されます。

    getRemote() {
        const handler = {
            get: (_, p) => {
                if (p === 'then') {
                    return undefined;
                }
                return (...args) => this.invoke(p, ...args);
            },
        };
        return new Proxy({}, handler);
    }

また、Proxyでリモートのメソッドを呼び出すためのオブジェクトを作成して返しています。このオブジェクトがPromiseと認識されないようにthenだけundefinedを返すようにしています。それ以外はプロパティ名を引数にinvokeメソッドを実行する関数を返します。

    resolves = new Map();
    id = 0;

    async invoke(name, ...args) {
        this.id++;
        const id = String(this.id);
        const data = [name, ...args];

        this.send(this.remote, id, data);

        return new Promise((resolve) => {
            this.resolves.set(id, resolve);
        });
    }

    send(queue, id, data, headers) {
        const buff = Buffer.from(JSON.stringify(data));
        const opt = {
            replyTo: this.queue,
            correlationId: id,
            headers,
        };
        this.channel.sendToQueue(queue, buff, opt);
    }

RPC呼出しを実行するメソッドinvokeです。戻り値はPromiseですが、このPromiseを解決するresolve関数はIDと紐付けて保持しておきます。

IDは連番を文字列化したものを使用して、メッセージのcorrelation-idプロパティに設定して送信します。reply-toプロパティには上のopenで作成したキューの名前を設定します。

    async response(msg) {
        const id = msg.properties.correlationId;

        let result;
        let isErr = false;
        try {
            const responseJSON = msg.content.toString();
            result = JSON.parse(responseJSON);
        } catch (err) {
            result = err;
            isErr = true;
        }
        if (msg.properties.headers.err || isErr) {
            result = Promise.reject(result);
        }

        this.resolves.get(id)?.(result);
        this.resolves.delete(id);

        this.channel.ack(msg);
    }
}

レスポンスメッセージを受信したら、IDと紐付けておいたresolve関数を実行します。メッセージのパースに失敗するか、RPCの結果がエラーならPromiserejectします。

// client.mjs
import { RPCClient } from "./RPCClient.mjs";

const QUEUE = 'RPC-server-test';

(async function main() {
    const client = new RPCClient();
    const remote = await client.open('', QUEUE);

    const result = await Promise.allSettled([
        remote.double(30),
        remote.add(30, 25),
        remote.double(25),
        remote.err(),
        remote.nothing(),
    ]);
    console.log(result);

    await client.close();
})();

RPCサーバのメソッドを結果を待たずに複数呼び出しています。全て実行が完了すると結果を出力して終了します。

実行してみる

先にserver.mjsを起動した状態で、client.mjsを実行します。

$ node server.mjs
1660095482087 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 1 ["double",30]
1660095482088 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 2 ["add",30,25]
1660095482089 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 3 ["double",25]
1660095482089 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 4 ["err"]
1660095482089 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 5 ["nothing"]
1660095482091 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 4 エラー
1660095482091 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 5 this.methods[name] is not a function
1660095483490 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 2 55
1660095484484 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 1 60
1660095484546 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 3 50

adddoubleメソッドにはウェイトがあり、doubleの方が少しウェイトが長いのでレスポンスを返す順番が前後しています。

$ node client.mjs
[
  { status: 'fulfilled', value: 60 },
  { status: 'fulfilled', value: 55 },
  { status: 'fulfilled', value: 50 },
  { status: 'rejected', reason: 'エラー' },
  {
    status: 'rejected',
    reason: 'this.methods[name] is not a function'
  }
]
$

呼出し側では、きちんと呼出しに対応した戻り値が取得できていることがわかります。また、エラーもエラーとして取得できています。

100回ループしてみる

server.mjsmethods.addからウェイトを外します。

// server.mjs
    async add(a, b) {
-       await sleep(1000, 500);
        return a + b;
    },
// client2.mjs
import { RPCClient } from "./RPCClient.mjs";

const QUEUE = 'RPC-server-test';

(async function main() {
    const client = new RPCClient();
    const remote = await client.open('', QUEUE);

    console.time();
    let sum = 0;
    for (let i = 1; i <= 100; i++) {
        sum = await remote.add(sum, i);
    }
    console.log(sum);
    console.timeEnd();

    await client.close();
})();
$ node client2.mjs
5050
default: 103.428ms
$

何回か実行してみましたが大体50〜120msくらいでした。これはRabbitMQを同一ホストで動かしている場合なので、ネットワーク越しだともう少し時間が掛かりそうですが、オーバーヘッドとしては許容範囲かなぁ、という感じです。

RPCClientのconsumeをnoAckにしても平均で10ms下がるかどうかという感じです。

RPCを実装してみました

ひとまずローカルで試す分には十分動作すると思いますが、実際には色々な例外を考慮する必要があります。タイムアウトの処理やRPCサーバが動作しているかどうかのチェック、データの型検査などです。場合によっては、Protocol Buffersとかの他のシリアライズ方法やデータの圧縮も検討した方が良いでしょう。

また、例えばRPCサーバでメソッドの実行中、ackを返す前に何らかの原因でプロセスが落ちたとします。この時、再起動後には再び同じリクエストメッセージを処理することになるので、何度実行しても問題ないように冪等性を確保した処理にする必要があります。