Skip to content

Commit

Permalink
Fixes #1152: force-close idle half-closed tcp connections (#1163) (#1167
Browse files Browse the repository at this point in the history
)

This feature is disabled by default. It is enabled by defining the
environment variable "SKUPPER_ROUTER_ENABLE_1152"

(cherry picked from commit b1a804e)
  • Loading branch information
kgiusti authored Jul 17, 2023
1 parent e71d251 commit 507f6e4
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 6 deletions.
113 changes: 107 additions & 6 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
//
const uint32_t TCP_MAX_CAPACITY = 121635 * 6 * 2; // 1,459,620

static qd_duration_t half_closed_idle_timeout = 0; // 0 = disabled

ALLOC_DEFINE(qdr_tcp_stats_t);
ALLOC_DEFINE(qd_tcp_listener_t);
ALLOC_DEFINE(qd_tcp_connector_t);
Expand Down Expand Up @@ -91,12 +93,12 @@ struct qdr_tcp_connection_t {
bool flow_enabled;
bool is_egress_dispatcher_conn;
bool connector_closed;//only used if egress_dispatcher=true
bool in_list; // This connection is in the adaptor's connections list
sys_atomic_t raw_closed_read; // proton event seen
sys_atomic_t raw_closed_write; // proton event seen or write_close called
bool in_list; // This connection is in the adaptor's connections list
bool raw_read_shutdown; // stream closed
bool read_eos_seen;
bool window_disabled; // true: ignore unacked byte window
sys_atomic_t raw_closed_read; // proton event seen
sys_atomic_t raw_closed_write; // proton event seen or write_close called
qdr_delivery_t *initial_delivery;
qd_timer_t *activate_timer;
qd_tcp_adaptor_config_t *config; // config
Expand All @@ -118,17 +120,19 @@ struct qdr_tcp_connection_t {
uint64_t opened_time;
uint64_t last_in_time;
uint64_t last_out_time;
uint64_t half_closed_bytes; // bytes_in+_out snapshot
sys_atomic_t half_closed_expired; // check for idle connection
qd_timer_t *half_closed_timer;

qd_adaptor_buffer_list_t out_buffs; // Buffers for writing

qd_message_stream_data_t *previous_stream_data; // previous segment (received in full)
qd_message_stream_data_t *outgoing_stream_data; // current segment
qd_message_stream_data_t *release_up_to;
size_t outgoing_body_bytes; // bytes received from current segment
int outgoing_body_offset; // buffer offset into current segment

pn_raw_buffer_t outgoing_buffs[WRITE_BUFFERS];
int outgoing_body_offset; // buffer offset into current segment
int outgoing_buff_count; // number of buffers with data
pn_raw_buffer_t outgoing_buffs[WRITE_BUFFERS];
int outgoing_buff_idx; // first buffer with data
bool require_tls; // Is TLS required on this connection ?
DEQ_LINKS(qdr_tcp_connection_t);
Expand Down Expand Up @@ -165,6 +169,16 @@ static void qdr_process_app_properties(qdr_tcp_connection_t *tc, qd_message_t *m
static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t *tc);
static void handle_outgoing(qdr_tcp_connection_t *conn);
static void encrypt_outgoing_tls(qdr_tcp_connection_t *conn, qd_adaptor_buffer_t *unencrypted_buff, bool write_buffers);
static void start_half_closed_monitoring(qdr_tcp_connection_t *conn);
static void stop_half_closed_monitoring(qdr_tcp_connection_t *conn);
static bool check_half_closed_timeout(qdr_tcp_connection_t *conn);

// is the connection in half-closed state?
//
inline static bool tcp_conn_is_half_closed(qdr_tcp_connection_t *conn)
{
return IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) != IS_ATOMIC_FLAG_SET(&conn->raw_closed_write);
}

// is the incoming byte window full
//
Expand Down Expand Up @@ -576,7 +590,9 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t *tc)
free(tc->alpn_protocol);
sys_atomic_destroy(&tc->raw_closed_read);
sys_atomic_destroy(&tc->raw_closed_write);
sys_atomic_destroy(&tc->half_closed_expired);
qd_timer_free(tc->activate_timer);
qd_timer_free(tc->half_closed_timer);
sys_mutex_free(&tc->activation_lock);

// Free tls related stuff if need be.
Expand Down Expand Up @@ -1030,13 +1046,19 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
case PN_RAW_CONNECTION_CLOSED_READ: {
SET_ATOMIC_FLAG(&conn->raw_closed_read);
handle_incoming(conn, "PNRC_CLOSED_READ");
if (!IS_ATOMIC_FLAG_SET(&conn->raw_closed_write)) {
start_half_closed_monitoring(conn);
}
qd_log(log, QD_LOG_DEBUG, "[C%" PRIu64 "] PN_RAW_CONNECTION_CLOSED_READ %s", conn->conn_id,
qdr_tcp_connection_role_name(conn));
break;
}
case PN_RAW_CONNECTION_CLOSED_WRITE: {
SET_ATOMIC_FLAG(&conn->raw_closed_write);
int num_drained_write_buffers = qd_raw_connection_drain_write_buffers(conn->pn_raw_conn);
if (!IS_ATOMIC_FLAG_SET(&conn->raw_closed_read)) {
start_half_closed_monitoring(conn);
}
qd_log(log, QD_LOG_DEBUG, "[C%" PRIu64 "] PN_RAW_CONNECTION_CLOSED_WRITE %s, drained %i write buffers",
conn->conn_id, qdr_tcp_connection_role_name(conn), num_drained_write_buffers);
break;
Expand All @@ -1060,6 +1082,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
qd_log(log, QD_LOG_INFO, "[C%" PRIu64 "] PN_RAW_CONNECTION_DISCONNECTED %s, drained_buffers=%i", conn->conn_id,
qdr_tcp_connection_role_name(conn), drained_buffers);

stop_half_closed_monitoring(conn);

LOCK(&conn->activation_lock);
pn_raw_connection_set_context(conn->pn_raw_conn, 0);
conn->pn_raw_conn = 0;
Expand Down Expand Up @@ -1090,6 +1114,16 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
qd_log(log, QD_LOG_DEBUG, "[C%" PRIu64 "] PN_RAW_CONNECTION_WAKE %s", conn->conn_id,
qdr_tcp_connection_role_name(conn));
while (qdr_connection_process(conn->qdr_conn)) {}
if (tcp_conn_is_half_closed(conn) && check_half_closed_timeout(conn)) {
qd_log(log, QD_LOG_WARNING,
"[C%" PRIu64 "] Idle half-closed TCP connection detected, forcing close", conn->conn_id);
pn_condition_t *cond = pn_raw_connection_condition(conn->pn_raw_conn);
if (!!cond) {
(void) pn_condition_set_name(cond, "connection:forced");
(void) pn_condition_set_description(cond, "connection closed due to half-closed idle timeout");
}
pn_raw_connection_close(conn->pn_raw_conn);
}
break;
}
case PN_RAW_CONNECTION_DRAIN_BUFFERS: {
Expand Down Expand Up @@ -1211,6 +1245,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection(qd_tcp_listener_t *listener, qd_
tc->context.handler = &handle_connection_event;
sys_atomic_init(&tc->raw_closed_read, 0);
sys_atomic_init(&tc->raw_closed_write, 0);
sys_atomic_init(&tc->half_closed_expired, 0);
sys_mutex_init(&tc->activation_lock);
tc->is_egress_dispatcher_conn = is_egress_dispatcher_conn;
tc->ingress = ingress;
Expand Down Expand Up @@ -2180,6 +2215,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
{
qd_adaptor_common_init();
qdr_tcp_adaptor_t *adaptor = NEW(qdr_tcp_adaptor_t);
ZERO(adaptor);
adaptor->core = core;
adaptor->adaptor = qdr_protocol_adaptor(core,
"tcp", // name
Expand All @@ -2205,6 +2241,21 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
sys_mutex_init(&adaptor->listener_lock);
*adaptor_context = adaptor;

// ISSUE-1152: Check if half-close idle timeout enabled via the environment. If the value is a positive integer, use
// it for the timeout value (in seconds).
//
char *half_close_enabled = getenv("SKUPPER_ROUTER_ENABLE_1152");
if (half_close_enabled) {
int timeout;
// This default is based on 2x the conventional maximum segment lifetime (2 minutes)
half_closed_idle_timeout = 240000;
if (sscanf(half_close_enabled, "%d", &timeout) == 1) {
if (timeout > 0) {
half_closed_idle_timeout = timeout * 1000;
}
}
}

tcp_adaptor = adaptor;
}

Expand Down Expand Up @@ -2521,3 +2572,53 @@ static void detach_links(qdr_tcp_connection_t *conn)
conn->outgoing_link = 0;
}
}

// Half-closed idle timer handler. Note this callback may run in parallel with an I/O thread handling events for this
// connection. It is safe to access the raw connection since this timer will be cancelled before the raw connection is
// closed.
//
static void on_idle_timer(void *context)
{
qdr_tcp_connection_t *conn = (qdr_tcp_connection_t *) context;
SET_ATOMIC_FLAG(&conn->half_closed_expired);
pn_raw_connection_wake(conn->pn_raw_conn);
}

// ISSUE-1152: the connection has entered half-closed state.
//
static void start_half_closed_monitoring(qdr_tcp_connection_t *conn)
{
if (half_closed_idle_timeout) {
assert(tcp_conn_is_half_closed(conn));
assert(!conn->half_closed_timer);
conn->half_closed_bytes = conn->bytes_in + conn->bytes_out;
conn->half_closed_timer = qd_timer(tcp_adaptor->core->qd, on_idle_timer, conn);
qd_timer_schedule(conn->half_closed_timer, half_closed_idle_timeout);
}
}

static void stop_half_closed_monitoring(qdr_tcp_connection_t *conn)
{
if (conn->half_closed_timer) {
qd_timer_cancel(conn->half_closed_timer);
}
}

static bool check_half_closed_timeout(qdr_tcp_connection_t *conn)
{
bool is_idle = false;

if (CLEAR_ATOMIC_FLAG(&conn->half_closed_expired)) {
uint64_t bytes_now = conn->bytes_in + conn->bytes_out;
if (conn->half_closed_bytes == bytes_now) {
// no traffic passed during idle time
is_idle = true;
} else {
// not idle, try again later
conn->half_closed_bytes = bytes_now;
qd_timer_schedule(conn->half_closed_timer, half_closed_idle_timeout);
}
}

return is_idle;
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ foreach(py_test_module
system_tests_http1_over_tcp
system_tests_tcp_adaptor
${SYSTEM_TESTS_ADAPTOR_TLS_TESTS}
system_tests_tcp_half_close
system_tests_heartbeats
system_tests_address_watch
system_tests_router_annotations
Expand Down
150 changes: 150 additions & 0 deletions tests/system_tests_tcp_half_close.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import os
import socket
import time

from system_test import Qdrouterd, TIMEOUT, TestCase, unittest
from system_test import main_module, retry_exception, retry


class TcpAdaptorIdleHalfClosedTest(TestCase):
"""
Test the TCP adaptor half-closed idle connection timeout functionality
"""
@classmethod
def setUpClass(cls):
# Create a single router with a tcpConnector and tcpListener.
super(TcpAdaptorIdleHalfClosedTest, cls).setUpClass()

cls.idle_timeout = 3
cls.service_addr = "idle/timeout"
cls.listener_port = cls.tester.get_port()
cls.connector_port = cls.tester.get_port()
config = [
('router', {'mode': 'interior', 'id': 'TcpIdleTimeout'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('tcpListener', {'port': cls.listener_port,
'address': cls.service_addr}),
('tcpConnector', {'host': '127.0.0.1',
'port': cls.connector_port,
'address': cls.service_addr}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]

config = Qdrouterd.Config(config)

os.environ["SKUPPER_ROUTER_ENABLE_1152"] = str(cls.idle_timeout)
cls.router = cls.tester.qdrouterd('TcpIdleTimeout',
Qdrouterd.Config(config),
wait=False)
cls.router.wait_startup_message()
os.environ.pop("SKUPPER_ROUTER_ENABLE_1152")

def _get_tcp_conn_count(self):
"""
Return the number of currently active TCP connections
"""
CONNECTION_TYPE = 'io.skupper.router.connection'
mgmt = self.router.management
conns = mgmt.query(type=CONNECTION_TYPE, attribute_names=['protocol',
'container',
'host']).get_dicts()
results = [c for c in filter(lambda c:
c['protocol'] == 'tcp' and
c['container'] == 'TcpAdaptor' and
c['host'] != 'egress-dispatch', conns)]
return len(results)

def _is_socket_closed(self, sock):
sock.settimeout(TIMEOUT)
try:
data = sock.recv(4096)
if data == b'':
return True
except Exception as exc:
print(f"Socket Recv Failed! Error={exc}", flush=True)
return False

def test_01_detect_idle_conn(self):
"""
Connect a client and server that perform a half-close. Wait for the
idle timeout to expire and verify the TCP connections have been
force-closed by the router.
"""

# create the server listening socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener:
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(("", self.connector_port))
listener.settimeout(TIMEOUT)
listener.listen(1)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
retry_exception(lambda cs=client:
cs.connect(("localhost",
self.listener_port)),
delay=0.25,
exception=ConnectionRefusedError)
client.settimeout(TIMEOUT)

# accept the client connection
server, _ = listener.accept()
try:
# Ensure both conns are established
self.assertTrue(retry(lambda: self._get_tcp_conn_count() == 2))

# now half-close the client
client.shutdown(socket.SHUT_WR)

# send a continuous stream of data from the server to the
# client for longer than the timeout. Since there is
# active traffic the connection must not drop

deadline = time.time() + self.idle_timeout + 1
while time.time() < deadline:
time.sleep(0.25)
server.sendall(b'ping')
data = client.recv(4096)
self.assertNotEqual(b'', data)

# verify the connections are still present
self.assertEqual(2, self._get_tcp_conn_count())

# Now wait until both conns are torn down
self.assertTrue(retry(lambda: self._get_tcp_conn_count() == 0))

# verify that the client and server sockets are closed
self.assertTrue(retry(lambda sock=client:
self._is_socket_closed(sock)))
self.assertTrue(retry(lambda sock=server:
self._is_socket_closed(sock)))

finally:
server.close()

# verify the vanflow event was generated
self.router.wait_log_message("result=connection:forced reason=connection closed due to half-closed idle timeout")


if __name__ == '__main__':
unittest.main(main_module())

0 comments on commit 507f6e4

Please sign in to comment.