Skip to content

Commit

Permalink
Revert back to shared implementation of "locals" storage array.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jan 12, 2023
1 parent 21cea96 commit 3efea70
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 18 deletions.
9 changes: 6 additions & 3 deletions lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
require 'concurrent/constants'
require_relative 'locals'

module Concurrent
class FiberLocalVar
LOCALS = FiberLocals.new(:concurrent_fiber_local_var)

def initialize(default = nil, &default_block)
if default && block_given?
raise ArgumentError, "Cannot use both value and block as default value"
Expand All @@ -15,17 +18,17 @@ def initialize(default = nil, &default_block)
@default = default
end

@name = :"concurrent_variable_#{object_id}"
@index = LOCALS.next_index(self)
end

# @!macro thread_local_var_method_get
def value
Thread.current.fetch(@name) {default}
LOCALS.fetch(@index) {default}
end

# @!macro thread_local_var_method_set
def value=(value)
Thread.current[@name] = value
LOCALS.set(@index, value)
end

# @!macro thread_local_var_method_bind
Expand Down
179 changes: 179 additions & 0 deletions lib/concurrent-ruby/concurrent/atomic/locals.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
require 'concurrent/constants'

module Concurrent
# @!visibility private
# @!macro internal_implementation_note
#
# An abstract implementation of local storage, with sub-classes for
# per-thread and per-fiber locals.
#
# Each execution context (EC, thread or fiber) has a lazily initialized array
# of local variable values. Each time a new local variable is created, we
# allocate an "index" for it.
#
# For example, if the allocated index is 1, that means slot #1 in EVERY EC's
# locals array will be used for the value of that variable.
#
# The good thing about using a per-EC structure to hold values, rather than
# a global, is that no synchronization is needed when reading and writing
# those values (since the structure is only ever accessed by a single
# thread).
#
# Of course, when a local variable is GC'd, 1) we need to recover its index
# for use by other new local variables (otherwise the locals arrays could
# get bigger and bigger with time), and 2) we need to null out all the
# references held in the now-unused slots (both to avoid blocking GC of those
# objects, and also to prevent "stale" values from being passed on to a new
# local when the index is reused).
#
# Because we need to null out freed slots, we need to keep references to
# ALL the locals arrays, so we can null out the appropriate slots in all of
# them. This is why we need to use a finalizer to clean up the locals array
# when the EC goes out of scope.
class AbstractLocals
def initialize(name_prefix = :concurrent_locals)
@free = []
@lock = Mutex.new
@all_locals = {}
@next = 0

@name = :"#{name_prefix}_#{object_id}"
end

def synchronize
@lock.synchronize { yield }
end

if Concurrent.on_cruby?
def weak_synchronize
yield
end
else
alias_method :weak_synchronize, :synchronize
end

def next_index(target)
index = synchronize do
if @free.empty?
@next += 1
else
@free.pop
end
end

# When the target goes out of scope, we should free the associated index and all values stored into it.
ObjectSpace.define_finalizer(target, target_finalizer(index))

return index
end

def free_index(index)
weak_synchronize do
@all_locals.values.each do |locals|
locals[index] = nil
end

@free << index
end
end

def fetch(index, default = nil)
if locals = self.locals
value = locals[index]
end

if value.nil?
if block_given?
yield
else
default
end
elsif value.equal?(NULL)
nil
else
value
end
end

def set(index, value)
locals = self.locals!
locals[index] = (value.nil? ? NULL : value)

value
end

private

# When the target index goes out of scope, clean up that slot across all locals currently assigned.
def target_finalizer(index)
proc do
free_index(index)
end
end

# When a target (locals) goes out of scope, delete the locals from all known locals.
def locals_finalizer(locals_object_id)
proc do |locals_id|
weak_synchronize do
@all_locals.delete(locals_object_id)
end
end
end

# Returns the locals for the current scope, or nil if none exist.
def locals
raise NotImplementedError
end

# Returns the locals for the current scope, creating them if necessary.
def locals!
raise NotImplementedError
end
end

# An array-backed storage of indexed variables per thread.
class ThreadLocals < AbstractLocals
def locals
Thread.current.thread_variable_get(@name)
end

def locals!
thread = Thread.current
locals = thread.thread_variable_get(@name)

unless locals
locals = thread.thread_variable_set(@name, [])
weak_synchronize do
@all_locals[locals.object_id] = locals
# When the thread goes out of scope, we should delete the associated locals:
ObjectSpace.define_finalizer(thread, locals_finalizer(locals.object_id))
end
end

return locals
end
end

# An array-backed storage of indexed variables per fiber.
class FiberLocals < AbstractLocals
def locals
Thread.current[@name]
end

def locals!
thread = Thread.current
locals = thread[@name]

unless locals
locals = thread[@name] = []
weak_synchronize do
@all_locals[locals.object_id] = locals
# When the thread goes out of scope, we should delete the associated locals:
ObjectSpace.define_finalizer(Fiber.current, locals_finalizer(locals.object_id))
end
end

return locals
end
end
end
21 changes: 6 additions & 15 deletions lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
require 'concurrent/constants'
require_relative 'locals'

module Concurrent

# @!macro thread_local_var
class ThreadLocalVar
LOCALS = ThreadLocals.new(:concurrent_fiber_local_var)

# @!macro thread_local_var_method_initialize
def initialize(default = nil, &default_block)
if default && block_given?
Expand All @@ -18,29 +21,17 @@ def initialize(default = nil, &default_block)
@default = default
end

@name = :"concurrent_variable_#{object_id}"
@index = LOCALS.next_index(self)
end

# @!macro thread_local_var_method_get
def value
value = Thread.current.thread_variable_get(@name)

if value.nil?
default
elsif value.equal?(NULL)
nil
else
value
end
LOCALS.fetch(@index) {default}
end

# @!macro thread_local_var_method_set
def value=(value)
if value.nil?
value = NULL
end

Thread.current.thread_variable_set(@name, value)
LOCALS.set(@index, value)
end

# @!macro thread_local_var_method_bind
Expand Down

0 comments on commit 3efea70

Please sign in to comment.