Skip to content

Commit

Permalink
Upgrade redis-rs dependency. (#994)
Browse files Browse the repository at this point in the history
* ++redis-rs.

* Fix transactions not being routed.

Transactions without user routing were routed to a random node, and so almost by default caused a MOVED error and slot refresh.
`req_packed_commands` finds the right routing for the transaction.

* Fix python exception tests.

Longer error messages are shortened by pytest, so we need to use match instead of asserting over the string.
  • Loading branch information
nihohit authored Feb 19, 2024
1 parent a73ec76 commit 5d0a2e7
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 15 deletions.
15 changes: 7 additions & 8 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::connection_request::{
use crate::scripts_container::get_script;
use futures::FutureExt;
use logger_core::log_info;
use redis::aio::ConnectionLike;
use redis::cluster_async::ClusterConnection;
use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo};
use redis::RedisResult;
Expand Down Expand Up @@ -206,14 +207,12 @@ impl Client {
client.send_pipeline(pipeline, offset, 1).await
}

ClientWrapper::Cluster { ref mut client } => {
let route = match routing {
Some(RoutingInfo::SingleNode(route)) => route,
_ => SingleNodeRoutingInfo::Random,
};

client.route_pipeline(pipeline, offset, 1, route).await
}
ClientWrapper::Cluster { ref mut client } => match routing {
Some(RoutingInfo::SingleNode(route)) => {
client.route_pipeline(pipeline, offset, 1, route).await
}
_ => client.req_packed_commands(pipeline, offset, 1).await,
},
}?;

Self::get_transaction_values(pipeline, values, command_count, offset)
Expand Down
85 changes: 83 additions & 2 deletions glide-core/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ mod utilities;
pub(crate) mod shared_client_tests {
use super::*;
use glide_core::client::Client;
use redis::RedisConnectionInfo;
use redis::Value;
use redis::{
cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo},
FromRedisValue, InfoDict, RedisConnectionInfo, Value,
};
use rstest::rstest;
use utilities::cluster::*;
use utilities::BackingServer;
Expand Down Expand Up @@ -75,6 +77,85 @@ pub(crate) mod shared_client_tests {
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_pipeline_is_not_routed() {
// This test checks that a transaction without user routing isn't routed to a random node before reaching its target.
// This is tested by checking how many requests each node has received - one of the 6 nodes should have more requests than the others.
block_on_all(async {
let mut test_basics = setup_test_basics(
true,
TestConfiguration {
use_tls: false,
shared_server: true,
..Default::default()
},
)
.await;

// reset stats on all connections
let mut cmd = redis::cmd("CONFIG");
cmd.arg("RESETSTAT");
let _ = test_basics
.client
.send_command(
&cmd,
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
None,
))),
)
.await
.unwrap();

// Send a keyed transaction
let key = generate_random_string(6);
let mut pipe = redis::pipe();
pipe.cmd("GET").arg(key);
pipe.atomic();
let _ = test_basics
.client
.send_transaction(&pipe, None)
.await
.unwrap();

// Gather info from each server
let mut cmd = redis::cmd("INFO");
cmd.arg("STATS");
let values = test_basics
.client
.send_command(
&cmd,
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
None,
))),
)
.await
.unwrap();

let mut values: Vec<_> = match values {
Value::Map(map) => map.into_iter().filter_map(|(_, value)| {
InfoDict::from_owned_redis_value(value)
.unwrap()
.get::<u32>("total_commands_processed")
}),

_ => panic!("Expected map, got `{values:?}`"),
}
.collect();

// Check that all nodes except for one has processed the same number of commands.
values.sort();
assert_eq!(values.len(), 6);
assert_eq!(values[0], values[1]);
assert_eq!(values[1], values[2]);
assert_eq!(values[2], values[3]);
assert_eq!(values[3], values[4]);
assert_eq!(values[4] + 3, values[5]); // + MULTI, GET, EXEC
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_resp_support(#[values(false, true)] use_cluster: bool, #[values(2, 3)] protocol: i64) {
Expand Down
3 changes: 1 addition & 2 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,13 @@ async def test_can_connect_with_auth_requirepass(
["CONFIG", "SET", "requirepass", password]
)

with pytest.raises(ClosingError) as e:
with pytest.raises(ClosingError, match="NOAUTH"):
# Creation of a new client without password should fail
await create_client(
request,
is_cluster,
addresses=redis_client.config.addresses,
)
assert "NOAUTH" in str(e)

auth_client = await create_client(
request,
Expand Down
3 changes: 1 addition & 2 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,8 @@ async def test_transaction_with_different_slots(self, redis_client: TRedisClient
)
transaction.set("key1", "value1")
transaction.set("key2", "value2")
with pytest.raises(RequestError) as e:
with pytest.raises(RequestError, match="CrossSlot"):
await redis_client.exec(transaction)
assert "Moved" in str(e)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
Expand Down

0 comments on commit 5d0a2e7

Please sign in to comment.