diff --git a/cpp/mrc/include/mrc/codable/fundamental_types.hpp b/cpp/mrc/include/mrc/codable/fundamental_types.hpp index 0cbf4e6fe..11199a822 100644 --- a/cpp/mrc/include/mrc/codable/fundamental_types.hpp +++ b/cpp/mrc/include/mrc/codable/fundamental_types.hpp @@ -22,6 +22,7 @@ #include "mrc/codable/encode.hpp" #include "mrc/codable/encoding_options.hpp" #include "mrc/codable/types.hpp" +#include "mrc/memory/literals.hpp" #include "mrc/memory/memory_kind.hpp" #include "mrc/utils/tuple_utils.hpp" @@ -157,6 +158,8 @@ struct codable_protocol> mrc::codable::Encoder2>& encoder, const mrc::codable::EncodingOptions& opts) { + using namespace mrc::memory::literals; + // First put in the size mrc::codable::encode2(obj.size(), encoder, opts); @@ -164,7 +167,7 @@ struct codable_protocol> { // Since these are fundamental types, just encode in a single memory block encoder.write_descriptor({obj.data(), obj.size() * sizeof(T), memory::memory_kind::host}, - DescriptorKind::Deferred); + obj.size() * sizeof(T) > 1_MiB ? DescriptorKind::Eager : DescriptorKind::Deferred); } else { diff --git a/cpp/mrc/include/mrc/coroutines/semaphore.hpp b/cpp/mrc/include/mrc/coroutines/semaphore.hpp new file mode 100644 index 000000000..eb5c09939 --- /dev/null +++ b/cpp/mrc/include/mrc/coroutines/semaphore.hpp @@ -0,0 +1,468 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Original Source: https://github.com/jbaldwin/libcoro + * Original License: Apache License, Version 2.0; included below + */ + +/** + * Copyright 2021 Josh Baldwin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/core/expected.hpp" +#include "mrc/coroutines/schedule_policy.hpp" +#include "mrc/coroutines/thread_local_context.hpp" +#include "mrc/coroutines/thread_pool.hpp" + +#include + +#include +#include +#include +#include +#include + +namespace mrc::coroutines { + +enum class RingBufferOpStatus +{ + Success, + Stopped, +}; + +class Semaphore +{ + using mutex_type = std::mutex; + + public: + struct Options + { + // capacity of ring buffer + std::size_t capacity{8}; + + // when there is an awaiting reader, the active execution context of the next writer will resume the awaiting + // reader, the schedule_policy_t dictates how that is accomplished. + SchedulePolicy reader_policy{SchedulePolicy::Reschedule}; + + // when there is an awaiting writer, the active execution context of the next reader will resume the awaiting + // writer, the producder_policy_t dictates how that is accomplished. + SchedulePolicy writer_policy{SchedulePolicy::Reschedule}; + }; + + /** + * @throws std::runtime_error If `num_elements` == 0. + */ + explicit Semaphore(Options opts = {}) : + m_num_elements(opts.capacity), + m_writer_policy(opts.writer_policy), + m_reader_policy(opts.reader_policy) + { + if (m_num_elements == 0) + { + throw std::runtime_error{"num_elements cannot be zero"}; + } + } + + ~Semaphore() + { + // // Wake up anyone still using the ring buffer. + // notify_waiters(); + } + + Semaphore(const Semaphore&) = delete; + Semaphore(Semaphore&&) = delete; + + auto operator=(const Semaphore&) noexcept -> Semaphore& = delete; + auto operator=(Semaphore&&) noexcept -> Semaphore& = delete; + + struct WriteOperation : ThreadLocalContext + { + WriteOperation(Semaphore& rb) : m_rb(rb), m_policy(m_rb.m_writer_policy) {} + + auto await_ready() noexcept -> bool + { + // return immediate if the buffer is closed + if (m_rb.m_stopped.load(std::memory_order::acquire)) + { + m_stopped = true; + return true; + } + + // start a span to time the write - this would include time suspended if the buffer is full + // m_write_span->AddEvent("start_on", {{"thead.id", mrc::this_thread::get_id()}}); + + // the lock is owned by the operation, not scoped to the await_ready function + m_lock = std::unique_lock(m_rb.m_mutex); + return m_rb.try_write_locked(m_lock); + } + + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool + { + // m_lock was acquired as part of await_ready; await_suspend is responsible for releasing the lock + auto lock = std::move(m_lock); // use raii + + ThreadLocalContext::suspend_thread_local_context(); + + m_awaiting_coroutine = awaiting_coroutine; + m_next = m_rb.m_write_waiters; + m_rb.m_write_waiters = this; + return true; + } + + /** + * @return write_result + */ + auto await_resume() -> RingBufferOpStatus + { + ThreadLocalContext::resume_thread_local_context(); + return (!m_stopped ? RingBufferOpStatus::Success : RingBufferOpStatus::Stopped); + } + + WriteOperation& use_scheduling_policy(SchedulePolicy policy) & + { + m_policy = policy; + return *this; + } + + WriteOperation use_scheduling_policy(SchedulePolicy policy) && + { + m_policy = policy; + return std::move(*this); + } + + WriteOperation& resume_immediately() & + { + m_policy = SchedulePolicy::Immediate; + return *this; + } + + WriteOperation resume_immediately() && + { + m_policy = SchedulePolicy::Immediate; + return std::move(*this); + } + + WriteOperation& resume_on(ThreadPool* thread_pool) & + { + m_policy = SchedulePolicy::Reschedule; + set_resume_on_thread_pool(thread_pool); + return *this; + } + + WriteOperation resume_on(ThreadPool* thread_pool) && + { + m_policy = SchedulePolicy::Reschedule; + set_resume_on_thread_pool(thread_pool); + return std::move(*this); + } + + private: + friend Semaphore; + + void resume() + { + if (m_policy == SchedulePolicy::Immediate) + { + set_resume_on_thread_pool(nullptr); + } + resume_coroutine(m_awaiting_coroutine); + } + + /// The lock is acquired in await_ready; if ready it is release; otherwise, await_suspend should release it + std::unique_lock m_lock; + /// The ring buffer the element is being written into. + Semaphore& m_rb; + /// If the operation needs to suspend, the coroutine to resume when the element can be written. + std::coroutine_handle<> m_awaiting_coroutine; + /// Linked list of write operations that are awaiting to write their element. + WriteOperation* m_next{nullptr}; + /// Was the operation stopped? + bool m_stopped{false}; + /// Scheduling Policy - default provided by the Semaphore, but can be overrided owner of the Awaiter + SchedulePolicy m_policy; + /// Span to measure the duration the writer spent writting data + // trace::Handle m_write_span{nullptr}; + }; + + struct ReadOperation : ThreadLocalContext + { + explicit ReadOperation(Semaphore& rb) : m_rb(rb), m_policy(m_rb.m_reader_policy) {} + + auto await_ready() noexcept -> bool + { + // the lock is owned by the operation, not scoped to the await_ready function + m_lock = std::unique_lock(m_rb.m_mutex); + // m_read_span->AddEvent("start_on", {{"thead.id", mrc::this_thread::get_id()}}); + return m_rb.try_read_locked(m_lock, this); + } + + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool + { + // m_lock was acquired as part of await_ready; await_suspend is responsible for releasing the lock + auto lock = std::move(m_lock); + + // // the buffer is empty; don't suspend if the stop signal has been set. + // if (m_rb.m_stopped.load(std::memory_order::acquire)) + // { + // m_stopped = true; + // return false; + // } + + // m_read_span->AddEvent("buffer_empty"); + ThreadLocalContext::suspend_thread_local_context(); + + m_awaiting_coroutine = awaiting_coroutine; + m_next = m_rb.m_read_waiters; + m_rb.m_read_waiters = this; + return true; + } + + /** + * @return The consumed element or std::nullopt if the read has failed. + */ + auto await_resume() -> mrc::expected + { + ThreadLocalContext::resume_thread_local_context(); + + if (m_stopped) + { + return mrc::unexpected(RingBufferOpStatus::Stopped); + } + + return std::move(m_e); + } + + ReadOperation& use_scheduling_policy(SchedulePolicy policy) + { + m_policy = policy; + return *this; + } + + ReadOperation& resume_immediately() + { + m_policy = SchedulePolicy::Immediate; + return *this; + } + + ReadOperation& resume_on(ThreadPool* thread_pool) + { + m_policy = SchedulePolicy::Reschedule; + set_resume_on_thread_pool(thread_pool); + return *this; + } + + private: + friend Semaphore; + + void resume() + { + if (m_policy == SchedulePolicy::Immediate) + { + set_resume_on_thread_pool(nullptr); + } + resume_coroutine(m_awaiting_coroutine); + } + + /// The lock is acquired in await_ready; if ready it is release; otherwise, await_suspend should release it + std::unique_lock m_lock; + /// The ring buffer to read an element from. + Semaphore& m_rb; + /// If the operation needs to suspend, the coroutine to resume when the element can be consumed. + std::coroutine_handle<> m_awaiting_coroutine; + /// Linked list of read operations that are awaiting to read an element. + ReadOperation* m_next{nullptr}; + /// Was the operation stopped? + bool m_stopped{false}; + /// Scheduling Policy - default provided by the Semaphore, but can be overrided owner of the Awaiter + SchedulePolicy m_policy; + /// Span measure time awaiting on reading data + // trace::Handle m_read_span; + }; + + /** + * Produces the given element into the ring buffer. This operation will suspend until a slot + * in the ring buffer becomes available. + * @param e The element to write. + */ + [[nodiscard]] auto write() -> WriteOperation + { + return WriteOperation{*this}; + } + + /** + * Consumes an element from the ring buffer. This operation will suspend until an element in + * the ring buffer becomes available. + */ + [[nodiscard]] auto read() -> ReadOperation + { + return ReadOperation{*this}; + } + + /** + * @return The current number of elements contained in the ring buffer. + */ + auto size() const -> size_t + { + std::atomic_thread_fence(std::memory_order::acquire); + return m_used; + } + + /** + * @return True if the ring buffer contains zero elements. + */ + auto empty() const -> bool + { + return size() == 0; + } + + // /** + // * Wakes up all currently awaiting writers and readers. Their await_resume() function + // * will return an expected read result that the ring buffer has stopped. + // */ + // auto notify_waiters() -> void + // { + // // Only wake up waiters once. + // if (m_stopped.load(std::memory_order::acquire)) + // { + // return; + // } + + // std::unique_lock lk{m_mutex}; + // m_stopped.exchange(true, std::memory_order::release); + + // while (m_write_waiters != nullptr) + // { + // auto* to_resume = m_write_waiters; + // to_resume->m_stopped = true; + // m_write_waiters = m_write_waiters->m_next; + + // lk.unlock(); + // to_resume->resume(); + // lk.lock(); + // } + + // while (m_read_waiters != nullptr) + // { + // auto* to_resume = m_read_waiters; + // to_resume->m_stopped = true; + // m_read_waiters = m_read_waiters->m_next; + + // lk.unlock(); + // to_resume->resume(); + // lk.lock(); + // } + // } + + private: + friend WriteOperation; + friend ReadOperation; + + mutex_type m_mutex{}; + + const std::size_t m_num_elements; + const SchedulePolicy m_writer_policy; + const SchedulePolicy m_reader_policy; + + /// The number of items in the ring buffer. + size_t m_used{0}; + + /// The LIFO list of write waiters - single writers will have order perserved + // Note: if there are multiple writers order can not be guaranteed, so no need for FIFO + WriteOperation* m_write_waiters{nullptr}; + /// The LIFO list of read watier. + ReadOperation* m_read_waiters{nullptr}; + + auto try_write_locked(std::unique_lock& lk) -> bool + { + if (m_used == m_num_elements) + { + DCHECK(m_read_waiters == nullptr); + return false; + } + + ++m_used; + + ReadOperation* to_resume = nullptr; + + if (m_read_waiters != nullptr) + { + to_resume = m_read_waiters; + m_read_waiters = m_read_waiters->m_next; + + // Since the read operation suspended it needs to be provided an element to read. + --m_used; // And we just consumed up another item. + } + + // release lock + lk.unlock(); + + if (to_resume != nullptr) + { + to_resume->resume(); + } + + return true; + } + + auto try_read_locked(std::unique_lock& lk, ReadOperation* op) -> bool + { + if (m_used == 0) + { + return false; + } + + --m_used; + + WriteOperation* to_resume = nullptr; + + if (m_write_waiters != nullptr) + { + to_resume = m_write_waiters; + m_write_waiters = m_write_waiters->m_next; + + // Since the write operation suspended it needs to be provided a slot to place its element. + ++m_used; // And we just written another item. + } + + // release lock + lk.unlock(); + + if (to_resume != nullptr) + { + to_resume->resume(); + } + + return true; + } +}; + +} // namespace mrc::coroutines diff --git a/cpp/mrc/src/internal/data_plane/data_plane_resources.cpp b/cpp/mrc/src/internal/data_plane/data_plane_resources.cpp index 642aa0c3e..323f7f6b9 100644 --- a/cpp/mrc/src/internal/data_plane/data_plane_resources.cpp +++ b/cpp/mrc/src/internal/data_plane/data_plane_resources.cpp @@ -258,6 +258,11 @@ DataPlaneResources2::DataPlaneResources2() : // << dec_message->object_id << ") " << start_tokens << " -> " << dec_message->tokens << " -> " // << remaining_tokens << ". Destroying."; + for (const auto& payload : remote_descriptor->encoded_object().payloads()) + { + m_registration_cache->drop_block(payload.address(), payload.bytes()); + } + m_remote_descriptor_by_id.erase(dec_message->object_id); } else diff --git a/cpp/mrc/src/internal/data_plane/data_plane_resources.hpp b/cpp/mrc/src/internal/data_plane/data_plane_resources.hpp index 44a48a9ee..9a55354a5 100644 --- a/cpp/mrc/src/internal/data_plane/data_plane_resources.hpp +++ b/cpp/mrc/src/internal/data_plane/data_plane_resources.hpp @@ -236,7 +236,7 @@ class DataPlaneResources2 std::map> m_endpoints_by_address; std::map> m_endpoints_by_id; - std::atomic_size_t m_next_object_id{0}; + std::atomic_size_t m_next_object_id{1000}; std::unique_ptr>> m_inbound_channel; diff --git a/cpp/mrc/src/internal/ucx/registration_cache.cpp b/cpp/mrc/src/internal/ucx/registration_cache.cpp index 2f556ac03..00dd4649a 100644 --- a/cpp/mrc/src/internal/ucx/registration_cache.cpp +++ b/cpp/mrc/src/internal/ucx/registration_cache.cpp @@ -19,6 +19,7 @@ #include "internal/ucx/utils.hpp" +#include #include #include @@ -169,4 +170,28 @@ std::optional> RegistrationCache3::lookup(ui return this->lookup(reinterpret_cast(addr)); } +std::size_t RegistrationCache3::drop_block(const void* addr, std::size_t bytes) +{ + std::lock_guard lock(m_mutex); + + auto found = m_memory_handle_by_address.find(addr); + + if (found == m_memory_handle_by_address.end()) + { + throw std::runtime_error("Memory block not found"); + } + + auto handle = found->second; + + m_memory_handle_by_address.erase(addr); + + DCHECK_EQ(handle->getSize(), bytes); + + return handle->getSize(); +} + +std::size_t RegistrationCache3::drop_block(uintptr_t addr, std::size_t bytes) +{ + return this->drop_block(reinterpret_cast(addr), bytes); +} } // namespace mrc::ucx diff --git a/cpp/mrc/src/internal/ucx/registration_cache.hpp b/cpp/mrc/src/internal/ucx/registration_cache.hpp index 3ffad0832..924c954bf 100644 --- a/cpp/mrc/src/internal/ucx/registration_cache.hpp +++ b/cpp/mrc/src/internal/ucx/registration_cache.hpp @@ -215,6 +215,17 @@ class RegistrationCache3 final std::optional> lookup(uintptr_t addr) const noexcept; + /** + * @brief Deregister a contiguous block of memory from the ucx context and remove the cache entry + * + * @param addr + * @param bytes + * @return std::size_t + */ + std::size_t drop_block(const void* addr, std::size_t bytes); + + std::size_t drop_block(uintptr_t addr, std::size_t bytes); + private: mutable std::mutex m_mutex; const std::shared_ptr m_context; diff --git a/cpp/mrc/tests/test_executor.cpp b/cpp/mrc/tests/test_executor.cpp index c8322d3f7..f04f93772 100644 --- a/cpp/mrc/tests/test_executor.cpp +++ b/cpp/mrc/tests/test_executor.cpp @@ -196,7 +196,7 @@ class TestExecutor : public ::testing::Test static std::unique_ptr make_pipeline() { - const size_t msg_count = 10; + const size_t msg_count = 10000; using transfer_t = TrackTimings; auto pipeline = mrc::make_pipeline(); @@ -714,7 +714,7 @@ TEST_F(TestExecutor, MultiNodeA) auto& mapping_1 = machine_1.register_pipeline(std::move(pipeline_1)); - mapping_1.get_segment("seg_2").set_enabled(false); + // mapping_1.get_segment("seg_2").set_enabled(false); mapping_1.get_segment("seg_4").set_enabled(false); auto start_1 = boost::fibers::async([&] { @@ -741,7 +741,7 @@ TEST_F(TestExecutor, MultiNodeB) auto& mapping_2 = machine_2.register_pipeline(std::move(pipeline_2)); mapping_2.get_segment("seg_1").set_enabled(false); - mapping_2.get_segment("seg_3").set_enabled(false); + // mapping_2.get_segment("seg_3").set_enabled(false); auto start_2 = boost::fibers::async([&] { machine_2.start(); diff --git a/debug_ray.py b/debug_ray.py index 06db67ee7..bd9eabbd7 100644 --- a/debug_ray.py +++ b/debug_ray.py @@ -53,7 +53,7 @@ class Step: def step(self, x: TrackTimings): - # y = len(large_object) + y = len(large_object) x.add_timing() @@ -75,7 +75,7 @@ def step(self, x: TrackTimings): print("==============Starting Ray DAG==============") ray_start = time.time() -timings = ray.get([dag.execute(TrackTimings(i, np.zeros((0 * 1024 * 1024) // 8))) for i in range(num_msg)]) +timings = ray.get([dag.execute(TrackTimings(i, np.zeros((4 * 1024 * 1024) // 8))) for i in range(num_msg)]) for timing in timings: if (timing.id == num_msg - 1):