Skip to content

Commit

Permalink
Various fixes for ThreadLocalVar and FiberLocalVar
Browse files Browse the repository at this point in the history
  • Loading branch information
eregon committed Jan 23, 2023
1 parent 4959d76 commit 87ad3a8
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 117 deletions.
93 changes: 57 additions & 36 deletions lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,50 @@
require_relative 'locals'

module Concurrent
class FiberLocalVar
LOCALS = FiberLocals.new(:concurrent_fiber_local_var)

# @!macro fiber_local_var_method_initialize
#
# Creates a fiber local variable.
#
# @param [Object] default the default value when otherwise unset
# @param [Proc] default_block Optional block that gets called to obtain the
# default value for each fiber

# @!macro fiber_local_var_method_get
#
# Returns the value in the current fiber's copy of this fiber-local variable.
#
# @return [Object] the current value

# @!macro fiber_local_var_method_set
#
# Sets the current fiber's copy of this fiber-local variable to the specified value.
#
# @param [Object] value the value to set
# @return [Object] the new value
# A `FiberLocalVar` is a variable where the value is different for each fiber.
# Each variable may have a default value, but when you modify the variable only
# the current fiber will ever see that change.
#
# This is similar to Ruby's built-in fiber-local variables (`Thread.current[:name]`),
# but with these major advantages:
# * `FiberLocalVar` has its own identity, it doesn't need a Symbol.
# * Each Ruby's built-in fiber-local variable leaks some memory forever (it's a Symbol held forever on the fiber),
# so it's only OK to create a small amount of them.
# `FiberLocalVar` has no such issue and it is fine to create many of them.
# * Ruby's built-in fiber-local variables leak forever the value set on each fiber (unless set to nil explicitly).
# `FiberLocalVar` automatically removes the mapping for each fiber once the `FiberLocalVar` instance is GC'd.
#
# @example
# v = FiberLocalVar.new(14)
# v.value #=> 14
# v.value = 2
# v.value #=> 2
#
# @example
# v = FiberLocalVar.new(14)
#
# Fiber.new do
# v.value #=> 14
# v.value = 1
# v.value #=> 1
# end.resume
#
# Fiber.new do
# v.value #=> 14
# v.value = 2
# v.value #=> 2
# end.resume
#
# v.value #=> 14
class FiberLocalVar
LOCALS = FiberLocals.new

# @!macro fiber_local_var_method_bind
# Creates a fiber local variable.
#
# Bind the given value to fiber local storage during
# execution of the given block.
#
# @param [Object] value the value to bind
# @yield the operation to be performed with the bound variable
# @return [Object] the value

# @param [Object] default the default value when otherwise unset
# @param [Proc] default_block Optional block that gets called to obtain the
# default value for each fiber
def initialize(default = nil, &default_block)
if default && block_given?
raise ArgumentError, "Cannot use both value and block as default value"
Expand All @@ -51,22 +62,32 @@ def initialize(default = nil, &default_block)
@index = LOCALS.next_index(self)
end

# @!macro fiber_local_var_method_get
# Returns the value in the current fiber's copy of this fiber-local variable.
#
# @return [Object] the current value
def value
LOCALS.fetch(@index) {default}
LOCALS.fetch(@index) { default }
end

# @!macro fiber_local_var_method_set
# Sets the current fiber's copy of this fiber-local variable to the specified value.
#
# @param [Object] value the value to set
# @return [Object] the new value
def value=(value)
LOCALS.set(@index, value)
end

# @!macro fiber_local_var_method_bind
def bind(value, &block)
# Bind the given value to fiber local storage during
# execution of the given block.
#
# @param [Object] value the value to bind
# @yield the operation to be performed with the bound variable
# @return [Object] the value
def bind(value)
if block_given?
old_value = self.value
self.value = value
begin
self.value = value
yield
ensure
self.value = old_value
Expand Down
77 changes: 36 additions & 41 deletions lib/concurrent-ruby/concurrent/atomic/locals.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@ module Concurrent
# them. This is why we need to use a finalizer to clean up the locals array
# when the EC goes out of scope.
class AbstractLocals
def initialize(name_prefix = :concurrent_locals)
def initialize
@free = []
@lock = Mutex.new
@all_locals = {}
@all_arrays = {}
@next = 0

@name = :"#{name_prefix}_#{object_id}"
end

def synchronize
Expand All @@ -53,7 +51,7 @@ def weak_synchronize
alias_method :weak_synchronize, :synchronize
end

def next_index(target)
def next_index(local)
index = synchronize do
if @free.empty?
@next += 1
Expand All @@ -62,11 +60,11 @@ def next_index(target)
end
end

# When the target goes out of scope, we should free the associated index
# When the local goes out of scope, we should free the associated index
# and all values stored into it.
ObjectSpace.define_finalizer(target, target_finalizer(index))
ObjectSpace.define_finalizer(local, local_finalizer(index))

return index
index
end

def free_index(index)
Expand All @@ -78,7 +76,7 @@ def free_index(index)
#
# DO NOT use each_value which might conflict with new pair assignment
# into the hash in #set method.
@all_locals.values.each do |locals|
@all_arrays.values.each do |locals|
locals[index] = nil
end

Expand All @@ -87,18 +85,13 @@ def free_index(index)
end
end

def fetch(index, default = nil)
if locals = self.locals
value = locals[index]
end
def fetch(index)
locals = self.locals
value = locals ? locals[index] : nil

if value.nil?
if block_given?
yield
else
default
end
elsif value.equal?(NULL)
if nil == value
yield
elsif NULL.equal?(value)
nil
else
value
Expand All @@ -107,25 +100,25 @@ def fetch(index, default = nil)

def set(index, value)
locals = self.locals!
locals[index] = (value.nil? ? NULL : value)
locals[index] = (nil == value ? NULL : value)

value
end

private

# When the target index goes out of scope, clean up that slot across all locals currently assigned.
def target_finalizer(index)
# When the local goes out of scope, clean up that slot across all locals currently assigned.
def local_finalizer(index)
proc do
free_index(index)
end
end

# When a target (locals) goes out of scope, delete the locals from all known locals.
def locals_finalizer(locals_object_id)
proc do |locals_id|
# When a thread/fiber goes out of scope, remove the array from @all_arrays.
def thread_fiber_finalizer(array_object_id)
proc do
weak_synchronize do
@all_locals.delete(locals_object_id)
@all_arrays.delete(array_object_id)
end
end
end
Expand All @@ -146,23 +139,23 @@ def locals!
# An array-backed storage of indexed variables per thread.
class ThreadLocals < AbstractLocals
def locals
Thread.current.thread_variable_get(@name)
Thread.current.thread_variable_get(:concurrent_thread_locals)
end

def locals!
thread = Thread.current
locals = thread.thread_variable_get(@name)
locals = thread.thread_variable_get(:concurrent_thread_locals)

unless locals
locals = thread.thread_variable_set(@name, [])
locals = thread.thread_variable_set(:concurrent_thread_locals, [])
weak_synchronize do
@all_locals[locals.object_id] = locals
# When the thread goes out of scope, we should delete the associated locals:
ObjectSpace.define_finalizer(thread, locals_finalizer(locals.object_id))
@all_arrays[locals.object_id] = locals
end
# When the thread goes out of scope, we should delete the associated locals:
ObjectSpace.define_finalizer(thread, thread_fiber_finalizer(locals.object_id))
end

return locals
locals
end
end

Expand All @@ -171,23 +164,25 @@ def locals!
# An array-backed storage of indexed variables per fiber.
class FiberLocals < AbstractLocals
def locals
Thread.current[@name]
Thread.current[:concurrent_fiber_locals]
end

def locals!
thread = Thread.current
locals = thread[@name]
locals = thread[:concurrent_fiber_locals]

unless locals
locals = thread[@name] = []
locals = thread[:concurrent_fiber_locals] = []
weak_synchronize do
@all_locals[locals.object_id] = locals
# When the thread goes out of scope, we should delete the associated locals:
ObjectSpace.define_finalizer(Fiber.current, locals_finalizer(locals.object_id))
@all_arrays[locals.object_id] = locals
end
# When the fiber goes out of scope, we should delete the associated locals:
ObjectSpace.define_finalizer(Fiber.current, thread_fiber_finalizer(locals.object_id))
end

return locals
locals
end
end

private_constant :AbstractLocals, :ThreadLocals, :FiberLocals
end
11 changes: 9 additions & 2 deletions lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
require 'concurrent/utility/engine'
require_relative 'fiber_local_var'
require_relative 'thread_local_var'

module Concurrent
# @!visibility private
def self.mutex_owned_per_thread?
mutex = Mutex.new
return false if Concurrent.on_jruby? || Concurrent.on_truffleruby?

mutex = Mutex.new
# Lock the mutex:
mutex.synchronize do
# Check if the mutex is still owned in a child fiber:
Fiber.new{mutex.owned?}.resume
Fiber.new { mutex.owned? }.resume
end
end

Expand All @@ -18,4 +20,9 @@ def self.mutex_owned_per_thread?
else
LockLocalVar = FiberLocalVar
end

# Either {FiberLocalVar} or {ThreadLocalVar} depending on whether Mutex (and Monitor)
# are held, respectively, per Fiber or per Thread.
class LockLocalVar
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def initialize
@Counter = AtomicFixnum.new(0) # single integer which represents lock state
@ReadQueue = Synchronization::Lock.new # used to queue waiting readers
@WriteQueue = Synchronization::Lock.new # used to queue waiting writers
@HeldCount = LockLocalVar.new(0) # indicates # of R & W locks held by this thread
@HeldCount = LockLocalVar.new(0) # indicates # of R & W locks held by this thread
end

# Execute a block operation within a read lock.
Expand Down
Loading

0 comments on commit 87ad3a8

Please sign in to comment.