Hatena::Grouprubyist

takuma104のRuby/Rails日記

ツッコミ大歓迎!間違等ありましたらご指摘ください! / はてダはこちらで書いてます。

2008-05-07

Ruby/EventMachine での非同期サーバを Generator を使って同期サーバっぽく書く [その2]

| 03:28 | Ruby/EventMachine での非同期サーバを Generator を使って同期サーバっぽく書く [その2] - takuma104のRuby/Rails日記 を含むブックマーク はてなブックマーク - Ruby/EventMachine での非同期サーバを Generator を使って同期サーバっぽく書く [その2] - takuma104のRuby/Rails日記 Ruby/EventMachine での非同期サーバを Generator を使って同期サーバっぽく書く [その2] - takuma104のRuby/Rails日記 のブックマークコメント

昨日の続きで、さらに TCPServer での Thread-loop 内と全く同じメソッドが EventMachine でも使えるように改良してみました。

使い方は、

ruby hoge.rb -e

とか -e オプションをつけると、EventMachine を使用したサーバになり、-eをつけないと TCPServer を使用したサーバになります。

ポイントは、on_accept メソッドを両者で使い回しているところです。EventMachine のほうを socket に合わせる形にしている FilberIO クラスにほとんど秘密があります(この名前が微妙だな。なにかいい名前無いかな??)

ソースは以下です。


#!/usr/bin/env ruby -KU

require 'rubygems'
require 'eventmachine'
require 'socket'
require 'generator'
require 'optparse'

class FiberIO
  def initialize(connection)
    @recv_data = ''
    @connection = connection
    Generator.new do |g|
      @generator = g
      yield(self)
    end
  end
  
  def read(size)
    loop do
      d = pop(size)
      if d
        return d
      else
        remain = size - @recv_data.size
        @generator.yield(remain)
      end
    end
  end
  
  def write(buf)
    @connection.send_data buf
  end
  
  def push(data)
    @recv_data << data
    if @generator.next?
      @generator.next
    end
  end
  
  def close
    @connection.close_connection_after_writing
  end
  
  def peeraddr
    addr = Socket.unpack_sockaddr_in(@connection.get_peername)
    ["AF_INET", addr[0], addr[1], addr[1]]
  end
  
private
  def pop(size)
    if @recv_data.size >= size
      r = @recv_data[0..size-1]
      @recv_data = @recv_data[size..-1]
      r
    else
      nil
    end
  end
end

class GeneratorSampleConnection < EventMachine::Connection
  def initialize(arg)
    @proc = arg
  end
  
  def post_init
    @io = FiberIO.new(self) {|io| @proc.call(io)}
  end

  def receive_data(data)
    @io.push(data)
  end
end

def on_accept(sock)
  puts "Connected from %s:%d" % [sock.peeraddr[3],sock.peeraddr[1]]
  puts sock.read(10)
  puts "---"
  puts sock.read(20)
  puts "---END"
  sock.write ">> OK\r\n"
  sock.close
end

def loop_eventmachine(host, port)
  EventMachine::run do
    EventMachine.start_server(host, port, GeneratorSampleConnection, Proc.new do |io|
      on_accept(io)
    end)
    puts "Now accepting connections on address port #{port} (EventMachine)"
  end
end

def loop_tcpserver(port)
  gs = TCPServer.open(port)
  puts "Now accepting connections on address port #{port} (TCPServer)"
  loop do
    Thread.start(gs.accept) do |sock|
      on_accept(sock)
    end
  end
end

if $0 == __FILE__
  host = "127.0.0.1"
  options = {:p=>3793,:e=>false}
  
  opt = OptionParser.new
  opt.on('-p VAL') {|v| options[:p] = v.to_i }
  opt.on('-e') {|v| options[:e] = true }
  opt.parse!(ARGV)

  if ARGV.size != 0
    puts "-e      : use EventMachine (default=TCPServer)"
    puts "-p port : listening port (default=3793)"
  end
  
  if options[:e]
    loop_eventmachine(host, options[:p])
  else
    loop_tcpserver(options[:p])
  end
end
トラックバック - http://rubyist.g.hatena.ne.jp/takuma104/20080507

2008-05-06

Ruby/EventMachine での非同期サーバを Generator を使って同期サーバっぽく書く

| 04:24 | Ruby/EventMachine での非同期サーバを Generator を使って同期サーバっぽく書く - takuma104のRuby/Rails日記 を含むブックマーク はてなブックマーク - Ruby/EventMachine での非同期サーバを Generator を使って同期サーバっぽく書く - takuma104のRuby/Rails日記 Ruby/EventMachine での非同期サーバを Generator を使って同期サーバっぽく書く - takuma104のRuby/Rails日記 のブックマークコメント

Ruby/EventMachine という、非同期サーバがお手軽に書けるライブラリを教えてもらいました。ほかにも、Rev というのもあるようですが、今回試したのは、EventMachine のほう。Ruby/EventMachine は、現在の実装だと select(2) に落ちているようです。Rev はすでに epoll(2) 実装になっているようです。


まず、EventMachine のインストールは、

$ sudo gem install eventmachine

とかでいけます。ネイティブエクステンションとかも勝手にコンパイルとかしてくれます。gems 便利!

簡単な使い方は、Young risk taker.: [Ruby] Ruby/EventMachineでネットワークプログラミング などがあります。このサンプルがちょっと間違っててそのまま動かないので、修正して必要最小限にしたのは以下のようになります。

require 'rubygems'
require 'eventmachine'

class CharacterCount < EventMachine::Connection
  def post_init
    puts "Received a new connection"
  end

  def receive_data(data)
    puts "Received data: #{data}"
    send_data "#{data.length} (characters)\r\n"
    close_connection_after_writing
  end
end

EventMachine::run do
  host, port = "127.0.0.1", 3793
  EventMachine.start_server(host, port, CharacterCount)
  puts "Now accepting connections on address #{host}, port #{port}"
end

これだけで 127.0.0.1 の Port 3793 で、listen を開始して、Ctrl-C とかするまで動き続けるサーバーが出来ます。telnet とかで、3793 に接続して文字入力して改行を押すと、文字数とかが返ってくると思います。

CharacterCount クラスが、EventMachine::Connection クラスを継承していて、EventMachine.start_server にて登録されています。この CharacterCount のインスタンスが、TCPコネクションが張られるたびに自動生成されるようです。EventMachine::Connection のメソッドは、

  • post_init が、コネクション貼られたときに呼び出される
  • receive_data がデータを受信した時に呼び出される
  • データ送信は、send_data です。

のようになります。で、例えば同期サーバ (TCPServer) で以下のような処理を書きたい場合に、

require "socket"

port = 3793
gs = TCPServer.open(port)

loop do
  Thread.start(gs.accept) do |s|
    puts "Now accepting connections on address port #{port}"
    puts s.read(10)
    puts "---"
    puts s.read(20)
    puts "---END"
    s.write(">> OK\r\n")
    s.close
  end
end

これをそのままEventMachineのような非同期サーバにそのまま変更しようと思うと、結構大変です。非同期サーバの作り方として、よくありがちなのが、状態遷移マシンを作って、例えばある状態の時は、10バイトまで読み続けて、ある状態になったら、応答を返してとかですかね。でもそれだと、このまま同期サーバーを非同期サーバに移植しようと思うと大変で気が滅入ります。

ということで、Ruby1.8におけるFiber (Co-routine, microthreadなどの呼び名がありますが、基本思想的には同じものです) 機構の代替えになるような Generator を使って、同期サーバーっぽく書けるようにしてみました。まだまだ改善余地があると思いますが、とりあえず、これで動作しています。

reader_setup の引数ブロックが、Generator 内部になっているので、reader メソッドで、上記の同期サーバーとほぼ同じ書き方が出来ています。*1

ちなみに、Generator は、遅いそうなので、これではあまり高速という話にはならないかもしれません。(ちゃんと計測したりとかはまだしてません)Ruby 1.9 の Fiber でも同様なことは書けると思いますので、速さを求めるなら、Ruby 1.9 の Fiber でも良いかもしれません。

#!/usr/bin/env ruby -KU

require 'rubygems'
require 'eventmachine'
require 'socket'
require 'generator'

class GeneratorSampleConnection < EventMachine::Connection
  def initialize
    @recv_data = ''
    @reader_generator = nil
  end
  
  def post_init
    addr = Socket.unpack_sockaddr_in(get_peername)
    puts "Connected from %s:%d" % [addr[1],addr[0]]
    reader_setup { reader }
  end

  def receive_data(data)
    push(data)
  end

private
  def reader
    puts read_bytes(10)
    puts "---"
    puts read_bytes(20)
    puts "---END"
    send_data ">> OK\r\n"
    close_connection_after_writing
  end

private
  def reader_setup
    Generator.new do |g|
      @reader_generator = g
      yield
    end
  end

  def push(data)
    @recv_data << data
    if @reader_generator.next?
      @reader_generator.next
    end
  end
  
  def pop(size)
    if @recv_data.size >= size
      r = @recv_data[0..size-1]
      @recv_data = @recv_data[size..-1]
      r
    else
      nil
    end
  end  

  def read_bytes(size)
    loop do
      d = pop(size)
      if d
        return d
      else
        remain = size - @recv_data.size
        @reader_generator.yield(remain)
      end
    end
  end
end

EventMachine::run do
  host, port = "127.0.0.1", 3793
  EventMachine.start_server(host, port, GeneratorSampleConnection)
  puts "Now accepting connections on address #{host}, port #{port}"
end

*1:ちなみにEventMachineはすべてシングルスレッドで動作するので、あまりRubyのグリーンスレッドも使いたくなかった、というのもあります

takuma104takuma1042008/05/08 03:31EventMachineとはなにか、というところから解説を追記しました。

トラックバック - http://rubyist.g.hatena.ne.jp/takuma104/20080506