switched to new tmtc stack API

This commit is contained in:
2022-07-20 22:21:15 +02:00
parent 9860061fc6
commit be35bd53a6
51 changed files with 753 additions and 374 deletions

View File

@ -8,6 +8,7 @@
#include "fsfw/tmtcpacket/pus/tm.h"
#include "fsfw/tmtcservices/AcceptsTelemetryIF.h"
#include "fsfw/tmtcservices/TmTcMessage.h"
#include "fsfw/tmtcservices/sendAndStoreHelper.h"
object_id_t CommandingServiceBase::defaultPacketSource = objects::NO_OBJECT;
object_id_t CommandingServiceBase::defaultPacketDestination = objects::NO_OBJECT;
@ -19,17 +20,19 @@ CommandingServiceBase::CommandingServiceBase(object_id_t setObjectId, uint16_t a
apid(apid),
service(service),
timeoutSeconds(commandTimeoutSeconds),
tmStoreHelper(apid, nullptr),
tmSendHelper(nullptr),
commandMap(numberOfParallelCommands) {
commandQueue = QueueFactory::instance()->createMessageQueue(queueDepth);
requestQueue = QueueFactory::instance()->createMessageQueue(queueDepth);
}
void CommandingServiceBase::setPacketSource(object_id_t packetSource) {
this->packetSource = packetSource;
void CommandingServiceBase::setPacketSource(object_id_t packetSource_) {
packetSource = packetSource_;
}
void CommandingServiceBase::setPacketDestination(object_id_t packetDestination) {
this->packetDestination = packetDestination;
void CommandingServiceBase::setPacketDestination(object_id_t packetDestination_) {
packetDestination = packetDestination_;
}
CommandingServiceBase::~CommandingServiceBase() {
@ -58,13 +61,12 @@ ReturnValue_t CommandingServiceBase::initialize() {
if (packetDestination == objects::NO_OBJECT) {
packetDestination = defaultPacketDestination;
}
AcceptsTelemetryIF* packetForwarding =
ObjectManager::instance()->get<AcceptsTelemetryIF>(packetDestination);
auto* packetForwarding = ObjectManager::instance()->get<AcceptsTelemetryIF>(packetDestination);
if (packetSource == objects::NO_OBJECT) {
packetSource = defaultPacketSource;
}
PUSDistributorIF* distributor = ObjectManager::instance()->get<PUSDistributorIF>(packetSource);
auto* distributor = ObjectManager::instance()->get<PUSDistributorIF>(packetSource);
if (packetForwarding == nullptr or distributor == nullptr) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
@ -78,10 +80,10 @@ ReturnValue_t CommandingServiceBase::initialize() {
distributor->registerService(this);
requestQueue->setDefaultDestination(packetForwarding->getReportReceptionQueue());
IPCStore = ObjectManager::instance()->get<StorageManagerIF>(objects::IPC_STORE);
TCStore = ObjectManager::instance()->get<StorageManagerIF>(objects::TC_STORE);
ipcStore = ObjectManager::instance()->get<StorageManagerIF>(objects::IPC_STORE);
tcStore = ObjectManager::instance()->get<StorageManagerIF>(objects::TC_STORE);
if (IPCStore == nullptr or TCStore == nullptr) {
if (ipcStore == nullptr or tcStore == nullptr) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "CommandingServiceBase::intialize: IPC store or TC store "
"not initialized yet!"
@ -89,7 +91,20 @@ ReturnValue_t CommandingServiceBase::initialize() {
#endif
return ObjectManagerIF::CHILD_INIT_FAILED;
}
if (tmStoreHelper.getTmStore() == nullptr) {
auto* tmStore = ObjectManager::instance()->get<StorageManagerIF>(objects::TM_STORE);
if (tmStore == nullptr) {
return ObjectManagerIF::CHILD_INIT_FAILED;
}
tmStoreHelper.setTmStore(tmStore);
}
if (errReporter == nullptr) {
errReporter =
ObjectManager::instance()->get<InternalErrorReporterIF>(objects::INTERNAL_ERROR_REPORTER);
if (errReporter != nullptr) {
tmSendHelper.setInternalErrorReporter(errReporter);
}
}
return RETURN_OK;
}
@ -226,23 +241,31 @@ void CommandingServiceBase::handleRequestQueue() {
TmTcMessage message;
ReturnValue_t result;
store_address_t address;
TcPacketStoredPus packet;
MessageQueueId_t queue;
object_id_t objectId;
for (result = requestQueue->receiveMessage(&message); result == RETURN_OK;
result = requestQueue->receiveMessage(&message)) {
address = message.getStorageId();
packet.setStoreAddress(address, &packet);
const uint8_t* dataPtr;
size_t dataLen = 0;
result = tcStore->getData(message.getStorageId(), &dataPtr, &dataLen);
if (result != HasReturnvaluesIF::RETURN_OK) {
// TODO: Warning?
}
tcReader.setReadOnlyData(dataPtr, dataLen);
if ((packet.getSubService() == 0) or (isValidSubservice(packet.getSubService()) != RETURN_OK)) {
rejectPacket(tc_verification::START_FAILURE, &packet, INVALID_SUBSERVICE);
if ((tcReader.getSubService() == 0) or
(isValidSubservice(tcReader.getSubService()) != RETURN_OK)) {
rejectPacket(tc_verification::START_FAILURE, address, &tcReader, INVALID_SUBSERVICE);
continue;
}
result = getMessageQueueAndObject(packet.getSubService(), packet.getApplicationData(),
packet.getApplicationDataSize(), &queue, &objectId);
size_t appDataLen = 0;
const uint8_t* appData = tcReader.getUserData(appDataLen);
result =
getMessageQueueAndObject(tcReader.getSubService(), appData, appDataLen, &queue, &objectId);
if (result != HasReturnvaluesIF::RETURN_OK) {
rejectPacket(tc_verification::START_FAILURE, &packet, result);
rejectPacket(tc_verification::START_FAILURE, address, &tcReader, result);
continue;
}
@ -253,33 +276,26 @@ void CommandingServiceBase::handleRequestQueue() {
if (iter != commandMap.end()) {
result = iter->second.fifo.insert(address);
if (result != RETURN_OK) {
rejectPacket(tc_verification::START_FAILURE, &packet, OBJECT_BUSY);
rejectPacket(tc_verification::START_FAILURE, address, &tcReader, OBJECT_BUSY);
}
} else {
CommandInfo newInfo; // Info will be set by startExecution if neccessary
newInfo.objectId = objectId;
result = commandMap.insert(queue, newInfo, &iter);
if (result != RETURN_OK) {
rejectPacket(tc_verification::START_FAILURE, &packet, BUSY);
rejectPacket(tc_verification::START_FAILURE, address, &tcReader, BUSY);
} else {
startExecution(&packet, iter);
startExecution(address, &tcReader, iter);
}
}
}
}
ReturnValue_t CommandingServiceBase::sendTmPacket(uint8_t subservice, const uint8_t* data,
size_t dataLen, const uint8_t* headerData,
size_t headerSize) {
#if FSFW_USE_PUS_C_TELEMETRY == 0
TmPacketStoredPusA tmPacketStored(this->apid, this->service, subservice, this->tmPacketCounter,
data, dataLen, headerData, headerSize);
#else
TmPacketStoredPusC tmPacketStored(this->apid, this->service, subservice, this->tmPacketCounter,
data, dataLen, headerData, headerSize);
#endif
ReturnValue_t result =
tmPacketStored.sendPacket(requestQueue->getDefaultDestination(), requestQueue->getId());
ReturnValue_t CommandingServiceBase::sendTmPacket(uint8_t subservice, const uint8_t* sourceData,
size_t sourceDataLen) {
tmStoreHelper.preparePacket(service, subservice, tmPacketCounter);
tmStoreHelper.setSourceDataRaw(sourceData, sourceDataLen);
ReturnValue_t result = tm::storeAndSendTmPacket(tmStoreHelper, tmSendHelper);
if (result == HasReturnvaluesIF::RETURN_OK) {
this->tmPacketCounter++;
}
@ -288,54 +304,38 @@ ReturnValue_t CommandingServiceBase::sendTmPacket(uint8_t subservice, const uint
ReturnValue_t CommandingServiceBase::sendTmPacket(uint8_t subservice, object_id_t objectId,
const uint8_t* data, size_t dataLen) {
uint8_t buffer[sizeof(object_id_t)];
uint8_t* pBuffer = buffer;
size_t size = 0;
SerializeAdapter::serialize(&objectId, &pBuffer, &size, sizeof(object_id_t),
SerializeIF::Endianness::BIG);
#if FSFW_USE_PUS_C_TELEMETRY == 0
TmPacketStoredPusA tmPacketStored(this->apid, this->service, subservice, this->tmPacketCounter,
data, dataLen, buffer, size);
#else
TmPacketStoredPusC tmPacketStored(this->apid, this->service, subservice, this->tmPacketCounter,
data, dataLen, buffer, size);
#endif
ReturnValue_t result =
tmPacketStored.sendPacket(requestQueue->getDefaultDestination(), requestQueue->getId());
tm::SourceDataWithObjectIdPrefix dataWithObjId(objectId, data, dataLen);
tmStoreHelper.preparePacket(service, subservice, tmPacketCounter);
tmStoreHelper.setSourceDataSerializable(&dataWithObjId);
ReturnValue_t result = tm::storeAndSendTmPacket(tmStoreHelper, tmSendHelper);
if (result == HasReturnvaluesIF::RETURN_OK) {
this->tmPacketCounter++;
}
return result;
}
ReturnValue_t CommandingServiceBase::sendTmPacket(uint8_t subservice, SerializeIF* content,
SerializeIF* header) {
#if FSFW_USE_PUS_C_TELEMETRY == 0
TmPacketStoredPusA tmPacketStored(this->apid, this->service, subservice, this->tmPacketCounter,
content, header);
#else
TmPacketStoredPusC tmPacketStored(this->apid, this->service, subservice, this->tmPacketCounter,
content, header);
#endif
ReturnValue_t result =
tmPacketStored.sendPacket(requestQueue->getDefaultDestination(), requestQueue->getId());
ReturnValue_t CommandingServiceBase::sendTmPacket(uint8_t subservice, SerializeIF* sourceData) {
tmStoreHelper.preparePacket(service, subservice, tmPacketCounter);
tmStoreHelper.setSourceDataSerializable(sourceData);
ReturnValue_t result = tm::storeAndSendTmPacket(tmStoreHelper, tmSendHelper);
if (result == HasReturnvaluesIF::RETURN_OK) {
this->tmPacketCounter++;
}
return result;
}
void CommandingServiceBase::startExecution(TcPacketStoredPus* storedPacket, CommandMapIter iter) {
void CommandingServiceBase::startExecution(store_address_t storeId, PusTcReader* storedPacket,
CommandMapIter iter) {
ReturnValue_t result = RETURN_OK;
CommandMessage command;
// TcPacketPusBase* tcPacketBase = storedPacket->getPacketBase();
if (storedPacket == nullptr) {
return;
}
iter->second.subservice = storedPacket->getSubService();
result = prepareCommand(&command, iter->second.subservice, storedPacket->getApplicationData(),
storedPacket->getApplicationDataSize(), &iter->second.state,
iter->second.objectId);
size_t appDataLen = 0;
const uint8_t* appData = storedPacket->getUserData(appDataLen);
result = prepareCommand(&command, iter->second.subservice, appData, appDataLen,
&iter->second.state, iter->second.objectId);
ReturnValue_t sendResult = RETURN_OK;
switch (result) {
@ -349,12 +349,12 @@ void CommandingServiceBase::startExecution(TcPacketStoredPus* storedPacket, Comm
iter->second.subservice = storedPacket->getSubService();
iter->second.command = command.getCommand();
iter->second.tcInfo.ackFlags = storedPacket->getAcknowledgeFlags();
iter->second.tcInfo.tcPacketId = storedPacket->getPacketId();
iter->second.tcInfo.tcSequenceControl = storedPacket->getPacketSeqCtrl();
acceptPacket(tc_verification::START_SUCCESS, storedPacket);
iter->second.tcInfo.tcPacketId = storedPacket->getPacketIdRaw();
iter->second.tcInfo.tcSequenceControl = storedPacket->getPacketSeqCtrlRaw();
acceptPacket(tc_verification::START_SUCCESS, storeId, storedPacket);
} else {
command.clearCommandMessage();
rejectPacket(tc_verification::START_FAILURE, storedPacket, sendResult);
rejectPacket(tc_verification::START_FAILURE, storeId, storedPacket, sendResult);
checkAndExecuteFifo(iter);
}
break;
@ -364,33 +364,32 @@ void CommandingServiceBase::startExecution(TcPacketStoredPus* storedPacket, Comm
sendResult = commandQueue->sendMessage(iter.value->first, &command);
}
if (sendResult == RETURN_OK) {
verificationReporter.sendSuccessReport(tc_verification::START_SUCCESS,
storedPacket->getPacketBase());
acceptPacket(tc_verification::COMPLETION_SUCCESS, storedPacket);
verificationReporter.sendSuccessReport(tc_verification::START_SUCCESS, storedPacket);
acceptPacket(tc_verification::COMPLETION_SUCCESS, storeId, storedPacket);
checkAndExecuteFifo(iter);
} else {
command.clearCommandMessage();
rejectPacket(tc_verification::START_FAILURE, storedPacket, sendResult);
rejectPacket(tc_verification::START_FAILURE, storeId, storedPacket, sendResult);
checkAndExecuteFifo(iter);
}
break;
default:
rejectPacket(tc_verification::START_FAILURE, storedPacket, result);
rejectPacket(tc_verification::START_FAILURE, storeId, storedPacket, result);
checkAndExecuteFifo(iter);
break;
}
}
void CommandingServiceBase::rejectPacket(uint8_t reportId, TcPacketStoredPus* packet,
ReturnValue_t errorCode) {
verificationReporter.sendFailureReport(reportId, dynamic_cast<TcPacketPusBase*>(packet),
errorCode);
packet->deletePacket();
void CommandingServiceBase::rejectPacket(uint8_t reportId, store_address_t tcStoreId,
PusTcReader* correspondingTc, ReturnValue_t errorCode) {
verificationReporter.sendFailureReport(reportId, correspondingTc, errorCode);
tcStore->deleteData(tcStoreId);
}
void CommandingServiceBase::acceptPacket(uint8_t reportId, TcPacketStoredPus* packet) {
verificationReporter.sendSuccessReport(reportId, dynamic_cast<TcPacketPusBase*>(packet));
packet->deletePacket();
void CommandingServiceBase::acceptPacket(uint8_t reportId, store_address_t tcStoreId,
PusTcReader* packet) {
verificationReporter.sendSuccessReport(reportId, packet);
tcStore->deleteData(tcStoreId);
}
void CommandingServiceBase::checkAndExecuteFifo(CommandMapIter& iter) {
@ -398,8 +397,15 @@ void CommandingServiceBase::checkAndExecuteFifo(CommandMapIter& iter) {
if (iter->second.fifo.retrieve(&address) != RETURN_OK) {
commandMap.erase(&iter);
} else {
TcPacketStoredPus newPacket(address);
startExecution(&newPacket, iter);
const uint8_t* dataPtr;
size_t dataLen = 0;
ReturnValue_t result = tcStore->getData(address, &dataPtr, &dataLen);
if (result == HasReturnvaluesIF::RETURN_OK) {
tcReader.setReadOnlyData(dataPtr, dataLen);
startExecution(address, &tcReader, iter);
} else {
// TODO: Warning?
}
}
}
@ -426,3 +432,7 @@ void CommandingServiceBase::checkTimeout() {
}
void CommandingServiceBase::setTaskIF(PeriodicTaskIF* task_) { executingTask = task_; }
void CommandingServiceBase::setCustomTmStore(StorageManagerIF* store) {
tmStoreHelper.setTmStore(store);
}