From db54296990bbccf8eb5069a7440e733578045145 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Wed, 11 Sep 2024 15:33:21 +0200 Subject: [PATCH] wip --- eng/testing/tests.wasi.targets | 2 + .../System.Net.Sockets/Directory.Build.props | 3 +- .../src/System.Net.Sockets.csproj | 6 +- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 8 +- .../Net/Sockets/SocketAsyncEngine.Wasi.cs | 65 +++++++++ .../src/System/Net/Sockets/SocketPal.Unix.cs | 14 ++ .../System.Net.Sockets.Tests.csproj | 3 +- src/mono/wasi/mono-include/descriptor_table.h | 130 ++++++++++++++++++ .../libs/System.Native/pal_networking.c | 29 ++-- 9 files changed, 242 insertions(+), 18 deletions(-) create mode 100644 src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Wasi.cs create mode 100644 src/mono/wasi/mono-include/descriptor_table.h diff --git a/eng/testing/tests.wasi.targets b/eng/testing/tests.wasi.targets index e076ac18c4b0b8..db27c71fcb20ab 100644 --- a/eng/testing/tests.wasi.targets +++ b/eng/testing/tests.wasi.targets @@ -50,6 +50,8 @@ <_XHarnessArgs >$(_XHarnessArgs) --engine-arg=--wasm --engine-arg=max-wasm-stack=134217728 <_XHarnessArgs >$(_XHarnessArgs) --engine-arg=--wasi --engine-arg=http <_XHarnessArgs >$(_XHarnessArgs) --engine-arg=--wasi --engine-arg=inherit-network + <_XHarnessArgs >$(_XHarnessArgs) --engine-arg=--wasi --engine-arg=tcp + <_XHarnessArgs >$(_XHarnessArgs) --engine-arg=--wasi --engine-arg=udp <_XHarnessArgs >$(_XHarnessArgs) --engine-arg=--wasi --engine-arg=allow-ip-name-lookup <_XHarnessArgs >$(_XHarnessArgs) --engine-arg=--env --engine-arg=DOTNET_WASI_PRINT_EXIT_CODE=1 <_XHarnessArgs Condition="'$(WasmXHarnessArgsCli)' != ''" >$(_XHarnessArgs) $(WasmXHarnessArgsCli) diff --git a/src/libraries/System.Net.Sockets/Directory.Build.props b/src/libraries/System.Net.Sockets/Directory.Build.props index bc799605d32edf..ce244cbea56199 100644 --- a/src/libraries/System.Net.Sockets/Directory.Build.props +++ b/src/libraries/System.Net.Sockets/Directory.Build.props @@ -3,7 +3,6 @@ Microsoft true - - browser;wasi + browser \ No newline at end of file diff --git a/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj b/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj index e7a4cfbcce3c9f..685e5c9b46bdcb 100644 --- a/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj +++ b/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj @@ -1,7 +1,7 @@ - $(NetCoreAppCurrent)-windows;$(NetCoreAppCurrent)-unix;$(NetCoreAppCurrent)-osx;$(NetCoreAppCurrent)-ios;$(NetCoreAppCurrent)-tvos;$(NetCoreAppCurrent) + $(NetCoreAppCurrent)-windows;$(NetCoreAppCurrent)-unix;$(NetCoreAppCurrent)-osx;$(NetCoreAppCurrent)-ios;$(NetCoreAppCurrent)-tvos;$(NetCoreAppCurrent)-wasi;$(NetCoreAppCurrent) true @@ -15,6 +15,7 @@ SR.SystemNetSockets_PlatformNotSupported true $(DefineConstants);SYSTEM_NET_SOCKETS_APPLE_PLATFROM + $(DefineConstants);TARGET_WASI @@ -183,11 +184,12 @@ Link="Common\System\Net\CompletionPortHelper.Windows.cs" /> - + + diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 43364203118470..88e556ab659f84 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -9,7 +9,7 @@ namespace System.Net.Sockets { - internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem + internal sealed unsafe partial class SocketAsyncEngine : IThreadPoolWorkItem { private const int EventBufferCount = #if DEBUG @@ -143,6 +143,7 @@ public void UnregisterSocket(IntPtr socketHandle) _handleToContextMap.TryRemove(socketHandle, out _); } +#if !TARGET_WASI private SocketAsyncEngine() { _port = (IntPtr)(-1); @@ -161,9 +162,11 @@ private SocketAsyncEngine() } } + Console.WriteLine("SocketAsyncEngine C"); fixed (Interop.Sys.SocketEvent** bufferPtr = &_buffer) { err = Interop.Sys.CreateSocketEventBuffer(EventBufferCount, bufferPtr); + Console.WriteLine("SocketAsyncEngine D"); if (err != Interop.Error.SUCCESS) { throw new InternalException(err); @@ -217,6 +220,7 @@ private void EventLoop() Environment.FailFast("Exception thrown from SocketAsyncEngine event loop: " + e.ToString(), e); } } +#endif private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty) { @@ -303,6 +307,7 @@ void IThreadPoolWorkItem.Execute() } while (Environment.TickCount - startTimeMs < 15 && eventQueue.TryDequeue(out ev)); } +#if !TARGET_WASI private void FreeNativeResources() { if (_buffer != null) @@ -314,6 +319,7 @@ private void FreeNativeResources() Interop.Sys.CloseSocketEventPort(_port); } } +#endif // !TARGET_WASI // The JIT is allowed to arbitrarily extend the lifetime of locals, which may retain SocketAsyncContext references, // indirectly preventing Socket instances to be finalized, despite being no longer referenced by user code. diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Wasi.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Wasi.cs new file mode 100644 index 00000000000000..46b388a06c9ff6 --- /dev/null +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Wasi.cs @@ -0,0 +1,65 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Sockets +{ + internal sealed partial class SocketAsyncEngine : IThreadPoolWorkItem + { + private SocketAsyncEngine() + { +#pragma warning disable CS4014 + WasiSocketsEventLoop(); +#pragma warning restore CS4014 + } + + private async Task WasiSocketsEventLoop() + { + try + { + SocketEventHandler handler = new SocketEventHandler(this); + while (true) + { + int numEvents = EventBufferCount; + await WaitForAnySocketPollable().ConfigureAwait(false); + + // The native shim is responsible for ensuring this condition. + Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); + + // Only enqueue a work item if the stage is NotScheduled. + // Otherwise there must be a work item already queued or another thread already handling parallelization. + if (handler.HandleSocketEvents(numEvents) && + Interlocked.Exchange( + ref _eventQueueProcessingStage, + EventQueueProcessingStage.Scheduled) == EventQueueProcessingStage.NotScheduled) + { + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + } + } + catch (Exception e) + { + Environment.FailFast("Exception thrown from SocketAsyncEngine event loop: " + e.ToString(), e); + } + } + +#pragma warning disable CA1822 // TODO remove + private Task WaitForAnySocketPollable() + { + TaskCompletionSource todo = new TaskCompletionSource(); + /* TODO something like this, but with handle->pollable conversion + Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, handler.Buffer, &numEvents); + if (err != Interop.Error.SUCCESS) + { + throw new InternalException(err); + }*/ + return todo.Task; + } + } +} diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs index 579c2dea66160c..446efb4c12484f 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs @@ -289,6 +289,9 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, IL int startIndex = bufferIndex, startOffset = offset; int maxBuffers = buffers.Count - startIndex; +#if TARGET_WASI // WASI doesn't have iovecs and recvmsg in preview2 + maxBuffers = Math.Max(maxBuffers, 1); +#endif bool allocOnStack = maxBuffers <= IovStackThreshold; Span handles = allocOnStack ? stackalloc GCHandle[IovStackThreshold] : new GCHandle[maxBuffers]; Span iovecs = allocOnStack ? stackalloc Interop.Sys.IOVector[IovStackThreshold] : new Interop.Sys.IOVector[maxBuffers]; @@ -376,6 +379,9 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, Debug.Assert(socket.IsSocket); int maxBuffers = buffers.Count; +#if TARGET_WASI // WASI doesn't have iovecs and recvmsg in preview2 + maxBuffers = Math.Max(maxBuffers, 1); +#endif bool allocOnStack = maxBuffers <= IovStackThreshold; // When there are many buffers, reduce the number of pinned buffers based on available bytes. @@ -532,6 +538,10 @@ private static unsafe int SysReceiveMessageFrom( Debug.Assert(socket.IsSocket); int buffersCount = buffers.Count; +#if TARGET_WASI // WASI doesn't have iovecs and sendmsg in preview2 + buffersCount = Math.Max(buffersCount, 1); +#endif + bool allocOnStack = buffersCount <= IovStackThreshold; Span handles = allocOnStack ? stackalloc GCHandle[IovStackThreshold] : new GCHandle[buffersCount]; Span iovecs = allocOnStack ? stackalloc Interop.Sys.IOVector[IovStackThreshold] : new Interop.Sys.IOVector[buffersCount]; @@ -554,8 +564,10 @@ private static unsafe int SysReceiveMessageFrom( fixed (byte* sockAddr = socketAddress) fixed (Interop.Sys.IOVector* iov = iovecs) { +#if !TARGET_WASI // WASI doesn't have msg_control and sendmsg in preview2 int cmsgBufferLen = Interop.Sys.GetControlMessageBufferSize(Convert.ToInt32(isIPv4), Convert.ToInt32(isIPv6)); byte* cmsgBuffer = stackalloc byte[cmsgBufferLen]; +#endif var messageHeader = new Interop.Sys.MessageHeader { @@ -563,8 +575,10 @@ private static unsafe int SysReceiveMessageFrom( SocketAddressLen = socketAddress.Length, IOVectors = iov, IOVectorCount = iovCount, +#if !TARGET_WASI // WASI doesn't have msg_control and sendmsg in preview2 ControlBuffer = cmsgBuffer, ControlBufferLen = cmsgBufferLen +#endif }; long received = 0; diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/System.Net.Sockets.Tests.csproj b/src/libraries/System.Net.Sockets/tests/FunctionalTests/System.Net.Sockets.Tests.csproj index 2b5c1cea632f06..43844aea397681 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/System.Net.Sockets.Tests.csproj +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/System.Net.Sockets.Tests.csproj @@ -2,9 +2,10 @@ true true - $(NetCoreAppCurrent)-windows;$(NetCoreAppCurrent)-unix;$(NetCoreAppCurrent)-browser + $(NetCoreAppCurrent)-windows;$(NetCoreAppCurrent)-unix;$(NetCoreAppCurrent)-browser;$(NetCoreAppCurrent)-wasi true true + true diff --git a/src/mono/wasi/mono-include/descriptor_table.h b/src/mono/wasi/mono-include/descriptor_table.h new file mode 100644 index 00000000000000..ce1be20763535f --- /dev/null +++ b/src/mono/wasi/mono-include/descriptor_table.h @@ -0,0 +1,130 @@ +// copy from https://github.com/WebAssembly/wasi-libc/blob/230d4be6c54bec93181050f9e25c87150506bdd0/libc-bottom-half/headers/private/wasi/descriptor_table.h +// https://github.com/WebAssembly/wasi-libc/blob/main/LICENSE-MIT + +#ifndef DESCRIPTOR_TABLE_H +#define DESCRIPTOR_TABLE_H + +#include + +typedef struct { + int dummy; +} tcp_socket_state_unbound_t; +typedef struct { + int dummy; +} tcp_socket_state_bound_t; +typedef struct { + int dummy; +} tcp_socket_state_connecting_t; +typedef struct { + int dummy; +} tcp_socket_state_listening_t; + +typedef struct { + streams_own_input_stream_t input; + poll_own_pollable_t input_pollable; + streams_own_output_stream_t output; + poll_own_pollable_t output_pollable; +} tcp_socket_state_connected_t; + +typedef struct { + network_error_code_t error_code; +} tcp_socket_state_connect_failed_t; + +// This is a tagged union. When adding/removing/renaming cases, be sure to keep the tag and union definitions in sync. +typedef struct { + enum { + TCP_SOCKET_STATE_UNBOUND, + TCP_SOCKET_STATE_BOUND, + TCP_SOCKET_STATE_CONNECTING, + TCP_SOCKET_STATE_CONNECTED, + TCP_SOCKET_STATE_CONNECT_FAILED, + TCP_SOCKET_STATE_LISTENING, + } tag; + union { + tcp_socket_state_unbound_t unbound; + tcp_socket_state_bound_t bound; + tcp_socket_state_connecting_t connecting; + tcp_socket_state_connected_t connected; + tcp_socket_state_connect_failed_t connect_failed; + tcp_socket_state_listening_t listening; + }; +} tcp_socket_state_t; + +typedef struct { + tcp_own_tcp_socket_t socket; + poll_own_pollable_t socket_pollable; + bool blocking; + bool fake_nodelay; + bool fake_reuseaddr; + network_ip_address_family_t family; + tcp_socket_state_t state; +} tcp_socket_t; + +typedef struct { + udp_own_incoming_datagram_stream_t incoming; + poll_own_pollable_t incoming_pollable; + udp_own_outgoing_datagram_stream_t outgoing; + poll_own_pollable_t outgoing_pollable; +} udp_socket_streams_t; + +typedef struct { + int dummy; +} udp_socket_state_unbound_t; +typedef struct { + int dummy; +} udp_socket_state_bound_nostreams_t; + +typedef struct { + udp_socket_streams_t streams; // Streams have no remote_address +} udp_socket_state_bound_streaming_t; + +typedef struct { + udp_socket_streams_t streams; // Streams have a remote_address +} udp_socket_state_connected_t; + +// This is a tagged union. When adding/removing/renaming cases, be sure to keep the tag and union definitions in sync. +// The "bound" state is split up into two distinct tags: +// - "bound_nostreams": Bound, but no datagram streams set up (yet). That will be done the first time send or recv is called. +// - "bound_streaming": Bound with active streams. +typedef struct { + enum { + UDP_SOCKET_STATE_UNBOUND, + UDP_SOCKET_STATE_BOUND_NOSTREAMS, + UDP_SOCKET_STATE_BOUND_STREAMING, + UDP_SOCKET_STATE_CONNECTED, + } tag; + union { + udp_socket_state_unbound_t unbound; + udp_socket_state_bound_nostreams_t bound_nostreams; + udp_socket_state_bound_streaming_t bound_streaming; + udp_socket_state_connected_t connected; + }; +} udp_socket_state_t; + +typedef struct { + udp_own_udp_socket_t socket; + poll_own_pollable_t socket_pollable; + bool blocking; + network_ip_address_family_t family; + udp_socket_state_t state; +} udp_socket_t; + +// This is a tagged union. When adding/removing/renaming cases, be sure to keep the tag and union definitions in sync. +typedef struct { + enum { + DESCRIPTOR_TABLE_ENTRY_TCP_SOCKET, + DESCRIPTOR_TABLE_ENTRY_UDP_SOCKET, + } tag; + union { + tcp_socket_t tcp_socket; + udp_socket_t udp_socket; + }; +} descriptor_table_entry_t; + +bool descriptor_table_insert(descriptor_table_entry_t entry, int *fd); + +bool descriptor_table_get_ref(int fd, descriptor_table_entry_t **entry); + +bool descriptor_table_remove(int fd, descriptor_table_entry_t *entry); + +#endif diff --git a/src/native/libs/System.Native/pal_networking.c b/src/native/libs/System.Native/pal_networking.c index 752df3926a73c5..e1afb97a1e1226 100644 --- a/src/native/libs/System.Native/pal_networking.c +++ b/src/native/libs/System.Native/pal_networking.c @@ -1550,15 +1550,18 @@ int32_t SystemNative_ReceiveMessage(intptr_t socket, MessageHeader* messageHeade } ssize_t res; -#if !HAVE_CMSGHDR - // TODO https://github.com/dotnet/runtime/issues/98957 - return Error_ENOTSUP; -#else // !HAVE_CMSGHDR +#if HAVE_CMSGHDR struct msghdr header; ConvertMessageHeaderToMsghdr(&header, messageHeader, fd); while ((res = recvmsg(fd, &header, socketFlags)) < 0 && errno == EINTR); +#else // HAVE_CMSGHDR + // we will only use 0th buffer + struct iovec* msg_iov = (struct iovec*)messageHeader->IOVectors; + while ((res = recvfrom(fd, msg_iov[0].iov_base, msg_iov[0].iov_len, socketFlags, (sockaddr *)messageHeader->SocketAddress, (socklen_t*) &(messageHeader->SocketAddressLen))) < 0 && errno == EINTR); +#endif // HAVE_CMSGHDR +#if HAVE_CMSGHDR assert(header.msg_name == messageHeader->SocketAddress); // should still be the same location as set in ConvertMessageHeaderToMsghdr assert(header.msg_control == messageHeader->ControlBuffer); @@ -1569,6 +1572,7 @@ int32_t SystemNative_ReceiveMessage(intptr_t socket, MessageHeader* messageHeade messageHeader->ControlBufferLen = Min((int32_t)header.msg_controllen, messageHeader->ControlBufferLen); messageHeader->Flags = ConvertSocketFlagsPlatformToPal(header.msg_flags); +#endif // HAVE_CMSGHDR if (res != -1) { @@ -1578,7 +1582,6 @@ int32_t SystemNative_ReceiveMessage(intptr_t socket, MessageHeader* messageHeade *received = 0; return SystemNative_ConvertErrorPlatformToPal(errno); -#endif // !HAVE_CMSGHDR } int32_t SystemNative_Send(intptr_t socket, void* buffer, int32_t bufferLen, int32_t flags, int32_t* sent) @@ -1632,11 +1635,8 @@ int32_t SystemNative_SendMessage(intptr_t socket, MessageHeader* messageHeader, return Error_ENOTSUP; } -#if !HAVE_CMSGHDR - // TODO https://github.com/dotnet/runtime/issues/98957 - return Error_ENOTSUP; -#else // !HAVE_CMSGHDR ssize_t res; +#if HAVE_CMSGHDR struct msghdr header; ConvertMessageHeaderToMsghdr(&header, messageHeader, fd); @@ -1649,6 +1649,12 @@ int32_t SystemNative_SendMessage(intptr_t socket, MessageHeader* messageHeader, #else while ((res = sendmsg(fd, &header, socketFlags)) < 0 && errno == EINTR); #endif +#else // HAVE_CMSGHDR + // we will only use 0th buffer + struct iovec* msg_iov = (struct iovec*)messageHeader->IOVectors; + while ((res = sendto(fd, msg_iov[0].iov_base, msg_iov[0].iov_len, socketFlags, (sockaddr *)messageHeader->SocketAddress, (socklen_t)messageHeader->SocketAddressLen)) < 0 && errno == EINTR); +#endif // HAVE_CMSGHDR + if (res != -1) { *sent = res; @@ -1657,7 +1663,6 @@ int32_t SystemNative_SendMessage(intptr_t socket, MessageHeader* messageHeader, *sent = 0; return SystemNative_ConvertErrorPlatformToPal(errno); -#endif // !HAVE_CMSGHDR } int32_t SystemNative_Accept(intptr_t socket, uint8_t* socketAddress, int32_t* socketAddressLen, intptr_t* acceptedSocket) @@ -3300,7 +3305,7 @@ static int32_t WaitForSocketEventsInner(int32_t port, SocketEvent* buffer, int32 return Error_SUCCESS; } -#else +#else // !HAVE_KQUEUE !HAVE_EPOLL static const size_t SocketEventBufferElementSize = 0; static int32_t CloseSocketEventPortInner(int32_t port) @@ -3322,7 +3327,7 @@ static int32_t WaitForSocketEventsInner(int32_t port, SocketEvent* buffer, int32 return Error_ENOSYS; } -#endif +#endif // HAVE_KQUEUE HAVE_EPOLL int32_t SystemNative_CreateSocketEventPort(intptr_t* port) {