diff --git a/include/swoole.h b/include/swoole.h index 1e795af4168..6befc8240ea 100644 --- a/include/swoole.h +++ b/include/swoole.h @@ -604,6 +604,16 @@ struct EventData { char data[SW_IPC_BUFFER_SIZE]; }; +struct SendData { + DataHead info; + const char *data; +}; + +struct RecvData { + DataHead info; + const char *data; +}; + struct ThreadGlobal { uint16_t id; uint8_t type; diff --git a/include/swoole_http.h b/include/swoole_http.h index 4d2e6a28397..bd92196b5df 100644 --- a/include/swoole_http.h +++ b/include/swoole_http.h @@ -156,7 +156,7 @@ char *url_encode(char const *str, size_t len); #ifdef SW_USE_HTTP2 ssize_t get_package_length(Protocol *protocol, network::Socket *conn, const char *data, uint32_t length); uint8_t get_package_length_size(network::Socket *conn); -int dispatch_frame(Protocol *protocol, network::Socket *conn, const char *data, uint32_t length); +int dispatch_frame(const Protocol *protocol, network::Socket *conn, const RecvData *rdata); #endif //----------------------------------------------------------------- diff --git a/include/swoole_protocol.h b/include/swoole_protocol.h index d296d8acebf..9ea48bbbd0e 100644 --- a/include/swoole_protocol.h +++ b/include/swoole_protocol.h @@ -42,9 +42,17 @@ struct Protocol { void *private_data; void *private_data_2; uint16_t real_header_length; - uint16_t ext_flags; - int (*onPackage)(Protocol *, network::Socket *, const char *, uint32_t); + /** + * callback this function when a complete data packet is received + */ + int (*onPackage)(const Protocol *, network::Socket *, const RecvData *); + /** + * parse the length value in the received data + * @return 0: more data needs to be received + * @return -1: abnormal value, connection should be closed + * @return >0: the length of the data packet + */ LengthFunc get_package_length; uint8_t (*get_package_length_size)(network::Socket *); diff --git a/include/swoole_server.h b/include/swoole_server.h index a6efb9b50f6..ea312827307 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -142,16 +142,6 @@ struct Connection { sw_atomic_t lock; }; -struct SendData { - DataHead info; - const char *data; -}; - -struct RecvData { - DataHead info; - const char *data; -}; - struct PipeBuffer { DataHead info; char data[0]; @@ -1399,7 +1389,7 @@ class Server { #endif static int accept_command_result(Reactor *reactor, Event *event); static int close_connection(Reactor *reactor, network::Socket *_socket); - static int dispatch_task(Protocol *proto, network::Socket *_socket, const char *data, uint32_t length); + static int dispatch_task(const Protocol *proto, network::Socket *_socket, const RecvData *rdata); int send_to_connection(SendData *); ssize_t send_to_worker_from_worker(Worker *dst_worker, const void *buf, size_t len, int flags); diff --git a/include/swoole_websocket.h b/include/swoole_websocket.h index 8a8cd59ef4b..99275d0dbd2 100644 --- a/include/swoole_websocket.h +++ b/include/swoole_websocket.h @@ -152,7 +152,7 @@ static inline bool decode(Frame *frame, String *str) { } ssize_t get_package_length(Protocol *protocol, network::Socket *conn, const char *data, uint32_t length); -int dispatch_frame(Protocol *protocol, network::Socket *conn, const char *data, uint32_t length); +int dispatch_frame(const Protocol *protocol, network::Socket *conn, const RecvData *rdata); } // namespace websocket } // namespace swoole diff --git a/src/core/base.cc b/src/core/base.cc index af2a4b7d9d5..01edf96ab0a 100644 --- a/src/core/base.cc +++ b/src/core/base.cc @@ -329,7 +329,7 @@ pid_t swoole_fork(int flags) { swoole_fatal_error(SW_ERROR_OPERATION_NOT_SUPPORT, "must be forked outside the coroutine"); } if (SwooleTG.async_threads) { - swoole_trace("aio_task_num=%d, reactor=%p", SwooleTG.async_threads->task_num, sw_reactor()); + swoole_trace("aio_task_num=%lu, reactor=%p", SwooleTG.async_threads->task_num, sw_reactor()); swoole_fatal_error(SW_ERROR_OPERATION_NOT_SUPPORT, "can not create server after using async file operation"); } diff --git a/src/network/client.cc b/src/network/client.cc index 55bbe3a7c15..1b36a42cc3f 100644 --- a/src/network/client.cc +++ b/src/network/client.cc @@ -51,7 +51,7 @@ static int Client_onWrite(Reactor *reactor, Event *event); static int Client_onError(Reactor *reactor, Event *event); static void Client_onTimeout(Timer *timer, TimerNode *tnode); static void Client_onResolveCompleted(AsyncEvent *event); -static int Client_onPackage(Protocol *proto, Socket *conn, const char *data, uint32_t length); +static int Client_onPackage(const Protocol *proto, Socket *conn, const RecvData *rdata); static sw_inline void execute_onConnect(Client *cli) { if (cli->timer) { @@ -938,9 +938,9 @@ static int Client_https_proxy_handshake(Client *cli) { } #endif -static int Client_onPackage(Protocol *proto, Socket *conn, const char *data, uint32_t length) { +static int Client_onPackage(const Protocol *proto, Socket *conn, const RecvData *rdata) { Client *cli = (Client *) conn->object; - cli->onReceive(cli, data, length); + cli->onReceive(cli, rdata->data, rdata->info.len); return conn->close_wait ? SW_ERR : SW_OK; } diff --git a/src/protocol/base.cc b/src/protocol/base.cc index 58356e28b27..49b1e0e1bae 100644 --- a/src/protocol/base.cc +++ b/src/protocol/base.cc @@ -47,10 +47,10 @@ ssize_t Protocol::default_length_func(Protocol *protocol, network::Socket *socke // Protocol length is not legitimate, out of bounds or exceed the allocated length if (body_length < 0) { swoole_warning("invalid package (size=%d) from socket#%u<%s:%d>", - size, - socket->fd, - socket->info.get_ip(), - socket->info.get_port()); + size, + socket->fd, + socket->info.get_ip(), + socket->info.get_port()); return SW_ERR; } swoole_debug("length=%d", protocol->package_body_offset + body_length); @@ -60,14 +60,17 @@ ssize_t Protocol::default_length_func(Protocol *protocol, network::Socket *socke } int Protocol::recv_split_by_eof(network::Socket *socket, String *buffer) { + RecvData rdata{}; + int retval; + if (buffer->length < package_eof_len) { return SW_CONTINUE; } - int retval; - ssize_t n = buffer->split(package_eof, package_eof_len, [&](const char *data, size_t length) -> int { - if (onPackage(this, socket, data, length) < 0) { + rdata.info.len = length; + rdata.data = data; + if (onPackage(this, socket, &rdata) < 0) { retval = SW_CLOSE; return false; } @@ -108,6 +111,7 @@ int Protocol::recv_split_by_eof(network::Socket *socket, String *buffer) { * @return SW_OK: continue */ int Protocol::recv_with_length_protocol(network::Socket *socket, String *buffer) { + RecvData rdata{}; ssize_t package_length; uint8_t _package_length_size = get_package_length_size ? get_package_length_size(socket) : package_length_size; uint32_t recv_size; @@ -152,7 +156,9 @@ int Protocol::recv_with_length_protocol(network::Socket *socket, String *buffer) if (socket->recv_wait) { if (buffer->length >= (size_t) buffer->offset) { _do_dispatch: - if (onPackage(this, socket, buffer->str, buffer->offset) < 0) { + rdata.info.len = buffer->offset; + rdata.data = buffer->str; + if (onPackage(this, socket, &rdata) < 0) { return SW_ERR; } if (socket->removed) { @@ -228,6 +234,7 @@ int Protocol::recv_with_length_protocol(network::Socket *socket, String *buffer) int Protocol::recv_with_eof_protocol(network::Socket *socket, String *buffer) { bool recv_again = false; int buf_size; + RecvData rdata{}; _recv_data: buf_size = buffer->size - buffer->length; @@ -268,7 +275,9 @@ int Protocol::recv_with_eof_protocol(network::Socket *socket, String *buffer) { } } else if (memcmp(buffer->str + buffer->length - package_eof_len, package_eof, package_eof_len) == 0) { buffer->offset = buffer->length; - if (onPackage(this, socket, buffer->str, buffer->length) < 0) { + rdata.info.len = buffer->length; + rdata.data = buffer->str; + if (onPackage(this, socket, &rdata) < 0) { return SW_ERR; } if (socket->removed) { diff --git a/src/protocol/http.cc b/src/protocol/http.cc index 44a7838d99a..a90db896946 100644 --- a/src/protocol/http.cc +++ b/src/protocol/http.cc @@ -736,12 +736,12 @@ uint8_t get_package_length_size(Socket *socket) { } } -int dispatch_frame(Protocol *proto, Socket *socket, const char *data, uint32_t length) { +int dispatch_frame(const Protocol *proto, Socket *socket, const RecvData *rdata) { Connection *conn = (Connection *) socket->object; if (conn->websocket_status >= websocket::STATUS_HANDSHAKE) { - return websocket::dispatch_frame(proto, socket, data, length); + return websocket::dispatch_frame(proto, socket, rdata); } else if (conn->http2_stream) { - return Server::dispatch_task(proto, socket, data, length); + return Server::dispatch_task(proto, socket, rdata); } else { protocol_status_error(socket, conn); return SW_ERR; diff --git a/src/protocol/redis.cc b/src/protocol/redis.cc index 424319a849b..f19a3d79e50 100644 --- a/src/protocol/redis.cc +++ b/src/protocol/redis.cc @@ -43,7 +43,7 @@ int recv_packet(Protocol *protocol, Connection *conn, String *buffer) { int ret; char *buf_ptr; size_t buf_size; - + RecvData rdata{}; Request *request; network::Socket *socket = conn->socket; @@ -148,7 +148,9 @@ int recv_packet(Protocol *protocol, Connection *conn, String *buffer) { buffer->offset = buffer->length; if (request->n_lines_received == request->n_lines_total) { - if (protocol->onPackage(protocol, socket, buffer->str, buffer->length) < 0) { + rdata.info.len = buffer->length; + rdata.data = buffer->str; + if (protocol->onPackage(protocol, socket, &rdata) < 0) { return SW_ERR; } if (socket->removed) { diff --git a/src/protocol/websocket.cc b/src/protocol/websocket.cc index 606c291184f..33c7543932b 100644 --- a/src/protocol/websocket.cc +++ b/src/protocol/websocket.cc @@ -181,10 +181,10 @@ bool decode(Frame *frame, char *data, size_t length) { } swoole_trace_log(SW_TRACE_WEBSOCKET, - "decode frame, payload_length=%ld, mask=%d, opcode=%d", - payload_length, - frame->header.MASK, - frame->header.OPCODE); + "decode frame, payload_length=%ld, mask=%d, opcode=%d", + payload_length, + frame->header.MASK, + frame->header.OPCODE); if (payload_length == 0) { frame->header_length = header_length; @@ -239,11 +239,13 @@ void print_frame(Frame *frame) { } } -int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t length) { +int dispatch_frame(const Protocol *proto, Socket *_socket, const RecvData *rdata) { Server *serv = (Server *) proto->private_data_2; Connection *conn = (Connection *) _socket->object; - - String send_frame = {}; + RecvData dispatch_data{}; + String send_frame{}; + const char *data = rdata->data; + const uint32_t length = rdata->info.len; char buf[SW_WEBSOCKET_HEADER_LEN + SW_WEBSOCKET_CLOSE_CODE_LEN + SW_WEBSOCKET_CLOSE_REASON_MAX_LEN]; send_frame.str = buf; send_frame.size = sizeof(buf); @@ -275,9 +277,10 @@ int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t frame_buffer->append(data + offset, frame_length); // frame is finished, do dispatch if (ws.header.FIN) { - proto->ext_flags = conn->websocket_buffer->offset; - proto->ext_flags |= FLAG_FIN; - Server::dispatch_task(proto, _socket, frame_buffer->str, frame_buffer->length); + dispatch_data.info.ext_flags = conn->websocket_buffer->offset | FLAG_FIN; + dispatch_data.info.len = frame_buffer->length; + dispatch_data.data = frame_buffer->str; + Server::dispatch_task(proto, _socket, &dispatch_data); delete frame_buffer; conn->websocket_buffer = nullptr; } @@ -286,19 +289,21 @@ int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t case OPCODE_TEXT: case OPCODE_BINARY: { offset = length - ws.payload_length; - proto->ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws)); - + int ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws)); if (!ws.header.FIN) { if (conn->websocket_buffer) { swoole_warning("merging incomplete frame, bad request. remote_addr=%s:%d", - conn->info.get_ip(), - conn->info.get_port()); + conn->info.get_ip(), + conn->info.get_port()); return SW_ERR; } conn->websocket_buffer = new swoole::String(data + offset, length - offset); - conn->websocket_buffer->offset = proto->ext_flags; + conn->websocket_buffer->offset = ext_flags; } else { - Server::dispatch_task(proto, _socket, data + offset, length - offset); + dispatch_data.info.ext_flags = ext_flags; + dispatch_data.info.len = length - offset; + dispatch_data.data = data + offset; + Server::dispatch_task(proto, _socket, &dispatch_data); } break; } @@ -306,20 +311,20 @@ int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t case OPCODE_PONG: if (length >= (sizeof(buf) - SW_WEBSOCKET_HEADER_LEN)) { swoole_warning("%s frame application data is too big. remote_addr=%s:%d", - ws.header.OPCODE == OPCODE_PING ? "ping" : "pong", - conn->info.get_ip(), - conn->info.get_port()); + ws.header.OPCODE == OPCODE_PING ? "ping" : "pong", + conn->info.get_ip(), + conn->info.get_port()); return SW_ERR; } else if (length == SW_WEBSOCKET_HEADER_LEN) { - data = nullptr; - length = 0; + dispatch_data.data = nullptr; + dispatch_data.info.len = 0; } else { offset = ws.header.MASK ? SW_WEBSOCKET_HEADER_LEN + SW_WEBSOCKET_MASK_LEN : SW_WEBSOCKET_HEADER_LEN; - data += offset; - length -= offset; + dispatch_data.data = data + offset; + dispatch_data.info.len = length - offset; } - proto->ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws)); - Server::dispatch_task(proto, _socket, data, length); + dispatch_data.info.ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws)); + Server::dispatch_task(proto, _socket, &dispatch_data); break; case OPCODE_CLOSE: @@ -330,9 +335,10 @@ int dispatch_frame(Protocol *proto, Socket *_socket, const char *data, uint32_t if (conn->websocket_status != STATUS_CLOSING) { // Dispatch the frame with the same format of message frame offset = length - ws.payload_length; - proto->ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws)); - - Server::dispatch_task(proto, _socket, data + offset, length - offset); + dispatch_data.info.ext_flags = get_ext_flags(ws.header.OPCODE, get_flags(&ws)); + dispatch_data.info.len = length - offset; + dispatch_data.data = data + offset; + Server::dispatch_task(proto, _socket, &dispatch_data); // Client attempt to close send_frame.str[0] = 0x88; // FIN | OPCODE: WEBSOCKET_OPCODE_CLOSE diff --git a/src/server/port.cc b/src/server/port.cc index bd5b3d0325d..cd574fcfb84 100644 --- a/src/server/port.cc +++ b/src/server/port.cc @@ -311,6 +311,7 @@ static int Port_onRead_raw(Reactor *reactor, ListenPort *port, Event *event) { Socket *_socket = event->socket; Connection *conn = (Connection *) _socket->object; Server *serv = (Server *) reactor->ptr; + RecvData rdata{}; String *buffer = serv->get_recv_buffer(_socket); if (!buffer) { @@ -335,7 +336,9 @@ static int Port_onRead_raw(Reactor *reactor, ListenPort *port, Event *event) { return SW_OK; } else { buffer->offset = buffer->length = n; - return Server::dispatch_task(&port->protocol, _socket, buffer->str, n); + rdata.info.len = n; + rdata.data = buffer->str; + return Server::dispatch_task(&port->protocol, _socket, &rdata); } } @@ -379,6 +382,7 @@ static int Port_onRead_http(Reactor *reactor, ListenPort *port, Event *event) { Socket *_socket = event->socket; Connection *conn = (Connection *) _socket->object; Server *serv = (Server *) reactor->ptr; + RecvData dispatch_data {}; if (conn->websocket_status >= websocket::STATUS_HANDSHAKE) { if (conn->http_upgrade == 0) { @@ -540,7 +544,9 @@ static int Port_onRead_http(Reactor *reactor, ListenPort *port, Event *event) { // send static file content directly in the reactor thread if (!serv->enable_static_handler || !serv->select_static_handler(request, conn)) { // dynamic request, dispatch to worker - Server::dispatch_task(protocol, _socket, buffer->str, request->header_length_); + dispatch_data.info.len = request->header_length_; + dispatch_data.data = buffer->str; + Server::dispatch_task(protocol, _socket, &dispatch_data); } if (!conn->active || _socket->removed) { return SW_OK; @@ -637,7 +643,9 @@ static int Port_onRead_http(Reactor *reactor, ListenPort *port, Event *event) { } buffer->offset = request_length; - Server::dispatch_task(protocol, _socket, buffer->str, buffer->length); + dispatch_data.data = buffer->str; + dispatch_data.info.len = buffer->length; + Server::dispatch_task(protocol, _socket, &dispatch_data); if (conn->active && !_socket->removed) { serv->destroy_http_request(conn); diff --git a/src/server/reactor_thread.cc b/src/server/reactor_thread.cc index d00d1f44cc4..284c74b9bb8 100644 --- a/src/server/reactor_thread.cc +++ b/src/server/reactor_thread.cc @@ -915,7 +915,7 @@ static void ReactorThread_resume_data_receiving(Timer *timer, TimerNode *tnode) /** * dispatch request data [only data frame] */ -int Server::dispatch_task(Protocol *proto, Socket *_socket, const char *data, uint32_t length) { +int Server::dispatch_task(const Protocol *proto, Socket *_socket, const RecvData *rdata) { Server *serv = (Server *) proto->private_data_2; SendData task; @@ -925,14 +925,13 @@ int Server::dispatch_task(Protocol *proto, Socket *_socket, const char *data, ui sw_memset_zero(&task.info, sizeof(task.info)); task.info.server_fd = conn->server_fd; task.info.reactor_id = conn->reactor_id; - task.info.ext_flags = proto->ext_flags; - proto->ext_flags = 0; + task.info.ext_flags = rdata->info.ext_flags; task.info.type = SW_SERVER_EVENT_RECV_DATA; task.info.time = conn->last_recv_time; int return_code = SW_OK; - swoole_trace("send string package, size=%ld bytes", (long) length); + swoole_trace("send string package, size=%u bytes", rdata->info.len); if (serv->stream_socket_file) { Stream *stream = Stream::create(serv->stream_socket_file, 0, SW_SOCK_UNIX_STREAM); @@ -955,22 +954,25 @@ int Server::dispatch_task(Protocol *proto, Socket *_socket, const char *data, ui return_code = SW_ERR; goto _return; } - if (stream->send(data, length) < 0) { + if (stream->send(rdata->data, rdata->info.len) < 0) { goto _cancel; } } else { task.info.fd = conn->fd; - task.info.len = length; - task.data = data; - if (length > 0) { - sw_atomic_fetch_add(&conn->recv_queued_bytes, length); - swoole_trace_log( - SW_TRACE_SERVER, "session_id=%ld, len=%d, qb=%d", conn->session_id, length, conn->recv_queued_bytes); + task.info.len = rdata->info.len; + task.data = rdata->data; + if (rdata->info.len > 0) { + sw_atomic_fetch_add(&conn->recv_queued_bytes, rdata->info.len); + swoole_trace_log(SW_TRACE_SERVER, + "session_id=%ld, len=%d, qb=%d", + conn->session_id, + rdata->info.len, + conn->recv_queued_bytes); } if (!serv->factory->dispatch(&task)) { return_code = SW_ERR; - if (length > 0) { - sw_atomic_fetch_sub(&conn->recv_queued_bytes, length); + if (rdata->info.len > 0) { + sw_atomic_fetch_sub(&conn->recv_queued_bytes, rdata->info.len); } } } diff --git a/src/server/worker.cc b/src/server/worker.cc index 85055e6d457..b9659d3f355 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -33,7 +33,7 @@ using namespace network; static int Worker_onPipeReceive(Reactor *reactor, Event *event); static int Worker_onStreamAccept(Reactor *reactor, Event *event); static int Worker_onStreamRead(Reactor *reactor, Event *event); -static int Worker_onStreamPackage(Protocol *proto, Socket *sock, const char *data, uint32_t length); +static int Worker_onStreamPackage(const Protocol *proto, Socket *sock, const RecvData *rdata); static int Worker_onStreamClose(Reactor *reactor, Event *event); static void Worker_reactor_try_to_exit(Reactor *reactor); @@ -175,14 +175,14 @@ static int Worker_onStreamClose(Reactor *reactor, Event *event) { return SW_OK; } -static int Worker_onStreamPackage(Protocol *proto, Socket *sock, const char *data, uint32_t length) { +static int Worker_onStreamPackage(const Protocol *proto, Socket *sock, const RecvData *rdata) { Server *serv = (Server *) proto->private_data_2; SendData task{}; - memcpy(&task.info, data + proto->package_length_size, sizeof(task.info)); - task.info.len = length - (uint32_t) sizeof(task.info) - proto->package_length_size; + memcpy(&task.info, rdata->data + proto->package_length_size, sizeof(task.info)); + task.info.len = rdata->info.len - (uint32_t) sizeof(task.info) - proto->package_length_size; if (task.info.len > 0) { - task.data = (char *) (data + proto->package_length_size + sizeof(task.info)); + task.data = (char *) (rdata->data + proto->package_length_size + sizeof(task.info)); } serv->last_stream_socket = sock;