D言語でアプリケーションサーバを書いていて、そういえば普段良く使っているRubyのPumaだと、どういう実装をされているんだろう? というのが気になったので少し読んでみる。
server.rb
まず読み始めるのはPuma::Server
クラスから。
クラスのインスタンス化の際にinitialize
の中でスレッド数の下限と上限を変数で保持している。
... def initialize(app, events=Events.stdio, options={}) ... @min_threads = 0 # デフォルトの最小スレッド数 @max_threads = 16 # デフォルトの最大スレッド数 @auto_trim_time = 30 @reaping_time = 1 @thread = nil @thread_pool = nil ... end ...
その数値を元にrun
メソッドでPuma::ThreadPool
クラスをインスタンス化し、ワーカスレッドを起動。ブロックに渡ってくる処理は一旦割愛。
最後に、handle_servers
でリクエストをワーカスレッドに引き渡す処理を始める。
... def run(background=true) ... @thread_pool = ThreadPool.new(@min_threads, @max_threads, IOBuffer) do |client, buffer| ... end ... if background @thread = Thread.new { handle_servers } return @thread else handle_servers end end ...
thread_pool.rb
上で使われているPuma::ThreadPool
クラスのインスタンス化の際には何が行われているのかというと、Mutexで同期をとった上で最小ワーカスレッド数で指定されている数だけスレッドを立ち上げている。
... def initialize(min, max, *extra, &block) ... @mutex = Mutex.new @todo = [] # ワーカへの処理を積むキュー @spawned = 0 # 起動しているワーカの数 @waiting = 0 # todoを待っているワーカの数 ... @min = Integer(min) # min_threads @max = Integer(max) # max_threads ... @workers = [] # spawn_threadで起動されたワーカのアレイ ... @mutex.synchronize do @min.times { spawn_thread } # ワーカスレッドを起動 end ... end ...
spawn_thread
はちょっと長いが、中で行われているのは、whileでのスレッドの処理ループだ。
initialize
の中で初期化された@todo
というインスタンス変数にワーカへの処理が積まれていくので、ワーカが起動している間(continue
がtrue)は、todo
に積まれているキューをshiftして、スレッドに渡されているブロックで実行している。また、todo
が空っぽの際はThread::ConditionVariableでスレッドをストップさせている。
また、作られたワーカスレッドは@workers
に追加される。
... def spawn_thread @spawned += 1 th = Thread.new(@spawned) do |spawned| ... todo = @todo ... while true work = nil ... mutex.synchronize do while todo.empty? # ワーカが処理するものは何もない ... @waiting += 1 not_full.signal not_empty.wait mutex # todoが入ってくるまでスレッドを止める @waiting -= 1 end work = todo.shift if continue # todoからひとつもらってworkにいれる end ... begin block.call(work, *extra) # workの処理をスレッドに引き渡す。 rescue Exception => e ... end end ... end @workers << th # 作ったワーカスレッドを追加 th end ...
Puma::ThreadPool
には<<
のオペレータが定義されていて、インスタンスへtodo
を積み、スレッドを動かせるようになっている。
また、積む際に待っているワーカがいなければ、再び上で触れたspawn_thread
を実行して再び上限までワーカスレッドを起動しようとする。
... # Add +work+ to the todo list for a Thread to pickup and process. def <<(work) @mutex.synchronize do ... @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal # 空ではなくなったことを通知して、スレッドを動かす end end ...
ちなみに、この<<
オペレータは一番最初に触れたPuma::Server
クラスの中にあるhandle_servers
メソッドの中で使われていて、ソケットで受けたリクエストからPuma::Cilent
クラスのインスタンスを生成し、その処理をスレッドプールに任せるために使われている模様。
... def handle_servers begin ... pool = @thread_pool ... while @status == :run begin ios = IO.select sockets ios.first.each do |sock| if sock == check ... else begin if io = sock.accept_nonblock client = Client.new io, @binder.env(sock) ... pool << client # スレッドプールへtodoを積む pool.wait_until_not_full end rescue SystemCallError ... rescue Errno::ECONNABORTED ... end end end rescue Object => e ... end end ... rescue Exception => e ... ensure ... end ... end ...
そういえば、起動されたワーカが必要なくなったときの処理はどうしているんだろう? と思ったら、trim
というメソッドがPuma::ThreadPool
クラスの中にあるのを見つけた。このメソッドの中では待ちワーカがひとつ以上あり、かつワーカの数が下限を下回らない範囲で、ワーカスレッドを終了させるキュー(trim_requested
)を加算していく。
... # If too many threads are in the pool, tell one to finish go ahead # and exit. If +force+ is true, then a trim request is requested # even if all threads are being utilized. # def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end ...
@trim_requested
が加算されていくと、todo
が空っぽの際にワーカスレッドが自ら終了していく。この処理は上で触れたspawn_thread
の中のワーカスレッドの処理ループの中で行われている。
... def spawn_thread @spawned += 1 th = Thread.new(@spawned) do |spawned| ... todo = @todo ... while true ... continue = true # ワーカスレッドを起動し続けるかどうか mutex.synchronize do while todo.empty? if @trim_requested > 0 # 終了がリクエストされている @trim_requested -= 1 continue = false # ワーカスレッドの終了をセット not_full.signal break end ... end ... end break unless continue # ワーカスレッドの処理ループを抜ける。 ... end mutex.synchronize do @spawned -= 1 # 動いているワーカの数をひとつ減らす @workers.delete th # ワーカのアレイから削除 end end @workers << th th end ...
Puma::Server
クラスのrun
メソッドは、ワーカスレッドを起動するのと同じタイミングで、デフォルト30秒のインターバル(@auto_trim_time
で指定されている)毎にtrim
メソッドを呼び出すPuma::ThreadPool::AutoTrim
クラスのインスタンスを準備する(auto_trim!
)
def initialize(app, events=Events.stdio, options={}) ... @min_threads = 0 @max_threads = 16 @auto_trim_time = 30 # auto_trim!のインターバル @reaping_time = 1 ... end ... def run(background=true) ... @thread_pool = ThreadPool.new(@min_threads, @max_threads, IOBuffer) do |client, buffer| ... end ... if @auto_trim_time # インターバルが指定されていれば、auto_trimを始める。 @thread_pool.auto_trim!(@auto_trim_time) end ... end
Puma::ThreadPool::AutoTrim
クラスは@timeout
で指定されたインターバル毎に、Puma::ThreadPool
のtrim
メソッドを呼び出して余ったワーカスレッドの終了を試みる。
... class AutoTrim def initialize(pool, timeout) @pool = pool @timeout = timeout @running = false end def start! @running = true @thread = Thread.new do while @running @pool.trim sleep @timeout end end end ... end def auto_trim!(timeout=30) @auto_trim = AutoTrim.new(self, timeout) @auto_trim.start! end ...
Pumaでは、Puma::ThreadPool
クラスが、ワーカスレッドと、ワーカへの処理のキューを管理していて、キューが予約されるタイミングでワーカを増やしたり、減らしたりしているということが分かった。
今回は、スレッドプールの部分を中心に読んだだけなので、実装にリクエストをどう処理しているのか、クラスタリングの実装はどうなっているのか、あたりはまだしっかり読んでいない。けれども、既存のアプリケーションサーバの実装を読んでみることで、自分で1から作っていく際にどういう実装にするのがベターかというプラクティスを探るきっかけになりそうだな〜と思う◎