Skip to content

Commit

Permalink
Enhance StreamResponse handling and update dependencies
Browse files Browse the repository at this point in the history
Updated `ResponseBuilderDefaults` to include `StreamResponse` in `SpecialTypes`. Refactored `SetBodyCoreAsync` in `DefaultResponseBuilder.cs` for readability and removed unnecessary `using` statements. Modified `RequestCoreAsync` in `HttpWebRequestInvoker.cs` and `BuildResponseAsync` in `InMemoryRequestInvoker.cs` to handle `StreamResponse` types with proper disposal. Updated `Elastic.Transport.csproj` to reference `System.Text.Json` version `8.0.5`.
  • Loading branch information
stevejgordon committed Oct 22, 2024
1 parent 63e90cb commit 160eff8
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 69 deletions.
95 changes: 46 additions & 49 deletions src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal static class ResponseBuilderDefaults

public static readonly Type[] SpecialTypes =
{
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse)
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse), typeof(StreamResponse)
};
}

Expand Down Expand Up @@ -224,68 +224,65 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
details.ResponseBodyInBytes = bytes;
}

using (responseStream)
{
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;

if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
return null;
if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
return null;

var serializer = requestData.ConnectionSettings.RequestResponseSerializer;
var serializer = requestData.ConnectionSettings.RequestResponseSerializer;

TResponse response;
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();
TResponse response;
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();

if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;
if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

return response;
}
return response;
}

// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
response = new TResponse();
SetErrorOnResponse(response, error);
return response;
}
response = new TResponse();
SetErrorOnResponse(response, error);
return response;
}

if (!requestData.ValidateResponseContentType(mimeType))
return default;
if (!requestData.ValidateResponseContentType(mimeType))
return default;

var beforeTicks = Stopwatch.GetTimestamp();
var beforeTicks = Stopwatch.GetTimestamp();

if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);
if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);

if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
return default;
}
return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
return default;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,28 +161,32 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
{
unregisterWaitHandle?.Invoke();
}
responseStream ??= Stream.Null;

TResponse response;
var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
{
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
TResponse response;

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
}
}
}

return response;
return response;
}
}

private static Dictionary<string, IEnumerable<string>> ParseHeaders(RequestData requestData, HttpWebResponse responseMessage, Dictionary<string, IEnumerable<string>> responseHeaders)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,16 @@ public async Task<TResponse> BuildResponseAsync<TResponse>(RequestData requestDa
requestData.MadeItToResponse = true;

var sc = statusCode ?? _statusCode;
Stream s = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);
return await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
.ToResponseAsync<TResponse>(requestData, _exception, sc, _headers, s, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)

Stream responseStream = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);

var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);

using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
{
return await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
.ToResponseAsync<TResponse>(requestData, _exception, sc, _headers, responseStream, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)
.ConfigureAwait(false);
}
}

}

0 comments on commit 160eff8

Please sign in to comment.