- その1 送信と受信
- その2 ACKとpersistent
- その3 エクスチェンジと一時的なキュー
- その4 RPC ←この記事
皆様どうも、こんにちは!
こまりの自称バックエンドエンジニア、桑木です。
今までの記事では、メッセージを送信するプログラムと受信するプログラムを分けて考えていました。しかし、実際に運用するプログラムで、そのどちらかだけということはあまりないと思います。今回はもう少し実用的なプログラムを考えてみます。
マイクロサービスのような分散システムでは一般的にRPCと呼ばれる手法が使われることも多いかと思います。RPCはメッセージの送信と受信が1対1になりがちなので、サンプルとしては丁度良さそうです。今回はシンプルなRPCを実現するコードをざっくりと実装してみます。
この記事の内容
RPC(Remote Procedure Call)
その前にRPCとは何でしょうか。文字通り解釈すると、外部(リモート)の処理を呼び出していれば何でもRPCになりそうです。それこそHTTPでREST APIを実行するのでも外部の処理には違いないはずです。しかし、これをRPCとは呼ばない気がします。
調べてみてもはっきりした定義がわかりませんが、どうやらRPCとは外部の処理を呼び出すコードの記述、もしくは呼び出される側のコードの記述に着目した命名のようです。つまりは、呼び出される側は普通に関数を定義していて、呼び出す側もローカルにある関数を実行するように呼び出しているなら、それがRPCだ、ということで良さそうです。
replyTo と correlationId
ところで、MQをRPCに使うのであれば呼び出しに対する戻り値(レスポンス)が必要になります。MQのモデルではメッセージは一方通行です。特定の相手にメッセージを送りたい場合、その相手が受信(consume)しているであろうキューに対してメッセージを送信しますが、受信側はそれが誰から送られてきたメッセージか知る方法はありません。
そこでRPCの呼出し側(RPCクライアント)は、メッセージに自身が受信しているキューの名前を明示的に含めることで受信側(RPCサーバ)にレスポンスの送り先を伝えます。RPCサーバはレスポンスのメッセージを、そのキューに対して送ることで戻り値がRPCクライアントに伝わります。
この時、どんな方法でキュー名をメッセージに含めても良いのですが、メッセージのreply-to
プロパティを使用する実装が一般的なようです。
また、RPCクライアントで非同期処理などをしていると、同じRPCクライアントから同時に複数のRPC呼出しを実行することがあります。この場合、レスポンスが呼出し順に返ってくるとも限らないので、どのRPC呼出しに対応するレスポンスか判別する必要があります。
これには、一意の識別情報(ID)をRPC呼出しのメッセージに含めて送信し、RPCサーバはレスポンスに同じ識別情報をそのまま含めることで、どのRPC呼出しに対するレスポンスか判別します。
この識別情報にはメッセージのcorrelation-id
プロパティを使用する実装が一般的なようです。
RPCサーバ
以上のことを踏まえて、RPCを受け付けるためのクラスRPCServer
を作成します。
// RPCServer.mjs
import amqp from 'amqplib';
export class RPCServer {
constructor(methods) {
this.methods = methods;
}
async open(url = '', queue) {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createConfirmChannel();
await this.channel.assertQueue(queue);
await this.channel.consume(queue, (msg) => this.request(msg));
}
コンストラクタで呼出しを受け付けるメソッドを実装したオブジェクトを受け取って、open
メソッドでキューをconsume
してメッセージを待ち受けます。メッセージを受信したらrequest
メソッドが実行されます。
async request(msg) {
const replyTo = msg.properties.replyTo;
const id = msg.properties.correlationId;
const requestJSON = msg.content.toString();
console.log(Date.now(), 'req', replyTo, id, requestJSON);
let result;
let headers;
try {
const [name, ...args] = JSON.parse(requestJSON);
result = await this.methods[name](...args);
} catch (err) {
result = err?.message;
headers = {
err: true,
};
}
await this.send(replyTo, id, result, headers);
console.log(Date.now(), 'res', replyTo, id, result);
this.channel.ack(msg);
}
send(queue, id, data, headers) {
return new Promise((resolve, reject) => {
const buff = Buffer.from(JSON.stringify(data));
const opt = {
correlationId: id,
headers,
};
this.channel.sendToQueue(queue, buff, opt, (err) => {
if (err) {
return reject(err);
}
resolve();
});
});
}
}
簡便化のために、リクエストメッセージの形式はメソッド名と引数の配列をJSON化した文字列、レスポンスメッセージの形式は戻り値をそのままJSON化した文字列とします。
レスポンスメッセージを送るキューは、リクエストメッセージのreply-to
プロパティの値(amqplib
がmsg.properties.replyTo
に格納します)をそのまま使用し、correlation-id
もリクエストメッセージと同じ値をそのまま使用します。
メッセージのパースやメソッドの実行でエラーが発生したら、エラーメッセージを戻り値としてヘッダにerr: true
を設定したレスポンスメッセージを返します。
レスポンスメッセージを送信したらack
でリクエストメッセージをキューから消費します。メソッドがエラーだったとしても、リクエストの処理そのものはそれが正常な結果なので、nack
ではなくack
をします。
// server.mjs
import { RPCServer } from './RPCServer.mjs';
const QUEUE = 'RPC-server-test';
const sleep = (ms, random = 0) => new Promise((resolve) => setTimeout(resolve, ms + Math.random() * random));
const methods = {
async add(a, b) {
await sleep(1000, 500);
return a + b;
},
async double(num) {
await sleep(2000, 500);
return num * 2;
},
err() {
throw new Error('エラー');
},
};
(async function main() {
const serv = new RPCServer(methods);
await serv.open('', QUEUE);
})();
RPCで呼出しを受け付けるメソッドをRPCServer
に渡してopen
で待ち受けます。
とりあえず、メソッドは適当にウェイトを入れつつ合計値を返すadd
、2倍の値を返すdouble
、わざとエラーを発生させるerr
を実装しました。
RPCクライアント
今度はRPCを呼び出すためのクラスRPCClient
を作成します。
// RPCClient.mjs
import amqp from 'amqplib';
export class RPCClient {
async open(url = '', remote) {
this.remote = remote;
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
const tempQueue = await this.channel.assertQueue('', { exclusive: true });
this.queue = tempQueue.queue;
await this.channel.consume(this.queue, (msg) => this.response(msg));
return this.getRemote();
}
async close() {
await this.connection.close();
}
サーバ側と似たような処理ですが、open
メソッドでレスポンスメッセージ用のプライベートなキューを作成してconsume
します。メッセージを受信したらresponse
メソッドが実行されます。
getRemote() {
const handler = {
get: (_, p) => {
if (p === 'then') {
return undefined;
}
return (...args) => this.invoke(p, ...args);
},
};
return new Proxy({}, handler);
}
また、Proxy
でリモートのメソッドを呼び出すためのオブジェクトを作成して返しています。このオブジェクトがPromise
と認識されないようにthen
だけundefined
を返すようにしています。それ以外はプロパティ名を引数にinvoke
メソッドを実行する関数を返します。
resolves = new Map();
id = 0;
async invoke(name, ...args) {
this.id++;
const id = String(this.id);
const data = [name, ...args];
this.send(this.remote, id, data);
return new Promise((resolve) => {
this.resolves.set(id, resolve);
});
}
send(queue, id, data, headers) {
const buff = Buffer.from(JSON.stringify(data));
const opt = {
replyTo: this.queue,
correlationId: id,
headers,
};
this.channel.sendToQueue(queue, buff, opt);
}
RPC呼出しを実行するメソッドinvoke
です。戻り値はPromise
ですが、このPromise
を解決するresolve
関数はIDと紐付けて保持しておきます。
IDは連番を文字列化したものを使用して、メッセージのcorrelation-id
プロパティに設定して送信します。reply-to
プロパティには上のopen
で作成したキューの名前を設定します。
async response(msg) {
const id = msg.properties.correlationId;
let result;
let isErr = false;
try {
const responseJSON = msg.content.toString();
result = JSON.parse(responseJSON);
} catch (err) {
result = err;
isErr = true;
}
if (msg.properties.headers.err || isErr) {
result = Promise.reject(result);
}
this.resolves.get(id)?.(result);
this.resolves.delete(id);
this.channel.ack(msg);
}
}
レスポンスメッセージを受信したら、IDと紐付けておいたresolve
関数を実行します。メッセージのパースに失敗するか、RPCの結果がエラーならPromise
をreject
します。
// client.mjs
import { RPCClient } from "./RPCClient.mjs";
const QUEUE = 'RPC-server-test';
(async function main() {
const client = new RPCClient();
const remote = await client.open('', QUEUE);
const result = await Promise.allSettled([
remote.double(30),
remote.add(30, 25),
remote.double(25),
remote.err(),
remote.nothing(),
]);
console.log(result);
await client.close();
})();
RPCサーバのメソッドを結果を待たずに複数呼び出しています。全て実行が完了すると結果を出力して終了します。
実行してみる
先にserver.mjs
を起動した状態で、client.mjs
を実行します。
$ node server.mjs
1660095482087 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 1 ["double",30]
1660095482088 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 2 ["add",30,25]
1660095482089 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 3 ["double",25]
1660095482089 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 4 ["err"]
1660095482089 req amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 5 ["nothing"]
1660095482091 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 4 エラー
1660095482091 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 5 this.methods[name] is not a function
1660095483490 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 2 55
1660095484484 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 1 60
1660095484546 res amq.gen-zJ0ulHm_Nm4jCVbyU8OR0A 3 50
add
とdouble
メソッドにはウェイトがあり、double
の方が少しウェイトが長いのでレスポンスを返す順番が前後しています。
$ node client.mjs
[
{ status: 'fulfilled', value: 60 },
{ status: 'fulfilled', value: 55 },
{ status: 'fulfilled', value: 50 },
{ status: 'rejected', reason: 'エラー' },
{
status: 'rejected',
reason: 'this.methods[name] is not a function'
}
]
$
呼出し側では、きちんと呼出しに対応した戻り値が取得できていることがわかります。また、エラーもエラーとして取得できています。
100回ループしてみる
server.mjs
のmethods.add
からウェイトを外します。
// server.mjs
async add(a, b) {
- await sleep(1000, 500);
return a + b;
},
// client2.mjs
import { RPCClient } from "./RPCClient.mjs";
const QUEUE = 'RPC-server-test';
(async function main() {
const client = new RPCClient();
const remote = await client.open('', QUEUE);
console.time();
let sum = 0;
for (let i = 1; i <= 100; i++) {
sum = await remote.add(sum, i);
}
console.log(sum);
console.timeEnd();
await client.close();
})();
$ node client2.mjs
5050
default: 103.428ms
$
何回か実行してみましたが大体50〜120msくらいでした。これはRabbitMQを同一ホストで動かしている場合なので、ネットワーク越しだともう少し時間が掛かりそうですが、オーバーヘッドとしては許容範囲かなぁ、という感じです。
RPCClient
のconsumeをnoAckにしても平均で10ms下がるかどうかという感じです。
RPCを実装してみました
ひとまずローカルで試す分には十分動作すると思いますが、実際には色々な例外を考慮する必要があります。タイムアウトの処理やRPCサーバが動作しているかどうかのチェック、データの型検査などです。場合によっては、Protocol Buffersとかの他のシリアライズ方法やデータの圧縮も検討した方が良いでしょう。
また、例えばRPCサーバでメソッドの実行中、ack
を返す前に何らかの原因でプロセスが落ちたとします。この時、再起動後には再び同じリクエストメッセージを処理することになるので、何度実行しても問題ないように冪等性を確保した処理にする必要があります。