fsfw/osal/host/MessageQueue.cpp

154 lines
4.9 KiB
C++
Raw Normal View History

2020-09-05 21:19:53 +02:00
#include "MessageQueue.h"
#include "QueueMapManager.h"
2020-09-05 20:18:52 +02:00
#include "../../serviceinterface/ServiceInterfaceStream.h"
#include "../../ipc/MutexFactory.h"
2021-03-20 12:49:15 +01:00
#include "../../ipc/MutexGuard.h"
2020-09-05 20:18:52 +02:00
2021-05-12 14:28:37 +02:00
#include <cstring>
2020-09-05 20:18:52 +02:00
MessageQueue::MessageQueue(size_t messageDepth, size_t maxMessageSize):
messageSize(maxMessageSize), messageDepth(messageDepth) {
queueLock = MutexFactory::instance()->createMutex();
auto result = QueueMapManager::instance()->addMessageQueue(this, &mqId);
if(result != HasReturnvaluesIF::RETURN_OK) {
2021-01-03 14:16:52 +01:00
#if FSFW_CPP_OSTREAM_ENABLED == 1
2020-09-05 21:19:53 +02:00
sif::error << "MessageQueue::MessageQueue:"
<< " Could not be created" << std::endl;
#endif
2020-09-05 20:18:52 +02:00
}
}
MessageQueue::~MessageQueue() {
MutexFactory::instance()->deleteMutex(queueLock);
}
ReturnValue_t MessageQueue::sendMessage(MessageQueueId_t sendTo,
MessageQueueMessageIF* message, bool ignoreFault) {
return sendMessageFrom(sendTo, message, this->getId(), ignoreFault);
}
ReturnValue_t MessageQueue::sendToDefault(MessageQueueMessageIF* message) {
return sendToDefaultFrom(message, this->getId());
}
ReturnValue_t MessageQueue::sendToDefaultFrom(MessageQueueMessageIF* message,
MessageQueueId_t sentFrom, bool ignoreFault) {
return sendMessageFrom(defaultDestination,message,sentFrom,ignoreFault);
}
ReturnValue_t MessageQueue::reply(MessageQueueMessageIF* message) {
2020-12-05 10:40:53 +01:00
if (this->lastPartner != MessageQueueIF::NO_QUEUE) {
2020-09-05 20:18:52 +02:00
return sendMessageFrom(this->lastPartner, message, this->getId());
} else {
return MessageQueueIF::NO_REPLY_PARTNER;
}
}
ReturnValue_t MessageQueue::sendMessageFrom(MessageQueueId_t sendTo,
MessageQueueMessageIF* message, MessageQueueId_t sentFrom,
bool ignoreFault) {
return sendMessageFromMessageQueue(sendTo, message, sentFrom,
ignoreFault);
}
ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessageIF* message,
MessageQueueId_t* receivedFrom) {
ReturnValue_t status = this->receiveMessage(message);
if(status == HasReturnvaluesIF::RETURN_OK) {
*receivedFrom = this->lastPartner;
}
return status;
}
ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessageIF* message) {
if(messageQueue.empty()) {
return MessageQueueIF::EMPTY;
}
2021-03-20 12:49:15 +01:00
MutexGuard mutexLock(queueLock, MutexIF::TimeoutType::WAITING, 20);
2021-04-08 20:53:54 +02:00
std::copy(messageQueue.front().data(), messageQueue.front().data() + messageSize,
message->getBuffer());
2020-09-05 20:18:52 +02:00
messageQueue.pop();
// The last partner is the first uint32_t field in the message
this->lastPartner = message->getSender();
return HasReturnvaluesIF::RETURN_OK;
}
MessageQueueId_t MessageQueue::getLastPartner() const {
return lastPartner;
}
ReturnValue_t MessageQueue::flush(uint32_t* count) {
*count = messageQueue.size();
// Clears the queue.
2021-04-08 20:53:54 +02:00
messageQueue = std::queue<std::vector<uint8_t>>();
2020-09-05 20:18:52 +02:00
return HasReturnvaluesIF::RETURN_OK;
}
MessageQueueId_t MessageQueue::getId() const {
return mqId;
}
void MessageQueue::setDefaultDestination(MessageQueueId_t defaultDestination) {
defaultDestinationSet = true;
this->defaultDestination = defaultDestination;
}
MessageQueueId_t MessageQueue::getDefaultDestination() const {
return defaultDestination;
}
bool MessageQueue::isDefaultDestinationSet() const {
return defaultDestinationSet;
}
// static core function to send messages.
ReturnValue_t MessageQueue::sendMessageFromMessageQueue(MessageQueueId_t sendTo,
MessageQueueMessageIF* message, MessageQueueId_t sentFrom,
bool ignoreFault) {
2021-04-08 19:07:03 +02:00
if(message == nullptr) {
return HasReturnvaluesIF::RETURN_FAILED;
}
2020-12-05 10:40:53 +01:00
message->setSender(sentFrom);
2020-09-05 20:18:52 +02:00
if(message->getMessageSize() > message->getMaximumMessageSize()) {
// Actually, this should never happen or an error will be emitted
// in MessageQueueMessage.
// But I will still return a failure here.
return HasReturnvaluesIF::RETURN_FAILED;
}
MessageQueue* targetQueue = dynamic_cast<MessageQueue*>(
QueueMapManager::instance()->getMessageQueue(sendTo));
if(targetQueue == nullptr) {
if(not ignoreFault) {
InternalErrorReporterIF* internalErrorReporter =
objectManager->get<InternalErrorReporterIF>(
objects::INTERNAL_ERROR_REPORTER);
if (internalErrorReporter != nullptr) {
internalErrorReporter->queueMessageNotSent();
}
}
// TODO: Better returnvalue
return HasReturnvaluesIF::RETURN_FAILED;
}
if(targetQueue->messageQueue.size() < targetQueue->messageDepth) {
2021-04-08 19:07:03 +02:00
MutexGuard mutexLock(targetQueue->queueLock, MutexIF::TimeoutType::WAITING, 20);
2021-04-08 20:53:54 +02:00
targetQueue->messageQueue.push(std::vector<uint8_t>(message->getMaximumMessageSize()));
memcpy(targetQueue->messageQueue.back().data(), message->getBuffer(),
message->getMaximumMessageSize());
2020-09-05 20:18:52 +02:00
}
else {
return MessageQueueIF::FULL;
}
return HasReturnvaluesIF::RETURN_OK;
}
2020-09-05 21:19:53 +02:00
ReturnValue_t MessageQueue::lockQueue(MutexIF::TimeoutType timeoutType,
dur_millis_t lockTimeout) {
return queueLock->lockMutex(timeoutType, lockTimeout);
2020-09-05 20:18:52 +02:00
}
ReturnValue_t MessageQueue::unlockQueue() {
return queueLock->unlockMutex();
}