Merge remote-tracking branch 'upstream/master' into mueller_HybridIterator
This commit is contained in:
commit
e10cf44c8d
@ -18,8 +18,8 @@ public:
|
|||||||
*/
|
*/
|
||||||
static QueueFactory* instance();
|
static QueueFactory* instance();
|
||||||
|
|
||||||
MessageQueueIF* createMessageQueue(uint32_t message_depth = 3,
|
MessageQueueIF* createMessageQueue(uint32_t messageDepth = 3,
|
||||||
uint32_t max_message_size = MessageQueueMessage::MAX_MESSAGE_SIZE);
|
size_t maxMessageSize = MessageQueueMessage::MAX_MESSAGE_SIZE);
|
||||||
|
|
||||||
void deleteMessageQueue(MessageQueueIF* queue);
|
void deleteMessageQueue(MessageQueueIF* queue);
|
||||||
private:
|
private:
|
||||||
|
@ -25,8 +25,8 @@ QueueFactory::~QueueFactory() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
MessageQueueIF* QueueFactory::createMessageQueue(uint32_t message_depth,
|
MessageQueueIF* QueueFactory::createMessageQueue(uint32_t message_depth,
|
||||||
uint32_t max_message_size) {
|
size_t maxMessageSize) {
|
||||||
return new MessageQueue(message_depth, max_message_size);
|
return new MessageQueue(message_depth, maxMessageSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueueFactory::deleteMessageQueue(MessageQueueIF* queue) {
|
void QueueFactory::deleteMessageQueue(MessageQueueIF* queue) {
|
||||||
|
@ -1,10 +1,7 @@
|
|||||||
#include <framework/serviceinterface/ServiceInterfaceStream.h>
|
#include <framework/serviceinterface/ServiceInterfaceStream.h>
|
||||||
#include <unistd.h>
|
|
||||||
#include <limits.h>
|
|
||||||
#include <signal.h>
|
|
||||||
#include <errno.h>
|
|
||||||
#include <framework/osal/linux/FixedTimeslotTask.h>
|
#include <framework/osal/linux/FixedTimeslotTask.h>
|
||||||
|
|
||||||
|
#include <limits.h>
|
||||||
|
|
||||||
uint32_t FixedTimeslotTask::deadlineMissedCount = 0;
|
uint32_t FixedTimeslotTask::deadlineMissedCount = 0;
|
||||||
const size_t PeriodicTaskIF::MINIMUM_STACK_SIZE = PTHREAD_STACK_MIN;
|
const size_t PeriodicTaskIF::MINIMUM_STACK_SIZE = PTHREAD_STACK_MIN;
|
||||||
@ -23,7 +20,7 @@ void* FixedTimeslotTask::taskEntryPoint(void* arg) {
|
|||||||
FixedTimeslotTask *originalTask(reinterpret_cast<FixedTimeslotTask*>(arg));
|
FixedTimeslotTask *originalTask(reinterpret_cast<FixedTimeslotTask*>(arg));
|
||||||
//The task's functionality is called.
|
//The task's functionality is called.
|
||||||
originalTask->taskFunctionality();
|
originalTask->taskFunctionality();
|
||||||
return NULL;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t FixedTimeslotTask::startTask() {
|
ReturnValue_t FixedTimeslotTask::startTask() {
|
||||||
|
@ -8,7 +8,20 @@
|
|||||||
|
|
||||||
class FixedTimeslotTask: public FixedTimeslotTaskIF, public PosixThread {
|
class FixedTimeslotTask: public FixedTimeslotTaskIF, public PosixThread {
|
||||||
public:
|
public:
|
||||||
FixedTimeslotTask(const char* name_, int priority_, size_t stackSize_, uint32_t periodMs_);
|
/**
|
||||||
|
* Create a generic periodic task.
|
||||||
|
* @param name_
|
||||||
|
* Name, maximum allowed size of linux is 16 chars, everything else will
|
||||||
|
* be truncated.
|
||||||
|
* @param priority_
|
||||||
|
* Real-time priority, ranges from 1 to 99 for Linux.
|
||||||
|
* See: https://man7.org/linux/man-pages/man7/sched.7.html
|
||||||
|
* @param stackSize_
|
||||||
|
* @param period_
|
||||||
|
* @param deadlineMissedFunc_
|
||||||
|
*/
|
||||||
|
FixedTimeslotTask(const char* name_, int priority_, size_t stackSize_,
|
||||||
|
uint32_t periodMs_);
|
||||||
virtual ~FixedTimeslotTask();
|
virtual ~FixedTimeslotTask();
|
||||||
|
|
||||||
virtual ReturnValue_t startTask();
|
virtual ReturnValue_t startTask();
|
||||||
@ -17,7 +30,9 @@ public:
|
|||||||
|
|
||||||
virtual uint32_t getPeriodMs() const;
|
virtual uint32_t getPeriodMs() const;
|
||||||
|
|
||||||
virtual ReturnValue_t addSlot(object_id_t componentId, uint32_t slotTimeMs, int8_t executionStep);
|
virtual ReturnValue_t addSlot(object_id_t componentId, uint32_t slotTimeMs,
|
||||||
|
int8_t executionStep);
|
||||||
|
|
||||||
virtual ReturnValue_t checkSequence() const;
|
virtual ReturnValue_t checkSequence() const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -34,11 +49,10 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
/**
|
/**
|
||||||
* @brief This function holds the main functionality of the thread.
|
* @brief This function holds the main functionality of the thread.
|
||||||
*
|
* @details
|
||||||
*
|
* Holding the main functionality of the task, this method is most important.
|
||||||
* @details Holding the main functionality of the task, this method is most important.
|
* It links the functionalities provided by FixedSlotSequence with the
|
||||||
* It links the functionalities provided by FixedSlotSequence with the OS's System Calls
|
* OS's System Calls to keep the timing of the periods.
|
||||||
* to keep the timing of the periods.
|
|
||||||
*/
|
*/
|
||||||
virtual void taskFunctionality();
|
virtual void taskFunctionality();
|
||||||
|
|
||||||
@ -46,8 +60,13 @@ private:
|
|||||||
/**
|
/**
|
||||||
* @brief This is the entry point in a new thread.
|
* @brief This is the entry point in a new thread.
|
||||||
*
|
*
|
||||||
* @details This method, that is the entry point in the new thread and calls taskFunctionality of the child class.
|
* @details
|
||||||
* Needs a valid pointer to the derived class.
|
* This method, that is the entry point in the new thread and calls
|
||||||
|
* taskFunctionality of the child class. Needs a valid pointer to the
|
||||||
|
* derived class.
|
||||||
|
*
|
||||||
|
* The void* returnvalue is not used yet but could be used to return
|
||||||
|
* arbitrary data.
|
||||||
*/
|
*/
|
||||||
static void* taskEntryPoint(void* arg);
|
static void* taskEntryPoint(void* arg);
|
||||||
FixedSlotSequence pst;
|
FixedSlotSequence pst;
|
||||||
|
@ -1,56 +1,37 @@
|
|||||||
#include <framework/serviceinterface/ServiceInterfaceStream.h>
|
#include <framework/serviceinterface/ServiceInterfaceStream.h>
|
||||||
#include <fcntl.h> /* For O_* constants */
|
|
||||||
#include <sys/stat.h> /* For mode constants */
|
|
||||||
#include <mqueue.h>
|
|
||||||
#include <cstring>
|
|
||||||
#include <errno.h>
|
|
||||||
#include <framework/osal/linux/MessageQueue.h>
|
#include <framework/osal/linux/MessageQueue.h>
|
||||||
|
|
||||||
|
#include <fstream>
|
||||||
|
|
||||||
MessageQueue::MessageQueue(size_t message_depth, size_t max_message_size) :
|
#include <fcntl.h> /* For O_* constants */
|
||||||
id(0), lastPartner(0), defaultDestination(NO_QUEUE) {
|
#include <sys/stat.h> /* For mode constants */
|
||||||
|
#include <cstring>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
|
||||||
|
MessageQueue::MessageQueue(uint32_t messageDepth, size_t maxMessageSize):
|
||||||
|
id(MessageQueueIF::NO_QUEUE),lastPartner(MessageQueueIF::NO_QUEUE),
|
||||||
|
defaultDestination(MessageQueueIF::NO_QUEUE) {
|
||||||
//debug << "MessageQueue::MessageQueue: Creating a queue" << std::endl;
|
//debug << "MessageQueue::MessageQueue: Creating a queue" << std::endl;
|
||||||
mq_attr attributes;
|
mq_attr attributes;
|
||||||
this->id = 0;
|
this->id = 0;
|
||||||
//Set attributes
|
//Set attributes
|
||||||
attributes.mq_curmsgs = 0;
|
attributes.mq_curmsgs = 0;
|
||||||
attributes.mq_maxmsg = message_depth;
|
attributes.mq_maxmsg = messageDepth;
|
||||||
attributes.mq_msgsize = max_message_size;
|
attributes.mq_msgsize = maxMessageSize;
|
||||||
attributes.mq_flags = 0; //Flags are ignored on Linux during mq_open
|
attributes.mq_flags = 0; //Flags are ignored on Linux during mq_open
|
||||||
//Set the name of the queue
|
//Set the name of the queue. The slash is mandatory!
|
||||||
sprintf(name, "/Q%u\n", queueCounter++);
|
sprintf(name, "/FSFW_MQ%u\n", queueCounter++);
|
||||||
|
|
||||||
//Create a nonblocking queue if the name is available (the queue is Read and
|
// Create a nonblocking queue if the name is available (the queue is read
|
||||||
// writable for the owner as well as the group)
|
// and writable for the owner as well as the group)
|
||||||
mqd_t tempId = mq_open(name, O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL,
|
int oflag = O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL;
|
||||||
S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP | S_IROTH | S_IWOTH, &attributes);
|
mode_t mode = S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP | S_IROTH | S_IWOTH;
|
||||||
|
mqd_t tempId = mq_open(name, oflag, mode, &attributes);
|
||||||
if (tempId == -1) {
|
if (tempId == -1) {
|
||||||
//An error occured during open
|
handleError(&attributes, messageDepth);
|
||||||
//We need to distinguish if it is caused by an already created queue
|
|
||||||
if (errno == EEXIST) {
|
|
||||||
//There's another queue with the same name
|
|
||||||
//We unlink the other queue
|
|
||||||
int status = mq_unlink(name);
|
|
||||||
if (status != 0) {
|
|
||||||
sif::error << "mq_unlink Failed with status: " << strerror(errno)
|
|
||||||
<< std::endl;
|
|
||||||
} else {
|
|
||||||
//Successful unlinking, try to open again
|
|
||||||
mqd_t tempId = mq_open(name,
|
|
||||||
O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL,
|
|
||||||
S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP, &attributes);
|
|
||||||
if (tempId != -1) {
|
|
||||||
//Successful mq_open
|
|
||||||
this->id = tempId;
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
else {
|
||||||
}
|
|
||||||
//Failed either the first time or the second time
|
|
||||||
sif::error << "MessageQueue::MessageQueue: Creating Queue " << std::hex
|
|
||||||
<< name << std::dec << " failed with status: "
|
|
||||||
<< strerror(errno) << std::endl;
|
|
||||||
} else {
|
|
||||||
//Successful mq_open call
|
//Successful mq_open call
|
||||||
this->id = tempId;
|
this->id = tempId;
|
||||||
}
|
}
|
||||||
@ -69,6 +50,73 @@ MessageQueue::~MessageQueue() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ReturnValue_t MessageQueue::handleError(mq_attr* attributes,
|
||||||
|
uint32_t messageDepth) {
|
||||||
|
switch(errno) {
|
||||||
|
case(EINVAL): {
|
||||||
|
sif::error << "MessageQueue::MessageQueue: Invalid name or attributes"
|
||||||
|
" for message size" << std::endl;
|
||||||
|
size_t defaultMqMaxMsg = 0;
|
||||||
|
// Not POSIX conformant, but should work for all UNIX systems.
|
||||||
|
// Just an additional helpful printout :-)
|
||||||
|
if(std::ifstream("/proc/sys/fs/mqueue/msg_max",std::ios::in) >>
|
||||||
|
defaultMqMaxMsg and defaultMqMaxMsg < messageDepth) {
|
||||||
|
// See: https://www.man7.org/linux/man-pages/man3/mq_open.3.html
|
||||||
|
// This happens if the msg_max value is not large enough
|
||||||
|
// It is ignored if the executable is run in privileged mode.
|
||||||
|
// Run the unlockRealtime script or grant the mode manually by using:
|
||||||
|
// sudo setcap 'CAP_SYS_RESOURCE=+ep' <pathToBinary>
|
||||||
|
|
||||||
|
// Persistent solution for session:
|
||||||
|
// echo <newMsgMax> | sudo tee /proc/sys/fs/mqueue/msg_max
|
||||||
|
|
||||||
|
// Permanent solution:
|
||||||
|
// sudo nano /etc/sysctl.conf
|
||||||
|
// Append at end: fs/mqueue/msg_max = <newMsgMaxLen>
|
||||||
|
// Apply changes with: sudo sysctl -p
|
||||||
|
sif::error << "MessageQueue::MessageQueue: Default MQ size "
|
||||||
|
<< defaultMqMaxMsg << " is too small for requested size "
|
||||||
|
<< messageDepth << std::endl;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case(EEXIST): {
|
||||||
|
// An error occured during open
|
||||||
|
// We need to distinguish if it is caused by an already created queue
|
||||||
|
//There's another queue with the same name
|
||||||
|
//We unlink the other queue
|
||||||
|
int status = mq_unlink(name);
|
||||||
|
if (status != 0) {
|
||||||
|
sif::error << "mq_unlink Failed with status: " << strerror(errno)
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Successful unlinking, try to open again
|
||||||
|
mqd_t tempId = mq_open(name,
|
||||||
|
O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL,
|
||||||
|
S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP, attributes);
|
||||||
|
if (tempId != -1) {
|
||||||
|
//Successful mq_open
|
||||||
|
this->id = tempId;
|
||||||
|
return HasReturnvaluesIF::RETURN_OK;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Failed either the first time or the second time
|
||||||
|
sif::error << "MessageQueue::MessageQueue: Creating Queue " << std::hex
|
||||||
|
<< name << std::dec << " failed with status: "
|
||||||
|
<< strerror(errno) << std::endl;
|
||||||
|
|
||||||
|
}
|
||||||
|
return HasReturnvaluesIF::RETURN_FAILED;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
ReturnValue_t MessageQueue::sendMessage(MessageQueueId_t sendTo,
|
ReturnValue_t MessageQueue::sendMessage(MessageQueueId_t sendTo,
|
||||||
MessageQueueMessage* message, bool ignoreFault) {
|
MessageQueueMessage* message, bool ignoreFault) {
|
||||||
return sendMessageFrom(sendTo, message, this->getId(), false);
|
return sendMessageFrom(sendTo, message, this->getId(), false);
|
||||||
@ -108,12 +156,13 @@ ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessage* message) {
|
|||||||
//Success but no message received
|
//Success but no message received
|
||||||
return MessageQueueIF::EMPTY;
|
return MessageQueueIF::EMPTY;
|
||||||
} else {
|
} else {
|
||||||
//No message was received. Keep lastPartner anyway, I might send something later.
|
//No message was received. Keep lastPartner anyway, I might send
|
||||||
//But still, delete packet content.
|
//something later. But still, delete packet content.
|
||||||
memset(message->getData(), 0, message->MAX_DATA_SIZE);
|
memset(message->getData(), 0, message->MAX_DATA_SIZE);
|
||||||
switch(errno){
|
switch(errno){
|
||||||
case EAGAIN:
|
case EAGAIN:
|
||||||
//O_NONBLOCK or MQ_NONBLOCK was set and there are no messages currently on the specified queue.
|
//O_NONBLOCK or MQ_NONBLOCK was set and there are no messages
|
||||||
|
//currently on the specified queue.
|
||||||
return MessageQueueIF::EMPTY;
|
return MessageQueueIF::EMPTY;
|
||||||
case EBADF:
|
case EBADF:
|
||||||
//mqdes doesn't represent a valid queue open for reading.
|
//mqdes doesn't represent a valid queue open for reading.
|
||||||
@ -123,9 +172,12 @@ ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessage* message) {
|
|||||||
case EINVAL:
|
case EINVAL:
|
||||||
/*
|
/*
|
||||||
* This value indicates one of the following:
|
* This value indicates one of the following:
|
||||||
* * The pointer to the buffer for storing the received message, msg_ptr, is NULL.
|
* - The pointer to the buffer for storing the received message,
|
||||||
* * The number of bytes requested, msg_len is less than zero.
|
* msg_ptr, is NULL.
|
||||||
* * msg_len is anything other than the mq_msgsize of the specified queue, and the QNX extended option MQ_READBUF_DYNAMIC hasn't been set in the queue's mq_flags.
|
* - The number of bytes requested, msg_len is less than zero.
|
||||||
|
* - msg_len is anything other than the mq_msgsize of the specified
|
||||||
|
* queue, and the QNX extended option MQ_READBUF_DYNAMIC hasn't
|
||||||
|
* been set in the queue's mq_flags.
|
||||||
*/
|
*/
|
||||||
sif::error << "MessageQueue::receive: configuration error "
|
sif::error << "MessageQueue::receive: configuration error "
|
||||||
<< strerror(errno) << std::endl;
|
<< strerror(errno) << std::endl;
|
||||||
@ -133,8 +185,12 @@ ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessage* message) {
|
|||||||
case EMSGSIZE:
|
case EMSGSIZE:
|
||||||
/*
|
/*
|
||||||
* This value indicates one of the following:
|
* This value indicates one of the following:
|
||||||
* * the QNX extended option MQ_READBUF_DYNAMIC hasn't been set, and the given msg_len is shorter than the mq_msgsize for the given queue.
|
* - the QNX extended option MQ_READBUF_DYNAMIC hasn't been set,
|
||||||
* * the extended option MQ_READBUF_DYNAMIC has been set, but the given msg_len is too short for the message that would have been received.
|
* and the given msg_len is shorter than the mq_msgsize for
|
||||||
|
* the given queue.
|
||||||
|
* - the extended option MQ_READBUF_DYNAMIC has been set, but the
|
||||||
|
* given msg_len is too short for the message that would have
|
||||||
|
* been received.
|
||||||
*/
|
*/
|
||||||
sif::error << "MessageQueue::receive: configuration error "
|
sif::error << "MessageQueue::receive: configuration error "
|
||||||
<< strerror(errno) << std::endl;
|
<< strerror(errno) << std::endl;
|
||||||
@ -182,9 +238,10 @@ ReturnValue_t MessageQueue::flush(uint32_t* count) {
|
|||||||
case EINVAL:
|
case EINVAL:
|
||||||
/*
|
/*
|
||||||
* This value indicates one of the following:
|
* This value indicates one of the following:
|
||||||
* * mq_attr is NULL.
|
* - mq_attr is NULL.
|
||||||
* * MQ_MULT_NOTIFY had been set for this queue, and the given mq_flags includes a 0 in the MQ_MULT_NOTIFY bit. Once MQ_MULT_NOTIFY has been turned on, it may never be turned off.
|
* - MQ_MULT_NOTIFY had been set for this queue, and the given
|
||||||
*
|
* mq_flags includes a 0 in the MQ_MULT_NOTIFY bit. Once
|
||||||
|
* MQ_MULT_NOTIFY has been turned on, it may never be turned off.
|
||||||
*/
|
*/
|
||||||
default:
|
default:
|
||||||
return HasReturnvaluesIF::RETURN_FAILED;
|
return HasReturnvaluesIF::RETURN_FAILED;
|
||||||
@ -233,7 +290,8 @@ ReturnValue_t MessageQueue::sendMessageFromMessageQueue(MessageQueueId_t sendTo,
|
|||||||
//TODO: Check if we're in ISR.
|
//TODO: Check if we're in ISR.
|
||||||
if (result != 0) {
|
if (result != 0) {
|
||||||
if(!ignoreFault){
|
if(!ignoreFault){
|
||||||
InternalErrorReporterIF* internalErrorReporter = objectManager->get<InternalErrorReporterIF>(
|
InternalErrorReporterIF* internalErrorReporter =
|
||||||
|
objectManager->get<InternalErrorReporterIF>(
|
||||||
objects::INTERNAL_ERROR_REPORTER);
|
objects::INTERNAL_ERROR_REPORTER);
|
||||||
if (internalErrorReporter != NULL) {
|
if (internalErrorReporter != NULL) {
|
||||||
internalErrorReporter->queueMessageNotSent();
|
internalErrorReporter->queueMessageNotSent();
|
||||||
@ -241,10 +299,13 @@ ReturnValue_t MessageQueue::sendMessageFromMessageQueue(MessageQueueId_t sendTo,
|
|||||||
}
|
}
|
||||||
switch(errno){
|
switch(errno){
|
||||||
case EAGAIN:
|
case EAGAIN:
|
||||||
//The O_NONBLOCK flag was set when opening the queue, or the MQ_NONBLOCK flag was set in its attributes, and the specified queue is full.
|
//The O_NONBLOCK flag was set when opening the queue, or the
|
||||||
|
//MQ_NONBLOCK flag was set in its attributes, and the
|
||||||
|
//specified queue is full.
|
||||||
return MessageQueueIF::FULL;
|
return MessageQueueIF::FULL;
|
||||||
case EBADF:
|
case EBADF:
|
||||||
//mq_des doesn't represent a valid message queue descriptor, or mq_des wasn't opened for writing.
|
//mq_des doesn't represent a valid message queue descriptor,
|
||||||
|
//or mq_des wasn't opened for writing.
|
||||||
sif::error << "MessageQueue::sendMessage: Configuration error "
|
sif::error << "MessageQueue::sendMessage: Configuration error "
|
||||||
<< strerror(errno) << " in mq_send mqSendTo: " << sendTo
|
<< strerror(errno) << " in mq_send mqSendTo: " << sendTo
|
||||||
<< " sent from " << sentFrom << std::endl;
|
<< " sent from " << sentFrom << std::endl;
|
||||||
@ -254,18 +315,22 @@ ReturnValue_t MessageQueue::sendMessageFromMessageQueue(MessageQueueId_t sendTo,
|
|||||||
case EINVAL:
|
case EINVAL:
|
||||||
/*
|
/*
|
||||||
* This value indicates one of the following:
|
* This value indicates one of the following:
|
||||||
* * msg_ptr is NULL.
|
* - msg_ptr is NULL.
|
||||||
* * msg_len is negative.
|
* - msg_len is negative.
|
||||||
* * msg_prio is greater than MQ_PRIO_MAX.
|
* - msg_prio is greater than MQ_PRIO_MAX.
|
||||||
* * msg_prio is less than 0.
|
* - msg_prio is less than 0.
|
||||||
* * MQ_PRIO_RESTRICT is set in the mq_attr of mq_des,
|
* - MQ_PRIO_RESTRICT is set in the mq_attr of mq_des, and
|
||||||
* and msg_prio is greater than the priority of the calling process.
|
* msg_prio is greater than the priority of the calling process.
|
||||||
* */
|
*/
|
||||||
sif::error << "MessageQueue::sendMessage: Configuration error "
|
sif::error << "MessageQueue::sendMessage: Configuration error "
|
||||||
<< strerror(errno) << " in mq_send" << std::endl;
|
<< strerror(errno) << " in mq_send" << std::endl;
|
||||||
/*NO BREAK*/
|
/*NO BREAK*/
|
||||||
case EMSGSIZE:
|
case EMSGSIZE:
|
||||||
//The msg_len is greater than the msgsize associated with the specified queue.
|
// The msg_len is greater than the msgsize associated with
|
||||||
|
//the specified queue.
|
||||||
|
sif::error << "MessageQueue::sendMessage: Size error [" <<
|
||||||
|
strerror(errno) << "] in mq_send" << std::endl;
|
||||||
|
/*NO BREAK*/
|
||||||
default:
|
default:
|
||||||
return HasReturnvaluesIF::RETURN_FAILED;
|
return HasReturnvaluesIF::RETURN_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -4,21 +4,26 @@
|
|||||||
#include <framework/internalError/InternalErrorReporterIF.h>
|
#include <framework/internalError/InternalErrorReporterIF.h>
|
||||||
#include <framework/ipc/MessageQueueIF.h>
|
#include <framework/ipc/MessageQueueIF.h>
|
||||||
#include <framework/ipc/MessageQueueMessage.h>
|
#include <framework/ipc/MessageQueueMessage.h>
|
||||||
|
|
||||||
|
#include <mqueue.h>
|
||||||
/**
|
/**
|
||||||
* @brief This class manages sending and receiving of message queue messages.
|
* @brief This class manages sending and receiving of message queue messages.
|
||||||
*
|
*
|
||||||
* @details Message queues are used to pass asynchronous messages between processes.
|
* @details
|
||||||
|
* Message queues are used to pass asynchronous messages between processes.
|
||||||
* They work like post boxes, where all incoming messages are stored in FIFO
|
* They work like post boxes, where all incoming messages are stored in FIFO
|
||||||
* order. This class creates a new receiving queue and provides methods to fetch
|
* order. This class creates a new receiving queue and provides methods to fetch
|
||||||
* received messages. Being a child of MessageQueueSender, this class also provides
|
* received messages. Being a child of MessageQueueSender, this class also
|
||||||
* methods to send a message to a user-defined or a default destination. In addition
|
* provides methods to send a message to a user-defined or a default destination.
|
||||||
* it also provides a reply method to answer to the queue it received its last message
|
* In addition it also provides a reply method to answer to the queue it
|
||||||
* from.
|
* received its last message from.
|
||||||
* The MessageQueue should be used as "post box" for a single owning object. So all
|
*
|
||||||
* message queue communication is "n-to-one".
|
* The MessageQueue should be used as "post box" for a single owning object.
|
||||||
* For creating the queue, as well as sending and receiving messages, the class makes
|
* So all message queue communication is "n-to-one".
|
||||||
* use of the operating system calls provided.
|
*
|
||||||
* \ingroup message_queue
|
* The creation of message queues, as well as sending and receiving messages,
|
||||||
|
* makes use of the operating system calls provided.
|
||||||
|
* @ingroup message_queue
|
||||||
*/
|
*/
|
||||||
class MessageQueue : public MessageQueueIF {
|
class MessageQueue : public MessageQueueIF {
|
||||||
friend class MessageQueueSenderIF;
|
friend class MessageQueueSenderIF;
|
||||||
@ -35,7 +40,8 @@ public:
|
|||||||
* @param max_message_size With this parameter, the maximum message size can be adjusted.
|
* @param max_message_size With this parameter, the maximum message size can be adjusted.
|
||||||
* This should be left default.
|
* This should be left default.
|
||||||
*/
|
*/
|
||||||
MessageQueue( size_t message_depth = 3, size_t max_message_size = MessageQueueMessage::MAX_MESSAGE_SIZE );
|
MessageQueue(uint32_t messageDepth = 3,
|
||||||
|
size_t maxMessageSize = MessageQueueMessage::MAX_MESSAGE_SIZE );
|
||||||
/**
|
/**
|
||||||
* @brief The destructor deletes the formerly created message queue.
|
* @brief The destructor deletes the formerly created message queue.
|
||||||
* @details This is accomplished by using the delete call provided by the operating system.
|
* @details This is accomplished by using the delete call provided by the operating system.
|
||||||
@ -168,6 +174,8 @@ private:
|
|||||||
char name[5];
|
char name[5];
|
||||||
|
|
||||||
static uint16_t queueCounter;
|
static uint16_t queueCounter;
|
||||||
|
|
||||||
|
ReturnValue_t handleError(mq_attr* attributes, uint32_t messageDepth);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* MESSAGEQUEUE_H_ */
|
#endif /* MESSAGEQUEUE_H_ */
|
||||||
|
@ -3,9 +3,10 @@
|
|||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <framework/osal/linux/PeriodicPosixTask.h>
|
#include <framework/osal/linux/PeriodicPosixTask.h>
|
||||||
|
|
||||||
PeriodicPosixTask::PeriodicPosixTask(const char* name_, int priority_, size_t stackSize_, uint32_t period_, void(deadlineMissedFunc_)()):PosixThread(name_,priority_,stackSize_),objectList(),started(false),periodMs(period_),deadlineMissedFunc(
|
PeriodicPosixTask::PeriodicPosixTask(const char* name_, int priority_,
|
||||||
deadlineMissedFunc_) {
|
size_t stackSize_, uint32_t period_, void(deadlineMissedFunc_)()):
|
||||||
|
PosixThread(name_,priority_,stackSize_),objectList(),started(false),
|
||||||
|
periodMs(period_),deadlineMissedFunc(deadlineMissedFunc_) {
|
||||||
}
|
}
|
||||||
|
|
||||||
PeriodicPosixTask::~PeriodicPosixTask() {
|
PeriodicPosixTask::~PeriodicPosixTask() {
|
||||||
@ -37,7 +38,8 @@ ReturnValue_t PeriodicPosixTask::sleepFor(uint32_t ms) {
|
|||||||
|
|
||||||
ReturnValue_t PeriodicPosixTask::startTask(void){
|
ReturnValue_t PeriodicPosixTask::startTask(void){
|
||||||
started = true;
|
started = true;
|
||||||
createTask(&taskEntryPoint,this);
|
//sif::info << stackSize << std::endl;
|
||||||
|
PosixThread::createTask(&taskEntryPoint,this);
|
||||||
return HasReturnvaluesIF::RETURN_OK;
|
return HasReturnvaluesIF::RETURN_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,9 +58,11 @@ void PeriodicPosixTask::taskFunctionality(void){
|
|||||||
char name[20] = {0};
|
char name[20] = {0};
|
||||||
int status = pthread_getname_np(pthread_self(),name,sizeof(name));
|
int status = pthread_getname_np(pthread_self(),name,sizeof(name));
|
||||||
if(status==0){
|
if(status==0){
|
||||||
sif::error << "ObjectTask: " << name << " Deadline missed." << std::endl;
|
sif::error << "PeriodicPosixTask " << name << ": Deadline "
|
||||||
|
"missed." << std::endl;
|
||||||
}else{
|
}else{
|
||||||
sif::error << "ObjectTask: X Deadline missed. " << status << std::endl;
|
sif::error << "PeriodicPosixTask X: Deadline missed. " <<
|
||||||
|
status << std::endl;
|
||||||
}
|
}
|
||||||
if (this->deadlineMissedFunc != NULL) {
|
if (this->deadlineMissedFunc != NULL) {
|
||||||
this->deadlineMissedFunc();
|
this->deadlineMissedFunc();
|
||||||
|
@ -9,9 +9,22 @@
|
|||||||
|
|
||||||
class PeriodicPosixTask: public PosixThread, public PeriodicTaskIF {
|
class PeriodicPosixTask: public PosixThread, public PeriodicTaskIF {
|
||||||
public:
|
public:
|
||||||
|
/**
|
||||||
|
* Create a generic periodic task.
|
||||||
|
* @param name_
|
||||||
|
* Name, maximum allowed size of linux is 16 chars, everything else will
|
||||||
|
* be truncated.
|
||||||
|
* @param priority_
|
||||||
|
* Real-time priority, ranges from 1 to 99 for Linux.
|
||||||
|
* See: https://man7.org/linux/man-pages/man7/sched.7.html
|
||||||
|
* @param stackSize_
|
||||||
|
* @param period_
|
||||||
|
* @param deadlineMissedFunc_
|
||||||
|
*/
|
||||||
PeriodicPosixTask(const char* name_, int priority_, size_t stackSize_,
|
PeriodicPosixTask(const char* name_, int priority_, size_t stackSize_,
|
||||||
uint32_t period_, void(*deadlineMissedFunc_)());
|
uint32_t period_, void(*deadlineMissedFunc_)());
|
||||||
virtual ~PeriodicPosixTask();
|
virtual ~PeriodicPosixTask();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief The method to start the task.
|
* @brief The method to start the task.
|
||||||
* @details The method starts the task with the respective system call.
|
* @details The method starts the task with the respective system call.
|
||||||
|
@ -1,8 +1,13 @@
|
|||||||
#include <framework/serviceinterface/ServiceInterfaceStream.h>
|
#include <framework/serviceinterface/ServiceInterfaceStream.h>
|
||||||
|
#include <framework/osal/linux/PosixThread.h>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <framework/osal/linux/PosixThread.h>
|
|
||||||
|
|
||||||
|
PosixThread::PosixThread(const char* name_, int priority_, size_t stackSize_):
|
||||||
|
thread(0),priority(priority_),stackSize(stackSize_) {
|
||||||
|
name[0] = '\0';
|
||||||
|
std::strncat(name, name_, PTHREAD_MAX_NAMELEN - 1);
|
||||||
|
}
|
||||||
|
|
||||||
PosixThread::~PosixThread() {
|
PosixThread::~PosixThread() {
|
||||||
//No deletion and no free of Stack Pointer
|
//No deletion and no free of Stack Pointer
|
||||||
@ -55,10 +60,6 @@ void PosixThread::resume(){
|
|||||||
pthread_kill(thread,SIGUSR1);
|
pthread_kill(thread,SIGUSR1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
bool PosixThread::delayUntil(uint64_t* const prevoiusWakeTime_ms,
|
bool PosixThread::delayUntil(uint64_t* const prevoiusWakeTime_ms,
|
||||||
const uint64_t delayTime_ms) {
|
const uint64_t delayTime_ms) {
|
||||||
uint64_t nextTimeToWake_ms;
|
uint64_t nextTimeToWake_ms;
|
||||||
@ -113,12 +114,6 @@ uint64_t PosixThread::getCurrentMonotonicTimeMs(){
|
|||||||
return currentTime_ms;
|
return currentTime_ms;
|
||||||
}
|
}
|
||||||
|
|
||||||
PosixThread::PosixThread(const char* name_, int priority_, size_t stackSize_):
|
|
||||||
thread(0),priority(priority_),stackSize(stackSize_) {
|
|
||||||
strcpy(name,name_);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void PosixThread::createTask(void* (*fnc_)(void*), void* arg_) {
|
void PosixThread::createTask(void* (*fnc_)(void*), void* arg_) {
|
||||||
//sif::debug << "PosixThread::createTask" << std::endl;
|
//sif::debug << "PosixThread::createTask" << std::endl;
|
||||||
@ -135,14 +130,24 @@ void PosixThread::createTask(void* (*fnc_)(void*), void* arg_) {
|
|||||||
sif::error << "Posix Thread attribute init failed with: " <<
|
sif::error << "Posix Thread attribute init failed with: " <<
|
||||||
strerror(status) << std::endl;
|
strerror(status) << std::endl;
|
||||||
}
|
}
|
||||||
void* sp;
|
void* stackPointer;
|
||||||
status = posix_memalign(&sp, sysconf(_SC_PAGESIZE), stackSize);
|
status = posix_memalign(&stackPointer, sysconf(_SC_PAGESIZE), stackSize);
|
||||||
if(status != 0){
|
if(status != 0){
|
||||||
sif::error << "Posix Thread stack init failed with: " <<
|
sif::error << "PosixThread::createTask: Stack init failed with: " <<
|
||||||
strerror(status) << std::endl;
|
strerror(status) << std::endl;
|
||||||
|
if(errno == ENOMEM) {
|
||||||
|
uint64_t stackMb = stackSize/10e6;
|
||||||
|
sif::error << "PosixThread::createTask: Insufficient memory for"
|
||||||
|
" the requested " << stackMb << " MB" << std::endl;
|
||||||
|
}
|
||||||
|
else if(errno == EINVAL) {
|
||||||
|
sif::error << "PosixThread::createTask: Wrong alignment argument!"
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
status = pthread_attr_setstack(&attributes, sp, stackSize);
|
status = pthread_attr_setstack(&attributes, stackPointer, stackSize);
|
||||||
if(status != 0){
|
if(status != 0){
|
||||||
sif::error << "Posix Thread attribute setStack failed with: " <<
|
sif::error << "Posix Thread attribute setStack failed with: " <<
|
||||||
strerror(status) << std::endl;
|
strerror(status) << std::endl;
|
||||||
@ -188,8 +193,18 @@ void PosixThread::createTask(void* (*fnc_)(void*), void* arg_) {
|
|||||||
|
|
||||||
status = pthread_setname_np(thread,name);
|
status = pthread_setname_np(thread,name);
|
||||||
if(status != 0){
|
if(status != 0){
|
||||||
sif::error << "Posix Thread setname failed with: " <<
|
sif::error << "PosixThread::createTask: setname failed with: " <<
|
||||||
strerror(status) << std::endl;
|
strerror(status) << std::endl;
|
||||||
|
if(status == ERANGE) {
|
||||||
|
sif::error << "PosixThread::createTask: Task name length longer"
|
||||||
|
" than 16 chars. Truncating.." << std::endl;
|
||||||
|
name[15] = '\0';
|
||||||
|
status = pthread_setname_np(thread,name);
|
||||||
|
if(status != 0){
|
||||||
|
sif::error << "PosixThread::createTask: Setting name"
|
||||||
|
" did not work.." << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
status = pthread_attr_destroy(&attributes);
|
status = pthread_attr_destroy(&attributes);
|
||||||
|
@ -1,15 +1,15 @@
|
|||||||
#ifndef FRAMEWORK_OSAL_LINUX_POSIXTHREAD_H_
|
#ifndef FRAMEWORK_OSAL_LINUX_POSIXTHREAD_H_
|
||||||
#define FRAMEWORK_OSAL_LINUX_POSIXTHREAD_H_
|
#define FRAMEWORK_OSAL_LINUX_POSIXTHREAD_H_
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <sched.h>
|
|
||||||
#include <signal.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#include <framework/returnvalues/HasReturnvaluesIF.h>
|
#include <framework/returnvalues/HasReturnvaluesIF.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <signal.h>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
class PosixThread {
|
class PosixThread {
|
||||||
public:
|
public:
|
||||||
|
static constexpr uint8_t PTHREAD_MAX_NAMELEN = 16;
|
||||||
PosixThread(const char* name_, int priority_, size_t stackSize_);
|
PosixThread(const char* name_, int priority_, size_t stackSize_);
|
||||||
virtual ~PosixThread();
|
virtual ~PosixThread();
|
||||||
/**
|
/**
|
||||||
@ -54,21 +54,24 @@ protected:
|
|||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Function that has to be called by derived class because the derived class pointer has to be valid as argument
|
* @brief Function that has to be called by derived class because the
|
||||||
* @details This function creates a pthread with the given parameters. As the function requires a pointer to the derived object
|
* derived class pointer has to be valid as argument.
|
||||||
* it has to be called after the this pointer of the derived object is valid. Sets the taskEntryPoint as
|
* @details
|
||||||
* function to be called by new a thread.
|
* This function creates a pthread with the given parameters. As the
|
||||||
* @param name_ Name of the task
|
* function requires a pointer to the derived object it has to be called
|
||||||
* @param priority_ Priority of the task according to POSIX
|
* after the this pointer of the derived object is valid.
|
||||||
* @param stackSize_ Size of the stack attached to that task
|
* Sets the taskEntryPoint as function to be called by new a thread.
|
||||||
* @param arg_ argument of the taskEntryPoint function, needs to be this pointer of derived class
|
* @param fnc_ Function which will be executed by the thread.
|
||||||
|
* @param arg_
|
||||||
|
* argument of the taskEntryPoint function, needs to be this pointer
|
||||||
|
* of derived class
|
||||||
*/
|
*/
|
||||||
void createTask(void* (*fnc_)(void*),void* arg_);
|
void createTask(void* (*fnc_)(void*),void* arg_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
char name[10];
|
char name[PTHREAD_MAX_NAMELEN];
|
||||||
int priority;
|
int priority;
|
||||||
size_t stackSize;
|
size_t stackSize = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* FRAMEWORK_OSAL_LINUX_POSIXTHREAD_H_ */
|
#endif /* FRAMEWORK_OSAL_LINUX_POSIXTHREAD_H_ */
|
||||||
|
@ -5,16 +5,18 @@
|
|||||||
#include <framework/serviceinterface/ServiceInterfaceStream.h>
|
#include <framework/serviceinterface/ServiceInterfaceStream.h>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
|
||||||
QueueFactory* QueueFactory::factoryInstance = NULL;
|
QueueFactory* QueueFactory::factoryInstance = nullptr;
|
||||||
|
|
||||||
|
|
||||||
ReturnValue_t MessageQueueSenderIF::sendMessage(MessageQueueId_t sendTo,
|
ReturnValue_t MessageQueueSenderIF::sendMessage(MessageQueueId_t sendTo,
|
||||||
MessageQueueMessage* message, MessageQueueId_t sentFrom,bool ignoreFault) {
|
MessageQueueMessage* message, MessageQueueId_t sentFrom,
|
||||||
return MessageQueue::sendMessageFromMessageQueue(sendTo,message,sentFrom,ignoreFault);
|
bool ignoreFault) {
|
||||||
|
return MessageQueue::sendMessageFromMessageQueue(sendTo,message,
|
||||||
|
sentFrom,ignoreFault);
|
||||||
}
|
}
|
||||||
|
|
||||||
QueueFactory* QueueFactory::instance() {
|
QueueFactory* QueueFactory::instance() {
|
||||||
if (factoryInstance == NULL) {
|
if (factoryInstance == nullptr) {
|
||||||
factoryInstance = new QueueFactory;
|
factoryInstance = new QueueFactory;
|
||||||
}
|
}
|
||||||
return factoryInstance;
|
return factoryInstance;
|
||||||
@ -26,9 +28,9 @@ QueueFactory::QueueFactory() {
|
|||||||
QueueFactory::~QueueFactory() {
|
QueueFactory::~QueueFactory() {
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageQueueIF* QueueFactory::createMessageQueue(uint32_t message_depth,
|
MessageQueueIF* QueueFactory::createMessageQueue(uint32_t messageDepth,
|
||||||
uint32_t max_message_size) {
|
size_t maxMessageSize) {
|
||||||
return new MessageQueue(message_depth, max_message_size);
|
return new MessageQueue(messageDepth, maxMessageSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueueFactory::deleteMessageQueue(MessageQueueIF* queue) {
|
void QueueFactory::deleteMessageQueue(MessageQueueIF* queue) {
|
||||||
|
@ -13,12 +13,20 @@ TaskFactory* TaskFactory::instance() {
|
|||||||
return TaskFactory::factoryInstance;
|
return TaskFactory::factoryInstance;
|
||||||
}
|
}
|
||||||
|
|
||||||
PeriodicTaskIF* TaskFactory::createPeriodicTask(TaskName name_,TaskPriority taskPriority_,TaskStackSize stackSize_,TaskPeriod periodInSeconds_,TaskDeadlineMissedFunction deadLineMissedFunction_) {
|
PeriodicTaskIF* TaskFactory::createPeriodicTask(TaskName name_,
|
||||||
return static_cast<PeriodicTaskIF*>(new PeriodicPosixTask(name_, taskPriority_,stackSize_,periodInSeconds_ * 1000,deadLineMissedFunction_));
|
TaskPriority taskPriority_,TaskStackSize stackSize_,
|
||||||
|
TaskPeriod periodInSeconds_,
|
||||||
|
TaskDeadlineMissedFunction deadLineMissedFunction_) {
|
||||||
|
return new PeriodicPosixTask(name_, taskPriority_,stackSize_,
|
||||||
|
periodInSeconds_ * 1000, deadLineMissedFunction_);
|
||||||
}
|
}
|
||||||
|
|
||||||
FixedTimeslotTaskIF* TaskFactory::createFixedTimeslotTask(TaskName name_,TaskPriority taskPriority_,TaskStackSize stackSize_,TaskPeriod periodInSeconds_,TaskDeadlineMissedFunction deadLineMissedFunction_) {
|
FixedTimeslotTaskIF* TaskFactory::createFixedTimeslotTask(TaskName name_,
|
||||||
return static_cast<FixedTimeslotTaskIF*>(new FixedTimeslotTask(name_, taskPriority_,stackSize_,periodInSeconds_*1000));
|
TaskPriority taskPriority_,TaskStackSize stackSize_,
|
||||||
|
TaskPeriod periodInSeconds_,
|
||||||
|
TaskDeadlineMissedFunction deadLineMissedFunction_) {
|
||||||
|
return new FixedTimeslotTask(name_, taskPriority_,stackSize_,
|
||||||
|
periodInSeconds_*1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t TaskFactory::deleteTask(PeriodicTaskIF* task) {
|
ReturnValue_t TaskFactory::deleteTask(PeriodicTaskIF* task) {
|
||||||
|
@ -49,9 +49,9 @@ QueueFactory::QueueFactory() {
|
|||||||
QueueFactory::~QueueFactory() {
|
QueueFactory::~QueueFactory() {
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageQueueIF* QueueFactory::createMessageQueue(uint32_t message_depth,
|
MessageQueueIF* QueueFactory::createMessageQueue(uint32_t messageDepth,
|
||||||
uint32_t max_message_size) {
|
size_t maxMessageSize) {
|
||||||
return new MessageQueue(message_depth, max_message_size);
|
return new MessageQueue(messageDepth, maxMessageSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueueFactory::deleteMessageQueue(MessageQueueIF* queue) {
|
void QueueFactory::deleteMessageQueue(MessageQueueIF* queue) {
|
||||||
|
Loading…
Reference in New Issue
Block a user