#include "MessageQueue.h" #include "unixUtility.h" #include "../../serviceinterface/ServiceInterface.h" #include "../../objectmanager/ObjectManager.h" #include #include /* For O_* constants */ #include /* For mode constants */ #include #include MessageQueue::MessageQueue(uint32_t messageDepth, size_t maxMessageSize): id(MessageQueueIF::NO_QUEUE),lastPartner(MessageQueueIF::NO_QUEUE), defaultDestination(MessageQueueIF::NO_QUEUE), maxMessageSize(maxMessageSize) { mq_attr attributes; this->id = 0; //Set attributes attributes.mq_curmsgs = 0; attributes.mq_maxmsg = messageDepth; attributes.mq_msgsize = maxMessageSize; attributes.mq_flags = 0; //Flags are ignored on Linux during mq_open //Set the name of the queue. The slash is mandatory! sprintf(name, "/FSFW_MQ%u\n", queueCounter++); // Create a nonblocking queue if the name is available (the queue is read // and writable for the owner as well as the group) int oflag = O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL; mode_t mode = S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP | S_IROTH | S_IWOTH; mqd_t tempId = mq_open(name, oflag, mode, &attributes); if (tempId == -1) { handleOpenError(&attributes, messageDepth); } else { //Successful mq_open call this->id = tempId; } } MessageQueue::~MessageQueue() { int status = mq_close(this->id); if(status != 0){ utility::printUnixErrorGeneric(CLASS_NAME, "~MessageQueue", "close"); } status = mq_unlink(name); if(status != 0){ utility::printUnixErrorGeneric(CLASS_NAME, "~MessageQueue", "unlink"); } } ReturnValue_t MessageQueue::sendMessage(MessageQueueId_t sendTo, MessageQueueMessageIF* message, bool ignoreFault) { return sendMessageFrom(sendTo, message, this->getId(), false); } ReturnValue_t MessageQueue::sendToDefault(MessageQueueMessageIF* message) { return sendToDefaultFrom(message, this->getId()); } ReturnValue_t MessageQueue::reply(MessageQueueMessageIF* message) { if (this->lastPartner != 0) { return sendMessageFrom(this->lastPartner, message, this->getId()); } else { return NO_REPLY_PARTNER; } } ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessageIF* message, MessageQueueId_t* receivedFrom) { ReturnValue_t status = this->receiveMessage(message); *receivedFrom = this->lastPartner; return status; } ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessageIF* message) { if(message == nullptr) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "MessageQueue::receiveMessage: Message is " "nullptr!" << std::endl; #endif return HasReturnvaluesIF::RETURN_FAILED; } if(message->getMaximumMessageSize() < maxMessageSize) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "MessageQueue::receiveMessage: Message size " << message->getMaximumMessageSize() << " too small to receive data!" << std::endl; #endif return HasReturnvaluesIF::RETURN_FAILED; } unsigned int messagePriority = 0; int status = mq_receive(id,reinterpret_cast(message->getBuffer()), message->getMaximumMessageSize(),&messagePriority); if (status > 0) { this->lastPartner = message->getSender(); //Check size of incoming message. if (message->getMessageSize() < message->getMinimumMessageSize()) { return HasReturnvaluesIF::RETURN_FAILED; } return HasReturnvaluesIF::RETURN_OK; } else if (status==0) { //Success but no message received return MessageQueueIF::EMPTY; } else { //No message was received. Keep lastPartner anyway, I might send //something later. But still, delete packet content. memset(message->getData(), 0, message->getMaximumDataSize()); switch(errno){ case EAGAIN: //O_NONBLOCK or MQ_NONBLOCK was set and there are no messages //currently on the specified queue. return MessageQueueIF::EMPTY; case EBADF: { //mqdes doesn't represent a valid queue open for reading. utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "EBADF"); break; } case EINVAL: { /* * This value indicates one of the following: * - The pointer to the buffer for storing the received message, * msg_ptr, is NULL. * - The number of bytes requested, msg_len is less than zero. * - msg_len is anything other than the mq_msgsize of the specified * queue, and the QNX extended option MQ_READBUF_DYNAMIC hasn't * been set in the queue's mq_flags. */ utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "EINVAL"); break; } case EMSGSIZE: { /* * This value indicates one of the following: * - the QNX extended option MQ_READBUF_DYNAMIC hasn't been set, * and the given msg_len is shorter than the mq_msgsize for * the given queue. * - the extended option MQ_READBUF_DYNAMIC has been set, but the * given msg_len is too short for the message that would have * been received. */ utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "EMSGSIZE"); break; } case EINTR: { //The operation was interrupted by a signal. utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "EINTR"); break; } case ETIMEDOUT: { //The operation was interrupted by a signal. utility::printUnixErrorGeneric(CLASS_NAME, "receiveMessage", "ETIMEDOUT"); break; } default: return HasReturnvaluesIF::RETURN_FAILED; } return HasReturnvaluesIF::RETURN_FAILED; } } MessageQueueId_t MessageQueue::getLastPartner() const { return this->lastPartner; } ReturnValue_t MessageQueue::flush(uint32_t* count) { mq_attr attrib; int status = mq_getattr(id,&attrib); if(status != 0){ switch(errno){ case EBADF: //mqdes doesn't represent a valid message queue. utility::printUnixErrorGeneric(CLASS_NAME, "flush", "EBADF"); break; /*NO BREAK*/ case EINVAL: //mq_attr is NULL utility::printUnixErrorGeneric(CLASS_NAME, "flush", "EINVAL"); break; default: return HasReturnvaluesIF::RETURN_FAILED; } return HasReturnvaluesIF::RETURN_FAILED; } *count = attrib.mq_curmsgs; attrib.mq_curmsgs = 0; status = mq_setattr(id,&attrib,NULL); if(status != 0){ switch(errno) { case EBADF: //mqdes doesn't represent a valid message queue. utility::printUnixErrorGeneric(CLASS_NAME, "flush", "EBADF"); break; case EINVAL: /* * This value indicates one of the following: * - mq_attr is NULL. * - MQ_MULT_NOTIFY had been set for this queue, and the given * mq_flags includes a 0 in the MQ_MULT_NOTIFY bit. Once * MQ_MULT_NOTIFY has been turned on, it may never be turned off. */ utility::printUnixErrorGeneric(CLASS_NAME, "flush", "EINVAL"); break; default: return HasReturnvaluesIF::RETURN_FAILED; } return HasReturnvaluesIF::RETURN_FAILED; } return HasReturnvaluesIF::RETURN_OK; } MessageQueueId_t MessageQueue::getId() const { return this->id; } void MessageQueue::setDefaultDestination(MessageQueueId_t defaultDestination) { this->defaultDestination = defaultDestination; } ReturnValue_t MessageQueue::sendToDefaultFrom(MessageQueueMessageIF* message, MessageQueueId_t sentFrom, bool ignoreFault) { return sendMessageFrom(defaultDestination, message, sentFrom, ignoreFault); } ReturnValue_t MessageQueue::sendMessageFrom(MessageQueueId_t sendTo, MessageQueueMessageIF* message, MessageQueueId_t sentFrom, bool ignoreFault) { return sendMessageFromMessageQueue(sendTo,message, sentFrom,ignoreFault); } MessageQueueId_t MessageQueue::getDefaultDestination() const { return this->defaultDestination; } bool MessageQueue::isDefaultDestinationSet() const { return (defaultDestination != NO_QUEUE); } uint16_t MessageQueue::queueCounter = 0; ReturnValue_t MessageQueue::sendMessageFromMessageQueue(MessageQueueId_t sendTo, MessageQueueMessageIF *message, MessageQueueId_t sentFrom, bool ignoreFault) { if(message == nullptr) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "MessageQueue::sendMessageFromMessageQueue: Message is nullptr!" << std::endl; #else sif::printError("MessageQueue::sendMessageFromMessageQueue: Message is nullptr!\n"); #endif return HasReturnvaluesIF::RETURN_FAILED; } message->setSender(sentFrom); int result = mq_send(sendTo, reinterpret_cast(message->getBuffer()), message->getMessageSize(),0); //TODO: Check if we're in ISR. if (result != 0) { if(!ignoreFault){ InternalErrorReporterIF* internalErrorReporter = ObjectManager::instance()-> get(objects::INTERNAL_ERROR_REPORTER); if (internalErrorReporter != NULL) { internalErrorReporter->queueMessageNotSent(); } } switch(errno){ case EAGAIN: //The O_NONBLOCK flag was set when opening the queue, or the //MQ_NONBLOCK flag was set in its attributes, and the //specified queue is full. return MessageQueueIF::FULL; case EBADF: { //mq_des doesn't represent a valid message queue descriptor, //or mq_des wasn't opened for writing. utility::printUnixErrorGeneric(CLASS_NAME, "sendMessageFromMessageQueue", "EBADF"); #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::warning << "mq_send to: " << sendTo << " sent from " << sentFrom << "failed" << std::endl; #else sif::printWarning("mq_send to: %d sent from %d failed\n", sendTo, sentFrom); #endif return DESTINATION_INVALID; } case EINTR: //The call was interrupted by a signal. utility::printUnixErrorGeneric(CLASS_NAME, "sendMessageFromMessageQueue", "EINTR"); break; case EINVAL: /* * This value indicates one of the following: * - msg_ptr is NULL. * - msg_len is negative. * - msg_prio is greater than MQ_PRIO_MAX. * - msg_prio is less than 0. * - MQ_PRIO_RESTRICT is set in the mq_attr of mq_des, and * msg_prio is greater than the priority of the calling process. */ utility::printUnixErrorGeneric(CLASS_NAME, "sendMessageFromMessageQueue", "EINVAL"); break; case EMSGSIZE: // The msg_len is greater than the msgsize associated with //the specified queue. utility::printUnixErrorGeneric(CLASS_NAME, "sendMessageFromMessageQueue", "EMSGSIZE"); break; default: return HasReturnvaluesIF::RETURN_FAILED; } return HasReturnvaluesIF::RETURN_FAILED; } return HasReturnvaluesIF::RETURN_OK; } ReturnValue_t MessageQueue::handleOpenError(mq_attr* attributes, uint32_t messageDepth) { switch(errno) { case(EINVAL): { utility::printUnixErrorGeneric(CLASS_NAME, "MessageQueue", "EINVAL"); size_t defaultMqMaxMsg = 0; // Not POSIX conformant, but should work for all UNIX systems. // Just an additional helpful printout :-) if(std::ifstream("/proc/sys/fs/mqueue/msg_max",std::ios::in) >> defaultMqMaxMsg and defaultMqMaxMsg < messageDepth) { /* See: https://www.man7.org/linux/man-pages/man3/mq_open.3.html This happens if the msg_max value is not large enough It is ignored if the executable is run in privileged mode. Run the unlockRealtime script or grant the mode manually by using: sudo setcap 'CAP_SYS_RESOURCE=+ep' Persistent solution for session: echo | sudo tee /proc/sys/fs/mqueue/msg_max Permanent solution: sudo nano /etc/sysctl.conf Append at end: fs/mqueue/msg_max = Apply changes with: sudo sysctl -p */ #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "MessageQueue::MessageQueue: Default MQ size " << defaultMqMaxMsg << " is too small for requested size " << messageDepth << std::endl; sif::error << "This error can be fixed by setting the maximum " "allowed message size higher!" << std::endl; #else sif::printError("MessageQueue::MessageQueue: Default MQ size %d is too small for" "requested size %d\n"); sif::printError("This error can be fixes by setting the maximum allowed" "message size higher!\n"); #endif } break; } case(EEXIST): { // An error occured during open. // We need to distinguish if it is caused by an already created queue // There's another queue with the same name // We unlink the other queue int status = mq_unlink(name); if (status != 0) { utility::printUnixErrorGeneric(CLASS_NAME, "MessageQueue", "EEXIST"); } else { // Successful unlinking, try to open again mqd_t tempId = mq_open(name, O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL, S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP, attributes); if (tempId != -1) { //Successful mq_open this->id = tempId; return HasReturnvaluesIF::RETURN_OK; } } break; } default: { // Failed either the first time or the second time utility::printUnixErrorGeneric(CLASS_NAME, "MessageQueue", "Unknown"); } } return HasReturnvaluesIF::RETURN_FAILED; }