Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create method ThreadPoolExecutor#active_count to expose the number of threads that are actively executing tasks #1002

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ module Concurrent
# The number of tasks that have been completed by the pool since construction.
# @return [Integer] The number of tasks that have been completed by the pool since construction.

# @!macro thread_pool_executor_method_active_count
# The number of threads that are actively executing tasks.
# @return [Integer] The number of threads that are actively executing tasks.

# @!macro thread_pool_executor_attr_reader_idletime
# The number of seconds that a thread may be idle before being reclaimed.
# @return [Integer] The number of seconds that a thread may be idle before being reclaimed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def completed_task_count
@executor.getCompletedTaskCount
end

# @!macro thread_pool_executor_method_active_count
def active_count
@executor.getActiveCount
end

# @!macro thread_pool_executor_attr_reader_idletime
def idletime
@executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ def completed_task_count
synchronize { @completed_task_count }
end

# @!macro thread_pool_executor_method_active_count
def active_count
synchronize do
@pool.length - @ready.length
end
end

# @!macro executor_service_method_can_overflow_question
def can_overflow?
synchronize { ns_limited_queue? }
Expand Down
47 changes: 39 additions & 8 deletions spec/concurrent/executor/thread_pool_executor_shared.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require_relative 'thread_pool_shared'
require 'concurrent/atomic/atomic_fixnum'
require 'concurrent/atomic/cyclic_barrier'

RSpec.shared_examples :thread_pool_executor do

Expand Down Expand Up @@ -258,6 +259,36 @@
end
end

context '#active_count' do
subject do
described_class.new(
min_threads: 5,
max_threads: 10,
idletime: 60,
max_queue: 0,
fallback_policy: :discard
)
end

it 'returns the number of threads that are actively executing tasks.' do
barrier = Concurrent::CyclicBarrier.new(4)
latch = Concurrent::CountDownLatch.new(1)

3.times do
subject.post do
barrier.wait
latch.wait
end
end
barrier.wait

expect(subject.active_count).to eq 3

# release
latch.count_down
end
end

context '#fallback_policy' do

let!(:min_threads){ 1 }
Expand Down Expand Up @@ -627,33 +658,33 @@
max_threads: 1,
max_queue: 1,
fallback_policy: :caller_runs)

worker_unblocker = Concurrent::CountDownLatch.new(1)
executor_unblocker = Concurrent::CountDownLatch.new(1)
queue_done = Concurrent::CountDownLatch.new(1)

# Block the worker thread
executor << proc { worker_unblocker.wait }

# Fill the queue
executor << proc { log.push :queued; queue_done.count_down }

# Block in a caller_runs job
caller_runs_thread = Thread.new {
executor << proc { executor_unblocker.wait; log.push :unblocked }
}

# Wait until the caller_runs job is blocked
Thread.pass until caller_runs_thread.status == 'sleep'

# Now unblock the worker thread
worker_unblocker.count_down
queue_done.wait
executor_unblocker.count_down

# Tidy up
caller_runs_thread.join

# We will see the queued jobs run before the caller_runs job unblocks
expect([log.pop, log.pop]).to eq [:queued, :unblocked]
end
Expand Down