Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add TSubscriptionRef #2725

Merged
merged 10 commits into from
Oct 10, 2024
5 changes: 5 additions & 0 deletions .changeset/cold-cougars-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add TSubscriptionRef
5 changes: 5 additions & 0 deletions .changeset/shiny-squids-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add Stream.fromTQueue & Stream.fromTPubSub
37 changes: 37 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import type * as Sink from "./Sink.js"
import type * as Emit from "./StreamEmit.js"
import type * as HaltStrategy from "./StreamHaltStrategy.js"
import type * as Take from "./Take.js"
import type { TPubSub } from "./TPubSub.js"
import type { TDequeue } from "./TQueue.js"
import type * as Tracer from "./Tracer.js"
import type { Covariant, NoInfer, TupleOf } from "./Types.js"
import type * as Unify from "./Unify.js"
Expand Down Expand Up @@ -2013,6 +2015,30 @@ export const fromPubSub: {
): Stream<A>
} = internal.fromPubSub

/**
* Creates a stream from a subscription to a `TPubSub`.
*
* @param shutdown If `true`, the `TPubSub` will be shutdown after the stream is evaluated (defaults to `false`)
* @since 3.10.0
* @category constructors
*/
export const fromTPubSub: {
<A>(
pubsub: TPubSub<A>,
options: {
readonly scoped: true
readonly shutdown?: boolean | undefined
}
): Effect.Effect<Stream<A>, never, Scope.Scope>
<A>(
pubsub: TPubSub<A>,
options?: {
readonly scoped?: false | undefined
readonly shutdown?: boolean | undefined
}
): Stream<A>
} = internal.fromTPubSub

/**
* Creates a new `Stream` from an iterable collection of values.
*
Expand Down Expand Up @@ -2094,6 +2120,17 @@ export const fromQueue: <A>(
}
) => Stream<A> = internal.fromQueue

/**
* Creates a stream from a TQueue of values
*
* @since 3.10.0
* @category constructors
*/
export const fromTQueue: <A>(
queue: TDequeue<A>,
options?: { readonly shutdown?: boolean | undefined }
) => Stream<A> = internal.fromTQueue

/**
* Creates a stream from a `ReadableStream`.
*
Expand Down
9 changes: 9 additions & 0 deletions packages/effect/src/TPubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ export const isEmpty: <A>(self: TPubSub<A>) => STM.STM<boolean> = internal.isEmp
*/
export const isFull: <A>(self: TPubSub<A>) => STM.STM<boolean> = internal.isFull

/**
* Interrupts any fibers that are suspended on `offer` or `take`. Future calls
* to `offer*` and `take*` will be interrupted immediately.
*
* @since 2.0.0
* @category utils
*/
export const shutdown: <A>(self: TPubSub<A>) => STM.STM<void> = internal.shutdown

/**
* Returns `true` if `shutdown` has been called, otherwise returns `false`.
*
Expand Down
14 changes: 7 additions & 7 deletions packages/effect/src/TQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ export const isTEnqueue: (u: unknown) => u is TEnqueue<unknown> = internal.isTEn
* @since 2.0.0
* @category mutations
*/
export const awaitShutdown: <A>(self: TQueue<A>) => STM.STM<void> = internal.awaitShutdown
export const awaitShutdown: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<void> = internal.awaitShutdown

/**
* Creates a bounded queue with the back pressure strategy. The queue will
Expand All @@ -226,7 +226,7 @@ export const bounded: <A>(requestedCapacity: number) => STM.STM<TQueue<A>> = int
* @since 2.0.0
* @category getters
*/
export const capacity: <A>(self: TQueue<A>) => number = internal.capacity
export const capacity: <A>(self: TDequeue<A> | TEnqueue<A>) => number = internal.capacity

/**
* Creates a bounded queue with the dropping strategy. The queue will drop new
Expand All @@ -245,7 +245,7 @@ export const dropping: <A>(requestedCapacity: number) => STM.STM<TQueue<A>> = in
* @since 2.0.0
* @category getters
*/
export const isEmpty: <A>(self: TQueue<A>) => STM.STM<boolean> = internal.isEmpty
export const isEmpty: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<boolean> = internal.isEmpty

/**
* Returns `true` if the `TQueue` contains at least one element, `false`
Expand All @@ -254,15 +254,15 @@ export const isEmpty: <A>(self: TQueue<A>) => STM.STM<boolean> = internal.isEmpt
* @since 2.0.0
* @category getters
*/
export const isFull: <A>(self: TQueue<A>) => STM.STM<boolean> = internal.isFull
export const isFull: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<boolean> = internal.isFull

/**
* Returns `true` if `shutdown` has been called, otherwise returns `false`.
*
* @since 2.0.0
* @category getters
*/
export const isShutdown: <A>(self: TQueue<A>) => STM.STM<boolean> = internal.isShutdown
export const isShutdown: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<boolean> = internal.isShutdown
mikearnaldi marked this conversation as resolved.
Show resolved Hide resolved

/**
* Places one value in the queue.
Expand Down Expand Up @@ -345,7 +345,7 @@ export const seek: {
* @since 2.0.0
* @category mutations
*/
export const shutdown: <A>(self: TQueue<A>) => STM.STM<void> = internal.shutdown
export const shutdown: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<void> = internal.shutdown

/**
* Retrieves the size of the queue, which is equal to the number of elements
Expand All @@ -355,7 +355,7 @@ export const shutdown: <A>(self: TQueue<A>) => STM.STM<void> = internal.shutdown
* @since 2.0.0
* @category getters
*/
export const size: <A>(self: TQueue<A>) => STM.STM<number> = internal.size
export const size: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<number> = internal.size

/**
* Creates a bounded queue with the sliding strategy. The queue will add new
Expand Down
3 changes: 2 additions & 1 deletion packages/effect/src/TRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type * as TxnId from "./internal/stm/stm/txnId.js"
import type * as Versioned from "./internal/stm/stm/versioned.js"
import * as internal from "./internal/stm/tRef.js"
import type * as Option from "./Option.js"
import type { Pipeable } from "./Pipeable.js"
import type * as STM from "./STM.js"
import type * as Types from "./Types.js"

Expand Down Expand Up @@ -34,7 +35,7 @@ export type TRefTypeId = typeof TRefTypeId
* @since 2.0.0
* @category models
*/
export interface TRef<in out A> extends TRef.Variance<A> {
export interface TRef<in out A> extends TRef.Variance<A>, Pipeable {
/**
* Note: the method is unbound, exposed only for potential extensions.
*/
Expand Down
192 changes: 192 additions & 0 deletions packages/effect/src/TSubscriptionRef.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/**
* @since 3.10.0
*/
import type * as Effect from "./Effect.js"
import * as internal from "./internal/stm/tSubscriptionRef.js"
import type * as Option from "./Option.js"
import type * as Scope from "./Scope.js"
import type * as STM from "./STM.js"
import type * as Stream from "./Stream.js"
import type * as TPubSub from "./TPubSub.js"
import type * as TQueue from "./TQueue.js"
import type * as TRef from "./TRef.js"
import type * as Types from "./Types.js"

/**
* @since 3.10.0
* @category symbols
*/
export const TSubscriptionRefTypeId: unique symbol = internal.TSubscriptionRefTypeId

/**
* @since 3.10.0
* @category symbols
*/
export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId

/**
* A `TSubscriptionRef<A>` is a `TRef` that can be subscribed to in order to
* receive a `TDequeue<A>` of the current value and all committed changes to the value.
*
* @since 3.10.0
* @category models
*/
export interface TSubscriptionRef<in out A> extends TSubscriptionRef.Variance<A>, TRef.TRef<A> {
/** @internal */
readonly ref: TRef.TRef<A>
/** @internal */
readonly pubsub: TPubSub.TPubSub<A>
/** @internal */
modify<B>(f: (a: A) => readonly [B, A]): STM.STM<B>

/**
* A TDequeue containing the current value of the `Ref` as well as all changes
* to that value.
*/
readonly changes: STM.STM<TQueue.TDequeue<A>>
}

/**
* @since 3.10.0
*/
export declare namespace TSubscriptionRef {
/**
* @since 3.10.0
* @category models
*/
export interface Variance<in out A> {
readonly [TSubscriptionRefTypeId]: {
readonly _A: Types.Invariant<A>
}
}
}

/**
* @since 3.10.0
* @category mutations
*/
export const get: <A>(self: TSubscriptionRef<A>) => STM.STM<A> = internal.get

/**
* @since 3.10.0
* @category mutations
*/
export const getAndSet: {
<A>(value: A): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, value: A): STM.STM<A>
} = internal.getAndSet

/**
* @since 3.10.0
* @category mutations
*/
export const getAndUpdate: {
<A>(f: (a: A) => A): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, f: (a: A) => A): STM.STM<A>
} = internal.getAndUpdate

/**
* @since 3.10.0
* @category mutations
*/
export const getAndUpdateSome: {
<A>(f: (a: A) => Option.Option<A>): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, f: (a: A) => Option.Option<A>): STM.STM<A>
} = internal.getAndUpdateSome

/**
* @since 3.10.0
* @category constructors
*/
export const make: <A>(value: A) => STM.STM<TSubscriptionRef<A>> = internal.make

/**
* @since 3.10.0
* @category mutations
*/
export const modify: {
<A, B>(f: (a: A) => readonly [B, A]): (self: TSubscriptionRef<A>) => STM.STM<B>
<A, B>(self: TSubscriptionRef<A>, f: (a: A) => readonly [B, A]): STM.STM<B>
} = internal.modify

/**
* @since 3.10.0
* @category mutations
*/
export const modifySome: {
<A, B>(fallback: B, f: (a: A) => Option.Option<readonly [B, A]>): (self: TSubscriptionRef<A>) => STM.STM<B>
<A, B>(self: TSubscriptionRef<A>, fallback: B, f: (a: A) => Option.Option<readonly [B, A]>): STM.STM<B>
} = internal.modifySome

/**
* @since 3.10.0
* @category mutations
*/
export const set: {
<A>(value: A): (self: TSubscriptionRef<A>) => STM.STM<void>
<A>(self: TSubscriptionRef<A>, value: A): STM.STM<void>
} = internal.set

/**
* @since 3.10.0
* @category mutations
*/
export const setAndGet: {
<A>(value: A): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, value: A): STM.STM<A>
} = internal.setAndGet

/**
* @since 3.10.0
* @category mutations
*/
export const update: {
<A>(f: (a: A) => A): (self: TSubscriptionRef<A>) => STM.STM<void>
<A>(self: TSubscriptionRef<A>, f: (a: A) => A): STM.STM<void>
} = internal.update

/**
* @since 3.10.0
* @category mutations
*/
export const updateAndGet: {
<A>(f: (a: A) => A): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, f: (a: A) => A): STM.STM<A>
} = internal.updateAndGet

/**
* @since 3.10.0
* @category mutations
*/
export const updateSome: {
<A>(f: (a: A) => Option.Option<A>): (self: TSubscriptionRef<A>) => STM.STM<void>
<A>(self: TSubscriptionRef<A>, f: (a: A) => Option.Option<A>): STM.STM<void>
} = internal.updateSome

/**
* @since 3.10.0
* @category mutations
*/
export const updateSomeAndGet: {
<A>(f: (a: A) => Option.Option<A>): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, f: (a: A) => Option.Option<A>): STM.STM<A>
} = internal.updateSomeAndGet

/**
* @since 3.10.0
* @category mutations
*/
export const changesScoped: <A>(self: TSubscriptionRef<A>) => Effect.Effect<TQueue.TDequeue<A>, never, Scope.Scope> =
internal.changesScoped

/**
* @since 3.10.0
* @category mutations
*/
export const changesStream: <A>(self: TSubscriptionRef<A>) => Stream.Stream<A> = internal.changesStream

/**
* @since 3.10.0
* @category mutations
*/
export const changes: <A>(self: TSubscriptionRef<A>) => STM.STM<TQueue.TDequeue<A>> = (self) => self.changes
5 changes: 5 additions & 0 deletions packages/effect/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,11 @@ export * as TSemaphore from "./TSemaphore.js"
*/
export * as TSet from "./TSet.js"

/**
* @since 3.10.0
*/
export * as TSubscriptionRef from "./TSubscriptionRef.js"

/**
* @since 2.0.0
*/
Expand Down
3 changes: 1 addition & 2 deletions packages/effect/src/internal/stm/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ import { pipeArguments } from "../../Pipeable.js"
import { hasProperty } from "../../Predicate.js"
import type * as Scheduler from "../../Scheduler.js"
import type * as STM from "../../STM.js"
import { StreamTypeId } from "../../Stream.js"
import { YieldWrap } from "../../Utils.js"
import { ChannelTypeId } from "../core-stream.js"
import { withFiberRuntime } from "../core.js"
import { effectVariance } from "../effectable.js"
import { effectVariance, StreamTypeId } from "../effectable.js"
import { OP_COMMIT } from "../opCodes/effect.js"
import { SingleShotGen } from "../singleShotGen.js"
import { SinkTypeId } from "../sink.js"
Expand Down
1 change: 1 addition & 0 deletions packages/effect/src/internal/stm/tPubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class TPubSubSubscriptionImpl<in out A> implements TQueue.TDequeue<A> {
capacity(): number {
return this.requestedCapacity
}

size: STM.STM<number> = core.withSTMRuntime((runtime) => {
let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal)
if (currentSubscriberHead === undefined) {
Expand Down
Loading