Skip to content

Commit

Permalink
zero length payload was not transferred over shm in zero copy mode
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky committed Jul 30, 2024
1 parent c9307ef commit 0705eaf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
28 changes: 20 additions & 8 deletions ecal/core/src/io/shm/ecal_memfile_pool.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2022 Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -216,14 +216,26 @@ namespace eCAL
// -------------------------------------------------------------------------
if (zero_copy_allowed)
{
// acquire memory file payload pointer (no copying here)
const void* buf(nullptr);
if (m_memfile.GetReadAddress(buf, mfile_hdr.data_size) > 0)
if (m_data_callback)
{
// calculate data buffer offset
const char* data_buf = static_cast<const char*>(buf) + mfile_hdr.hdr_size;
// add sample to data reader (and call user callback function)
if (m_data_callback) m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
const char* data_buf = nullptr;
if (mfile_hdr.data_size > 0)
{
// acquire memory file payload pointer (no copying here)
const void* buf(nullptr);
if (m_memfile.GetReadAddress(buf, mfile_hdr.data_size) > 0)
{
// calculate user payload address
data_buf = static_cast<const char*>(buf) + mfile_hdr.hdr_size;
// call user callback function
m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}
}
else
{
// call user callback function
m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}
}
}
// -------------------------------------------------------------------------
Expand Down
33 changes: 24 additions & 9 deletions testing/ecal/pubsub_test/src/pubsub_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -681,10 +681,16 @@ TEST(PubSub, ZeroPayloadMessageSHM)
// create subscriber for topic "A"
eCAL::CSubscriber sub("A");

// create publisher for topic "A"
eCAL::CPublisher pub("A");
pub.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off);
pub.SetLayerMode(eCAL::TLayer::tlayer_shm, eCAL::TLayer::smode_on);
// create publisher for topic "A" (no zero copy)
eCAL::CPublisher pub1("A");
pub1.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off);
pub1.SetLayerMode(eCAL::TLayer::tlayer_shm, eCAL::TLayer::smode_on);

// create publisher for topic "A" (zero copy)
eCAL::CPublisher pub2("A");
pub2.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off);
pub2.SetLayerMode(eCAL::TLayer::tlayer_shm, eCAL::TLayer::smode_on);
pub2.ShmEnableZeroCopy(true);

// add callback
EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2)));
Expand All @@ -695,21 +701,30 @@ TEST(PubSub, ZeroPayloadMessageSHM)
g_callback_received_bytes = 0;
g_callback_received_count = 0;

EXPECT_EQ(send_s.size(), pub.Send(send_s));
// send without zero copy
EXPECT_EQ(send_s.size(), pub1.Send(send_s));
eCAL::Process::SleepMS(DATA_FLOW_TIME);

EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0));
EXPECT_EQ(send_s.size(), pub1.Send(nullptr, 0));
eCAL::Process::SleepMS(DATA_FLOW_TIME);

// send with zero copy
EXPECT_EQ(send_s.size(), pub2.Send(send_s));
eCAL::Process::SleepMS(DATA_FLOW_TIME);

EXPECT_EQ(send_s.size(), pub2.Send(nullptr, 0));
eCAL::Process::SleepMS(DATA_FLOW_TIME);

// check callback receive
EXPECT_EQ(send_s.size(), g_callback_received_bytes);
EXPECT_EQ(2, g_callback_received_count);
EXPECT_EQ(4, g_callback_received_count);

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();
pub1.Destroy();
pub2.Destroy();

// finalize eCAL API
eCAL::Finalize();
Expand Down

0 comments on commit 0705eaf

Please sign in to comment.