Skip to content

Commit

Permalink
iox-eclipse-iceoryx#218 Polling receiver implementation.
Browse files Browse the repository at this point in the history
Signed-off-by: Ithier Jeff (CC-AD/EYF1) <[email protected]>
  • Loading branch information
orecham committed Sep 7, 2020
1 parent f2da5b2 commit 8b8e23d
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ void receiving()
iox::popo::TypedSubscriber<Position> mySubscriber({"Odometry", "Position", "Vehicle"});
mySubscriber.subscribe(10);

// Setting receiver handler


// Polling receiver
while (!killswitch)
{
auto result = mySubscriber.receive();
if(result.has_value())
{
std::cout << "Got val: " << result->get() << std::endl;
auto position = result->get();
std::cout << "Got val: (" << position->x << ", " << position->y << ", " << position->z << ")" << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}

mySubscriber.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,74 @@ template<typename T, typename recvport_t>
inline cxx::expected<SubscriberError>
BaseSubscriber<T, recvport_t>::subscribe(const uint64_t cacheSize) noexcept
{
std::cout << "subscribe" << std::endl;
m_subscriptionRequested = true;
uint32_t size = cacheSize;
if (size > MAX_RECEIVER_QUEUE_CAPACITY)
{
LogWarn() << "Cache size for subscribe too large " << size
<< ", limiting to MAX_RECEIVER_QUEUE_CAPACITY = " << MAX_RECEIVER_QUEUE_CAPACITY;
size = MAX_RECEIVER_QUEUE_CAPACITY;
}
m_port.subscribe(true, size);
return cxx::success<>();
}

template<typename T, typename recvport_t>
inline SubscriptionState
BaseSubscriber<T, recvport_t>::getSubscriptionState() const noexcept
{
std::cout << "getSubscriptionState" << std::endl;
return SubscriptionState::NOT_SUBSCRIBED;
if (!m_subscriptionRequested)
{
return SubscriptionState::NOT_SUBSCRIBED;
}
else
{
if (m_port.isSubscribed())
{
return SubscriptionState::SUBSCRIBED;
}
else
{
return SubscriptionState::SUBSCRIPTION_PENDING;
}
}
}

template<typename T, typename recvport_t>
inline void
BaseSubscriber<T, recvport_t>::unsubscribe() noexcept
{
std::cout << "unsubscribe" << std::endl;
m_port.unsubscribe();
m_subscriptionRequested = false;
}

template<typename T, typename recvport_t>
inline bool
BaseSubscriber<T, recvport_t>::hasData() const noexcept
{
return m_port.newData();
}

template<typename T, typename recvport_t>
inline cxx::optional<cxx::unique_ptr<T>>
BaseSubscriber<T, recvport_t>::receive() noexcept
{
std::cout << "receive" << std::endl;
return cxx::nullopt_t();

const mepoo::ChunkHeader* header = nullptr;
if (m_port.getChunk(header))
{
return cxx::optional<cxx::unique_ptr<T>>(cxx::unique_ptr<T>(
reinterpret_cast<T*>(header->payload()),
[this](T* const p){
auto header = iox::mepoo::convertPayloadPointerToChunkHeader(reinterpret_cast<void*>(p));
this->m_port.releaseChunk(header);
}
));
}
else
{
return cxx::nullopt_t();
}
}

// ======================================== Typed Subscriber ======================================== //
Expand Down Expand Up @@ -86,6 +129,13 @@ TypedSubscriber<T>::unsubscribe() noexcept
return BaseSubscriber<T>::unsubscribe();
}

template<typename T>
inline bool
TypedSubscriber<T>::hasData() const noexcept
{
return BaseSubscriber<T>::hasData();
}

template<typename T>
inline cxx::optional<cxx::unique_ptr<T>>
TypedSubscriber<T>::receive() noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ template<typename T>
class TypedPublisher : protected iox::popo::BasePublisher<T>
{
public:

TypedPublisher(const capro::ServiceDescription& service);
TypedPublisher(const TypedPublisher& other) = delete;
TypedPublisher& operator=(const TypedPublisher&) = delete;
Expand All @@ -182,26 +181,23 @@ class TypedPublisher : protected iox::popo::BasePublisher<T>
cxx::expected<Sample<T>, AllocationError> loan() noexcept;
void release(Sample<T>& sample) noexcept;
cxx::expected<AllocationError> publish(Sample<T>& sample) noexcept;
cxx::expected<AllocationError> publishCopyOf(const T& val) noexcept; /// @todo - move to typed API
cxx::expected<SampleRecallError> previousSample() const noexcept;

// This does not quite work yet. Need to understand how to identify generic callables in C++11.
cxx::expected<AllocationError> publishCopyOf(const T& val) noexcept;
template<typename Callable, typename... ArgTypes>
cxx::expected<AllocationError> publishResultOf(Callable c, ArgTypes... args) noexcept;

cxx::expected<SampleRecallError> previousSample() const noexcept;

void offer() noexcept;
void stopOffer() noexcept;
bool isOffered() noexcept;
bool hasSubscribers() noexcept;

};

// ======================================== Untyped Publisher ======================================== //

class UntypedPublisher : protected iox::popo::BasePublisher<void>
{
public:

UntypedPublisher(const capro::ServiceDescription& service);
UntypedPublisher(const UntypedPublisher& other) = delete;
UntypedPublisher& operator=(const UntypedPublisher&) = delete;
Expand All @@ -220,7 +216,6 @@ class UntypedPublisher : protected iox::popo::BasePublisher<void>
void stopOffer() noexcept;
bool isOffered() noexcept;
bool hasSubscribers() noexcept;

};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class BaseSubscriber {
SubscriptionState getSubscriptionState() const noexcept;
void unsubscribe() noexcept;


bool hasData() const noexcept;

///
/// @brief receive Receive the next sample if available.
/// @return
Expand Down Expand Up @@ -129,6 +132,9 @@ class BaseSubscriber {
protected:
BaseSubscriber(const capro::ServiceDescription& service);

protected:
bool m_subscriptionRequested;

private:
recvport_t m_port{nullptr};

Expand All @@ -148,6 +154,8 @@ class TypedSubscriber : protected BaseSubscriber<T>
cxx::expected<SubscriberError> subscribe(const uint64_t cacheSize = MAX_RECEIVER_QUEUE_CAPACITY) noexcept;
SubscriptionState getSubscriptionState() const noexcept;
void unsubscribe() noexcept;

bool hasData() const noexcept;
cxx::optional<cxx::unique_ptr<T>> receive() noexcept;
};

Expand Down

0 comments on commit 8b8e23d

Please sign in to comment.