From 4104636456ae408c27be223840a479e39fe8b284 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Mon, 16 Jan 2023 11:44:03 +0100 Subject: [PATCH 01/10] build: Add `fair-software.eu` compliance checker --- .github/workflows/fair-software.yml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 .github/workflows/fair-software.yml diff --git a/.github/workflows/fair-software.yml b/.github/workflows/fair-software.yml new file mode 100644 index 000000000..c9e50b289 --- /dev/null +++ b/.github/workflows/fair-software.yml @@ -0,0 +1,15 @@ +name: fair-software + +on: push + +jobs: + verify: + name: "fair-software" + runs-on: ubuntu-latest + steps: + - uses: fair-software/howfairis-github-action@0.2.1 + name: Measure compliance with fair-software.eu recommendations + env: + PYCHARM_HOSTED: "Trick colorama into displaying colored output" + with: + MY_REPO_URL: "https://github.com/${{ github.repository }}" From d3be9af9b61346d839d8776ffd6134dc1d56bbb2 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Fri, 12 Aug 2022 02:48:09 +0200 Subject: [PATCH 02/10] docs: Add our DOI badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dfdb52a40..879e94117 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) +# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985) C++ Message Queuing Library and Framework From adf91d053d50a27ae99599aa1dff5b1a058954fb Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Mon, 16 Jan 2023 13:06:08 +0100 Subject: [PATCH 03/10] docs: Add OpenSSF Best Practices Badge --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 879e94117..ee1999ae7 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ -# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985) +# FairMQ + +[![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) +[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985) +[![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915) C++ Message Queuing Library and Framework From 1881986ccad77ad26b537628567b99fce80ce041 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Mon, 16 Jan 2023 13:08:13 +0100 Subject: [PATCH 04/10] docs: Add fair-software.eu compliance badge --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index ee1999ae7..dcc516859 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985) [![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915) +[![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8B-orange)](https://fair-software.eu) C++ Message Queuing Library and Framework From d16e473b9195e7f5ca6f91382db550c580409e22 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Mon, 16 Jan 2023 13:26:24 +0100 Subject: [PATCH 05/10] docs: Update fair-software.eu compliance badge And link to the GH workflow page instead of fair-software.eu --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dcc516859..38cd57c26 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985) [![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915) -[![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8B-orange)](https://fair-software.eu) +[![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8F-yellow)](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml) C++ Message Queuing Library and Framework From a982d60ed78ed9e9aa4f66e000fc5c17827ed797 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 4 Oct 2022 11:50:11 +0200 Subject: [PATCH 06/10] example: fix incorrect config --- examples/region/sampler.cxx | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/examples/region/sampler.cxx b/examples/region/sampler.cxx index 12308d34e..a51cc86ee 100644 --- a/examples/region/sampler.cxx +++ b/examples/region/sampler.cxx @@ -39,12 +39,9 @@ struct Sampler : fair::mq::Device if (fExternalRegion) { regionCfg.id = 1; regionCfg.removeOnDestruction = false; - regionCfg.lock = false; // mlock region after creation - regionCfg.lock = false; // mlock region after creation - } else { - regionCfg.lock = true; // mlock region after creation - regionCfg.zero = true; // zero region content after creation } + regionCfg.lock = !fExternalRegion; // mlock region after creation + regionCfg.zero = !fExternalRegion; // zero region content after creation fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor( "data", // region is created using the transport of this channel... 0, // ... and this sub-channel From c3b273cec0fd2c0266078976bf06e28acc2bf5d9 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 4 Oct 2022 12:02:05 +0200 Subject: [PATCH 07/10] shm: Improve debug output a bit --- fairmq/shmem/Manager.h | 23 +++++++++++------------ fairmq/shmem/UnmanagedRegion.h | 25 +++++++++++++++---------- fairmq/shmem/UnmanagedRegionImpl.h | 6 +++--- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index aab01943a..052752b3b 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -207,22 +207,22 @@ class Manager fEventCounter = fManagementSegment.find(unique_instance).first; if (fEventCounter) { - LOG(debug) << "event counter found: " << fEventCounter->fCount; + LOG(trace) << "event counter found: " << fEventCounter->fCount; } else { - LOG(debug) << "no event counter found, creating one and initializing with 0"; + LOG(trace) << "no event counter found, creating one and initializing with 0"; fEventCounter = fManagementSegment.construct(unique_instance)(0); - LOG(debug) << "initialized event counter with: " << fEventCounter->fCount; + LOG(trace) << "initialized event counter with: " << fEventCounter->fCount; } fDeviceCounter = fManagementSegment.find(unique_instance).first; if (fDeviceCounter) { - LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; + LOG(trace) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; (fDeviceCounter->fCount)++; - LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount; + LOG(trace) << "incremented device counter, now: " << fDeviceCounter->fCount; } else { - LOG(debug) << "no device counter found, creating one and initializing with 1"; + LOG(trace) << "no device counter found, creating one and initializing with 1"; fDeviceCounter = fManagementSegment.construct(unique_instance)(1); - LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; + LOG(trace) << "initialized device counter with: " << fDeviceCounter->fCount; } fShmSegments = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); @@ -265,10 +265,10 @@ class Manager } } } - LOG(debug) << "Created/opened shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'." - << " Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes." - << " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes." - << " Allocation algorithm: " << allocationAlgorithm; + LOG(debug) << (createdSegment ? "Created" : "Opened") << " managed shared memory segment " << "fmq_" << fShmId << "_m_" << fSegmentId + << ". Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes." + << " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes." + << " Allocation algorithm: " << allocationAlgorithm; } catch (interprocess_exception& bie) { LOG(error) << "Failed to create/open shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "': " << bie.what(); throw TransportError(tools::ToString("Failed to create/open shared memory segment '", "fmq_", fShmId, "_m_", fSegmentId, "': ", bie.what())); @@ -447,7 +447,6 @@ class Manager UnmanagedRegion* GetRegion(uint16_t id) { std::lock_guard lock(fLocalRegionsMtx); - // remote region could actually be a local one if a message originates from this device (has been sent out and returned) auto it = fRegions.find(id); if (it != fRegions.end()) { return it->second.get(); diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 21ac12251..52b681f88 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -73,6 +73,8 @@ struct UnmanagedRegion // TODO: refactor this cfg.size = size; + const uint16_t id = cfg.id.value(); + bool created = false; LOG(debug) << "UnmanagedRegion(): " << fName << " | remote: " << remote << "."; @@ -94,23 +96,25 @@ struct UnmanagedRegion if (!fFile) { LOG(error) << "Failed to initialize file: " << fName; LOG(error) << "errno: " << errno << ": " << strerror(errno); - throw std::runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno))); + throw TransportError(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno))); } fFileMapping = file_mapping(fName.c_str(), read_write); - LOG(debug) << "shmem: initialized file: " << fName; + LOG(debug) << "UnmanagedRegion(): initialized file: " << fName; fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags); } else { try { // if opening fails, create try { fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); + created = false; } catch (interprocess_exception& e) { - LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what() << ", creating..."; + LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << id << "': " << e.what() << ", creating..."; fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); fShmemObject.truncate(size); + created = true; } } catch (interprocess_exception& e) { - LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); + LOG(error) << "Failed initializing shared_memory_object for region id " << id << ": " << e.what(); throw; } @@ -121,27 +125,27 @@ struct UnmanagedRegion throw TransportError(tools::ToString("Created/opened region size (", fRegion.get_size(), ") does not match configured size (", size, ")")); } } catch (interprocess_exception& e) { - LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); + LOG(error) << "Failed mapping shared_memory_object for region id " << id << ": " << e.what(); throw; } } if (cfg.lock) { - LOG(debug) << "Locking region " << cfg.id.value() << "..."; + LOG(debug) << "Locking region " << id << "..."; Lock(); - LOG(debug) << "Successfully locked region " << cfg.id.value() << "."; + LOG(debug) << "Successfully locked region " << id << "."; } if (cfg.zero) { - LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "..."; + LOG(debug) << "Zeroing free memory of region " << id << "..."; Zero(); - LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << "."; + LOG(debug) << "Successfully zeroed free memory of region " << id << "."; } if (!remote) { Register(shmId, cfg); } - LOG(trace) << "shmem: initialized region: " << fName << " (" << (remote ? "remote" : "local") << ")"; + LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (remote ? "remote" : "local") << ")"; } UnmanagedRegion() = delete; @@ -246,6 +250,7 @@ struct UnmanagedRegion static void Register(const std::string& shmId, const RegionConfig& cfg) { using namespace boost::interprocess; + LOG(debug) << "Registering unmanaged shared memory region with id " << cfg.id.value(); managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize); VoidAlloc alloc(mngSegment.get_segment_manager()); diff --git a/fairmq/shmem/UnmanagedRegionImpl.h b/fairmq/shmem/UnmanagedRegionImpl.h index 18ef5ef99..b9048b3d9 100644 --- a/fairmq/shmem/UnmanagedRegionImpl.h +++ b/fairmq/shmem/UnmanagedRegionImpl.h @@ -40,9 +40,9 @@ class UnmanagedRegionImpl final : public fair::mq::UnmanagedRegion , fRegion(nullptr) , fRegionId(0) { - auto result = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg)); - fRegion = result.first; - fRegionId = result.second; + auto [regionPtr, regionId] = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg)); + fRegion = regionPtr; + fRegionId = regionId; } UnmanagedRegionImpl(const UnmanagedRegionImpl&) = delete; From 58aa2b4f887cc9f3d153bc03a57f90f83631fc57 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 4 Oct 2022 12:10:32 +0200 Subject: [PATCH 08/10] shm: refactor UnamangedRegion: rename fRemote to fController --- fairmq/shmem/Manager.h | 6 ++--- fairmq/shmem/UnmanagedRegion.h | 41 +++++++++++++++++++--------------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 052752b3b..ba90ca140 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -396,7 +396,7 @@ class Manager const uint16_t id = cfg.id.value(); std::lock_guard lock(fLocalRegionsMtx); - auto& region = fRegions[id] = std::make_unique(fShmId, size, false, cfg); + auto& region = fRegions[id] = std::make_unique(fShmId, size, true, cfg); // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // start ack receiver only if a callback has been provided. @@ -463,7 +463,7 @@ class Manager } // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; - auto r = fRegions.emplace(id, std::make_unique(fShmId, 0, true, std::move(cfg))); + auto r = fRegions.emplace(id, std::make_unique(fShmId, 0, false, std::move(cfg))); r.first->second->InitializeQueues(); r.first->second->StartAckSender(); return r.first->second.get(); @@ -554,7 +554,7 @@ class Manager if (it != fRegions.end()) { region = it->second.get(); } else { - auto r = fRegions.emplace(cfgIt->first, std::make_unique(fShmId, 0, true, cfgIt->second)); + auto r = fRegions.emplace(cfgIt->first, std::make_unique(fShmId, 0, false, cfgIt->second)); region = r.first->second.get(); region->InitializeQueues(); region->StartAckSender(); diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 52b681f88..2981cf397 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -44,19 +44,19 @@ struct UnmanagedRegion friend class Monitor; UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size) - : UnmanagedRegion(shmId, size, false, makeRegionConfig(id)) + : UnmanagedRegion(shmId, size, true, makeRegionConfig(id)) {} UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg) - : UnmanagedRegion(shmId, size, false, std::move(cfg)) + : UnmanagedRegion(shmId, size, true, std::move(cfg)) {} UnmanagedRegion(const std::string& shmId, RegionConfig cfg) - : UnmanagedRegion(shmId, cfg.size, false, std::move(cfg)) + : UnmanagedRegion(shmId, cfg.size, true, std::move(cfg)) {} - UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg) - : fRemote(remote) + UnmanagedRegion(const std::string& shmId, uint64_t size, bool controlling, RegionConfig cfg) + : fControlling(controlling) , fRemoveOnDestruction(cfg.removeOnDestruction) , fLinger(cfg.linger) , fStopAcks(false) @@ -76,12 +76,12 @@ struct UnmanagedRegion const uint16_t id = cfg.id.value(); bool created = false; - LOG(debug) << "UnmanagedRegion(): " << fName << " | remote: " << remote << "."; + LOG(debug) << "UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; if (!cfg.path.empty()) { fName = std::string(cfg.path + fName); - if (!fRemote) { + if (fControlling) { // create a file std::filebuf fbuf; if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) { @@ -108,10 +108,15 @@ struct UnmanagedRegion fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); created = false; } catch (interprocess_exception& e) { - LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << id << "': " << e.what() << ", creating..."; - fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); - fShmemObject.truncate(size); - created = true; + if (fControlling) { + LOG(debug) << "Could not open controlling shared_memory_object for region " << id << ": " << e.what() << ", creating..."; + fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); + fShmemObject.truncate(size); + created = true; + } else { + LOG(error) << "Could not open view for shared_memory_object for region " << id << ": " << e.what(); + throw TransportError(tools::ToString("Could not open view for shared_memory_object for region ", id, ": ", e.what())); + } } } catch (interprocess_exception& e) { LOG(error) << "Failed initializing shared_memory_object for region id " << id << ": " << e.what(); @@ -141,11 +146,11 @@ struct UnmanagedRegion LOG(debug) << "Successfully zeroed free memory of region " << id << "."; } - if (!remote) { + if (fControlling) { Register(shmId, cfg); } - LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (remote ? "remote" : "local") << ")"; + LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; } UnmanagedRegion() = delete; @@ -177,7 +182,7 @@ struct UnmanagedRegion ~UnmanagedRegion() { - LOG(debug) << "~UnmanagedRegion(): " << fName << " | remote: " << fRemote << "."; + LOG(debug) << "~UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; fStopAcks = true; if (fAcksSender.joinable()) { @@ -185,7 +190,7 @@ struct UnmanagedRegion fAcksSender.join(); } - if (!fRemote) { + if (fControlling) { if (fAcksReceiver.joinable()) { fAcksReceiver.join(); } @@ -211,14 +216,14 @@ struct UnmanagedRegion fclose(fFile); } } else { - // LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; + // LOG(debug) << "Region queue '" << fQueueName << "' is viewer, no cleanup necessary"; } - // LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; + // LOG(debug) << "Region '" << fName << "' (" << (fControlling ? "controller" : "viewer") << ") destructed."; } private: - bool fRemote; + bool fControlling; bool fRemoveOnDestruction; uint32_t fLinger; std::atomic fStopAcks; From 42ce691f57847383e6270e33a8927f34aad59931 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 4 Oct 2022 12:15:21 +0200 Subject: [PATCH 09/10] shm: error on duplicate region IDs --- fairmq/shmem/Manager.h | 20 ++++++++++++++++++-- fairmq/shmem/UnmanagedRegion.h | 19 +++++++++++++++---- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index ba90ca140..ee87e73e6 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -396,7 +396,23 @@ class Manager const uint16_t id = cfg.id.value(); std::lock_guard lock(fLocalRegionsMtx); - auto& region = fRegions[id] = std::make_unique(fShmId, size, true, cfg); + + UnmanagedRegion* region = nullptr; + + auto it = fRegions.find(id); + if (it != fRegions.end()) { + region = it->second.get(); + if (region->fControlling) { + LOG(error) << "Unmanaged Region with id " << id << " already exists. Only unique IDs per session are allowed."; + throw TransportError(tools::ToString("Unmanaged Region with id ", id, " already exists. Only unique IDs per session are allowed.")); + } + + LOG(debug) << "Unmanaged region (view) already present, promoting to controller"; + region->BecomeController(cfg); + } else { + auto res = fRegions.emplace(id, std::make_unique(fShmId, size, true, cfg)); + region = res.first->second.get(); + } // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // start ack receiver only if a callback has been provided. @@ -406,7 +422,7 @@ class Manager region->StartAckSender(); region->StartAckReceiver(); } - result.first = region.get(); + result.first = region; result.second = id; } fRegionsGen += 1; // signal TL cache invalidation diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 2981cf397..18edcfa74 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -146,7 +146,7 @@ struct UnmanagedRegion LOG(debug) << "Successfully zeroed free memory of region " << id << "."; } - if (fControlling) { + if (fControlling && created) { Register(shmId, cfg); } @@ -160,6 +160,13 @@ struct UnmanagedRegion UnmanagedRegion& operator=(const UnmanagedRegion&) = delete; UnmanagedRegion& operator=(UnmanagedRegion&&) = delete; + void BecomeController(RegionConfig& cfg) + { + fControlling = true; + fLinger = cfg.linger; + fRemoveOnDestruction = cfg.removeOnDestruction; + } + void Zero() { memset(fRegion.get_address(), 0x00, fRegion.get_size()); @@ -263,10 +270,14 @@ struct UnmanagedRegion EventCounter* eventCounter = mngSegment.find_or_construct(unique_instance)(0); - bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second; - if (newShmRegionCreated) { - (eventCounter->fCount)++; + auto it = shmRegions->find(cfg.id.value()); + if (it != shmRegions->end()) { + LOG(error) << "Unmanaged Region with id " << cfg.id.value() << " has already been registered. Only unique IDs per session are allowed."; + throw TransportError(tools::ToString("Unmanaged Region with id ", cfg.id.value(), " has already been registered. Only unique IDs per session are allowed.")); } + + shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second; + (eventCounter->fCount)++; } void SetCallbacks(RegionCallback callback, RegionBulkCallback bulkCallback) From 16275db12590f1a6c81aaca9e9bed5113a39d693 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 4 Oct 2022 12:23:00 +0200 Subject: [PATCH 10/10] Add test for externally (outside the session) created shmem region --- test/region/_region.cxx | 130 +++++++++++++++++++++++++++++++++------- 1 file changed, 109 insertions(+), 21 deletions(-) diff --git a/test/region/_region.cxx b/test/region/_region.cxx index c76eb1e34..6c850c2eb 100644 --- a/test/region/_region.cxx +++ b/test/region/_region.cxx @@ -6,6 +6,11 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ +#include +#include +#include +#include + #include #include #include @@ -16,8 +21,12 @@ #include +#include +#include #include // make_unique #include +#include // pair +#include // pair namespace { @@ -25,6 +34,34 @@ namespace using namespace std; using namespace fair::mq; +struct ShmOwner +{ + ShmOwner(const string& sessionId, + const vector>& segments, + const vector>& regions) + : fShmId(fair::mq::shmem::makeShmIdStr(sessionId)) + { + LOG(info) << "ShmOwner: creating segments"; + for (auto [id, size] : segments) { + fSegments.emplace(id, fair::mq::shmem::Segment(fShmId, id, size, fair::mq::shmem::rbTreeBestFit)); + } + LOG(info) << "ShmOwner: creating regions"; + for (auto [id, size] : regions) { + fRegions.emplace(id, make_unique(fShmId, id, size)); + } + } + + ~ShmOwner() + { + LOG(info) << "ShmOwner: cleaning up"; + fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{fShmId}); + } + + string fShmId; + map fSegments; + map> fRegions; +}; + void RegionsSizeMismatch() { size_t session = tools::UuidHash(); @@ -108,31 +145,69 @@ void RegionsCache(const string& transport, const string& address) } } -void RegionEventSubscriptions(const string& transport) +void RegionEventSubscriptions(const string& transport, bool external) { + fair::Logger::SetConsoleSeverity(fair::Severity::debug); + + unique_ptr shmOwner = nullptr; + size_t session{tools::UuidHash()}; + constexpr int sSize = 100000000; + constexpr int r1Size = 1000000; + constexpr int r2Size = 5000000; + constexpr uint16_t sId = 0; + constexpr uint16_t r1id = 100; + constexpr uint16_t r2id = 101; + + if (external) { + shmOwner = make_unique( + to_string(session), + vector>{ { sId, sSize } }, + vector>{ { r1id, r1Size }, { r2id, r2Size } } + ); + } + ProgOptions config; config.SetProperty("session", to_string(session)); - config.SetProperty("shm-segment-size", 100000000); + config.SetProperty("shm-segment-size", sSize); + if (external) { + config.SetProperty("shm-no-cleanup", true); + config.SetProperty("shm-monitor", false); + } auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); - constexpr int size1 = 1000000; - constexpr int size2 = 5000000; constexpr int64_t userFlags = 12345; tools::Semaphore blocker; { - auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {}); + fair::mq::RegionConfig r1Cfg; + if (external) { + r1Cfg.id = r1id; + r1Cfg.removeOnDestruction = false; + } + auto region1 = factory->CreateUnmanagedRegion(r1Size, [](void*, size_t, void*) {}, r1Cfg); void* ptr1 = region1->GetData(); uint64_t id1 = region1->GetId(); - ASSERT_EQ(region1->GetSize(), size1); - - auto region2 = factory->CreateUnmanagedRegion(size2, userFlags, [](void*, size_t, void*) {}); + if (external) { + ASSERT_EQ(id1, r1id); + } + ASSERT_EQ(region1->GetSize(), r1Size); + + fair::mq::RegionConfig r2Cfg; + r2Cfg.userFlags = userFlags; + if (external) { + r2Cfg.id = r2id; + r2Cfg.removeOnDestruction = false; + } + auto region2 = factory->CreateUnmanagedRegion(r2Size, [](void*, size_t, void*) {}, r2Cfg); void* ptr2 = region2->GetData(); uint64_t id2 = region2->GetId(); - ASSERT_EQ(region2->GetSize(), size2); + if (external) { + ASSERT_EQ(id2, r2id); + } + ASSERT_EQ(region2->GetSize(), r2Size); ASSERT_EQ(factory->SubscribedToRegionEvents(), false); factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](RegionInfo info) { @@ -144,13 +219,15 @@ void RegionEventSubscriptions(const string& transport) << ", flags: " << info.flags; if (info.event == RegionEvent::created) { if (info.id == id1) { - ASSERT_EQ(info.size, size1); + ASSERT_EQ(info.size, r1Size); ASSERT_EQ(info.ptr, ptr1); blocker.Signal(); } else if (info.id == id2) { - ASSERT_EQ(info.size, size2); + ASSERT_EQ(info.size, r2Size); ASSERT_EQ(info.ptr, ptr2); - ASSERT_EQ(info.flags, userFlags); + if (!external) { + ASSERT_EQ(info.flags, userFlags); + } blocker.Signal(); } } else if (info.event == RegionEvent::destroyed) { @@ -170,10 +247,12 @@ void RegionEventSubscriptions(const string& transport) LOG(info) << "2 done."; } - blocker.Wait(); - LOG(info) << "3 done."; - blocker.Wait(); - LOG(info) << "4 done."; + if (!external) { + blocker.Wait(); + LOG(info) << "3 done."; + blocker.Wait(); + LOG(info) << "4 done."; + } LOG(info) << "All done."; factory->UnsubscribeFromRegionEvents(); @@ -185,9 +264,13 @@ void RegionCallbacks(const string& transport, const string& _address) size_t session(tools::UuidHash()); std::string address(tools::ToString(_address, "_", transport)); + constexpr size_t sSize = 100000000; + constexpr size_t r1Size = 2000000; + constexpr size_t r2Size = 3000000; + ProgOptions config; config.SetProperty("session", to_string(session)); - config.SetProperty("shm-segment-size", 100000000); + config.SetProperty("shm-segment-size", sSize); auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); @@ -206,7 +289,7 @@ void RegionCallbacks(const string& transport, const string& _address) void* ptr2 = nullptr; size_t size2 = 200; - auto region1 = factory->CreateUnmanagedRegion(2000000, [&](void* ptr, size_t size, void* hint) { + auto region1 = factory->CreateUnmanagedRegion(r1Size, [&](void* ptr, size_t size, void* hint) { ASSERT_EQ(ptr, ptr1); ASSERT_EQ(size, size1); ASSERT_EQ(hint, intPtr1.get()); @@ -215,7 +298,7 @@ void RegionCallbacks(const string& transport, const string& _address) }); ptr1 = region1->GetData(); - auto region2 = factory->CreateUnmanagedRegion(3000000, [&](const std::vector& blocks) { + auto region2 = factory->CreateUnmanagedRegion(r2Size, [&](const std::vector& blocks) { ASSERT_EQ(blocks.size(), 1); ASSERT_EQ(blocks.at(0).ptr, ptr2); ASSERT_EQ(blocks.at(0).size, size2); @@ -263,12 +346,12 @@ TEST(Cache, shmem) TEST(EventSubscriptions, zeromq) { - RegionEventSubscriptions("zeromq"); + RegionEventSubscriptions("zeromq", false); } TEST(EventSubscriptions, shmem) { - RegionEventSubscriptions("shmem"); + RegionEventSubscriptions("shmem", false); } TEST(Callbacks, zeromq) @@ -281,4 +364,9 @@ TEST(Callbacks, shmem) RegionCallbacks("shmem", "ipc://test_region_callbacks"); } +TEST(EventSubscriptionsExternalRegion, shmem) +{ + RegionEventSubscriptions("shmem", true); +} + } // namespace