Skip to content

Commit

Permalink
UDP: numerous fixes, added test for copy break
Browse files Browse the repository at this point in the history
There were several bugs here, including use-after-free,
a problem when the copy limit was exceeded, and uninitialized
receive thresholds.
  • Loading branch information
gdamore committed Oct 13, 2024
1 parent 6d0143d commit 0bef8c0
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 40 deletions.
9 changes: 9 additions & 0 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -818,9 +818,18 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
// low order 16 bits will be set. This is provided in native byte order,
// which makes it more convenient than using the NNG_OPT_LOCADDR option.
#define NNG_OPT_TCP_BOUND_PORT "tcp-bound-port"

// UDP options.

// UDP alias for convenience uses the same value
#define NNG_OPT_UDP_BOUND_PORT NNG_OPT_TCP_BOUND_PORT

// UDP short message size. Messages smaller than (or equal to) this
// will be copied, instead of loan up. This can allow for a faster
// pass up as we can allocate smaller message buffers instead of having
// to replace a full message buffer.
#define NNG_OPT_UDP_COPY_MAX "udp:copy-max"

// IPC options. These will largely vary depending on the platform,
// as POSIX systems have very different options than Windows.

Expand Down
132 changes: 92 additions & 40 deletions src/sp/transport/udp/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ typedef enum udp_disc_reason {
#define NNG_UDP_RECVMAX 65000 // largest permitted by spec
#endif

#ifndef NNG_UDP_COPYMAX // threshold for copying instead of loan up
#define NNG_UDP_COPYMAX 1024
#endif

#ifndef NNG_UDP_REFRESH
#define NNG_UDP_REFRESH (5 * NNI_SECOND)
#endif
Expand Down Expand Up @@ -213,12 +217,11 @@ struct udp_ep {
nng_duration refresh; // refresh interval for connections in seconds
udp_sp_msg rx_msg; // contains the received message header
uint16_t rcvmax; // max payload, trimmed to uint16_t
uint16_t short_msg;
uint16_t copymax;
udp_txring tx_ring;
nni_time next_wake;
nni_aio_completions complq;

#ifdef NNG_ENABLE_STATS
nni_stat_item st_rcv_max;
nni_stat_item st_rcv_reorder;
nni_stat_item st_rcv_toobig;
Expand All @@ -229,7 +232,7 @@ struct udp_ep {
nni_stat_item st_snd_toobig;
nni_stat_item st_snd_nobuf;
nni_stat_item st_peer_inactive;
#endif
nni_stat_item st_copy_max;
};

static void udp_ep_hold(udp_ep *ep);
Expand All @@ -253,8 +256,10 @@ udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa)
NNI_FREE_STRUCT(p);
return (rv);
}
udp_ep_hold(ep);
now = nni_clock();
nni_aio_list_init(&p->rx_aios);
nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN);
p->ep = ep;
p->dialer = ep->dialer;
p->self_seq = nni_random();
Expand All @@ -266,8 +271,6 @@ udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa)
p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p));
p->rcvmax = ep->rcvmax;
*pp = p;
nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN);
udp_ep_hold(ep);
return (0);
}

Expand Down Expand Up @@ -325,6 +328,12 @@ udp_pipe_destroy(udp_pipe *p)
{
nng_msg *m;

if (p->self_id != 0) {
nni_id_remove(&p->ep->pipes, p->self_id);
p->self_id = 0;
}
nni_list_node_remove(&p->node);

// call with ep->mtx lock held
while (!nni_lmq_empty(&p->rx_mq)) {
nni_lmq_get(&p->rx_mq, &m);
Expand All @@ -343,8 +352,6 @@ udp_pipe_fini(void *arg)
udp_ep *ep = p->ep;

nni_mtx_lock(&ep->mtx);
nni_id_remove(&ep->pipes, p->self_id);

udp_pipe_destroy(p);
udp_ep_rele(ep); // releases lock
}
Expand Down Expand Up @@ -605,8 +612,7 @@ udp_recv_disc(udp_ep *ep, udp_sp_disc *disc, nng_sockaddr *sa)
// For now we aren't validating the sequence numbers.
// This allows for an out of order DISC to cause the
// connection to be dropped, but it should self heal.
p->closed = true;
p->self_id = 0; // prevent it from being identified later
p->closed = true;
while ((aio = nni_list_first(&p->rx_aios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
Expand Down Expand Up @@ -641,6 +647,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
}
if (p->peer_id == 0) {
// connection isn't formed yet ... send another CREQ
nni_stat_inc(&ep->st_rcv_nomatch, 1);

Check warning on line 650 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L650

Added line #L650 was not covered by tests
udp_send_creq(ep, p);
return;
}
Expand Down Expand Up @@ -675,7 +682,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
}

// Short message, just alloc and copy
if (len <= ep->short_msg) {
if (len <= ep->copymax) {
nni_stat_inc(&ep->st_rcv_copy, 1);
if (nng_msg_alloc(&msg, len) != 0) {
if (p->npipe != NULL) {
Expand All @@ -684,9 +691,9 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
return;
}
nni_msg_set_address(msg, sa);
nni_msg_clear(msg);
nni_msg_append(msg, nni_msg_body(ep->rx_payload), len);
memcpy(nni_msg_body(msg), nni_msg_body(ep->rx_payload), len);
nni_lmq_put(&p->rx_mq, msg);
nni_msg_realloc(ep->rx_payload, ep->rcvmax);
} else {
nni_stat_inc(&ep->st_rcv_nocopy, 1);
// Message size larger than copy break, do zero copy
Expand Down Expand Up @@ -998,7 +1005,8 @@ udp_pipe_send(void *arg, nni_aio *aio)
// Just queue it, or fail it.
udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg);
nni_mtx_unlock(&ep->mtx);
nni_aio_finish(aio, 0, dreq.us_length);
nni_aio_finish(
aio, 0, msg ? nni_msg_len(msg) + nni_msg_header_len(msg) : 0);
}

static void
Expand Down Expand Up @@ -1190,21 +1198,22 @@ udp_ep_close(void *arg)

// close all pipes
uint32_t cursor = 0;
while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) {
p->closed = true;
uint64_t id;

// first we grab the connpipes that are not closed upstream
while ((p = nni_list_first(&ep->connpipes)) != NULL) {
udp_pipe_destroy(p);
ep->refcnt--;

Check warning on line 1206 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L1205-L1206

Added lines #L1205 - L1206 were not covered by tests
}
while (nni_id_visit(&ep->pipes, &id, (void **) &p, &cursor)) {
if (p->peer_id != 0) {
udp_send_disc(ep, p, DISC_CLOSED);
}
p->closed = true;
while ((aio = nni_list_first(&p->rx_aios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
if (p->npipe == NULL) {
nni_list_remove(&ep->connpipes, p);
nni_id_remove(&ep->pipes, p->self_id);
udp_pipe_destroy(p);
ep->refcnt--;
}
}
nni_aio_close(&ep->resaio);
nni_mtx_unlock(&ep->mtx);
Expand Down Expand Up @@ -1233,28 +1242,29 @@ udp_timer_cb(void *arg)
ep->next_wake = NNI_TIME_NEVER;
while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) {

if (p->closed) {
if (p->npipe == NULL) {
// pipe closed, but we have to clean it up
// ourselves
nni_id_remove(&ep->pipes, p->self_id);
udp_pipe_destroy(p);
ep->refcnt--;
}
continue;
}

if (now > p->expire) {
char buf[128];
nni_aio *aio;
nng_log_info("NNG-UDP-INACTIVE",
"Pipe peer %s timed out due to inactivity",
nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf)));

// Possibly alert the dialer, so it can restart a new
// attempt.
if ((ep->dialer) && (p->peer_id == 0) &&
(aio = nni_list_first(&ep->connaios))) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ETIMEDOUT);
}

// If we're still on the connect list, then we need
// take responsibility for cleaning this up.
if (nni_list_node_active(&p->node)) {
udp_pipe_destroy(p);
ep->refcnt--;
continue;

Check warning on line 1265 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L1263-L1265

Added lines #L1263 - L1265 were not covered by tests
}

// This will probably not be received by the peer,
// since we aren't getting anything from them. But
// having it on the wire may help debugging later.
Expand Down Expand Up @@ -1312,6 +1322,7 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
ep->url = url;
ep->refresh = NNG_UDP_REFRESH; // one minute by default
ep->rcvmax = NNG_UDP_RECVMAX;
ep->copymax = NNG_UDP_COPYMAX;
ep->refcnt = 1;
if ((rv = nni_msg_alloc(&ep->rx_payload,
ep->rcvmax + sizeof(ep->rx_msg)) != 0)) {
Expand All @@ -1331,7 +1342,6 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
nni_aio_init(&ep->resaio, udp_resolv_cb, ep);
nni_aio_completions_init(&ep->complq);

#ifdef NNG_ENABLE_STATS
static const nni_stat_info rcv_max_info = {
.si_name = "rcv_max",
.si_desc = "maximum receive size",
Expand Down Expand Up @@ -1402,8 +1412,16 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
.si_unit = NNG_UNIT_EVENTS,
.si_atomic = true,
};
static const nni_stat_info copy_max_info = {
.si_name = "rcv_copy_max",
.si_desc = "threshold to copy instead of loan-up",
.si_type = NNG_STAT_LEVEL,
.si_unit = NNG_UNIT_BYTES,
.si_atomic = true,
};

nni_stat_init(&ep->st_rcv_max, &rcv_max_info);
nni_stat_init(&ep->st_copy_max, &copy_max_info);
nni_stat_init(&ep->st_rcv_copy, &rcv_copy_info);
nni_stat_init(&ep->st_rcv_nocopy, &rcv_nocopy_info);
nni_stat_init(&ep->st_rcv_reorder, &rcv_reorder_info);
Expand All @@ -1413,7 +1431,8 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
nni_stat_init(&ep->st_snd_toobig, &snd_toobig_info);
nni_stat_init(&ep->st_snd_nobuf, &snd_nobuf_info);
nni_stat_init(&ep->st_peer_inactive, &peer_inactive_info);
#endif

nni_stat_set_value(&ep->st_rcv_max, ep->rcvmax);

// schedule our timer callback - forever for now
// adjusted automatically as we add pipes or other
Expand Down Expand Up @@ -1459,9 +1478,7 @@ udp_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
return (rv);
}

#ifdef NNG_ENABLE_STATS
nni_dialer_add_stat(ndialer, &ep->st_rcv_max);
#endif
*dp = ep;
return (0);
}
Expand All @@ -1485,9 +1502,7 @@ udp_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
}
ep->self_sa = sa;

#ifdef NNG_ENABLE_STATS
nni_listener_add_stat(nlistener, &ep->st_rcv_max);
#endif

*lp = ep;
return (0);
Expand Down Expand Up @@ -1730,16 +1745,48 @@ udp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
size_t val;
int rv;
if ((rv = nni_copyin_size(&val, v, sz, 0, 65000, t)) == 0) {
if ((val == 0) || (val > 65000)) {
val = 65000;
}
nni_mtx_lock(&ep->mtx);
if (ep->started) {
nni_mtx_unlock(&ep->mtx);
return (NNG_EBUSY);
}
ep->rcvmax = (uint16_t) val;
nni_mtx_unlock(&ep->mtx);
#ifdef NNG_ENABLE_STATS
nni_stat_set_value(&ep->st_rcv_max, val);
#endif
}
return (rv);
}

static int
udp_ep_get_copymax(void *arg, void *v, size_t *szp, nni_opt_type t)
{
udp_ep *ep = arg;
int rv;

nni_mtx_lock(&ep->mtx);
rv = nni_copyout_size(ep->copymax, v, szp, t);
nni_mtx_unlock(&ep->mtx);
return (rv);
}

static int
udp_ep_set_copymax(void *arg, const void *v, size_t sz, nni_opt_type t)
{
udp_ep *ep = arg;
size_t val;
int rv;
if ((rv = nni_copyin_size(&val, v, sz, 0, 65000, t)) == 0) {
nni_mtx_lock(&ep->mtx);
if (ep->started) {
nni_mtx_unlock(&ep->mtx);
return (NNG_EBUSY);

Check warning on line 1785 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L1784-L1785

Added lines #L1784 - L1785 were not covered by tests
}
ep->copymax = (uint16_t) val;
nni_mtx_unlock(&ep->mtx);
nni_stat_set_value(&ep->st_copy_max, val);
}
return (rv);
}
Expand Down Expand Up @@ -1834,6 +1881,11 @@ static const nni_option udp_ep_opts[] = {
.o_get = udp_ep_get_recvmaxsz,
.o_set = udp_ep_set_recvmaxsz,
},
{
.o_name = NNG_OPT_UDP_COPY_MAX,
.o_get = udp_ep_get_copymax,
.o_set = udp_ep_set_copymax,
},
{
.o_name = NNG_OPT_URL,
.o_get = udp_ep_get_url,
Expand Down
38 changes: 38 additions & 0 deletions src/sp/transport/udp/udp_tran_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ test_udp_recv_max(void)
NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz));
NUTS_TRUE(sz == 200);
NUTS_PASS(nng_listener_set_size(l, NNG_OPT_RECVMAXSZ, 100));
NUTS_PASS(nng_listener_get_size(l, NNG_OPT_RECVMAXSZ, &sz));
NUTS_TRUE(sz == 100);
NUTS_PASS(nng_listener_start(l, 0));

NUTS_OPEN(s1);
Expand All @@ -158,6 +160,41 @@ test_udp_recv_max(void)
NUTS_CLOSE(s1);
}

void
test_udp_recv_copy(void)
{
char msg[256];
char buf[256];
nng_socket s0;
nng_socket s1;
nng_listener l;
size_t sz;
char *addr;

NUTS_ADDR(addr, "udp");

NUTS_OPEN(s0);
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
NUTS_PASS(nng_listener_create(&l, s0, addr));
NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100));
NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz));
NUTS_TRUE(sz == 100);
NUTS_PASS(nng_listener_start(l, 0));

NUTS_OPEN(s1);
NUTS_PASS(nng_dial(s1, addr, NULL, 0));
nng_msleep(100);
NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100));
NUTS_PASS(nng_send(s1, msg, 95, 0));
NUTS_PASS(nng_recv(s0, buf, &sz, 0));
NUTS_TRUE(sz == 95);
NUTS_PASS(nng_send(s1, msg, 150, 0));
NUTS_PASS(nng_recv(s0, buf, &sz, 0));
NUTS_TRUE(sz == 150);
NUTS_CLOSE(s0);
NUTS_CLOSE(s1);
}

NUTS_TESTS = {

{ "udp wild card connect fail", test_udp_wild_card_connect_fail },
Expand All @@ -167,5 +204,6 @@ NUTS_TESTS = {
{ "udp non-local address", test_udp_non_local_address },
{ "udp malformed address", test_udp_malformed_address },
{ "udp recv max", test_udp_recv_max },
{ "udp recv copy", test_udp_recv_copy },
{ NULL, NULL },
};

0 comments on commit 0bef8c0

Please sign in to comment.