Skip to content

Commit

Permalink
Core: Fixed a bug that may have caused drops in the SHM Layer (#1198)
Browse files Browse the repository at this point in the history
Prior to this commit, SHM data was dropped, if the subscriber wasn't able to access it in 5ms. Now, the subscriber re-tries, until it got access to it, or new data is available.
  • Loading branch information
FlorianReimold committed Oct 27, 2023
1 parent 7d7457f commit 0adb692
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
39 changes: 24 additions & 15 deletions ecal/core/src/io/ecal_memfile_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace eCAL
m_created(false),
m_do_stop(false),
m_is_observing(false),
m_timeout_read(0)
m_time_of_last_life_signal(std::chrono::steady_clock::now())
{
}

Expand Down Expand Up @@ -137,7 +137,7 @@ namespace eCAL
{
if (!m_is_observing) return false;

m_timeout_read = 0;
m_time_of_last_life_signal = std::chrono::steady_clock::now();

return true;
}
Expand All @@ -147,21 +147,38 @@ namespace eCAL
// internal clock sample update checking
uint64_t last_sample_clock(0);

// Boolean that tells whether the SHM file has new data that we have NOT already accessed
bool has_unprocessed_data = false;

// runs as long as there is no timeout and no external stop request
while((m_timeout_read < timeout_) && !m_do_stop)
while(std::chrono::steady_clock::now() - std::chrono::steady_clock::time_point(m_time_of_last_life_signal) < std::chrono::milliseconds(timeout_)
&& !m_do_stop)
{
// loop start in ms
auto loop_start = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
if (!has_unprocessed_data)
{
// Only wait for the new-data-event, if we haven't processed the data, yet
// check for memory file update event from shm writer (20 ms)
has_unprocessed_data = gWaitForEvent(m_event_snd, 20);

if (has_unprocessed_data)
{
// We got a signal from the publisher! It is alive! So we reset the time since the last live signal
m_time_of_last_life_signal = std::chrono::steady_clock::now();
}
}

// check for memory file update event from shm writer (20 ms)
if(gWaitForEvent(m_event_snd, 20))
// If we have unprocessed data, we try to access (and process!) it
if(has_unprocessed_data)
{
// last chance to stop ..
if(m_do_stop) break;

// try to open memory file (timeout 5 ms)
if(m_memfile.GetReadAccess(5))
{
// We have gotten access! Now the data qualifies as processed, so next loop we will wait for the signal for new data, again.
has_unprocessed_data = false;

// read the file header
SMemFileHeader mfile_hdr;
ReadFileHeader(mfile_hdr);
Expand Down Expand Up @@ -237,14 +254,6 @@ namespace eCAL
}
}
}

// reset timeout
m_timeout_read = 0;
}
else
{
// increase timeout in ms
m_timeout_read += std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() - loop_start;
}
}

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/io/ecal_memfile_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace eCAL
std::atomic<bool> m_do_stop;
std::atomic<bool> m_is_observing;

std::atomic<long long> m_timeout_read;
std::atomic<std::chrono::steady_clock::time_point> m_time_of_last_life_signal;

MemFileDataCallbackT m_data_callback;

Expand Down

0 comments on commit 0adb692

Please sign in to comment.