Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] add/improved IsPublished/IsSubscribed logic (5.13.x) #1632

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
18 changes: 13 additions & 5 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -295,8 +295,12 @@ namespace eCAL

iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par);
}
// inform for local publisher connection
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least one local subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_loc() > 0)
{
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
}
}
}

Expand Down Expand Up @@ -345,8 +349,12 @@ namespace eCAL
const std::string writer_par = tlayer.par_layer().SerializeAsString();
iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par);
}
// inform for external publisher connection
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least one external subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_ext() > 0)
{
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -250,6 +250,12 @@ namespace eCAL
return(m_datareader->RemEventCallback(type_));
}

//bool CSubscriber::IsPublished() const
//{
// if (m_datareader == nullptr) return(false);
// return(m_datareader->IsPublished());
//}

size_t CSubscriber::GetPublisherCount() const
{
if(m_datareader == nullptr) return(0);
Expand Down
3 changes: 2 additions & 1 deletion ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,6 +96,7 @@ namespace eCAL

bool IsCreated() const {return(m_created);}

bool IsPublished() const { return(m_loc_published || m_ext_published); }
size_t GetPublisherCount() const
{
const std::lock_guard<std::mutex> lock(m_pub_map_sync);
Expand Down
98 changes: 95 additions & 3 deletions testing/ecal/pubsub_test/src/pubsub_receive_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -161,8 +161,6 @@ TEST(PubSub, TimingSubscriberReceive)
EXPECT_EQ(0, eCAL::Finalize());
}



// This tests test for sporadically received empty messages which were a problem.
TEST(PubSub, SporadicEmptyReceives)
{
Expand Down Expand Up @@ -219,3 +217,97 @@ TEST(PubSub, SporadicEmptyReceives)
// finalize eCAL API
EXPECT_EQ(0, eCAL::Finalize());
}

TEST(PubSub, TestSubscriberSeen)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

std::atomic<bool> subscriber_seen_at_publication_start(false);
std::atomic<bool> subscriber_seen_at_publication_end(false);

std::atomic<bool> do_start_publication(false);
std::atomic<bool> publication_finished(false);

// publishing thread
auto publisher_thread = [&]() {
eCAL::CPublisher pub("blob");
pub.ShmSetAcknowledgeTimeout(500);

int cnt(0);
const auto max_runs(1000);
while (eCAL::Ok())
{
if (do_start_publication && cnt < max_runs)
{
if (cnt == 0)
{
subscriber_seen_at_publication_start = pub.IsSubscribed();
}

pub.Send(std::to_string(cnt));
cnt++;

if (cnt == max_runs)
{
subscriber_seen_at_publication_end = pub.IsSubscribed();
publication_finished = true;
break;
}
}
}
};

// subscribing thread
auto subscriber_thread = [&]() {
eCAL::CSubscriber sub("blob");
bool received(false);
auto max_lines(10);
auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
if (max_lines != 0)
{
// the final log should look like this
// -----------------------------------
// Receiving 0
// Receiving 1
// Receiving 2
// Receiving 3
// Receiving 4
// Receiving 5
// Receiving 6
// Receiving 7
// Receiving 8
// Receiving 9
// -----------------------------------
std::cout << "Receiving " << std::string(static_cast<const char*>(data_->buf), data_->size) << std::endl;
max_lines--;
}
};
sub.AddReceiveCallback(receive_lambda);

while (eCAL::Ok() && !publication_finished)
{
//if (sub.IsPublished()) do_start_publication = true;
if (sub.GetPublisherCount() > 0) do_start_publication = true;
}
};

// create threads for publisher and subscriber
std::thread pub_thread(publisher_thread);
std::thread sub_thread(subscriber_thread);

// join threads to the main thread
pub_thread.join();
sub_thread.join();

// finalize eCAL API
eCAL::Finalize();

// check if the publisher has seen the subscriber
EXPECT_TRUE(subscriber_seen_at_publication_start);
EXPECT_TRUE(subscriber_seen_at_publication_end);
}
Loading