D言語でアプリケーションサーバを書いていて、そういえば普段良く使っているRubyのPumaだと、どういう実装をされているんだろう? というのが気になったので少し読んでみる。
まず読み始めるのはPuma::Server
クラスから。
クラスのインスタンス化の際にinitialize
の中でスレッド数の下限と上限を変数で保持している。
...
def initialize(app, events=Events.stdio, options={})
...
@min_threads = 0
@max_threads = 16
@auto_trim_time = 30
@reaping_time = 1
@thread = nil
@thread_pool = nil
...
end
...
その数値を元にrun
メソッドでPuma::ThreadPool
クラスをインスタンス化し、ワーカスレッドを起動。ブロックに渡ってくる処理は一旦割愛。
最後に、handle_servers
でリクエストをワーカスレッドに引き渡す処理を始める。
...
def run(background=true)
...
@thread_pool = ThreadPool.new(@min_threads,
@max_threads,
IOBuffer) do |client, buffer|
...
end
...
if background
@thread = Thread.new { handle_servers }
return @thread
else
handle_servers
end
end
...
上で使われているPuma::ThreadPool
クラスのインスタンス化の際には何が行われているのかというと、Mutexで同期をとった上で最小ワーカスレッド数で指定されている数だけスレッドを立ち上げている。
...
def initialize(min, max, *extra, &block)
...
@mutex = Mutex.new
@todo = []
@spawned = 0
@waiting = 0
...
@min = Integer(min)
@max = Integer(max)
...
@workers = []
...
@mutex.synchronize do
@min.times { spawn_thread }
end
...
end
...
spawn_thread
はちょっと長いが、中で行われているのは、whileでのスレッドの処理ループだ。
initialize
の中で初期化された@todo
というインスタンス変数にワーカへの処理が積まれていくので、ワーカが起動している間(continue
がtrue)は、todo
に積まれているキューをshiftして、スレッドに渡されているブロックで実行している。また、todo
が空っぽの際はThread::ConditionVariableでスレッドをストップさせている。
また、作られたワーカスレッドは@workers
に追加される。
...
def spawn_thread
@spawned += 1
th = Thread.new(@spawned) do |spawned|
...
todo = @todo
...
while true
work = nil
...
mutex.synchronize do
while todo.empty?
...
@waiting += 1
not_full.signal
not_empty.wait mutex
@waiting -= 1
end
work = todo.shift if continue
end
...
begin
block.call(work, *extra)
rescue Exception => e
...
end
end
...
end
@workers << th
th
end
...
Puma::ThreadPool
には<<
のオペレータが定義されていて、インスタンスへtodo
を積み、スレッドを動かせるようになっている。
また、積む際に待っているワーカがいなければ、再び上で触れたspawn_thread
を実行して再び上限までワーカスレッドを起動しようとする。
...
todo
def <<(work)
@mutex.synchronize do
...
@todo << work
if @waiting < @todo.size and @spawned < @max
spawn_thread
end
@not_empty.signal
end
end
...
ちなみに、この<<
オペレータは一番最初に触れたPuma::Server
クラスの中にあるhandle_servers
メソッドの中で使われていて、ソケットで受けたリクエストからPuma::Cilent
クラスのインスタンスを生成し、その処理をスレッドプールに任せるために使われている模様。
...
def handle_servers
begin
...
pool = @thread_pool
...
while @status == :run
begin
ios = IO.select sockets
ios.first.each do |sock|
if sock == check
...
else
begin
if io = sock.accept_nonblock
client = Client.new io, @binder.env(sock)
...
pool << client
pool.wait_until_not_full
end
rescue SystemCallError
...
rescue Errno::ECONNABORTED
...
end
end
end
rescue Object => e
...
end
end
...
rescue Exception => e
...
ensure
...
end
...
end
...
そういえば、起動されたワーカが必要なくなったときの処理はどうしているんだろう? と思ったら、trim
というメソッドがPuma::ThreadPool
クラスの中にあるのを見つけた。このメソッドの中では待ちワーカがひとつ以上あり、かつワーカの数が下限を下回らない範囲で、ワーカスレッドを終了させるキュー(trim_requested
)を加算していく。
...
def trim(force=false)
@mutex.synchronize do
if (force or @waiting > 0) and @spawned - @trim_requested > @min
@trim_requested += 1
@not_empty.signal
end
end
end
...
@trim_requested
が加算されていくと、todo
が空っぽの際にワーカスレッドが自ら終了していく。この処理は上で触れたspawn_thread
の中のワーカスレッドの処理ループの中で行われている。
...
def spawn_thread
@spawned += 1
th = Thread.new(@spawned) do |spawned|
...
todo = @todo
...
while true
...
continue = true
mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
continue = false
not_full.signal
break
end
...
end
...
end
break unless continue
...
end
mutex.synchronize do
@spawned -= 1
@workers.delete th
end
end
@workers << th
th
end
...
Puma::Server
クラスのrun
メソッドは、ワーカスレッドを起動するのと同じタイミングで、デフォルト30秒のインターバル(@auto_trim_time
で指定されている)毎にtrim
メソッドを呼び出すPuma::ThreadPool::AutoTrim
クラスのインスタンスを準備する(auto_trim!
)
def initialize(app, events=Events.stdio, options={})
...
@min_threads = 0
@max_threads = 16
@auto_trim_time = 30
@reaping_time = 1
...
end
...
def run(background=true)
...
@thread_pool = ThreadPool.new(@min_threads,
@max_threads,
IOBuffer) do |client, buffer|
...
end
...
if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
...
end
Puma::ThreadPool::AutoTrim
クラスは@timeout
で指定されたインターバル毎に、Puma::ThreadPool
のtrim
メソッドを呼び出して余ったワーカスレッドの終了を試みる。
...
class AutoTrim
def initialize(pool, timeout)
@pool = pool
@timeout = timeout
@running = false
end
def start!
@running = true
@thread = Thread.new do
while @running
@pool.trim
sleep @timeout
end
end
end
...
end
def auto_trim!(timeout=30)
@auto_trim = AutoTrim.new(self, timeout)
@auto_trim.start!
end
...
Pumaでは、Puma::ThreadPool
クラスが、ワーカスレッドと、ワーカへの処理のキューを管理していて、キューが予約されるタイミングでワーカを増やしたり、減らしたりしているということが分かった。
今回は、スレッドプールの部分を中心に読んだだけなので、実装にリクエストをどう処理しているのか、クラスタリングの実装はどうなっているのか、あたりはまだしっかり読んでいない。けれども、既存のアプリケーションサーバの実装を読んでみることで、自分で1から作っていく際にどういう実装にするのがベターかというプラクティスを探るきっかけになりそうだな〜と思う◎