Robin Mueller 371ff931bf
Linux CommandExecutor
The CommandExecutor helper class can execute shell commands in blocking and non-blocking mode
This class is able to execute processes by using the Linux popen call. It also has the capability of writing
the read output of a process into a provided ring buffer.

The executor works by first loading the command which should be executed and specifying whether
it should be executed blocking or non-blocking. After that, execution can be started with the execute call.

Using non-blocking mode allows to execute commands which might take a longer time in the background,
and allowing the user thread to check completion status with the check function

Moved to HAL like requested in code review and unit tested with failing commands as well.
Also, Linux HAL components are compiled by default now unless explicitely disabled.
2022-01-26 12:11:52 +01:00

393 lines
14 KiB

#include "fsfw/osal/linux/MessageQueue.h"
#include "fsfw/osal/linux/unixUtility.h"
#include "fsfw/serviceinterface/ServiceInterface.h"
#include "fsfw/objectmanager/ObjectManager.h"
#include <fstream>
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <cstring>
#include <errno.h>
MessageQueue::MessageQueue(uint32_t messageDepth, size_t maxMessageSize):
defaultDestination(MessageQueueIF::NO_QUEUE), maxMessageSize(maxMessageSize) {
mq_attr attributes;
this->id = 0;
//Set attributes
attributes.mq_curmsgs = 0;
attributes.mq_maxmsg = messageDepth;
attributes.mq_msgsize = maxMessageSize;
attributes.mq_flags = 0; //Flags are ignored on Linux during mq_open
//Set the name of the queue. The slash is mandatory!
sprintf(name, "/FSFW_MQ%u\n", queueCounter++);
// Create a nonblocking queue if the name is available (the queue is read
// and writable for the owner as well as the group)
int oflag = O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL;
mode_t mode = S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP | S_IROTH | S_IWOTH;
mqd_t tempId = mq_open(name, oflag, mode, &attributes);
if (tempId == -1) {
handleOpenError(&attributes, messageDepth);
else {
//Successful mq_open call
this->id = tempId;
MessageQueue::~MessageQueue() {
int status = mq_close(this->id);
if(status != 0){
utility::printUnixErrorGeneric(CLASS_NAME, "~MessageQueue", "close");
status = mq_unlink(name);
if(status != 0){
utility::printUnixErrorGeneric(CLASS_NAME, "~MessageQueue", "unlink");
ReturnValue_t MessageQueue::sendMessage(MessageQueueId_t sendTo,
MessageQueueMessageIF* message, bool ignoreFault) {
return sendMessageFrom(sendTo, message, this->getId(), false);
ReturnValue_t MessageQueue::sendToDefault(MessageQueueMessageIF* message) {
return sendToDefaultFrom(message, this->getId());
ReturnValue_t MessageQueue::reply(MessageQueueMessageIF* message) {
if (this->lastPartner != 0) {
return sendMessageFrom(this->lastPartner, message, this->getId());
} else {
ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessageIF* message,
MessageQueueId_t* receivedFrom) {
ReturnValue_t status = this->receiveMessage(message);
*receivedFrom = this->lastPartner;
return status;
ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessageIF* message) {
if(message == nullptr) {
sif::error << "MessageQueue::receiveMessage: Message is "
"nullptr!" << std::endl;
return HasReturnvaluesIF::RETURN_FAILED;
if(message->getMaximumMessageSize() < maxMessageSize) {
sif::error << "MessageQueue::receiveMessage: Message size "
<< message->getMaximumMessageSize()
<< " too small to receive data!" << std::endl;
return HasReturnvaluesIF::RETURN_FAILED;
unsigned int messagePriority = 0;
int status = mq_receive(id,reinterpret_cast<char*>(message->getBuffer()),
if (status > 0) {
this->lastPartner = message->getSender();
//Check size of incoming message.
if (message->getMessageSize() < message->getMinimumMessageSize()) {
return HasReturnvaluesIF::RETURN_FAILED;
return HasReturnvaluesIF::RETURN_OK;
else if (status==0) {
//Success but no message received
return MessageQueueIF::EMPTY;
else {
//No message was received. Keep lastPartner anyway, I might send
//something later. But still, delete packet content.
memset(message->getData(), 0, message->getMaximumDataSize());
case EAGAIN:
//O_NONBLOCK or MQ_NONBLOCK was set and there are no messages
//currently on the specified queue.
return MessageQueueIF::EMPTY;
case EBADF: {
//mqdes doesn't represent a valid queue open for reading.
utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "EBADF");
case EINVAL: {
* This value indicates one of the following:
* - The pointer to the buffer for storing the received message,
* msg_ptr, is NULL.
* - The number of bytes requested, msg_len is less than zero.
* - msg_len is anything other than the mq_msgsize of the specified
* queue, and the QNX extended option MQ_READBUF_DYNAMIC hasn't
* been set in the queue's mq_flags.
utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "EINVAL");
case EMSGSIZE: {
* This value indicates one of the following:
* - the QNX extended option MQ_READBUF_DYNAMIC hasn't been set,
* and the given msg_len is shorter than the mq_msgsize for
* the given queue.
* - the extended option MQ_READBUF_DYNAMIC has been set, but the
* given msg_len is too short for the message that would have
* been received.
utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "EMSGSIZE");
case EINTR: {
//The operation was interrupted by a signal.
utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "EINTR");
//The operation was interrupted by a signal.
utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "ETIMEDOUT");
return HasReturnvaluesIF::RETURN_FAILED;
return HasReturnvaluesIF::RETURN_FAILED;
MessageQueueId_t MessageQueue::getLastPartner() const {
return this->lastPartner;
ReturnValue_t MessageQueue::flush(uint32_t* count) {
mq_attr attrib;
int status = mq_getattr(id,&attrib);
if(status != 0){
case EBADF:
//mqdes doesn't represent a valid message queue.
utility::printUnixErrorGeneric(CLASS_NAME, "flush", "EBADF");
case EINVAL:
//mq_attr is NULL
utility::printUnixErrorGeneric(CLASS_NAME, "flush", "EINVAL");
return HasReturnvaluesIF::RETURN_FAILED;
return HasReturnvaluesIF::RETURN_FAILED;
*count = attrib.mq_curmsgs;
attrib.mq_curmsgs = 0;
status = mq_setattr(id,&attrib,NULL);
if(status != 0){
switch(errno) {
case EBADF:
//mqdes doesn't represent a valid message queue.
utility::printUnixErrorGeneric(CLASS_NAME, "flush", "EBADF");
case EINVAL:
* This value indicates one of the following:
* - mq_attr is NULL.
* - MQ_MULT_NOTIFY had been set for this queue, and the given
* mq_flags includes a 0 in the MQ_MULT_NOTIFY bit. Once
* MQ_MULT_NOTIFY has been turned on, it may never be turned off.
utility::printUnixErrorGeneric(CLASS_NAME, "flush", "EINVAL");
return HasReturnvaluesIF::RETURN_FAILED;
return HasReturnvaluesIF::RETURN_FAILED;
return HasReturnvaluesIF::RETURN_OK;
MessageQueueId_t MessageQueue::getId() const {
return this->id;
void MessageQueue::setDefaultDestination(MessageQueueId_t defaultDestination) {
this->defaultDestination = defaultDestination;
ReturnValue_t MessageQueue::sendToDefaultFrom(MessageQueueMessageIF* message,
MessageQueueId_t sentFrom, bool ignoreFault) {
return sendMessageFrom(defaultDestination, message, sentFrom, ignoreFault);
ReturnValue_t MessageQueue::sendMessageFrom(MessageQueueId_t sendTo,
MessageQueueMessageIF* message, MessageQueueId_t sentFrom,
bool ignoreFault) {
return sendMessageFromMessageQueue(sendTo,message, sentFrom,ignoreFault);
MessageQueueId_t MessageQueue::getDefaultDestination() const {
return this->defaultDestination;
bool MessageQueue::isDefaultDestinationSet() const {
return (defaultDestination != NO_QUEUE);
uint16_t MessageQueue::queueCounter = 0;
ReturnValue_t MessageQueue::sendMessageFromMessageQueue(MessageQueueId_t sendTo,
MessageQueueMessageIF *message, MessageQueueId_t sentFrom,
bool ignoreFault) {
if(message == nullptr) {
sif::error << "MessageQueue::sendMessageFromMessageQueue: Message is nullptr!" << std::endl;
sif::printError("MessageQueue::sendMessageFromMessageQueue: Message is nullptr!\n");
return HasReturnvaluesIF::RETURN_FAILED;
int result = mq_send(sendTo,
reinterpret_cast<const char*>(message->getBuffer()),
//TODO: Check if we're in ISR.
if (result != 0) {
InternalErrorReporterIF* internalErrorReporter = ObjectManager::instance()->
if (internalErrorReporter != NULL) {
case EAGAIN:
//The O_NONBLOCK flag was set when opening the queue, or the
//MQ_NONBLOCK flag was set in its attributes, and the
//specified queue is full.
return MessageQueueIF::FULL;
case EBADF: {
//mq_des doesn't represent a valid message queue descriptor,
//or mq_des wasn't opened for writing.
utility::printUnixErrorGeneric(CLASS_NAME, "sendMessageFromMessageQueue", "EBADF");
sif::warning << "mq_send to " << sendTo << " sent from "
<< sentFrom << " failed" << std::endl;
sif::printWarning("mq_send to %d sent from %d failed\n", sendTo, sentFrom);
case EINTR:
//The call was interrupted by a signal.
utility::printUnixErrorGeneric(CLASS_NAME, "sendMessageFromMessageQueue", "EINTR");
case EINVAL:
* This value indicates one of the following:
* - msg_ptr is NULL.
* - msg_len is negative.
* - msg_prio is greater than MQ_PRIO_MAX.
* - msg_prio is less than 0.
* - MQ_PRIO_RESTRICT is set in the mq_attr of mq_des, and
* msg_prio is greater than the priority of the calling process.
utility::printUnixErrorGeneric(CLASS_NAME, "sendMessageFromMessageQueue", "EINVAL");
// The msg_len is greater than the msgsize associated with
//the specified queue.
utility::printUnixErrorGeneric(CLASS_NAME, "sendMessageFromMessageQueue", "EMSGSIZE");
return HasReturnvaluesIF::RETURN_FAILED;
return HasReturnvaluesIF::RETURN_FAILED;
return HasReturnvaluesIF::RETURN_OK;
ReturnValue_t MessageQueue::handleOpenError(mq_attr* attributes,
uint32_t messageDepth) {
switch(errno) {
case(EINVAL): {
utility::printUnixErrorGeneric(CLASS_NAME, "MessageQueue", "EINVAL");
size_t defaultMqMaxMsg = 0;
// Not POSIX conformant, but should work for all UNIX systems.
// Just an additional helpful printout :-)
if(std::ifstream("/proc/sys/fs/mqueue/msg_max",std::ios::in) >>
defaultMqMaxMsg and defaultMqMaxMsg < messageDepth) {
See: https://www.man7.org/linux/man-pages/man3/mq_open.3.html
This happens if the msg_max value is not large enough
It is ignored if the executable is run in privileged mode.
Run the unlockRealtime script or grant the mode manually by using:
sudo setcap 'CAP_SYS_RESOURCE=+ep' <pathToBinary>
Persistent solution for session:
echo <newMsgMax> | sudo tee /proc/sys/fs/mqueue/msg_max
Permanent solution:
sudo nano /etc/sysctl.conf
Append at end: fs/mqueue/msg_max = <newMsgMaxLen>
Apply changes with: sudo sysctl -p
sif::error << "MessageQueue::MessageQueue: Default MQ size " << defaultMqMaxMsg <<
" is too small for requested size " << messageDepth << std::endl;
sif::error << "This error can be fixed by setting the maximum "
"allowed message size higher!" << std::endl;
sif::printError("MessageQueue::MessageQueue: Default MQ size %d is too small for"
"requested size %d\n");
sif::printError("This error can be fixes by setting the maximum allowed"
"message size higher!\n");
case(EEXIST): {
// An error occured during open.
// We need to distinguish if it is caused by an already created queue
// There's another queue with the same name
// We unlink the other queue
int status = mq_unlink(name);
if (status != 0) {
utility::printUnixErrorGeneric(CLASS_NAME, "MessageQueue", "EEXIST");
else {
// Successful unlinking, try to open again
mqd_t tempId = mq_open(name,
S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP, attributes);
if (tempId != -1) {
//Successful mq_open
this->id = tempId;
return HasReturnvaluesIF::RETURN_OK;
default: {
// Failed either the first time or the second time
utility::printUnixErrorGeneric(CLASS_NAME, "MessageQueue", "Unknown");
return HasReturnvaluesIF::RETURN_FAILED;