Skip to content

Commit

Permalink
Remove warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
niil-ohlin committed Mar 30, 2020
1 parent e9da9fc commit c8d4a96
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 130 deletions.
24 changes: 13 additions & 11 deletions Flow/Callbacker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ public final class Callbacker<Value> {

private var callbacks = Callbacks.none
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private func withMutex<T>(_ body: (PThreadMutex) throws -> T) rethrows -> T {
try withUnsafeMutablePointer(to: &_mutex, body)
}

public init() {
mutex.initialize()
withMutex { $0.initialize() }
}

deinit {
mutex.deinitialize()
withMutex { $0.deinitialize() }
}

/// - Returns: True if no callbacks has been registered.
public var isEmpty: Bool {
mutex.lock()
defer { mutex.unlock() }
withMutex { $0.lock() }
defer { withMutex { $0.unlock() } }

switch callbacks {
case .none: return true
Expand All @@ -46,8 +48,8 @@ public final class Callbacker<Value> {
/// Register a callback to be called when `callAll` is executed.
/// - Returns: A `Disposable` to be disposed to unregister the callback.
public func addCallback(_ callback: @escaping (Value) -> Void) -> Disposable {
mutex.lock()
defer { mutex.unlock() }
withMutex { $0.lock() }
defer { withMutex { $0.unlock() } }

let key = generateKey()

Expand All @@ -63,8 +65,8 @@ public final class Callbacker<Value> {
}

return NoLockKeyDisposer(key) { key in
self.mutex.lock()
defer { self.mutex.unlock() }
self.withMutex { $0.lock() }
defer { self.withMutex { $0.unlock() } }

switch self.callbacks {
case .single(let singleKey, _) where singleKey == key:
Expand All @@ -82,9 +84,9 @@ public final class Callbacker<Value> {

/// Will call all registered callbacks with `value`
public func callAll(with value: Value) {
mutex.lock()
withMutex { $0.lock() }
let callbacks = self.callbacks
mutex.unlock()
withMutex { $0.unlock() }

switch callbacks {
case .none: break
Expand Down
34 changes: 19 additions & 15 deletions Flow/Disposable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,26 @@ public struct NilDisposer: Disposable {
public final class Disposer: Disposable {
private var disposer: (() -> ())?
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private func withMutex<T>(_ body: (PThreadMutex) throws -> T) rethrows -> T {
try withUnsafeMutablePointer(to: &_mutex, body)
}

/// Pass a closure to be called when being disposed
public init(_ disposer: @escaping () -> () = {}) {
self.disposer = disposer
mutex.initialize()
withMutex { $0.initialize() }
}

deinit {
dispose()
mutex.deinitialize()
withMutex { $0.deinitialize() }
}

public func dispose() {
mutex.lock()
withMutex { $0.lock() }
let disposer = self.disposer
self.disposer = nil
mutex.unlock()
withMutex { $0.unlock() }
disposer?()
}
}
Expand All @@ -59,48 +61,50 @@ public final class Disposer: Disposable {
public final class DisposeBag: Disposable {
private var disposables: [Disposable]
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private func withMutex<T>(_ body: (PThreadMutex) throws -> T) rethrows -> T {
try withUnsafeMutablePointer(to: &_mutex, body)
}

/// Create an empty instance
public init() {
self.disposables = []
mutex.initialize()
withMutex { $0.initialize() }
}

/// Create an instance already containing `disposables`
public init<S: Sequence>(_ disposables: S) where S.Iterator.Element == Disposable {
self.disposables = Array(disposables)
mutex.initialize()
withMutex { $0.initialize() }
}

/// Create an instance already containing `disposables`
public init(_ disposables: Disposable...) {
self.disposables = disposables
mutex.initialize()
withMutex { $0.initialize() }
}

deinit {
dispose()
mutex.deinitialize()
withMutex { $0.deinitialize() }
}

/// Returns true if there is currently no disposables to dispose.
public var isEmpty: Bool {
return mutex.protect { disposables.isEmpty }
return withMutex { $0.protect { disposables.isEmpty } }
}

public func dispose() {
mutex.lock()
withMutex { $0.lock() }
let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back.
self.disposables = []
mutex.unlock()
withMutex { $0.unlock() }
for disposable in disposables { disposable.dispose() }
}

/// Add `disposable` to `self`
public func add(_ disposable: Disposable) {
mutex.lock()
defer { mutex.unlock() }
withMutex { $0.lock() }
defer { withMutex { $0.unlock() } }
disposables.append(disposable)
}
}
Expand Down
14 changes: 9 additions & 5 deletions Flow/Future+Combiners.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public func join<T>(_ futures: [Future<T>], cancelNonCompleted: Bool = true) ->
var results = [T?](repeating: nil, count: futures.count)
let mutex = Mutex()
func onValue(_ i: Int, _ val: T) {
mutex.protect {
results[i] = val
}
mutex.lock()
results[i] = val
mutex.unlock()
}

var future = futures.first!.onValue(on: .none) { onValue(0, $0) }
Expand Down Expand Up @@ -220,7 +220,9 @@ public final class SingleTaskPerformer<Value> {

mutex.unlock() // unlock while calling out as we might either recurs or always might execute at once.
let singleFuture = function().always(on: .none) {
self.mutex.protect { self.future = nil }
self.mutex.lock()
self.future = nil
self.mutex.unlock()
}
mutex.lock()

Expand All @@ -233,7 +235,9 @@ public final class SingleTaskPerformer<Value> {
}

public var isPerforming: Bool {
return mutex.protect { self.future != nil }
mutex.lock()
defer { mutex.unlock() }
return self.future != nil
}
}

Expand Down
16 changes: 9 additions & 7 deletions Flow/Future.swift
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public final class Future<Value> {
try onResult(completion, Mover(shouldClone: true))
}
}
mutex.initialize()
withMutex { $0.initialize() }

scheduler.async {
do {
Expand Down Expand Up @@ -143,13 +143,13 @@ public final class Future<Value> {

state = .completed(result)
clone = { Future(result: result) }
mutex.initialize()
withMutex { $0.initialize() }
}

deinit {
OSAtomicDecrement32(&futureUnitTestAliveCount)
memPrint("Future deinit", futureUnitTestAliveCount)
mutex.deinitialize()
withMutex { $0.deinitialize() }
}
}

Expand Down Expand Up @@ -327,18 +327,20 @@ func memPrint(_ str: String, _ count: Int32) {
}

private extension Future {
var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private func withMutex<T>(_ body: (PThreadMutex) throws -> T) rethrows -> T {
try withUnsafeMutablePointer(to: &_mutex, body)
}

private var protectedState: State {
return mutex.protect { state }
return withMutex { $0.protect { state } }
}

func lock() {
mutex.lock()
withMutex { $0.lock() }
}

func unlock() {
mutex.unlock()
withMutex { $0.unlock() }
}

func completeWithResult(_ result: Result<Value>) {
Expand Down
43 changes: 24 additions & 19 deletions Flow/FutureQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public extension FutureQueue {
return Future { completion in
let item = QueueItem<Output>(operation: operation, completion: completion)

self.mutex.protect {
self.items.append(item)
}
self.withMutex { $0.lock() }
self.items.append(item)
self.withMutex { $0.unlock() }

self.executeNextItem()

Expand Down Expand Up @@ -119,7 +119,7 @@ public extension FutureQueue {
public extension FutureQueue {
/// Do we have any enqueued operations?
var isEmpty: Bool {
return mutex.protect { items.isEmpty }
return withMutex { $0.protect { items.isEmpty } }
}

/// Returns a signal that will signal when `isEmpty` is changed.
Expand Down Expand Up @@ -164,19 +164,22 @@ public extension FutureQueue {

/// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true.
var closedError: Error? {
return mutex.protect { _closedError }
return withMutex { $0.protect { _closedError } }
}
}

private extension FutureQueue {
var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
func lock() { mutex.lock() }
func unlock() { mutex.unlock() }
private func withMutex<T>(_ body: (PThreadMutex) throws -> T) rethrows -> T {
try withUnsafeMutablePointer(to: &_mutex, body)
}

func lock() { withMutex { $0.lock() } }
func unlock() { withMutex { $0.unlock() } }

func removeItem(_ item: Executable) {
mutex.protect {
_ = items.firstIndex { $0 === item }.map { items.remove(at: $0) }
}
withMutex { $0.lock() }
_ = items.firstIndex { $0 === item }.map { items.remove(at: $0) }
withMutex { $0.unlock() }
}

func executeNextItem() {
Expand All @@ -188,9 +191,9 @@ private extension FutureQueue {
unlock()

item.execute(on: queueScheduler) {
self.mutex.protect {
self.concurrentCount -= 1
}
self.lock()
self.concurrentCount -= 1
self.unlock()
self.removeItem(item)
self.executeNextItem()
}
Expand All @@ -215,25 +218,27 @@ private final class QueueItem<Output>: Executable {
private weak var future: Future<Output>?
private var hasBeenCancelled = false
private var _mutex = pthread_mutex_t()
private func withMutex<T>(_ body: (PThreadMutex) throws -> T) rethrows -> T {
try withUnsafeMutablePointer(to: &_mutex, body)
}

init(operation: @escaping () throws -> Future<Output>, completion: @escaping (Result<Output>) -> ()) {
self.completion = completion
self.operation = operation
mutex.initialize()
withMutex { $0.initialize() }

OSAtomicIncrement32(&queueItemUnitTestAliveCount)
memPrint("Queue Item init", queueItemUnitTestAliveCount)
}

deinit {
mutex.deinitialize()
withMutex { $0.deinitialize() }
OSAtomicDecrement32(&queueItemUnitTestAliveCount)
memPrint("Queue Item deinit", queueItemUnitTestAliveCount)
}

private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private func lock() { mutex.lock() }
private func unlock() { mutex.unlock() }
private func lock() { withMutex { $0.lock() } }
private func unlock() { withMutex { $0.unlock() } }

private func complete(_ result: (Result<Output>)) {
lock()
Expand Down
Loading

0 comments on commit c8d4a96

Please sign in to comment.