diff --git a/.changeset/tough-lobsters-guess.md b/.changeset/tough-lobsters-guess.md new file mode 100644 index 00000000000..50a130bd188 --- /dev/null +++ b/.changeset/tough-lobsters-guess.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +add option to .releaseLock a ReadableStream on finalization diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index bbcf0bd7026..b06fd6ca747 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2120,10 +2120,16 @@ export const fromTQueue: (queue: TDequeue) => Stream = internal.fromTQu * @since 2.0.0 * @category constructors */ -export const fromReadableStream: ( - evaluate: LazyArg>, - onError: (error: unknown) => E -) => Stream = internal.fromReadableStream +export const fromReadableStream: { + ( + options: { + readonly evaluate: LazyArg> + readonly onError: (error: unknown) => E + readonly releaseLockOnEnd?: boolean | undefined + } + ): Stream + (evaluate: LazyArg>, onError: (error: unknown) => E): Stream +} = internal.fromReadableStream /** * Creates a stream from a `ReadableStreamBYOBReader`. @@ -2134,11 +2140,21 @@ export const fromReadableStream: ( * @since 2.0.0 * @category constructors */ -export const fromReadableStreamByob: ( - evaluate: LazyArg>, - onError: (error: unknown) => E, - allocSize?: number -) => Stream = internal.fromReadableStreamByob +export const fromReadableStreamByob: { + ( + options: { + readonly evaluate: LazyArg> + readonly onError: (error: unknown) => E + readonly bufferSize?: number | undefined + readonly releaseLockOnEnd?: boolean | undefined + } + ): Stream + ( + evaluate: LazyArg>, + onError: (error: unknown) => E, + allocSize?: number + ): Stream +} = internal.fromReadableStreamByob /** * Creates a stream from a `Schedule` that does not require any further diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 76805839833..76bc6bbbf48 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -18,7 +18,7 @@ import * as MergeDecision from "../MergeDecision.js" import * as Option from "../Option.js" import type * as Order from "../Order.js" import { pipeArguments } from "../Pipeable.js" -import { hasProperty, isTagged, type Predicate, type Refinement } from "../Predicate.js" +import { hasProperty, type Predicate, type Refinement } from "../Predicate.js" import * as PubSub from "../PubSub.js" import * as Queue from "../Queue.js" import * as RcRef from "../RcRef.js" @@ -3261,14 +3261,38 @@ export const fromSchedule = (schedule: Schedule.Schedule): ) /** @internal */ -export const fromReadableStream = ( - evaluate: LazyArg>, - onError: (error: unknown) => E -): Stream.Stream => - unwrapScoped(Effect.map( +export const fromReadableStream: { + ( + options: { + readonly evaluate: LazyArg> + readonly onError: (error: unknown) => E + readonly releaseLockOnEnd?: boolean | undefined + } + ): Stream.Stream + ( + evaluate: LazyArg>, + onError: (error: unknown) => E + ): Stream.Stream +} = ( + ...args: [options: { + readonly evaluate: LazyArg> + readonly onError: (error: unknown) => E + readonly releaseLockOnEnd?: boolean | undefined + }] | [ + evaluate: LazyArg>, + onError: (error: unknown) => E + ] +): Stream.Stream => { + const evaluate = args.length === 1 ? args[0].evaluate : args[0] + const onError = args.length === 1 ? args[0].onError : args[1] + const releaseLockOnEnd = args.length === 1 ? args[0].releaseLockOnEnd === true : false + return unwrapScoped(Effect.map( Effect.acquireRelease( Effect.sync(() => evaluate().getReader()), - (reader) => Effect.promise(() => reader.cancel()) + (reader) => + releaseLockOnEnd + ? Effect.sync(() => reader.releaseLock()) + : Effect.promise(() => reader.cancel()) ), (reader) => repeatEffectOption( @@ -3281,34 +3305,59 @@ export const fromReadableStream = ( ) ) )) +} /** @internal */ -export const fromReadableStreamByob = ( - evaluate: LazyArg>, - onError: (error: unknown) => E, - allocSize = 4096 -): Stream.Stream => - unwrapScoped(Effect.map( +export const fromReadableStreamByob: { + ( + options: { + readonly evaluate: LazyArg> + readonly onError: (error: unknown) => E + readonly bufferSize?: number | undefined + readonly releaseLockOnEnd?: boolean | undefined + } + ): Stream.Stream + ( + evaluate: LazyArg>, + onError: (error: unknown) => E, + allocSize?: number + ): Stream.Stream +} = ( + ...args: [options: { + readonly evaluate: LazyArg> + readonly onError: (error: unknown) => E + readonly bufferSize?: number | undefined + readonly releaseLockOnEnd?: boolean | undefined + }] | [ + evaluate: LazyArg>, + onError: (error: unknown) => E, + allocSize?: number | undefined + ] +): Stream.Stream => { + const evaluate = args.length === 1 ? args[0].evaluate : args[0] + const onError = args.length === 1 ? args[0].onError : args[1] + const allocSize = (args.length === 1 ? args[0].bufferSize : args[2]) ?? 4096 + const releaseLockOnEnd = args.length === 1 ? args[0].releaseLockOnEnd === true : false + return unwrapScoped(Effect.map( Effect.acquireRelease( Effect.sync(() => evaluate().getReader({ mode: "byob" })), - (reader) => Effect.promise(() => reader.cancel()) + (reader) => releaseLockOnEnd ? Effect.sync(() => reader.releaseLock()) : Effect.promise(() => reader.cancel()) ), (reader) => catchAll( forever(readChunkStreamByobReader(reader, onError, allocSize)), - (error) => isTagged(error, "EOF") ? empty : fail(error as E) + (error) => error === EOF ? empty : fail(error) ) )) - -interface EOF { - readonly _tag: "EOF" } +const EOF = Symbol.for("effect/Stream/EOF") + const readChunkStreamByobReader = ( reader: ReadableStreamBYOBReader, onError: (error: unknown) => E, size: number -): Stream.Stream => { +): Stream.Stream => { const buffer = new ArrayBuffer(size) return paginateEffect(0, (offset) => Effect.flatMap( @@ -3318,7 +3367,7 @@ const readChunkStreamByobReader = ( }), ({ done, value }) => { if (done) { - return Effect.fail({ _tag: "EOF" }) + return Effect.fail(EOF) } const newOffset = offset + value.byteLength return Effect.succeed([ diff --git a/packages/effect/test/Stream/constructors.test.ts b/packages/effect/test/Stream/constructors.test.ts index 15926537f87..f5aea08fec4 100644 --- a/packages/effect/test/Stream/constructors.test.ts +++ b/packages/effect/test/Stream/constructors.test.ts @@ -229,10 +229,10 @@ describe("Stream", () => { } const result = yield* $( - Stream.fromReadableStream( - () => new ReadableStream(new NumberSource()), - (error) => new FromReadableStreamError(error) - ), + Stream.fromReadableStream({ + evaluate: () => new ReadableStream(new NumberSource()), + onError: (error) => new FromReadableStreamError(error) + }), Stream.take(10), Stream.runCollect )