Skip to content

Commit

Permalink
Refactor Protocol::onPackage() (#4445)
Browse files Browse the repository at this point in the history
* Refactor Protocol::dispatch

* revert name, add comments

* fix

* format
  • Loading branch information
matyhtf authored Oct 25, 2021
1 parent 1d90d9a commit e73f48c
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 82 deletions.
10 changes: 10 additions & 0 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,16 @@ struct EventData {
char data[SW_IPC_BUFFER_SIZE];
};

struct SendData {
DataHead info;
const char *data;
};

struct RecvData {
DataHead info;
const char *data;
};

struct ThreadGlobal {
uint16_t id;
uint8_t type;
Expand Down
2 changes: 1 addition & 1 deletion include/swoole_http.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ char *url_encode(char const *str, size_t len);
#ifdef SW_USE_HTTP2
ssize_t get_package_length(Protocol *protocol, network::Socket *conn, const char *data, uint32_t length);
uint8_t get_package_length_size(network::Socket *conn);
int dispatch_frame(Protocol *protocol, network::Socket *conn, const char *data, uint32_t length);
int dispatch_frame(const Protocol *protocol, network::Socket *conn, const RecvData *rdata);
#endif

//-----------------------------------------------------------------
Expand Down
12 changes: 10 additions & 2 deletions include/swoole_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,17 @@ struct Protocol {
void *private_data;
void *private_data_2;
uint16_t real_header_length;
uint16_t ext_flags;

int (*onPackage)(Protocol *, network::Socket *, const char *, uint32_t);
/**
* callback this function when a complete data packet is received
*/
int (*onPackage)(const Protocol *, network::Socket *, const RecvData *);
/**
* parse the length value in the received data
* @return 0: more data needs to be received
* @return -1: abnormal value, connection should be closed
* @return >0: the length of the data packet
*/
LengthFunc get_package_length;
uint8_t (*get_package_length_size)(network::Socket *);

Expand Down
12 changes: 1 addition & 11 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,6 @@ struct Connection {
sw_atomic_t lock;
};

struct SendData {
DataHead info;
const char *data;
};

struct RecvData {
DataHead info;
const char *data;
};

struct PipeBuffer {
DataHead info;
char data[0];
Expand Down Expand Up @@ -1399,7 +1389,7 @@ class Server {
#endif
static int accept_command_result(Reactor *reactor, Event *event);
static int close_connection(Reactor *reactor, network::Socket *_socket);
static int dispatch_task(Protocol *proto, network::Socket *_socket, const char *data, uint32_t length);
static int dispatch_task(const Protocol *proto, network::Socket *_socket, const RecvData *rdata);

int send_to_connection(SendData *);
ssize_t send_to_worker_from_worker(Worker *dst_worker, const void *buf, size_t len, int flags);
Expand Down
2 changes: 1 addition & 1 deletion include/swoole_websocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ static inline bool decode(Frame *frame, String *str) {
}

ssize_t get_package_length(Protocol *protocol, network::Socket *conn, const char *data, uint32_t length);
int dispatch_frame(Protocol *protocol, network::Socket *conn, const char *data, uint32_t length);
int dispatch_frame(const Protocol *protocol, network::Socket *conn, const RecvData *rdata);

} // namespace websocket
} // namespace swoole
2 changes: 1 addition & 1 deletion src/core/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ pid_t swoole_fork(int flags) {
swoole_fatal_error(SW_ERROR_OPERATION_NOT_SUPPORT, "must be forked outside the coroutine");
}
if (SwooleTG.async_threads) {
swoole_trace("aio_task_num=%d, reactor=%p", SwooleTG.async_threads->task_num, sw_reactor());
swoole_trace("aio_task_num=%lu, reactor=%p", SwooleTG.async_threads->task_num, sw_reactor());
swoole_fatal_error(SW_ERROR_OPERATION_NOT_SUPPORT,
"can not create server after using async file operation");
}
Expand Down
6 changes: 3 additions & 3 deletions src/network/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ static int Client_onWrite(Reactor *reactor, Event *event);
static int Client_onError(Reactor *reactor, Event *event);
static void Client_onTimeout(Timer *timer, TimerNode *tnode);
static void Client_onResolveCompleted(AsyncEvent *event);
static int Client_onPackage(Protocol *proto, Socket *conn, const char *data, uint32_t length);
static int Client_onPackage(const Protocol *proto, Socket *conn, const RecvData *rdata);

static sw_inline void execute_onConnect(Client *cli) {
if (cli->timer) {
Expand Down Expand Up @@ -938,9 +938,9 @@ static int Client_https_proxy_handshake(Client *cli) {
}
#endif

static int Client_onPackage(Protocol *proto, Socket *conn, const char *data, uint32_t length) {
static int Client_onPackage(const Protocol *proto, Socket *conn, const RecvData *rdata) {
Client *cli = (Client *) conn->object;
cli->onReceive(cli, data, length);
cli->onReceive(cli, rdata->data, rdata->info.len);
return conn->close_wait ? SW_ERR : SW_OK;
}

Expand Down
27 changes: 18 additions & 9 deletions src/protocol/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ ssize_t Protocol::default_length_func(Protocol *protocol, network::Socket *socke
// Protocol length is not legitimate, out of bounds or exceed the allocated length
if (body_length < 0) {
swoole_warning("invalid package (size=%d) from socket#%u<%s:%d>",
size,
socket->fd,
socket->info.get_ip(),
socket->info.get_port());
size,
socket->fd,
socket->info.get_ip(),
socket->info.get_port());
return SW_ERR;
}
swoole_debug("length=%d", protocol->package_body_offset + body_length);
Expand All @@ -60,14 +60,17 @@ ssize_t Protocol::default_length_func(Protocol *protocol, network::Socket *socke
}

int Protocol::recv_split_by_eof(network::Socket *socket, String *buffer) {
RecvData rdata{};
int retval;

if (buffer->length < package_eof_len) {
return SW_CONTINUE;
}

int retval;

ssize_t n = buffer->split(package_eof, package_eof_len, [&](const char *data, size_t length) -> int {
if (onPackage(this, socket, data, length) < 0) {
rdata.info.len = length;
rdata.data = data;
if (onPackage(this, socket, &rdata) < 0) {
retval = SW_CLOSE;
return false;
}
Expand Down Expand Up @@ -108,6 +111,7 @@ int Protocol::recv_split_by_eof(network::Socket *socket, String *buffer) {
* @return SW_OK: continue
*/
int Protocol::recv_with_length_protocol(network::Socket *socket, String *buffer) {
RecvData rdata{};
ssize_t package_length;
uint8_t _package_length_size = get_package_length_size ? get_package_length_size(socket) : package_length_size;
uint32_t recv_size;
Expand Down Expand Up @@ -152,7 +156,9 @@ int Protocol::recv_with_length_protocol(network::Socket *socket, String *buffer)
if (socket->recv_wait) {
if (buffer->length >= (size_t) buffer->offset) {
_do_dispatch:
if (onPackage(this, socket, buffer->str, buffer->offset) < 0) {
rdata.info.len = buffer->offset;
rdata.data = buffer->str;
if (onPackage(this, socket, &rdata) < 0) {
return SW_ERR;
}
if (socket->removed) {
Expand Down Expand Up @@ -228,6 +234,7 @@ int Protocol::recv_with_length_protocol(network::Socket *socket, String *buffer)
int Protocol::recv_with_eof_protocol(network::Socket *socket, String *buffer) {
bool recv_again = false;
int buf_size;
RecvData rdata{};

_recv_data:
buf_size = buffer->size - buffer->length;
Expand Down Expand Up @@ -268,7 +275,9 @@ int Protocol::recv_with_eof_protocol(network::Socket *socket, String *buffer) {
}
} else if (memcmp(buffer->str + buffer->length - package_eof_len, package_eof, package_eof_len) == 0) {
buffer->offset = buffer->length;
if (onPackage(this, socket, buffer->str, buffer->length) < 0) {
rdata.info.len = buffer->length;
rdata.data = buffer->str;
if (onPackage(this, socket, &rdata) < 0) {
return SW_ERR;
}
if (socket->removed) {
Expand Down
6 changes: 3 additions & 3 deletions src/protocol/http.cc
Original file line number Diff line number Diff line change
Expand Up @@ -736,12 +736,12 @@ uint8_t get_package_length_size(Socket *socket) {
}
}

int dispatch_frame(Protocol *proto, Socket *socket, const char *data, uint32_t length) {
int dispatch_frame(const Protocol *proto, Socket *socket, const RecvData *rdata) {
Connection *conn = (Connection *) socket->object;
if (conn->websocket_status >= websocket::STATUS_HANDSHAKE) {
return websocket::dispatch_frame(proto, socket, data, length);
return websocket::dispatch_frame(proto, socket, rdata);
} else if (conn->http2_stream) {
return Server::dispatch_task(proto, socket, data, length);
return Server::dispatch_task(proto, socket, rdata);
} else {
protocol_status_error(socket, conn);
return SW_ERR;
Expand Down
6 changes: 4 additions & 2 deletions src/protocol/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int recv_packet(Protocol *protocol, Connection *conn, String *buffer) {
int ret;
char *buf_ptr;
size_t buf_size;

RecvData rdata{};
Request *request;
network::Socket *socket = conn->socket;

Expand Down Expand Up @@ -148,7 +148,9 @@ int recv_packet(Protocol *protocol, Connection *conn, String *buffer) {
buffer->offset = buffer->length;

if (request->n_lines_received == request->n_lines_total) {
if (protocol->onPackage(protocol, socket, buffer->str, buffer->length) < 0) {
rdata.info.len = buffer->length;
rdata.data = buffer->str;
if (protocol->onPackage(protocol, socket, &rdata) < 0) {
return SW_ERR;
}
if (socket->removed) {
Expand Down
62 changes: 34 additions & 28 deletions src/protocol/websocket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ bool decode(Frame *frame, char *data, size_t length) {
}

swoole_trace_log(SW_TRACE_WEBSOCKET,
"decode frame, payload_length=%ld, mask=%d, opcode=%d",
payload_length,
frame->header.MASK,
frame->header.OPCODE);
"decode frame, payload_length=%ld, mask=%d, opcode=%d",
payload_length,
frame->header.MASK,
frame->header.OPCODE);

if (payload_length == 0) {
frame->header_length = header_length;
Expand Down Expand Up @@ -239,11 +239,13 @@ void print_frame(Frame *frame) {
}
}

int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t length) {
int dispatch_frame(const Protocol *proto, Socket *_socket, const RecvData *rdata) {
Server *serv = (Server *) proto->private_data_2;
Connection *conn = (Connection *) _socket->object;

String send_frame = {};
RecvData dispatch_data{};
String send_frame{};
const char *data = rdata->data;
const uint32_t length = rdata->info.len;
char buf[SW_WEBSOCKET_HEADER_LEN + SW_WEBSOCKET_CLOSE_CODE_LEN + SW_WEBSOCKET_CLOSE_REASON_MAX_LEN];
send_frame.str = buf;
send_frame.size = sizeof(buf);
Expand Down Expand Up @@ -275,9 +277,10 @@ int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t
frame_buffer->append(data + offset, frame_length);
// frame is finished, do dispatch
if (ws.header.FIN) {
proto->ext_flags = conn->websocket_buffer->offset;
proto->ext_flags |= FLAG_FIN;
Server::dispatch_task(proto, _socket, frame_buffer->str, frame_buffer->length);
dispatch_data.info.ext_flags = conn->websocket_buffer->offset | FLAG_FIN;
dispatch_data.info.len = frame_buffer->length;
dispatch_data.data = frame_buffer->str;
Server::dispatch_task(proto, _socket, &dispatch_data);
delete frame_buffer;
conn->websocket_buffer = nullptr;
}
Expand All @@ -286,40 +289,42 @@ int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t
case OPCODE_TEXT:
case OPCODE_BINARY: {
offset = length - ws.payload_length;
proto->ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws));

int ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws));
if (!ws.header.FIN) {
if (conn->websocket_buffer) {
swoole_warning("merging incomplete frame, bad request. remote_addr=%s:%d",
conn->info.get_ip(),
conn->info.get_port());
conn->info.get_ip(),
conn->info.get_port());
return SW_ERR;
}
conn->websocket_buffer = new swoole::String(data + offset, length - offset);
conn->websocket_buffer->offset = proto->ext_flags;
conn->websocket_buffer->offset = ext_flags;
} else {
Server::dispatch_task(proto, _socket, data + offset, length - offset);
dispatch_data.info.ext_flags = ext_flags;
dispatch_data.info.len = length - offset;
dispatch_data.data = data + offset;
Server::dispatch_task(proto, _socket, &dispatch_data);
}
break;
}
case OPCODE_PING:
case OPCODE_PONG:
if (length >= (sizeof(buf) - SW_WEBSOCKET_HEADER_LEN)) {
swoole_warning("%s frame application data is too big. remote_addr=%s:%d",
ws.header.OPCODE == OPCODE_PING ? "ping" : "pong",
conn->info.get_ip(),
conn->info.get_port());
ws.header.OPCODE == OPCODE_PING ? "ping" : "pong",
conn->info.get_ip(),
conn->info.get_port());
return SW_ERR;
} else if (length == SW_WEBSOCKET_HEADER_LEN) {
data = nullptr;
length = 0;
dispatch_data.data = nullptr;
dispatch_data.info.len = 0;
} else {
offset = ws.header.MASK ? SW_WEBSOCKET_HEADER_LEN + SW_WEBSOCKET_MASK_LEN : SW_WEBSOCKET_HEADER_LEN;
data += offset;
length -= offset;
dispatch_data.data = data + offset;
dispatch_data.info.len = length - offset;
}
proto->ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws));
Server::dispatch_task(proto, _socket, data, length);
dispatch_data.info.ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws));
Server::dispatch_task(proto, _socket, &dispatch_data);
break;

case OPCODE_CLOSE:
Expand All @@ -330,9 +335,10 @@ int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t
if (conn->websocket_status != STATUS_CLOSING) {
// Dispatch the frame with the same format of message frame
offset = length - ws.payload_length;
proto->ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws));

Server::dispatch_task(proto, _socket, data + offset, length - offset);
dispatch_data.info.ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws));
dispatch_data.info.len = length - offset;
dispatch_data.data = data + offset;
Server::dispatch_task(proto, _socket, &dispatch_data);

// Client attempt to close
send_frame.str[0] = 0x88; // FIN | OPCODE: WEBSOCKET_OPCODE_CLOSE
Expand Down
14 changes: 11 additions & 3 deletions src/server/port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ static int Port_onRead_raw(Reactor *reactor, ListenPort *port, Event *event) {
Socket *_socket = event->socket;
Connection *conn = (Connection *) _socket->object;
Server *serv = (Server *) reactor->ptr;
RecvData rdata{};

String *buffer = serv->get_recv_buffer(_socket);
if (!buffer) {
Expand All @@ -335,7 +336,9 @@ static int Port_onRead_raw(Reactor *reactor, ListenPort *port, Event *event) {
return SW_OK;
} else {
buffer->offset = buffer->length = n;
return Server::dispatch_task(&port->protocol, _socket, buffer->str, n);
rdata.info.len = n;
rdata.data = buffer->str;
return Server::dispatch_task(&port->protocol, _socket, &rdata);
}
}

Expand Down Expand Up @@ -379,6 +382,7 @@ static int Port_onRead_http(Reactor *reactor, ListenPort *port, Event *event) {
Socket *_socket = event->socket;
Connection *conn = (Connection *) _socket->object;
Server *serv = (Server *) reactor->ptr;
RecvData dispatch_data {};

if (conn->websocket_status >= websocket::STATUS_HANDSHAKE) {
if (conn->http_upgrade == 0) {
Expand Down Expand Up @@ -540,7 +544,9 @@ static int Port_onRead_http(Reactor *reactor, ListenPort *port, Event *event) {
// send static file content directly in the reactor thread
if (!serv->enable_static_handler || !serv->select_static_handler(request, conn)) {
// dynamic request, dispatch to worker
Server::dispatch_task(protocol, _socket, buffer->str, request->header_length_);
dispatch_data.info.len = request->header_length_;
dispatch_data.data = buffer->str;
Server::dispatch_task(protocol, _socket, &dispatch_data);
}
if (!conn->active || _socket->removed) {
return SW_OK;
Expand Down Expand Up @@ -637,7 +643,9 @@ static int Port_onRead_http(Reactor *reactor, ListenPort *port, Event *event) {
}

buffer->offset = request_length;
Server::dispatch_task(protocol, _socket, buffer->str, buffer->length);
dispatch_data.data = buffer->str;
dispatch_data.info.len = buffer->length;
Server::dispatch_task(protocol, _socket, &dispatch_data);

if (conn->active && !_socket->removed) {
serv->destroy_http_request(conn);
Expand Down
Loading

0 comments on commit e73f48c

Please sign in to comment.