decoupling from raw reciever, linux mq improvements

This commit is contained in:
Robin Müller 2020-06-06 12:41:17 +02:00
parent 8e7593d68a
commit d35524ecbc
5 changed files with 82 additions and 52 deletions

View File

@ -15,7 +15,7 @@
#include <iomanip>
object_id_t DeviceHandlerBase::powerSwitcherId = objects::NO_OBJECT;
object_id_t DeviceHandlerBase::rawDataReceiverId = 0;
object_id_t DeviceHandlerBase::rawDataReceiverId = objects::NO_OBJECT;
object_id_t DeviceHandlerBase::defaultFDIRParentId = 0;
DeviceHandlerBase::DeviceHandlerBase(object_id_t setObjectId,
@ -36,7 +36,7 @@ DeviceHandlerBase::DeviceHandlerBase(object_id_t setObjectId,
childTransitionDelay(5000),
transitionSourceMode(_MODE_POWER_DOWN), transitionSourceSubMode(
SUBMODE_NONE), deviceSwitch(setDeviceSwitch) {
commandQueue = QueueFactory::instance()->createMessageQueue(cmdQueueSize,
commandQueue = QueueFactory::instance()->createMessageQueue(1,
CommandMessage::MAX_MESSAGE_SIZE);
cookieInfo.state = COOKIE_UNUSED;
insertInCommandMap(RAW_COMMAND_ID);
@ -121,15 +121,16 @@ ReturnValue_t DeviceHandlerBase::initialize() {
return RETURN_FAILED;
}
AcceptsDeviceResponsesIF *rawReceiver = objectManager->get<
AcceptsDeviceResponsesIF>(rawDataReceiverId);
if(rawDataReceiverId != objects::NO_OBJECT) {
AcceptsDeviceResponsesIF *rawReceiver = objectManager->get<
AcceptsDeviceResponsesIF>(rawDataReceiverId);
if (rawReceiver == NULL) {
return RETURN_FAILED;
if (rawReceiver == NULL) {
return RETURN_FAILED;
}
defaultRawReceiver = rawReceiver->getDeviceQueue();
}
defaultRawReceiver = rawReceiver->getDeviceQueue();
if(powerSwitcherId != objects::NO_OBJECT) {
powerSwitcher = objectManager->get<PowerSwitchIF>(powerSwitcherId);
if (powerSwitcher == NULL) {
@ -139,6 +140,8 @@ ReturnValue_t DeviceHandlerBase::initialize() {
result = healthHelper.initialize();
if (result != RETURN_OK) {
sif::error << "DeviceHandlerBase::initialize: Health Helper "
"initialization failure" << std::endl;
return result;
}
@ -614,7 +617,7 @@ void DeviceHandlerBase::doGetRead() {
replyRawData(receivedData, receivedDataLen, requestedRawTraffic);
}
if (mode == MODE_RAW) {
if (mode == MODE_RAW and defaultRawReceiver != MessageQueueIF::NO_QUEUE) {
replyRawReplyIfnotWiretapped(receivedData, receivedDataLen);
}
else {
@ -727,7 +730,7 @@ ReturnValue_t DeviceHandlerBase::getStorageData(store_address_t storageAddress,
void DeviceHandlerBase::replyRawData(const uint8_t *data, size_t len,
MessageQueueId_t sendTo, bool isCommand) {
if (IPCStore == NULL || len == 0) {
if (IPCStore == NULL || len == 0 || sendTo == MessageQueueIF::NO_QUEUE) {
return;
}
store_address_t address;
@ -1131,35 +1134,47 @@ void DeviceHandlerBase::handleDeviceTM(SerializeIF* data,
return;
}
DeviceTmReportingWrapper wrapper(getObjectId(), replyId, data);
if (iter->second.command != deviceCommandMap.end()) {//replies to a command
//replies to a command
if (iter->second.command != deviceCommandMap.end())
{
MessageQueueId_t queueId = iter->second.command->second.sendReplyTo;
if (queueId != NO_COMMANDER) {
//This may fail, but we'll ignore the fault.
actionHelper.reportData(queueId, replyId, data);
}
//This check should make sure we get any TM but don't get anything doubled.
if (wiretappingMode == TM && (requestedRawTraffic != queueId)) {
actionHelper.reportData(requestedRawTraffic, replyId, &wrapper);
} else if (forceDirectTm && (defaultRawReceiver != queueId)) {
// hiding of sender needed so the service will handle it as unexpected Data, no matter what state
//(progress or completed) it is in
actionHelper.reportData(defaultRawReceiver, replyId, &wrapper,
true);
}
} else { //unrequested/aperiodic replies
if (wiretappingMode == TM) {
actionHelper.reportData(requestedRawTraffic, replyId, &wrapper);
} else if (forceDirectTm) {
// hiding of sender needed so the service will handle it as unexpected Data, no matter what state
//(progress or completed) it is in
else if (forceDirectTm and (defaultRawReceiver != queueId) and
(defaultRawReceiver != MessageQueueIF::NO_QUEUE))
{
// hiding of sender needed so the service will handle it as
// unexpected Data, no matter what state (progress or completed)
// it is in
actionHelper.reportData(defaultRawReceiver, replyId, &wrapper,
true);
true);
}
}
//Try to cast to GlobDataSet and commit data.
//unrequested/aperiodic replies
else
{
if (wiretappingMode == TM) {
actionHelper.reportData(requestedRawTraffic, replyId, &wrapper);
}
else if (forceDirectTm and defaultRawReceiver !=
MessageQueueIF::NO_QUEUE)
{
// hiding of sender needed so the service will handle it as
// unexpected Data, no matter what state (progress or completed)
// it is in
actionHelper.reportData(defaultRawReceiver, replyId, &wrapper,
true);
}
}
//Try to cast to GlobDataSet and commit data.
if (!neverInDataPool) {
GlobDataSet* dataSet = dynamic_cast<GlobDataSet*>(data);
if (dataSet != NULL) {

View File

@ -577,7 +577,7 @@ protected:
* Used when there is no method of finding a recipient, ie raw mode and
* reporting erroneous replies
*/
MessageQueueId_t defaultRawReceiver = 0;
MessageQueueId_t defaultRawReceiver = MessageQueueIF::NO_QUEUE;
store_address_t storedRawData;
/**

View File

@ -40,6 +40,7 @@ void HealthHelper::setParentQeueue(MessageQueueId_t parentQueue) {
ReturnValue_t HealthHelper::initialize() {
healthTable = objectManager->get<HealthTableIF>(objects::HEALTH_TABLE);
eventSender = objectManager->get<EventReportingProxyIF>(objectId);
// TODO: Better returnvalues
if ((healthTable == NULL) || eventSender == NULL) {
return HasReturnvaluesIF::RETURN_FAILED;
}

View File

@ -7,7 +7,7 @@
MessageQueue::MessageQueue(size_t messageDepth, size_t maxMessageSize): id(0),
MessageQueue::MessageQueue(uint32_t messageDepth, size_t maxMessageSize): id(0),
lastPartner(0), defaultDestination(NO_QUEUE) {
//debug << "MessageQueue::MessageQueue: Creating a queue" << std::endl;
mq_attr attributes;
@ -25,7 +25,7 @@ MessageQueue::MessageQueue(size_t messageDepth, size_t maxMessageSize): id(0),
mqd_t tempId = mq_open(name, O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL,
S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP | S_IROTH | S_IWOTH, &attributes);
if (tempId == -1) {
ReturnValue_t result = handleError(&attributes);
handleError(&attributes);
}
else {
//Successful mq_open call
@ -47,33 +47,47 @@ MessageQueue::~MessageQueue() {
}
ReturnValue_t MessageQueue::handleError(mq_attr* attributes) {
// An error occured during open
// We need to distinguish if it is caused by an already created queue
if (errno == EEXIST) {
//There's another queue with the same name
//We unlink the other queue
int status = mq_unlink(name);
if (status != 0) {
sif::error << "mq_unlink Failed with status: " << strerror(errno)
<< std::endl;
}
else {
// Successful unlinking, try to open again
mqd_t tempId = mq_open(name,
O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL,
S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP, attributes);
if (tempId != -1) {
//Successful mq_open
this->id = tempId;
return HasReturnvaluesIF::RETURN_OK;
switch(errno) {
case(EINVAL): {
sif::error << "MessageQueue::MessageQueue: Invalid Name " << std::endl;
break;
}
case(EEXIST): {
// An error occured during open
// We need to distinguish if it is caused by an already created queue
if (errno == EEXIST) {
//There's another queue with the same name
//We unlink the other queue
int status = mq_unlink(name);
if (status != 0) {
sif::error << "mq_unlink Failed with status: " << strerror(errno)
<< std::endl;
}
else {
// Successful unlinking, try to open again
mqd_t tempId = mq_open(name,
O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL,
S_IWUSR | S_IREAD | S_IWGRP | S_IRGRP, attributes);
if (tempId != -1) {
//Successful mq_open
this->id = tempId;
return HasReturnvaluesIF::RETURN_OK;
}
}
}
break;
}
default:
// Failed either the first time or the second time
sif::error << "MessageQueue::MessageQueue: Creating Queue " << std::hex
<< name << std::dec << " failed with status: "
<< strerror(errno) << std::endl;
return HasReturnvaluesIF::RETURN_FAILED;
<< name << std::dec << " failed with status: "
<< strerror(errno) << std::endl;
}
return HasReturnvaluesIF::RETURN_FAILED;
}

View File

@ -40,7 +40,7 @@ public:
* @param max_message_size With this parameter, the maximum message size can be adjusted.
* This should be left default.
*/
MessageQueue(size_t messageDepth = 3,
MessageQueue(uint32_t messageDepth = 3,
size_t maxMessageSize = MessageQueueMessage::MAX_MESSAGE_SIZE );
/**
* @brief The destructor deletes the formerly created message queue.