MessagePack-RPCサーバのはまりどころ

MessagePack-RPCを使っていて「こういうときどうするんだっけ?」というのがあり、しかもけっこうはまりどころがあるので以下にまとめることにする。
古橋さんの以下の記事が大変参考になっております。
Ruby で MessagePack-RPC - Blog by Sadayuki Furuhashi

また以下で紹介したコードはgistにもはっつけております。
add.rb · GitHub

登場人物

簡易なMapReduceもどきを考えてみる。
あるサーバから別の複数サーバに対してリクエストを出し、なんらかのレスポンスをクライアント返すようなもの。

  • addサーバ(複数台、とりあえず例では3台とする)
    • 与えられた文字列をevalして、mrサーバに返す。
  • mr(MapReduce)サーバ(1台)
    • 与えられた配列の1要素をevalサーバに渡し、計算結果を合算してclientに返す
  • client
    • mrサーバに対し、合算したい計算の配列を渡す

関連図

                      +---+
                  +-->|add|
 +------+   +--+  |   +---+
 |client|<->|mr|<-+-->|add|
 +------+   +--+  |   +---+
                  +-->|add|
                      +---+

add.rb

require 'msgpack/rpc'

class Add
  def add(str)
    return eval(str)
  end
end

if $0 == __FILE__ then
  $sv_loop = Cool.io::Loop.default
  server = MessagePack::RPC::Server.new($sv_loop)
  server.listen('0.0.0.0', ARGV.first,  Add.new)
  server.run
end

mr.rb

require 'msgpack/rpc'

class MapReduce
  ADD1 = {host: '0.0.0.0', port: 10001}
  ADD2 = {host: '0.0.0.0', port: 10002}
  ADD3 = {host: '0.0.0.0', port: 10003}
end

if $0 == __FILE__ then
  $sv_loop = Cool.io::Loop.default
  server = MessagePack::RPC::Server.new($sv_loop)
  server.listen('0.0.0.0', ARGV.first,  MapReduce.new)
  server.run
end

mr.rbに対してメソッドをいろいろ追加していく。

それぞれ以下のように起動しておく。

$ ruby add.rb 10001 &
$ ruby add.rb 10002 &
$ ruby add.rb 10003 &
$ ruby mr.rb 10000

client.rb

# $ ruby client.rb method num args..
require 'msgpack/rpc'

concurrency = ARGV.shift.to_i
method = ARGV.shift
th = []
concurrency.times do
  th << Thread.start do
    MessagePack::RPC::Client.open('0.0.0.0', 10000) do |c|
      c.call(method, *ARGV)
    end
  end
end
th.map(&:join)

同時に呼び出す人数,呼び出すメソッド名,メソッドへの引数.. をARGVにとる。

同期呼び出しで実現

  #mr.rb
  def mr_leak(add1, add2, add3)
    clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
    clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
    clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])

    r = clis[0].call(:add, add1)
    r += clis[1].call(:add, add2)
    r += clis[2].call(:add, add3)

    return r
  end

ふつうに考えたらこうなるし、ちゃんと結果も帰ってくる。
が、ひとつ忘れているのはclientのclose処理。
mr.rbはサーバであり長く生きるプロセスなので、clientをちゃんとcloseしないとコネクションがリークしてしたままになってしまう。

  def mr_close(add1, add2, add3)
    # ...略...
    clis.map(&:close)
    return r
  end

同期呼び出しの場合はopenが使える。

  #mr.rb
  def mr_close_with_open(add1, add2, add3)
    r = 0
    MessagePack::RPC::Client.open(ADD1[:host], ADD1[:port]) do |cli|
      r += cli.call(:add, add1)
    end
    MessagePack::RPC::Client.open(ADD2[:host], ADD2[:port]) do |cli|
      r += cli.call(:add, add2)
    end
    MessagePack::RPC::Client.open(ADD3[:host], ADD3[:port]) do |cli|
      r += cli.call(:add, add3)
    end

    return r
  end
% ruby client.rb 1 mr_close_with_open 1+2 2+3 3+4
15

ちゃんと計算結果が返る。

非同期呼び出しで実現

同期呼び出しの例だと、それぞれのaddサーバの計算をまってから、次のaddサーバにリクエストをおこなう。
ほぼ同時に呼び出したいので、非同期呼び出しを使う。

  #mr.rb
  def mr_async(add1, add2, add3)
    clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
    clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
    clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])

    r = 0
    fs = [clis[0].callback(:add, add1){|res| r += res.get}]
    fs << clis[1].callback(:add, add2){|res| r += res.get}
    fs << clis[2].callback(:add, add3){|res| r += res.get}

    fs.map(&:join)
    clis.map(&:close)
    return r
  end

ここで大事なのは、callbackで得たfeatureをちゃんと最後にjoinする、という部分。
msgpack-rpcのクライアントは何も指定しないとイベントループを新規作成するので、そのループをちゃんと走らせないとレスポンスを受け取れないわけだ。

mrサーバをスケールさせる(サーバのイベントループを使った非同期呼び出し)

mrサーバをスケールさせたいとする。addサーバ1の処理に2秒かかる処理があるとしよう。
それを3人が同時に呼び出ぶと...6秒もかかってしまった。

% time ruby client.rb 3 mr_async 'sleep(2);1+2' 2+3 3+4
15
15
15
... 6.302 total

これを解決するために、まずaddサーバをスレッドを使って並行処理できるようにする。

  # add.rb
  def con_add(str)
    as = MessagePack::RPC::AsyncResult.new
    Thread.start do 
      as.result(eval(str))
    end
    return as
  end

次に、mrサーバでjoinしている部分を削除したい。
そのためにサーバ側のイベントループを使った非同期呼び出しをおこなってみよう。

  require "securerandom"
  def mr_async_on_sv_loop(add1, add2, add3)
    uuid = SecureRandom.uuid
    cli1 = MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port], $sv_loop)
    cli2 = MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port], $sv_loop)
    cli3 = MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port], $sv_loop)

    @res ||= {}
    @res[uuid] = {res: 0, finished: 0}
    as = MessagePack::RPC::AsyncResult.new
    cb = ->(c){
      ->(res){
        @res[uuid][:res] += res.get
        @res[uuid][:finished] += 1
        if @res[uuid][:finished] == 3
          as.result(@res[uuid][:res])
          @res.delete(uuid)
        end
        c.close
      }
    }
    cli1.call_async(:con_add, add1).attach_callback(cb.(cli1))
    cli2.call_async(:con_add, add2).attach_callback(cb.(cli2))
    cli3.call_async(:con_add, add3).attach_callback(cb.(cli3))
    return as
  end

こうするとサーバ側のイベントループでaddサーバのレスポンスを待ち受けるようになる。
サーバ側のイベントループは自分へのリクエストも待ち受けているため、addサーバのレスポンスを受け取る前にリクエストがわりこんだりする。

% time ruby client.rb 3 mr_async_with_sv_loop 'sleep(2);1+2' 2+3 3+4
15
15
15
... 2.359 total

このようにサーバ側のイベントループをなるべく使うようにすれば、わりとスケールする。
が、コールバック内の処理は逐次実行なので処理時間が短くなければイベントループが詰まってしまう。
そのため、コールバック内で非同期呼び出し&コールバック、の中でコールバック、といったコールバック地獄がおこる可能性がある。
もしくはDBへの接続はどうするかとか、時間がかかりそうな処理をどう対処していくかが問題になってくる。

mrサーバをスケールさせる(別スレッドで非同期呼び出し)

  # mr.rb
  def mr_async_on_thread(add1, add2, add3)
    clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
    clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
    clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])

    as = MessagePack::RPC::AsyncResult.new
    Thread.start(0, clis, add1, add2, add3) do |r, clis, add1, add2, add3|
      begin
        fs = [clis[0].callback(:con_add, add1){|res| r += res.get}]
        fs << clis[1].callback(:con_add, add2){|res| r += res.get}
        fs << clis[2].callback(:con_add, add3){|res| r += res.get}

        fs.map(&:join)
        as.result(r)
      ensure
        clis.map(&:close)
      end
    end
    return as
  end
% time ruby client.rb 3 mr_async_on_thread 'sleep(2);1+2' 2+3 3+4
15
15
15
... 2.947 total

かなり処理がシンプルになったが、前の例と比べてスレッド生成のコストが少しかかってしまった。
処理が長時間におよぶ場合はスレッド生成のコストなど微々たるものなので、上記のスレッドの例を使える。
この例のように、2秒もかかる処理ならスレッドの例の方がよい。
また、addサーバが落ちていた場合はcallにタイムアウト分の時間がかかる可能性もあるので、スレッドの例でも問題ないだろう。

スレッドをある程度プールしておけばある程度性能は上がるだろうが、そこまでする必要があるのか、という気もする。

clientはスレッドを使っても共通化できる?

サーバが使うクライアントは共有化したいとする。

  # mr.rb
  def mr_on_thread_with_shared_client(add1, add2, add3)
    @clis ||= [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port]),
               MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port]),
               MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])]

    as = MessagePack::RPC::AsyncResult.new
    Thread.start(0, @clis, add1, add2, add3) do |r, clis, add1, add2, add3|
      begin
        r += clis[0].call(:con_add, add1)
        r += clis[1].call(:con_add, add2)
        r += clis[2].call(:con_add, add3)
        as.result(r)
      end
    end
    return as
  end
$ ruby client.rb 3 mr_on_thread_with_shared_client 'sleep(2);1+2' 2+3 3+4

上記のサーバは上記コマンドで高確率で以下のエラーを吐く。

ruby: loop.c:192: Coolio_Loop_run_once: Assertion `loop_data->ev_loop && !loop_data->events_received' failed.
zsh: abort      ruby mr.rb 10000

要するに「イベントループ実行中に再度ループ実行しようとしている」というエラーだ。
mr_on_thread_with_shared_clientはスレッドを作って並行にclientのcallを呼び出す可能性があり、MessagePack::RPC::Clientはこのような使い方をしてはならない。
スレッドを使う場合は、そのスレッド毎にクライアントを作り、スレットが死んだ時にきちんとcloseするようにしよう。

サーバのイベントループを使った非同期呼び出しの方法であれば並行に動くことはないため共通化可能だ。

  def mr_async_on_sv_loop_with_shared_client(add1, add2, add3)
    uuid = SecureRandom.uuid
    @cli1 ||= MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port], $sv_loop)
    @cli2 ||= MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port], $sv_loop)
    @cli3 ||= MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port], $sv_loop)

    @res ||= {}
    @res[uuid] = {res: 0, finished: 0}
    as = MessagePack::RPC::AsyncResult.new
    cb = ->(res){
      begin
        @res[uuid][:res] += res.get
        @res[uuid][:finished] += 1
        if @res[uuid][:finished] == 3
          as.result(@res[uuid][:res])
          @res.delete(uuid)
        end
      end
    }
    @cli1.call_async(:con_add, add1).attach_callback(cb)
    @cli2.call_async(:con_add, add2).attach_callback(cb)
    @cli3.call_async(:con_add, add3).attach_callback(cb)
    return as
  end

呼び出し先サーバが落ちていた場合の対処

MessagePack-RPCクライアントの接続時間タイムアウトはデフォルトで10秒。

1つのaddサーバが落ちていても、他の2つのaddサーバにリクエストを投げたい場合は、非同期呼び出しでjoinする(mr.rbのmr_async()を参照)
タイムアウトの例外はres.getでraiseされるのでそれをrescueし、よしなに処理する。

1つでもサーバが落ちていたら残りのサーバにリクエストさせない場合、同期呼び出しする(mr.rbのmr_close_with_open()を参照)。
これはcallの時点でタイムアウトの例外がraiseされる。

計算の途中経過がクライアント側で知りたい

MessagePack-RPCにはHTTP/1.1でいうところのChunked transfer encodingがないので、ちょびちょび結果を受け取ることができない。
処理にものすごく時間がかかる場合は計算の途中結果を随時知りたい場合もあるはず。
そのため、途中経過をポーリングするようなAPIを新しく設けてやる。

  #mr.rb
  def mr_for_polling(add1, add2, add3)
    uuid = SecureRandom.uuid
    @res ||= {}
    @res[uuid] = {res: 0, finished: 0}
    clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
    clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
    clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])

    Thread.start(0, clis, add1, add2, add3) do |r, clis, add1, add2, add3|
      begin
        @res[uuid][:res] += clis[0].call(:con_add, add1)
        @res[uuid][:finished] += 1
        @res[uuid][:res] += clis[1].call(:con_add, add2)
        @res[uuid][:finished] += 1
        @res[uuid][:res] += clis[2].call(:con_add, add3)
        @res[uuid][:finished] += 1
        as.result(r)
      ensure
        clis.map(&:close)
      end
    end
    return uuid
  end

  def mr_for_polling_result(uuid)
    res = @res[uuid]
    if @res[uuid] && @res[uuid][:finished] == 3
      @res.delete(uuid)
    end
    return res
  end

クライアント側では以下のようにポーリングする。

# $ ruby client.rb method num args..
require 'msgpack/rpc'

concurrency = ARGV.shift.to_i
method = ARGV.shift
th = []
concurrency.times do
  th << Thread.start do
    MessagePack::RPC::Client.open('0.0.0.0', 10000) do |c|
      uuid = c.call(method, *ARGV)
      loop do
        r = c.call(method+"_result", uuid)
        p r
        break if r.nil?
        sleep 0.1
      end
    end
  end
end
th.map(&:join)
% ruby client_for_polling.rb 1 mr_for_polling '1+2' 2+3 3+4
{"res"=>0, "finished"=>0}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>3, "finished"=>1}
{"res"=>8, "finished"=>2}
{"res"=>8, "finished"=>2}
{"res"=>8, "finished"=>2}
{"res"=>8, "finished"=>2}
{"res"=>8, "finished"=>2}
{"res"=>8, "finished"=>2}
{"res"=>15, "finished"=>3}
nil

Msgpack-RPCはこの辺が一番いけてないと思いますよね(チラチラ

MessagePack-PPCサーバ側の処理時間が見積もれない場合

サーバ側の処理時間が見積もれない場合は以下の2つの選択肢がある。

  1. clientのタイムアウトをとんでもなく長くして対応する
  2. 前に述べたポーリングで結果を受け取っていく

1.は接続先サーバがおかしい場合(停止しているとき)にも影響するので、なるべくこの方法はとりたくない。
したがって、2.をつかってやることになる。

あぁ、いけてませんねぇ。いけてない。

まとめ

  • 同期・非同期呼び出しは、ほぼ同時に呼びたいか・呼びたくないかの違い
  • サーバをスケールさせる方法は以下の2種類
    • サーバのイベントループを使ってコールバックで頑張る方法
      • 並行に処理されないので、スレッドセーフとかそんなめんどいことを考えなくてよいという利点がある
    • スレッドを作る方法
  • クライアントのcloseには気を配る
  • クライアントを共通するさいの並行処理は注意
  • MessagePack-PPCサーバの処理時間はなるべく短くするのが正解
  • 処理時間が見積もれない場合
    • タイムアウトを長くする
    • ポーリングで結果を得る
      • こちらの方がよさそげ

追記: サーバのイベントループとクライアントのイベントループを同じにすると同期呼び出しできない

あ、そうだ。これを書くの忘れてた。
サーバ側のイベントループとクライアントのイベントループを同じにすると同期呼び出しできない。

  def mr_fail_on_sv_loop(add1, add2, add3)
    clis = [ADD1, ADD2, ADD3].each_with_object([]) do |add, clis|
      clis << MessagePack::RPC::Client.new(add[:host], add[:port], $sv_loop)
    end

    [add1, add2, add3].zip(clis).inject(0) do |r, (a, c)|
      r += c.call(:add, a)
    end
  end

# わかりやすいようにベタに書いてきたが、each_with_objectやzip,injectを使えばもっと綺麗に書ける。
実行するとmr.rbの方で以下のエラーを吐く。

ruby: loop.c:192: Coolio_Loop_run_once: Assertion `loop_data->ev_loop && !loop_data->events_received' failed.
zsh: abort      ruby mr.rb 10000

これはサーバの待受ループとクライアントの待受ループを1つのイベントループでおこなおうとするから。