Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added handling to "?" and NULL hostnames in CLUSTER SLOTS #104

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion redis/src/cluster_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,20 @@ pub(crate) fn parse_and_count_slots(
if node.len() < 2 {
return None;
}

// According to the CLUSTER SLOTS documentation:
// If the received hostname is an empty string or NULL, clients should utilize the hostname of the responding node.
// However, if the received hostname is "?", it should be regarded as an indication of an unknown node.
let hostname = if let Value::BulkString(ref ip) = node[0] {
let hostname = String::from_utf8_lossy(ip);
if hostname.is_empty() {
addr_of_answering_node.into()
} else if hostname == "?" {
return None;
} else {
hostname
}
} else if let Value::Nil = node[0] {
addr_of_answering_node.into()
} else {
return None;
};
Expand Down Expand Up @@ -141,6 +147,13 @@ pub(crate) fn parse_and_count_slots(
slots.push(Slot::new(start, end, nodes.pop().unwrap(), replicas));
}
}
if slots.is_empty() {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Error parsing slots: No healthy node found",
format!("Raw slot map response: {:?}", raw_slot_resp),
)));
}
// we sort the slots, because different nodes in a cluster might return the same slot view
// in different orders, which might cause the views to be considered evaluated as not equal.
slots.sort_unstable_by(|first, second| match first.start().cmp(&second.start()) {
Expand Down
12 changes: 8 additions & 4 deletions redis/tests/support/mock_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ impl MockConnectionBehavior {
}
}

pub fn add_new_mock_connection_behavior(name: &str, handler: Handler) {
MOCK_CONN_BEHAVIORS
.write()
.unwrap()
.insert(name.to_string(), MockConnectionBehavior::new(name, handler));
}

pub fn modify_mock_connection_behavior(name: &str, func: impl FnOnce(&mut MockConnectionBehavior)) {
func(
MOCK_CONN_BEHAVIORS
Expand Down Expand Up @@ -433,10 +440,7 @@ impl MockEnv {
.unwrap();

let id = id.to_string();
MOCK_CONN_BEHAVIORS.write().unwrap().insert(
id.clone(),
MockConnectionBehavior::new(&id, Arc::new(move |cmd, port| handler(cmd, port))),
);
add_new_mock_connection_behavior(&id, Arc::new(move |cmd, port| handler(cmd, port)));
let client = client_builder.build().unwrap();
let connection = client.get_generic_connection().unwrap();
#[cfg(feature = "cluster-async")]
Expand Down
59 changes: 59 additions & 0 deletions redis/tests/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,65 @@ fn test_cluster_can_connect_to_server_that_sends_cluster_slots_without_host_name
assert_eq!(value, Ok(Value::Nil));
}

#[test]
fn test_cluster_can_connect_to_server_that_sends_cluster_slots_with_null_host_name() {
let name = "test_cluster_can_connect_to_server_that_sends_cluster_slots_with_null_host_name";

let MockEnv { mut connection, .. } = MockEnv::new(name, move |cmd: &[u8], _| {
if contains_slice(cmd, b"PING") {
Err(Ok(Value::SimpleString("OK".into())))
} else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
Err(Ok(Value::Array(vec![Value::Array(vec![
Value::Int(0),
Value::Int(16383),
Value::Array(vec![Value::Nil, Value::Int(6379)]),
])])))
} else {
Err(Ok(Value::Nil))
}
});

let value = cmd("GET").arg("test").query::<Value>(&mut connection);

assert_eq!(value, Ok(Value::Nil));
}

#[test]
fn test_cluster_can_connect_to_server_that_sends_cluster_slots_with_partial_nodes_with_unknown_host_name(
) {
let name = "test_cluster_can_connect_to_server_that_sends_cluster_slots_with_partial_nodes_with_unknown_host_name";

let MockEnv { mut connection, .. } = MockEnv::new(name, move |cmd: &[u8], _| {
if contains_slice(cmd, b"PING") {
Err(Ok(Value::SimpleString("OK".into())))
} else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
Err(Ok(Value::Array(vec![
Value::Array(vec![
Value::Int(0),
Value::Int(7000),
Value::Array(vec![
Value::BulkString(name.as_bytes().to_vec()),
Value::Int(6379),
]),
]),
Value::Array(vec![
Value::Int(7001),
Value::Int(16383),
Value::Array(vec![
Value::BulkString("?".as_bytes().to_vec()),
Value::Int(6380),
]),
]),
])))
} else {
Err(Ok(Value::Nil))
}
});

let value = cmd("GET").arg("test").query::<Value>(&mut connection);
assert_eq!(value, Ok(Value::Nil));
}

#[test]
fn test_cluster_pipeline_command_ordering() {
let cluster = TestClusterContext::new(3, 0);
Expand Down
107 changes: 107 additions & 0 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,113 @@ fn test_cluster_async_can_connect_to_server_that_sends_cluster_slots_without_hos
assert_eq!(value, Ok(Value::Nil));
}

#[test]
fn test_cluster_async_can_connect_to_server_that_sends_cluster_slots_with_null_host_name() {
let name =
"test_cluster_async_can_connect_to_server_that_sends_cluster_slots_with_null_host_name";

let MockEnv {
runtime,
async_connection: mut connection,
..
} = MockEnv::new(name, move |cmd: &[u8], _| {
if contains_slice(cmd, b"PING") {
Err(Ok(Value::SimpleString("OK".into())))
} else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
Err(Ok(Value::Array(vec![Value::Array(vec![
Value::Int(0),
Value::Int(16383),
Value::Array(vec![Value::Nil, Value::Int(6379)]),
])])))
} else {
Err(Ok(Value::Nil))
}
});

let value = runtime.block_on(
cmd("GET")
.arg("test")
.query_async::<_, Value>(&mut connection),
);

assert_eq!(value, Ok(Value::Nil));
}

#[test]
fn test_cluster_async_cannot_connect_to_server_with_unknown_host_name() {
let name = "test_cluster_async_cannot_connect_to_server_with_unknown_host_name";
let handler = move |cmd: &[u8], _| {
if contains_slice(cmd, b"PING") {
Err(Ok(Value::SimpleString("OK".into())))
} else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
Err(Ok(Value::Array(vec![Value::Array(vec![
Value::Int(0),
Value::Int(16383),
Value::Array(vec![
Value::BulkString("?".as_bytes().to_vec()),
Value::Int(6379),
]),
])])))
} else {
Err(Ok(Value::Nil))
}
};
let client_builder = ClusterClient::builder(vec![&*format!("redis://{name}")]);
let client = client_builder.build().unwrap();
add_new_mock_connection_behavior(name, Arc::new(handler));
let connection = client.get_generic_connection::<MockConnection>();
assert!(connection.is_err());
let err = connection.err().unwrap();
assert!(err
.to_string()
.contains("Error parsing slots: No healthy node found"))
}

#[test]
fn test_cluster_async_can_connect_to_server_that_sends_cluster_slots_with_partial_nodes_with_unknown_host_name(
) {
let name = "test_cluster_async_can_connect_to_server_that_sends_cluster_slots_with_partial_nodes_with_unknown_host_name";

let MockEnv {
runtime,
async_connection: mut connection,
..
} = MockEnv::new(name, move |cmd: &[u8], _| {
if contains_slice(cmd, b"PING") {
Err(Ok(Value::SimpleString("OK".into())))
} else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
Err(Ok(Value::Array(vec![
Value::Array(vec![
Value::Int(0),
Value::Int(7000),
Value::Array(vec![
Value::BulkString(name.as_bytes().to_vec()),
Value::Int(6379),
]),
]),
Value::Array(vec![
Value::Int(7001),
Value::Int(16383),
Value::Array(vec![
Value::BulkString("?".as_bytes().to_vec()),
Value::Int(6380),
]),
]),
])))
} else {
Err(Ok(Value::Nil))
}
});

let value = runtime.block_on(
cmd("GET")
.arg("test")
.query_async::<_, Value>(&mut connection),
);

assert_eq!(value, Ok(Value::Nil));
}

#[test]
fn test_async_cluster_retries() {
let name = "tryagain";
Expand Down
Loading