diff --git a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb index 4de512a5f..993c3f1e3 100644 --- a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb @@ -39,6 +39,10 @@ module Concurrent # The number of tasks that have been completed by the pool since construction. # @return [Integer] The number of tasks that have been completed by the pool since construction. + # @!macro thread_pool_executor_method_active_count + # The number of threads that are actively executing tasks. + # @return [Integer] The number of threads that are actively executing tasks. + # @!macro thread_pool_executor_attr_reader_idletime # The number of seconds that a thread may be idle before being reclaimed. # @return [Integer] The number of seconds that a thread may be idle before being reclaimed. diff --git a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb index 1213a95fb..598a5f91f 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb @@ -73,6 +73,11 @@ def completed_task_count @executor.getCompletedTaskCount end + # @!macro thread_pool_executor_method_active_count + def active_count + @executor.getActiveCount + end + # @!macro thread_pool_executor_attr_reader_idletime def idletime @executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 298dd7fed..9375acf38 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -61,6 +61,13 @@ def completed_task_count synchronize { @completed_task_count } end + # @!macro thread_pool_executor_method_active_count + def active_count + synchronize do + @pool.length - @ready.length + end + end + # @!macro executor_service_method_can_overflow_question def can_overflow? synchronize { ns_limited_queue? } diff --git a/spec/concurrent/executor/thread_pool_executor_shared.rb b/spec/concurrent/executor/thread_pool_executor_shared.rb index bb91b3d4b..1fbf8de6d 100644 --- a/spec/concurrent/executor/thread_pool_executor_shared.rb +++ b/spec/concurrent/executor/thread_pool_executor_shared.rb @@ -1,5 +1,6 @@ require_relative 'thread_pool_shared' require 'concurrent/atomic/atomic_fixnum' +require 'concurrent/atomic/cyclic_barrier' RSpec.shared_examples :thread_pool_executor do @@ -258,6 +259,36 @@ end end + context '#active_count' do + subject do + described_class.new( + min_threads: 5, + max_threads: 10, + idletime: 60, + max_queue: 0, + fallback_policy: :discard + ) + end + + it 'returns the number of threads that are actively executing tasks.' do + barrier = Concurrent::CyclicBarrier.new(4) + latch = Concurrent::CountDownLatch.new(1) + + 3.times do + subject.post do + barrier.wait + latch.wait + end + end + barrier.wait + + expect(subject.active_count).to eq 3 + + # release + latch.count_down + end + end + context '#fallback_policy' do let!(:min_threads){ 1 } @@ -627,33 +658,33 @@ max_threads: 1, max_queue: 1, fallback_policy: :caller_runs) - + worker_unblocker = Concurrent::CountDownLatch.new(1) executor_unblocker = Concurrent::CountDownLatch.new(1) queue_done = Concurrent::CountDownLatch.new(1) - + # Block the worker thread executor << proc { worker_unblocker.wait } - + # Fill the queue executor << proc { log.push :queued; queue_done.count_down } - + # Block in a caller_runs job caller_runs_thread = Thread.new { executor << proc { executor_unblocker.wait; log.push :unblocked } } - + # Wait until the caller_runs job is blocked Thread.pass until caller_runs_thread.status == 'sleep' - + # Now unblock the worker thread worker_unblocker.count_down queue_done.wait executor_unblocker.count_down - + # Tidy up caller_runs_thread.join - + # We will see the queued jobs run before the caller_runs job unblocks expect([log.pop, log.pop]).to eq [:queued, :unblocked] end