diff --git a/spec-dtslint/observables/dom/fetch-spec.ts b/spec-dtslint/observables/dom/fetch-spec.ts new file mode 100644 index 0000000000..ac155b544c --- /dev/null +++ b/spec-dtslint/observables/dom/fetch-spec.ts @@ -0,0 +1,18 @@ +import { fromFetch } from 'rxjs/fetch'; +import { a$ } from '../../helpers'; + +it('should emit the fetch Response by default', () => { + const a = fromFetch("a"); // $ExpectType Observable +}); + +it('should support a selector that returns a Response promise', () => { + const a = fromFetch("a", { selector: response => response.text() }); // $ExpectType Observable +}); + +it('should support a selector that returns an arbitrary type', () => { + const a = fromFetch("a", { selector: response => a$ }); // $ExpectType Observable +}); + +it('should error for selectors that don\'t return an ObservableInput', () => { + const a = fromFetch("a", { selector: response => 42 }); // $ExpectError +}); diff --git a/spec-dtslint/tsconfig.json b/spec-dtslint/tsconfig.json index 6339f6dd64..86e857a69b 100644 --- a/spec-dtslint/tsconfig.json +++ b/spec-dtslint/tsconfig.json @@ -6,7 +6,11 @@ "noEmit": true, "paths": { "rxjs": ["../dist/types"], - "rxjs/operators": ["../dist/types/operators"] + "rxjs/ajax": ["../dist/types/ajax"], + "rxjs/fetch": ["../dist/types/fetch"], + "rxjs/operators": ["../dist/types/operators"], + "rxjs/testing": ["../dist/types/testing"], + "rxjs/webSocket": ["../dist/types/webSocket"] }, "skipLibCheck": true, "strict": true, diff --git a/spec/observables/dom/fetch-spec.ts b/spec/observables/dom/fetch-spec.ts index ce914e6942..81e111e0f9 100644 --- a/spec/observables/dom/fetch-spec.ts +++ b/spec/observables/dom/fetch-spec.ts @@ -264,4 +264,57 @@ describe('fromFetch', () => { } }); }); + + it('should support a selector', done => { + mockFetch.respondWith = { + ...OK_RESPONSE, + text: () => Promise.resolve('bar') + }; + const fetch$ = fromFetch('/foo', { + selector: response => response.text() + }); + expect(mockFetch.calls.length).to.equal(0); + expect(MockAbortController.created).to.equal(0); + + fetch$.subscribe({ + next: text => { + expect(text).to.equal('bar'); + }, + error: done, + complete: () => { + // Wait until the complete and the subsequent unsubscribe are finished + // before testing these expectations: + setTimeout(() => { + expect(MockAbortController.created).to.equal(1); + expect(mockFetch.calls.length).to.equal(1); + expect(mockFetch.calls[0].input).to.equal('/foo'); + expect(mockFetch.calls[0].init!.signal).not.to.be.undefined; + expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false; + done(); + }, 0); + } + }); + }); + + it('should abort when unsubscribed and a selector is specified', () => { + mockFetch.respondWith = { + ...OK_RESPONSE, + text: () => Promise.resolve('bar') + }; + const fetch$ = fromFetch('/foo', { + selector: response => response.text() + }); + expect(mockFetch.calls.length).to.equal(0); + expect(MockAbortController.created).to.equal(0); + const subscription = fetch$.subscribe(); + + expect(MockAbortController.created).to.equal(1); + expect(mockFetch.calls.length).to.equal(1); + expect(mockFetch.calls[0].input).to.equal('/foo'); + expect(mockFetch.calls[0].init!.signal).not.to.be.undefined; + expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false; + + subscription.unsubscribe(); + expect(mockFetch.calls[0].init!.signal!.aborted).to.be.true; + }); }); diff --git a/src/internal/observable/dom/fetch.ts b/src/internal/observable/dom/fetch.ts index 0a7e6ab2b6..5ff824bfea 100644 --- a/src/internal/observable/dom/fetch.ts +++ b/src/internal/observable/dom/fetch.ts @@ -1,5 +1,19 @@ import { Observable } from '../../Observable'; import { Subscription } from '../../Subscription'; +import { from } from '../../observable/from'; +import { ObservableInput } from '../../types'; + +export function fromFetch( + input: string | Request, + init: RequestInit & { + selector: (response: Response) => ObservableInput + } +): Observable; + +export function fromFetch( + input: string | Request, + init?: RequestInit +): Observable; /** * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to @@ -42,7 +56,36 @@ import { Subscription } from '../../Subscription'; * data$.subscribe({ * next: result => console.log(result), * complete: () => console.log('done') - * }) + * }); + * ``` + * + * ### Use with Chunked Transfer Encoding + * + * With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1), + * the promise returned by `fetch` will resolve as soon as the response's headers are + * received. + * + * That means the `fromFetch` observable will emit a `Response` - and will + * then complete - before the body is received. When one of the methods on the + * `Response` - like `text()` or `json()` - is called, the returned promise will not + * resolve until the entire body has been received. Unsubscribing from any observable + * that uses the promise as an observable input will not abort the request. + * + * To facilitate aborting the retrieval of responses that use chunked transfer encoding, + * a `selector` can be specified via the `init` parameter: + * + * ```ts + * import { of } from 'rxjs'; + * import { fromFetch } from 'rxjs/fetch'; + * + * const data$ = fromFetch('https://api.github.com/users?per_page=5', { + * selector: response => response.json() + * }); + * + * data$.subscribe({ + * next: result => console.log(result), + * complete: () => console.log('done') + * }); * ``` * * @param input The resource you would like to fetch. Can be a url or a request object. @@ -51,8 +94,14 @@ import { Subscription } from '../../Subscription'; * @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch` * function. The {@link Subscription} is tied to an `AbortController` for the the fetch. */ -export function fromFetch(input: string | Request, init?: RequestInit): Observable { - return new Observable(subscriber => { +export function fromFetch( + input: string | Request, + initWithSelector: RequestInit & { + selector?: (response: Response) => ObservableInput + } = {} +): Observable { + const { selector, ...init } = initWithSelector; + return new Observable(subscriber => { const controller = new AbortController(); const signal = controller.signal; let abortable = true; @@ -91,9 +140,26 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab } fetch(input, perSubscriberInit).then(response => { - abortable = false; - subscriber.next(response); - subscriber.complete(); + if (selector) { + subscription.add(from(selector(response)).subscribe( + value => subscriber.next(value), + err => { + abortable = false; + if (!unsubscribed) { + // Only forward the error if it wasn't an abort. + subscriber.error(err); + } + }, + () => { + abortable = false; + subscriber.complete(); + } + )); + } else { + abortable = false; + subscriber.next(response); + subscriber.complete(); + } }).catch(err => { abortable = false; if (!unsubscribed) {