diff --git a/Flow/Callbacker.swift b/Flow/Callbacker.swift index 01966e2..a581fc4 100644 --- a/Flow/Callbacker.swift +++ b/Flow/Callbacker.swift @@ -21,20 +21,22 @@ public final class Callbacker { private var callbacks = Callbacks.none private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } public init() { - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } } /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - mutex.lock() - defer { mutex.unlock() } + withMutex { $0.lock() } + defer { withMutex { $0.unlock() } } switch callbacks { case .none: return true @@ -46,8 +48,8 @@ public final class Callbacker { /// Register a callback to be called when `callAll` is executed. /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (Value) -> Void) -> Disposable { - mutex.lock() - defer { mutex.unlock() } + withMutex { $0.lock() } + defer { withMutex { $0.unlock() } } let key = generateKey() @@ -63,8 +65,8 @@ public final class Callbacker { } return NoLockKeyDisposer(key) { key in - self.mutex.lock() - defer { self.mutex.unlock() } + self.withMutex { $0.lock() } + defer { self.withMutex { $0.unlock() } } switch self.callbacks { case .single(let singleKey, _) where singleKey == key: @@ -82,9 +84,9 @@ public final class Callbacker { /// Will call all registered callbacks with `value` public func callAll(with value: Value) { - mutex.lock() + withMutex { $0.lock() } let callbacks = self.callbacks - mutex.unlock() + withMutex { $0.unlock() } switch callbacks { case .none: break diff --git a/Flow/Disposable.swift b/Flow/Disposable.swift index 9b3bcc3..5fa4b42 100644 --- a/Flow/Disposable.swift +++ b/Flow/Disposable.swift @@ -29,24 +29,26 @@ public struct NilDisposer: Disposable { public final class Disposer: Disposable { private var disposer: (() -> ())? private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } /// Pass a closure to be called when being disposed public init(_ disposer: @escaping () -> () = {}) { self.disposer = disposer - mutex.initialize() + withMutex { $0.initialize() } } deinit { dispose() - mutex.deinitialize() + withMutex { $0.deinitialize() } } public func dispose() { - mutex.lock() + withMutex { $0.lock() } let disposer = self.disposer self.disposer = nil - mutex.unlock() + withMutex { $0.unlock() } disposer?() } } @@ -59,48 +61,50 @@ public final class Disposer: Disposable { public final class DisposeBag: Disposable { private var disposables: [Disposable] private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } /// Create an empty instance public init() { self.disposables = [] - mutex.initialize() + withMutex { $0.initialize() } } /// Create an instance already containing `disposables` public init(_ disposables: S) where S.Iterator.Element == Disposable { self.disposables = Array(disposables) - mutex.initialize() + withMutex { $0.initialize() } } /// Create an instance already containing `disposables` public init(_ disposables: Disposable...) { self.disposables = disposables - mutex.initialize() + withMutex { $0.initialize() } } deinit { dispose() - mutex.deinitialize() + withMutex { $0.deinitialize() } } /// Returns true if there is currently no disposables to dispose. public var isEmpty: Bool { - return mutex.protect { disposables.isEmpty } + return withMutex { $0.protect { disposables.isEmpty } } } public func dispose() { - mutex.lock() + withMutex { $0.lock() } let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back. self.disposables = [] - mutex.unlock() + withMutex { $0.unlock() } for disposable in disposables { disposable.dispose() } } /// Add `disposable` to `self` public func add(_ disposable: Disposable) { - mutex.lock() - defer { mutex.unlock() } + withMutex { $0.lock() } + defer { withMutex { $0.unlock() } } disposables.append(disposable) } } diff --git a/Flow/Future+Combiners.swift b/Flow/Future+Combiners.swift index 26dcc41..1d39dd5 100644 --- a/Flow/Future+Combiners.swift +++ b/Flow/Future+Combiners.swift @@ -65,9 +65,9 @@ public func join(_ futures: [Future], cancelNonCompleted: Bool = true) -> var results = [T?](repeating: nil, count: futures.count) let mutex = Mutex() func onValue(_ i: Int, _ val: T) { - mutex.protect { - results[i] = val - } + mutex.lock() + results[i] = val + mutex.unlock() } var future = futures.first!.onValue(on: .none) { onValue(0, $0) } @@ -220,7 +220,9 @@ public final class SingleTaskPerformer { mutex.unlock() // unlock while calling out as we might either recurs or always might execute at once. let singleFuture = function().always(on: .none) { - self.mutex.protect { self.future = nil } + self.mutex.lock() + self.future = nil + self.mutex.unlock() } mutex.lock() @@ -233,7 +235,9 @@ public final class SingleTaskPerformer { } public var isPerforming: Bool { - return mutex.protect { self.future != nil } + mutex.lock() + defer { mutex.unlock() } + return self.future != nil } } diff --git a/Flow/Future.swift b/Flow/Future.swift index 19fad38..26a0b80 100644 --- a/Flow/Future.swift +++ b/Flow/Future.swift @@ -107,7 +107,7 @@ public final class Future { try onResult(completion, Mover(shouldClone: true)) } } - mutex.initialize() + withMutex { $0.initialize() } scheduler.async { do { @@ -143,13 +143,13 @@ public final class Future { state = .completed(result) clone = { Future(result: result) } - mutex.initialize() + withMutex { $0.initialize() } } deinit { OSAtomicDecrement32(&futureUnitTestAliveCount) memPrint("Future deinit", futureUnitTestAliveCount) - mutex.deinitialize() + withMutex { $0.deinitialize() } } } @@ -327,18 +327,20 @@ func memPrint(_ str: String, _ count: Int32) { } private extension Future { - var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } private var protectedState: State { - return mutex.protect { state } + return withMutex { $0.protect { state } } } func lock() { - mutex.lock() + withMutex { $0.lock() } } func unlock() { - mutex.unlock() + withMutex { $0.unlock() } } func completeWithResult(_ result: Result) { diff --git a/Flow/FutureQueue.swift b/Flow/FutureQueue.swift index 931f9e2..f058111 100644 --- a/Flow/FutureQueue.swift +++ b/Flow/FutureQueue.swift @@ -61,9 +61,9 @@ public extension FutureQueue { return Future { completion in let item = QueueItem(operation: operation, completion: completion) - self.mutex.protect { - self.items.append(item) - } + self.withMutex { $0.lock() } + self.items.append(item) + self.withMutex { $0.unlock() } self.executeNextItem() @@ -119,7 +119,7 @@ public extension FutureQueue { public extension FutureQueue { /// Do we have any enqueued operations? var isEmpty: Bool { - return mutex.protect { items.isEmpty } + return withMutex { $0.protect { items.isEmpty } } } /// Returns a signal that will signal when `isEmpty` is changed. @@ -164,19 +164,22 @@ public extension FutureQueue { /// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true. var closedError: Error? { - return mutex.protect { _closedError } + return withMutex { $0.protect { _closedError } } } } private extension FutureQueue { - var mutex: PThreadMutex { return PThreadMutex(&_mutex) } - func lock() { mutex.lock() } - func unlock() { mutex.unlock() } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } + + func lock() { withMutex { $0.lock() } } + func unlock() { withMutex { $0.unlock() } } func removeItem(_ item: Executable) { - mutex.protect { - _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } - } + withMutex { $0.lock() } + _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } + withMutex { $0.unlock() } } func executeNextItem() { @@ -188,9 +191,9 @@ private extension FutureQueue { unlock() item.execute(on: queueScheduler) { - self.mutex.protect { - self.concurrentCount -= 1 - } + self.lock() + self.concurrentCount -= 1 + self.unlock() self.removeItem(item) self.executeNextItem() } @@ -215,25 +218,27 @@ private final class QueueItem: Executable { private weak var future: Future? private var hasBeenCancelled = false private var _mutex = pthread_mutex_t() + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } init(operation: @escaping () throws -> Future, completion: @escaping (Result) -> ()) { self.completion = completion self.operation = operation - mutex.initialize() + withMutex { $0.initialize() } OSAtomicIncrement32(&queueItemUnitTestAliveCount) memPrint("Queue Item init", queueItemUnitTestAliveCount) } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } OSAtomicDecrement32(&queueItemUnitTestAliveCount) memPrint("Queue Item deinit", queueItemUnitTestAliveCount) } - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } - private func lock() { mutex.lock() } - private func unlock() { mutex.unlock() } + private func lock() { withMutex { $0.lock() } } + private func unlock() { withMutex { $0.unlock() } } private func complete(_ result: (Result)) { lock() diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 416be1b..8ca8472 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -11,32 +11,26 @@ import Foundation /// A reference wrapper around a POSIX thread mutex public final class Mutex { private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } public init() { - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } } /// Attempt to acquire the lock, blocking a thread’s execution until the lock can be acquired. public func lock() { - mutex.lock() + withMutex { $0.lock() } } /// Releases a previously acquired lock. public func unlock() { - mutex.unlock() - } - - /// Will lock `self`, call `block`, then unlock `self` - @discardableResult - public func protect(_ block: () throws -> T) rethrows -> T { - mutex.lock() - defer { mutex.unlock() } - return try block() + withMutex { $0.unlock() } } } @@ -87,16 +81,18 @@ final class StateAndCallback: Disposable { var val: State fileprivate var disposables = [Disposable]() private var _mutex = pthread_mutex_t() - fileprivate var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + fileprivate func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } init(state: State, callback: @escaping (Value) -> ()) { val = state self.callback = callback - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } dispose() } @@ -106,27 +102,27 @@ final class StateAndCallback: Disposable { } func lock() { - mutex.lock() + withMutex { $0.lock() } } func unlock() { - mutex.unlock() + withMutex { $0.unlock() } } @discardableResult func protect(_ block: () throws -> T) rethrows -> T { - mutex.lock() - defer { mutex.unlock() } + withMutex { $0.lock() } + defer { withMutex { $0.unlock() } } return try block() } func dispose() { - mutex.lock() + withMutex { $0.lock() } let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back. callback = nil exclusiveQueue = [] self.disposables = [] - mutex.unlock() + withMutex { $0.unlock() } for disposable in disposables { disposable.dispose() } } @@ -192,12 +188,12 @@ extension StateAndCallback where Value == () { func +=(bag: StateAndCallback, disposable: Disposable?) { guard let disposable = disposable else { return } - bag.mutex.lock() + bag.withMutex { $0.lock() } let hasBeenDisposed = bag.callback == nil if !hasBeenDisposed { bag.disposables.append(disposable) } - bag.mutex.unlock() + bag.withMutex { $0.unlock() } if hasBeenDisposed { disposable.dispose() } diff --git a/Flow/OrderedCallbacker.swift b/Flow/OrderedCallbacker.swift index 04e63ac..6c83100 100644 --- a/Flow/OrderedCallbacker.swift +++ b/Flow/OrderedCallbacker.swift @@ -15,19 +15,21 @@ import Foundation public final class OrderedCallbacker { private var callbacks: [Key: (OrderedValue, (CallbackValue) -> Future<()>)] = [:] private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } public init() { - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } } /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - return mutex.protect { callbacks.isEmpty } + return withMutex { $0.protect { callbacks.isEmpty } } } /// Register a callback and orderedValue to be called when `callAll` is executed. @@ -35,11 +37,13 @@ public final class OrderedCallbacker { /// - Parameter orderedValue: The value used to order this callback /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable { - return mutex.protect { - let key = generateKey() - callbacks[key] = (orderedValue, callback) - return Disposer { - self.mutex.protect { self.callbacks[key] = nil } + return withMutex { + $0.protect { + let key = generateKey() + callbacks[key] = (orderedValue, callback) + return Disposer { + self.withMutex { $0.protect { self.callbacks[key] = nil } } + } } } } @@ -48,9 +52,11 @@ public final class OrderedCallbacker { /// - Returns: A `Future` that will complete when all callbacks has been called. @discardableResult public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> { - return mutex.protect { - callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } - }.mapToFuture { $0(value) }.toVoid() + return withMutex { + $0.protect { + callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } + }.mapToFuture { $0(value) }.toVoid() + } } } diff --git a/Flow/Signal+Construction.swift b/Flow/Signal+Construction.swift index ee9697e..fc98885 100644 --- a/Flow/Signal+Construction.swift +++ b/Flow/Signal+Construction.swift @@ -113,27 +113,29 @@ private final class CallbackState: Disposable { let sharedKey: Key private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } init(shared: SharedState? = nil, getValue: (() -> Value)?, callback: @escaping (EventType) -> Void) { self.shared = shared self.sharedKey = shared == nil ? 0 : generateKey() self.getValue = getValue self.callback = callback - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } shared?.remove(key: sharedKey) } func lock() { - mutex.lock() + withMutex { $0.lock() } } func unlock() { - mutex.unlock() + withMutex { $0.unlock() } } // For efficiency `Self` could also also behave as a `NoLockKeyDisposer``, saving us an allocation for each listener. @@ -293,7 +295,9 @@ private final class CallbackState: Disposable { final class SharedState { private let getValue: (() -> Value)? private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } typealias Callback = (EventType) -> Void var firstCallback: (key: Key, value: Callback)? @@ -303,19 +307,19 @@ final class SharedState { init(getValue: (() -> Value)? = nil) { self.getValue = getValue - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } } func lock() { - mutex.lock() + withMutex { $0.lock() } } func unlock() { - mutex.unlock() + withMutex { $0.unlock() } } func remove(key: Key) { diff --git a/Flow/Signal+Scheduling.swift b/Flow/Signal+Scheduling.swift index 36bc088..14e959b 100644 --- a/Flow/Signal+Scheduling.swift +++ b/Flow/Signal+Scheduling.swift @@ -120,48 +120,51 @@ internal extension CoreSignal { private final class OnEventTypeDisposer: Disposable { private var disposable: Disposable? private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } + private let scheduler: Scheduler private var callback: ((EventType) -> Void)? init(on scheduler: Scheduler, callback: @escaping (EventType) -> Void, onEventType: @escaping (@escaping (EventType) -> Void) -> Disposable) { self.scheduler = scheduler self.callback = callback - mutex.initialize() + withMutex { $0.initialize() } let disposable = onEventType { [weak self] in self?.handleEventType($0) } - mutex.lock() + withMutex { $0.lock() } if self.callback == nil { disposable.dispose() } else { self.disposable = disposable } - mutex.unlock() + withMutex { $0.unlock() } } deinit { dispose() - mutex.deinitialize() + withMutex { $0.deinitialize() } } public func dispose() { - mutex.lock() + withMutex { $0.lock() } let disposable = self.disposable self.disposable = nil callback = nil - mutex.unlock() + withMutex { $0.unlock() } disposable?.dispose() } func handleEventType(_ eventType: EventType) { - mutex.lock() + withMutex { $0.lock() } guard let callback = self.callback else { - return mutex.unlock() + return withMutex { $0.unlock() } } - mutex.unlock() + withMutex { $0.unlock() } if scheduler.isImmediate { validate(eventType: eventType) @@ -176,14 +179,14 @@ private final class OnEventTypeDisposer: Disposable { scheduler.async { [weak self] in guard let `self` = self else { return } // At the time we are scheduled, we might already been disposed - self.mutex.lock() + self.withMutex { $0.lock() } guard let callback = self.callback else { - return self.mutex.unlock() + return self.withMutex { $0.unlock() } } self.validate(eventType: eventType) - self.mutex.unlock() + self.withMutex { $0.unlock() } callback(eventType) if case .event(.end) = eventType { self.dispose() diff --git a/Flow/Signal+Transforms.swift b/Flow/Signal+Transforms.swift index 8877b10..80359e3 100644 --- a/Flow/Signal+Transforms.swift +++ b/Flow/Signal+Transforms.swift @@ -741,14 +741,20 @@ private extension SignalProvider { let mutex = Mutex() var setter: ((T) -> ())? func setValue(_ value: T) { - let setValue = mutex.protect { setter ?? transform(signal.getter()!).setter! } + mutex.lock() + let setValue = setter ?? transform(signal.getter()!).setter! + mutex.unlock() setValue(value) } return CoreSignal(setValue: setValue, onEventType: { callback in let latestBag = DisposeBag() let bag = DisposeBag(latestBag) - bag += { mutex.protect { setter = nil } } + bag += { + mutex.lock() + setter = nil + mutex.unlock() + } bag += signal.onEventType(on: scheduler) { eventType in switch eventType { @@ -756,13 +762,17 @@ private extension SignalProvider { callback(.initial(nil)) case .initial(let val?): let signal = scheduler.sync { transform(val) } - mutex.protect { setter = signal.setter } + mutex.lock() + setter = signal.setter + mutex.unlock() latestBag += signal.onEventType(callback) case let .event(.value(val)): let isFirstEvent = latestBag.isEmpty latestBag.dispose() let signal = transform(val) - mutex.protect { setter = signal.setter } + mutex.lock() + setter = signal.setter + mutex.unlock() latestBag += signal.onEventType { eventType in switch eventType { case .initial(let val?) where KO.isReadable: diff --git a/FlowTests/FutureSchedulingTests.swift b/FlowTests/FutureSchedulingTests.swift index 17ecc64..16767ce 100644 --- a/FlowTests/FutureSchedulingTests.swift +++ b/FlowTests/FutureSchedulingTests.swift @@ -184,7 +184,9 @@ class FutureNewSchedulingTests: FutureTest { var f = Future(v).delay(by: delay) f = f.map(on: .concurrentBackground) { $0*2 } return f/*assertValue(v*2)*/.assert(on: .main).always(on: .concurrentBackground) { - mutex.protect { completeCount += 1 } + mutex.lock() + completeCount += 1 + mutex.unlock() } }).onCancel { e.fulfill() } diff --git a/FlowTests/SignalProviderTests.swift b/FlowTests/SignalProviderTests.swift index dcc2e31..590f856 100644 --- a/FlowTests/SignalProviderTests.swift +++ b/FlowTests/SignalProviderTests.swift @@ -1718,13 +1718,19 @@ class SignalProviderTests: XCTestCase { _ = Signal(callbacker: callbacker).start(with: 1).take(first: 2).onEvent(on: .concurrentBackground) { event in switch event { case .value(let val): - mutex.protect { result.append(val) } + mutex.lock() + result.append(val) + mutex.unlock() backgroundQueue.async { callbacker.callAll(with: val + 1) } - mutex.protect { result.append(val*10) } + mutex.lock() + result.append(val*10) + mutex.unlock() case .end: - XCTAssertEqual(mutex.protect { result }, [1, 10, 2, 20]) + mutex.lock() + XCTAssertEqual(result, [1, 10, 2, 20]) + mutex.unlock() } } } @@ -2718,13 +2724,15 @@ final class SimulatedTimer { func schedule(at time: TimeInterval, execute work: @escaping () -> ()) -> Disposable { let key = UUID() - mutex.protect { - assert(time >= self.time) - scheduledWork[key] = (time, work) - } + mutex.lock() + assert(time >= self.time) + scheduledWork[key] = (time, work) + mutex.unlock() return Disposer { - self.mutex.protect { self.scheduledWork[key] = nil } + self.mutex.lock() + self.scheduledWork[key] = nil + self.mutex.unlock() } } @@ -2748,7 +2756,9 @@ final class SimulatedTimer { //print("call", next) next.value.work() - mutex.protect { count -= 1 } + mutex.lock() + count -= 1 + mutex.unlock() mainQueue.async { self.release() } } }