Skip to content

Commit

Permalink
mavlink: fix MAVLink message forwarding
Browse files Browse the repository at this point in the history
This switches from the horribly intertwined ringbuffer implementation to
the new VariableLengthRingbuffer implementation.

By ditching the previous implementation, we fix MAVLink message
forwarding, which didn't work reliably. The reason it didn't work is
that multiple mavlink messages could be added but only one of them was
sent out because the buffer didn't keep track of the messages properly
and only read the first one.

Signed-off-by: Julian Oes <[email protected]>
  • Loading branch information
julianoes committed Nov 9, 2023
1 parent 2edc3cf commit e5d92c5
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 187 deletions.
3 changes: 2 additions & 1 deletion src/modules/mavlink/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
############################################################################
#
# Copyright (c) 2015-2021 PX4 Development Team. All rights reserved.
# Copyright (c) 2015-2023 PX4 Development Team. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -140,6 +140,7 @@ px4_add_module(
mavlink_c
timesync
tunes
variable_length_ringbuffer
version
UNITY_BUILD
)
Expand Down
184 changes: 25 additions & 159 deletions src/modules/mavlink/mavlink_main.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (c) 2012-2021 PX4 Development Team. All rights reserved.
* Copyright (c) 2012-2023 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -178,6 +178,7 @@ Mavlink::~Mavlink()
perf_free(_loop_perf);
perf_free(_loop_interval_perf);
perf_free(_send_byte_error_perf);
perf_free(_forwarding_error_perf);
}

void
Expand Down Expand Up @@ -1218,117 +1219,16 @@ Mavlink::configure_stream_threadsafe(const char *stream_name, const float rate)
}
}

int
Mavlink::message_buffer_init(int size)
{
_message_buffer.size = size;
_message_buffer.write_ptr = 0;
_message_buffer.read_ptr = 0;
_message_buffer.data = (char *)malloc(_message_buffer.size);

int ret;

if (_message_buffer.data == nullptr) {
ret = PX4_ERROR;
_message_buffer.size = 0;

} else {
ret = OK;
}

return ret;
}

void
Mavlink::message_buffer_destroy()
{
_message_buffer.size = 0;
_message_buffer.write_ptr = 0;
_message_buffer.read_ptr = 0;
free(_message_buffer.data);
}

int
Mavlink::message_buffer_count()
{
int n = _message_buffer.write_ptr - _message_buffer.read_ptr;

if (n < 0) {
n += _message_buffer.size;
}

return n;
}

bool
Mavlink::message_buffer_write(const void *ptr, int size)
{
// bytes available to write
int available = _message_buffer.read_ptr - _message_buffer.write_ptr - 1;

if (available < 0) {
available += _message_buffer.size;
}

if (size > available) {
// buffer overflow
return false;
}

char *c = (char *) ptr;
int n = _message_buffer.size - _message_buffer.write_ptr; // bytes to end of the buffer

if (n < size) {
// message goes over end of the buffer
memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), c, n);
_message_buffer.write_ptr = 0;

} else {
n = 0;
}

// now: n = bytes already written
int p = size - n; // number of bytes to write
memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), &(c[n]), p);
_message_buffer.write_ptr = (_message_buffer.write_ptr + p) % _message_buffer.size;
return true;
}

int
Mavlink::message_buffer_get_ptr(void **ptr, bool *is_part)
{
// bytes available to read
int available = _message_buffer.write_ptr - _message_buffer.read_ptr;

if (available == 0) {
return 0; // buffer is empty
}

int n = 0;

if (available > 0) {
// read pointer is before write pointer, all available bytes can be read
n = available;
*is_part = false;

} else {
// read pointer is after write pointer, read bytes from read_ptr to end of the buffer
n = _message_buffer.size - _message_buffer.read_ptr;
*is_part = _message_buffer.write_ptr > 0;
}

*ptr = &(_message_buffer.data[_message_buffer.read_ptr]);
return n;
}

void
Mavlink::pass_message(const mavlink_message_t *msg)
{
/* size is 8 bytes plus variable payload */
/* size is 12 bytes plus variable payload */
int size = MAVLINK_NUM_NON_PAYLOAD_BYTES + msg->len;
pthread_mutex_lock(&_message_buffer_mutex);
message_buffer_write(msg, size);
pthread_mutex_unlock(&_message_buffer_mutex);
LockGuard lg{_message_buffer_mutex};

if (!_message_buffer.push_back(reinterpret_cast<const uint8_t *>(msg), size)) {
perf_count(_forwarding_error_perf);
}
}

MavlinkShell *
Expand Down Expand Up @@ -2211,7 +2111,7 @@ Mavlink::task_main(int argc, char *argv[])
return PX4_ERROR;
}

/* initialize send mutex */
pthread_mutex_init(&_message_buffer_mutex, nullptr);
pthread_mutex_init(&_send_mutex, nullptr);
pthread_mutex_init(&_radio_status_mutex, nullptr);

Expand All @@ -2221,13 +2121,12 @@ Mavlink::task_main(int argc, char *argv[])
* make space for two messages plus off-by-one space as we use the empty element
* marker ring buffer approach.
*/
if (OK != message_buffer_init(2 * sizeof(mavlink_message_t) + 1)) {
LockGuard lg{_message_buffer_mutex};

if (!_message_buffer.allocate(2 * sizeof(mavlink_message_t) + 1)) {
PX4_ERR("msg buf alloc fail");
return PX4_ERROR;
}

/* initialize message buffer mutex */
pthread_mutex_init(&_message_buffer_mutex, nullptr);
}

/* Activate sending the data by default (for the IRIDIUM mode it will be disabled after the first round of packages is sent)*/
Expand Down Expand Up @@ -2569,50 +2468,21 @@ Mavlink::task_main(int argc, char *argv[])

_events.update(t);

/* pass messages from other UARTs */
/* pass messages from other instances */
if (get_forwarding_on()) {

bool is_part;
uint8_t *read_ptr;
uint8_t *write_ptr;

pthread_mutex_lock(&_message_buffer_mutex);
int available = message_buffer_get_ptr((void **)&read_ptr, &is_part);
pthread_mutex_unlock(&_message_buffer_mutex);

if (available > 0) {
// Reconstruct message from buffer

mavlink_message_t msg;
write_ptr = (uint8_t *)&msg;

// Pull a single message from the buffer
size_t read_count = available;

if (read_count > sizeof(mavlink_message_t)) {
read_count = sizeof(mavlink_message_t);
}

memcpy(write_ptr, read_ptr, read_count);

// We hold the mutex until after we complete the second part of the buffer. If we don't
// we may end up breaking the empty slot overflow detection semantics when we mark the
// possibly partial read below.
pthread_mutex_lock(&_message_buffer_mutex);

message_buffer_mark_read(read_count);

/* write second part of buffer if there is some */
if (is_part && read_count < sizeof(mavlink_message_t)) {
write_ptr += read_count;
available = message_buffer_get_ptr((void **)&read_ptr, &is_part);
read_count = sizeof(mavlink_message_t) - read_count;
memcpy(write_ptr, read_ptr, read_count);
message_buffer_mark_read(available);
}

pthread_mutex_unlock(&_message_buffer_mutex);
mavlink_message_t msg;
size_t available_bytes;
{
// We only send one message at a time, not to put too much strain on a
// link from forwarded messages.
LockGuard lg{_message_buffer_mutex};
available_bytes = _message_buffer.pop_front(reinterpret_cast<uint8_t *>(&msg), sizeof(msg));
// We need to make sure to release the lock here before sending the
// bytes out via IP or UART which could potentially take longer.
}

if (available_bytes > 0) {
resend_message(&msg);
}
}
Expand Down Expand Up @@ -2662,18 +2532,14 @@ Mavlink::task_main(int argc, char *argv[])
_socket_fd = -1;
}

if (get_forwarding_on()) {
message_buffer_destroy();
pthread_mutex_destroy(&_message_buffer_mutex);
}

if (_mavlink_ulog) {
_mavlink_ulog->stop();
_mavlink_ulog = nullptr;
}

pthread_mutex_destroy(&_send_mutex);
pthread_mutex_destroy(&_radio_status_mutex);
pthread_mutex_destroy(&_message_buffer_mutex);

PX4_INFO("exiting channel %i", (int)_channel);

Expand Down
32 changes: 5 additions & 27 deletions src/modules/mavlink/mavlink_main.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (c) 2012-2018 PX4 Development Team. All rights reserved.
* Copyright (c) 2012-2023 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -60,6 +60,7 @@

#include <containers/List.hpp>
#include <parameters/param.h>
#include <lib/variable_length_ringbuffer/VariableLengthRingbuffer.hpp>
#include <perf/perf_counter.h>
#include <px4_platform_common/cli.h>
#include <px4_platform_common/px4_config.h>
Expand Down Expand Up @@ -419,11 +420,6 @@ class Mavlink final : public ModuleParams
bool get_wait_to_transmit() { return _wait_to_transmit; }
bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages))); }

bool message_buffer_write(const void *ptr, int size);

void lockMessageBufferMutex(void) { pthread_mutex_lock(&_message_buffer_mutex); }
void unlockMessageBufferMutex(void) { pthread_mutex_unlock(&_message_buffer_mutex); }

/**
* Count transmitted bytes
*/
Expand Down Expand Up @@ -651,16 +647,9 @@ class Mavlink final : public ModuleParams

ping_statistics_s _ping_stats {};

struct mavlink_message_buffer {
int write_ptr;
int read_ptr;
int size;
char *data;
};
pthread_mutex_t _message_buffer_mutex{};
VariableLengthRingbuffer _message_buffer{};

mavlink_message_buffer _message_buffer {};

pthread_mutex_t _message_buffer_mutex {};
pthread_mutex_t _send_mutex {};
pthread_mutex_t _radio_status_mutex {};

Expand All @@ -682,6 +671,7 @@ class Mavlink final : public ModuleParams
perf_counter_t _loop_perf{perf_alloc(PC_ELAPSED, MODULE_NAME": tx run elapsed")}; /**< loop performance counter */
perf_counter_t _loop_interval_perf{perf_alloc(PC_INTERVAL, MODULE_NAME": tx run interval")}; /**< loop interval performance counter */
perf_counter_t _send_byte_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": send_bytes error")}; /**< send bytes error count */
perf_counter_t _forwarding_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": forwarding error")}; /**< forwarding messages error count */

void mavlink_update_parameters();

Expand Down Expand Up @@ -711,18 +701,6 @@ class Mavlink final : public ModuleParams
*/
int configure_streams_to_default(const char *configure_single_stream = nullptr);

int message_buffer_init(int size);

void message_buffer_destroy();

int message_buffer_count();

int message_buffer_is_empty() const { return (_message_buffer.read_ptr == _message_buffer.write_ptr); }

int message_buffer_get_ptr(void **ptr, bool *is_part);

void message_buffer_mark_read(int n) { _message_buffer.read_ptr = (_message_buffer.read_ptr + n) % _message_buffer.size; }

void pass_message(const mavlink_message_t *msg);

void publish_telemetry_status();
Expand Down

0 comments on commit e5d92c5

Please sign in to comment.