Skip to content

Commit

Permalink
[Internal] Upgrade Resiliency: Refactors Code to Enable Replica Valid…
Browse files Browse the repository at this point in the history
…ation Feature Through `CosmosClientOptions` And Environment Variable (#3974)

* Code changes to use client options to enable or disable replica validation.

* Code changes to fix preview build failures.
  • Loading branch information
kundadebdatta authored Jul 11, 2023
1 parent 51c9d9c commit 4bb62d3
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 49 deletions.
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,18 @@ public Func<HttpClient> HttpClientFactory
set;
}

/// <summary>
/// Gets or sets the boolean flag to enable replica validation.
/// </summary>
/// <value>
/// The default value for this parameter is false.
/// </value>
public bool? EnableAdvancedReplicaSelectionForTcp
{
get;
set;
}

/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@ public ConnectionMode ConnectionMode
/// <seealso cref="TransactionalBatchItemRequestOptions.EnableContentResponseOnWrite"/>
public bool? EnableContentResponseOnWrite { get; set; }

/// <summary>
/// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection
/// status, and based on status, it prioritizes the replicas which show healthy stable connections, so that the requests can be sent
/// confidently to the particular replica. This helps the cosmos client to become more resilient and effective to any connectivity issues.
/// The default value for this parameter is 'false'.
/// </summary>
/// <remarks>
/// <para>This is optimal for latency-sensitive workloads. Does not apply if <see cref="ConnectionMode.Gateway"/> is used.</para>
/// </remarks>
internal bool? EnableAdvancedReplicaSelectionForTcp { get; set; }

/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
/// </summary>
Expand Down Expand Up @@ -758,6 +769,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnablePartitionLevelFailover = this.EnablePartitionLevelFailover,
PortReuseMode = this.portReuseMode,
EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery,
EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback
};
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public DocumentClient(Uri serviceEndpoint,

this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public GlobalAddressResolver(

this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery;

this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy);

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

Expand Down
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@ public static T GetEnvironmentVariable<T>(string variable, T defaultValue)
/// both preview and GA. The method will eventually be removed, once replica valdiatin is enabled by default
/// for both preview and GA.
/// </summary>
/// <param name="connectionPolicy">An instance of <see cref="ConnectionPolicy"/> containing the client options.</param>
/// <returns>A boolean flag indicating if replica validation is enabled.</returns>
public static bool IsReplicaAddressValidationEnabled()
public static bool IsReplicaAddressValidationEnabled(
ConnectionPolicy connectionPolicy)
{
bool replicaValidationDefaultValue = false;
#if PREVIEW
replicaValidationDefaultValue = true;
#endif
if (connectionPolicy != null
&& connectionPolicy.EnableAdvancedReplicaSelectionForTcp.HasValue)
{
return connectionPolicy.EnableAdvancedReplicaSelectionForTcp.Value;
}

return ConfigurationManager
.GetEnvironmentVariable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,58 @@ public async Task Cleanup()
}

[TestMethod]
public async Task ReadManyTypedTest()
[DataRow(true, DisplayName = "Validates Read Many scenario with advanced replica selection enabled.")]
[DataRow(false, DisplayName = "Validates Read Many scenario with advanced replica selection disabled.")]
public async Task ReadManyTypedTestWithAdvancedReplicaSelection(
bool advancedReplicaSelectionEnabled)
{
List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i=0; i<10; i++)
CosmosClientOptions clientOptions = new ()
{
itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString())));
}
EnableAdvancedReplicaSelectionForTcp = advancedReplicaSelectionEnabled,
};

FeedResponse<ToDoActivity> feedResponse= await this.Container.ReadManyItemsAsync<ToDoActivity>(itemList);
Assert.IsNotNull(feedResponse);
Assert.AreEqual(feedResponse.Count, 10);
Assert.IsTrue(feedResponse.Headers.RequestCharge > 0);
Assert.IsNotNull(feedResponse.Diagnostics);
Database database = null;
CosmosClient cosmosClient = TestCommon.CreateCosmosClient(clientOptions);
try
{
database = await cosmosClient.CreateDatabaseAsync("ReadManyTypedTestScenarioDb");
Container container = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk");

int count = 0;
foreach (ToDoActivity item in feedResponse)
// Create items with different pk values
for (int i = 0; i < 500; i++)
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity();
item.pk = "pk" + i.ToString();
item.id = i.ToString();
ItemResponse<ToDoActivity> itemResponse = await container.CreateItemAsync(item);
Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode);
}

List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i = 0; i < 20; i++)
{
itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString())));
}

FeedResponse<ToDoActivity> feedResponse = await container.ReadManyItemsAsync<ToDoActivity>(itemList);
Assert.IsNotNull(feedResponse);
Assert.AreEqual(20, feedResponse.Count);
Assert.IsTrue(feedResponse.Headers.RequestCharge > 0);
Assert.IsNotNull(feedResponse.Diagnostics);

int count = 0;
foreach (ToDoActivity item in feedResponse)
{
count++;
Assert.IsNotNull(item);
}
Assert.AreEqual(20, count);
}
finally
{
count++;
Assert.IsNotNull(item);
await database.DeleteAsync();
cosmosClient.Dispose();
}
Assert.AreEqual(count, 10);
}

[TestMethod]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@ public class CosmosBadReplicaTests
{
[TestMethod]
[Timeout(30000)]
[DataRow(true, DisplayName = "Validate when replica validation is enabled.")]
[DataRow(false, DisplayName = "Validate when replica validation is disabled.")]
[DataRow(true, true, false, DisplayName = "Validate when replica validation is enabled using environment variable.")]
[DataRow(false, true, false, DisplayName = "Validate when replica validation is disabled using environment variable.")]
[DataRow(true, false, true, DisplayName = "Validate when replica validation is enabled using cosmos client options.")]
[DataRow(false, false, true, DisplayName = "Validate when replica validation is disabled using cosmos client options.")]
public async Task TestGoneFromServiceScenarioAsync(
bool enableReplicaValidation)
bool enableReplicaValidation,
bool useEnvironmentVariable,
bool useCosmosClientOptions)
{
try
{
Environment.SetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
value: enableReplicaValidation.ToString());
if (useEnvironmentVariable)
{
Environment.SetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
value: enableReplicaValidation.ToString());
}

Mock<IHttpHandler> mockHttpHandler = new Mock<IHttpHandler>(MockBehavior.Strict);
Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount(
Expand All @@ -56,28 +63,28 @@ public async Task TestGoneFromServiceScenarioAsync(
cRid,
out IReadOnlyList<string> partitionKeyRanges);

List<string> replicaIds1 = new List<string>()
{
"11111111111111111",
"22222222222222222",
"33333333333333333",
"44444444444444444",
};

HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
replicaIds1,
partitionKeyRanges.First(),
"eastus",
cRid);
List<string> replicaIds1 = new List<string>()
{
"11111111111111111",
"22222222222222222",
"33333333333333333",
"44444444444444444",
};

// One replica changed on the refresh
List<string> replicaIds2 = new List<string>()
{
"11111111111111111",
"22222222222222222",
"33333333333333333",
"55555555555555555",
};
HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
replicaIds1,
partitionKeyRanges.First(),
"eastus",
cRid);

// One replica changed on the refresh
List<string> replicaIds2 = new List<string>()
{
"11111111111111111",
"22222222222222222",
"33333333333333333",
"55555555555555555",
};

HttpResponseMessage replicaSet2 = MockSetupsHelper.CreateAddresses(
replicaIds2,
Expand Down Expand Up @@ -146,6 +153,11 @@ public async Task TestGoneFromServiceScenarioAsync(
TransportClientHandlerFactory = (original) => mockTransportClient.Object,
};

if (useCosmosClientOptions)
{
cosmosClientOptions.EnableAdvancedReplicaSelectionForTcp = enableReplicaValidation;
}

using (CosmosClient customClient = new CosmosClient(
endpoint.ToString(),
Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())),
Expand Down Expand Up @@ -203,9 +215,12 @@ public async Task TestGoneFromServiceScenarioAsync(
}
finally
{
Environment.SetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
value: null);
if (useEnvironmentVariable)
{
Environment.SetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
value: null);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsNull(clientOptions.HttpClientFactory);
Assert.AreNotEqual(consistencyLevel, clientOptions.ConsistencyLevel);
Assert.IsFalse(clientOptions.EnablePartitionLevelFailover);
Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue);

//Verify GetConnectionPolicy returns the correct values for default
ConnectionPolicy policy = clientOptions.GetConnectionPolicy(clientId: 0);
Expand All @@ -97,6 +98,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsNull(policy.HttpClientFactory);
Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel);
Assert.IsFalse(policy.EnablePartitionLevelFailover);
Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue);

cosmosClientBuilder.WithApplicationRegion(region)
.WithConnectionModeGateway(maxConnections, webProxy)
Expand All @@ -112,6 +114,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()

cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient());
clientOptions = cosmosClient.ClientOptions;
clientOptions.EnableAdvancedReplicaSelectionForTcp = true;

//Verify all the values are updated
Assert.AreEqual(region, clientOptions.ApplicationRegion);
Expand All @@ -131,6 +134,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsTrue(clientOptions.AllowBulkExecution);
Assert.AreEqual(consistencyLevel, clientOptions.ConsistencyLevel);
Assert.IsTrue(clientOptions.EnablePartitionLevelFailover);
Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue && clientOptions.EnableAdvancedReplicaSelectionForTcp.Value);

//Verify GetConnectionPolicy returns the correct values
policy = clientOptions.GetConnectionPolicy(clientId: 0);
Expand All @@ -145,7 +149,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds);
Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel());
Assert.IsTrue(policy.EnablePartitionLevelFailover);

Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp.Value);

IReadOnlyList<string> preferredLocations = new List<string>() { Regions.AustraliaCentral, Regions.AustraliaCentral2 };
//Verify Direct Mode settings
cosmosClientBuilder = new CosmosClientBuilder(
Expand Down

0 comments on commit 4bb62d3

Please sign in to comment.