Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance StreamResponse handling and update dependencies #121

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 46 additions & 49 deletions src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal static class ResponseBuilderDefaults

public static readonly Type[] SpecialTypes =
{
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse)
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse), typeof(StreamResponse)
};
}

Expand Down Expand Up @@ -224,68 +224,65 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
details.ResponseBodyInBytes = bytes;
}

using (responseStream)
{
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;

if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
return null;
if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
return null;

var serializer = requestData.ConnectionSettings.RequestResponseSerializer;
var serializer = requestData.ConnectionSettings.RequestResponseSerializer;

TResponse response;
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();
TResponse response;
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();

if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;
if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

return response;
}
return response;
}

// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
response = new TResponse();
SetErrorOnResponse(response, error);
return response;
}
response = new TResponse();
SetErrorOnResponse(response, error);
return response;
}

if (!requestData.ValidateResponseContentType(mimeType))
return default;
if (!requestData.ValidateResponseContentType(mimeType))
return default;

var beforeTicks = Stopwatch.GetTimestamp();
var beforeTicks = Stopwatch.GetTimestamp();

if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);
if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);

if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
return default;
}
return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
return default;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,28 +161,32 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
{
unregisterWaitHandle?.Invoke();
}
responseStream ??= Stream.Null;

TResponse response;
var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
{
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
TResponse response;

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
}
}
}

return response;
return response;
}
}

private static Dictionary<string, IEnumerable<string>> ParseHeaders(RequestData requestData, HttpWebResponse responseMessage, Dictionary<string, IEnumerable<string>> responseHeaders)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,32 @@ public TResponse BuildResponse<TResponse>(RequestData requestData, byte[] respon
{
var body = responseBody ?? _responseBody;
var data = requestData.PostData;
if (data != null)

if (data is not null)
{
using (var stream = requestData.MemoryStreamFactory.Create())
using var stream = requestData.MemoryStreamFactory.Create();
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
data.Write(zipStream, requestData.ConnectionSettings);
}
else
{
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
data.Write(zipStream, requestData.ConnectionSettings);
}
else
data.Write(stream, requestData.ConnectionSettings);
data.Write(stream, requestData.ConnectionSettings);
}
}
requestData.MadeItToResponse = true;

var sc = statusCode ?? _statusCode;
Stream s = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);
return requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>(requestData, _exception, sc, _headers, s, contentType ?? _contentType ?? RequestData.DefaultMimeType, body?.Length ?? 0, null, null);
Stream responseStream = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);

var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);

using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
{
return requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
.ToResponse<TResponse>(requestData, _exception, sc, _headers, responseStream, contentType ?? _contentType ?? RequestData.DefaultMimeType, body?.Length ?? 0, null, null);
}
}

/// <inheritdoc cref="BuildResponse{TResponse}"/>>
Expand All @@ -93,26 +101,34 @@ public async Task<TResponse> BuildResponseAsync<TResponse>(RequestData requestDa
{
var body = responseBody ?? _responseBody;
var data = requestData.PostData;
if (data != null)

if (data is not null)
{
using (var stream = requestData.MemoryStreamFactory.Create())
using var stream = requestData.MemoryStreamFactory.Create();

if (requestData.HttpCompression)
{
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
}
else
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
}
else
{
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
}
}
requestData.MadeItToResponse = true;

var sc = statusCode ?? _statusCode;
Stream s = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);
return await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
.ToResponseAsync<TResponse>(requestData, _exception, sc, _headers, s, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)
.ConfigureAwait(false);
}

Stream responseStream = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);

var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);

using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
{
return await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
.ToResponseAsync<TResponse>(requestData, _exception, sc, _headers, responseStream, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)
.ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
using Elastic.Transport.IntegrationTests.Plumbing;
using Elastic.Transport.Products.Elasticsearch;
using Microsoft.AspNetCore.Mvc;
using Xunit;

namespace Elastic.Transport.IntegrationTests.Http;

public class StreamResponseTests(TransportTestServer instance) : AssemblyServerTestsBase(instance)
{
private const string Path = "/streamresponse";

[Fact]
public async Task StreamResponse_ShouldNotBeDisposed()
{
var nodePool = new SingleNodePool(Server.Uri);
var config = new TransportConfiguration(nodePool, productRegistration: new ElasticsearchProductRegistration(typeof(Clients.Elasticsearch.ElasticsearchClient)));
var transport = new DistributedTransport(config);

var response = await transport.PostAsync<StreamResponse>(Path, PostData.String("{}"));

var sr = new StreamReader(response.Body);
var responseString = sr.ReadToEndAsync();
}
}

[ApiController, Route("[controller]")]
public class StreamResponseController : ControllerBase
{
[HttpPost]
public Task<JsonElement> Post([FromBody] JsonElement body) => Task.FromResult(body);
}
Loading
Loading