Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gonzalo de Pedro <[email protected]>
  • Loading branch information
Gonzalo de Pedro committed Dec 30, 2021
1 parent 02e0d11 commit cc79818
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 43 deletions.
7 changes: 4 additions & 3 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,10 @@ class AnySubscriptionCallback
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ptr);
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
} else {
throw std::runtime_error(
"convert_custom_type_to_ros_message_unique_ptr "
"unexpectedly called without TypeAdapter");
static_assert(
!sizeof(MessageT*),
"convert_custom_type_to_ros_message_unique_ptr() "
"unexpectedly called without specialized TypeAdapter");
}
}

Expand Down
22 changes: 12 additions & 10 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class IntraProcessManager
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
sub_ids.take_shared_subscriptions.size() <= 1)
{
// There is at maximum 1 buffer that does not require ownership.
// Merge the two vector of ids into a unique one
std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
concatenated_vector.insert(
Expand Down Expand Up @@ -344,16 +345,15 @@ class IntraProcessManager
std::shared_ptr<const MessageT> message,
std::vector<uint64_t> subscription_ids)
{
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;


for (auto id : subscription_ids) {
auto subscription_it = subscriptions_.find(id);
if (subscription_it == subscriptions_.end()) {
Expand All @@ -374,9 +374,10 @@ class IntraProcessManager
if (nullptr == ros_message_subscription) {
throw std::runtime_error(
"failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
"can happen when the publisher and subscription use different "
"allocator types, which is not supported");
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, and "
"to ROSMessageIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
" ROSMessageTypeDeleter> which can happen when the publisher and "
"subscription use different allocator types, which is not supported");
} else {
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
ROSMessageType ros_msg;
Expand Down Expand Up @@ -421,12 +422,12 @@ class IntraProcessManager
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;

using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
Expand All @@ -450,10 +451,11 @@ class IntraProcessManager

if (nullptr == ros_message_subscription) {
throw std::runtime_error(
"--failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
"can happen when the publisher and subscription use different "
"allocator types, which is not supported");
"failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, and "
"to ROSMessageIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
" ROSMessageTypeDeleter> which can happen when the publisher and "
"subscription use different allocator types, which is not supported");
} else {
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
ROSMessageTypeAllocator ros_message_alloc(allocator);
Expand Down
2 changes: 2 additions & 0 deletions rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ TEST_F(TestPublisher, conversion_exception_is_passed_up) {

initialize(options);
auto pub = node->create_publisher<BadStringTypeAdapter>("topic_name", 1);
//A subscription is created to ensure the existence of a buffer in the intra proccess
//manager which will trigger the faulty conversion.
auto sub = node->create_subscription<rclcpp::msg::String>("topic_name", 1, callback);
EXPECT_THROW(pub->publish(1), std::runtime_error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ TEST_F(
[message_data, &is_received](
const std::string & msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), msg.c_str());
EXPECT_STREQ(message_data.c_str(), msg.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -133,8 +133,8 @@ TEST_F(
[message_data, &is_received](
const std::string & msg, const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), msg.c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), msg.c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -153,7 +153,7 @@ TEST_F(
[message_data, &is_received](
std::unique_ptr<std::string> msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -172,8 +172,8 @@ TEST_F(
std::unique_ptr<std::string> msg,
const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -192,7 +192,7 @@ TEST_F(
[message_data, &is_received](
std::shared_ptr<std::string> msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -211,8 +211,8 @@ TEST_F(
std::shared_ptr<std::string> msg,
const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -231,7 +231,7 @@ TEST_F(
[message_data, &is_received](
std::shared_ptr<const std::string> msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -250,8 +250,8 @@ TEST_F(
std::shared_ptr<const std::string> msg,
const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -270,7 +270,7 @@ TEST_F(
[message_data, &is_received](
const std::shared_ptr<const std::string> & msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -290,8 +290,8 @@ TEST_F(
const std::shared_ptr<const std::string> & msg,
const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand Down Expand Up @@ -325,7 +325,7 @@ TEST_F(
[message_data, &is_received](
const std::string & msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), msg.c_str());
EXPECT_STREQ(message_data.c_str(), msg.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -343,8 +343,8 @@ TEST_F(
[message_data, &is_received](
const std::string & msg, const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), msg.c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), msg.c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -363,7 +363,7 @@ TEST_F(
[message_data, &is_received](
std::unique_ptr<std::string> msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -382,8 +382,8 @@ TEST_F(
std::unique_ptr<std::string> msg,
const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -402,7 +402,7 @@ TEST_F(
[message_data, &is_received](
std::shared_ptr<std::string> msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -421,8 +421,8 @@ TEST_F(
std::shared_ptr<std::string> msg,
const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -441,7 +441,7 @@ TEST_F(
[message_data, &is_received](
std::shared_ptr<const std::string> msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -461,8 +461,8 @@ TEST_F(
std::shared_ptr<const std::string> msg,
const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -481,7 +481,7 @@ TEST_F(
[message_data, &is_received](
const std::shared_ptr<const std::string> & msg) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand All @@ -501,8 +501,8 @@ TEST_F(
const std::shared_ptr<const std::string> & msg,
const rclcpp::MessageInfo & message_info) -> void {
is_received = true;
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
EXPECT_STREQ(message_data.c_str(), (*msg).c_str());
EXPECT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);

Expand Down

0 comments on commit cc79818

Please sign in to comment.