Refactor TM handling #450

Merged
muellerr merged 47 commits from refactor_tm_handling into develop 2023-03-11 15:05:22 +01:00
18 changed files with 96 additions and 55 deletions
Showing only changes of commit 8b26d13070 - Show all commits

View File

@ -772,7 +772,7 @@ ReturnValue_t ObjectFactory::createCcsdsComponents(CcsdsComponentArgs& args) {
new VirtualChannelWithQueue(objects::PTME_VC0_LIVE_TM, ccsds::VC0, "PTME VC0 LIVE TM", *ptme,
LINK_STATE, args.tmStore, 500);
args.liveDestination = vcWithQueue;
new LiveTmTask(objects::LIVE_TM_TASK, *vcWithQueue);
new LiveTmTask(objects::LIVE_TM_TASK, args.pusFunnel, args.cfdpFunnel, *vcWithQueue);
// Set up log store.
auto* vc = new VirtualChannel(objects::PTME_VC1_LOG_TM, ccsds::VC1, "PTME VC1 LOG TM", *ptme,

View File

@ -26,16 +26,21 @@ namespace ObjectFactory {
struct CcsdsComponentArgs {
CcsdsComponentArgs(LinuxLibgpioIF& gpioIF, StorageManagerIF& ipcStore, StorageManagerIF& tmStore,
PersistentTmStores& stores, CcsdsIpCoreHandler** ipCoreHandler)
PersistentTmStores& stores, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel,
CcsdsIpCoreHandler** ipCoreHandler)
: gpioComIF(gpioIF),
ipcStore(ipcStore),
tmStore(tmStore),
stores(stores),
pusFunnel(pusFunnel),
cfdpFunnel(cfdpFunnel),
ipCoreHandler(ipCoreHandler) {}
LinuxLibgpioIF& gpioComIF;
StorageManagerIF& ipcStore;
StorageManagerIF& tmStore;
PersistentTmStores& stores;
PusTmFunnel& pusFunnel;
CfdpTmFunnel& cfdpFunnel;
CcsdsIpCoreHandler** ipCoreHandler;
AcceptsTelemetryIF* liveDestination = nullptr;
};

View File

@ -116,10 +116,10 @@ void scheduling::initTasks() {
if (result != returnvalue::OK) {
scheduling::printAddObjectError("CFDP_DISTRIBUTOR", objects::CFDP_DISTRIBUTOR);
}
result = tmTcDistributor->addComponent(objects::TM_FUNNEL);
if (result != returnvalue::OK) {
scheduling::printAddObjectError("TM_FUNNEL", objects::TM_FUNNEL);
}
// result = tmTcDistributor->addComponent(objects::TM_FUNNEL);
// if (result != returnvalue::OK) {
// scheduling::printAddObjectError("TM_FUNNEL", objects::TM_FUNNEL);
// }
#if OBSW_ADD_TCPIP_SERVERS == 1
#if OBSW_ADD_TMTC_UDP_SERVER == 1
@ -181,6 +181,7 @@ void scheduling::initTasks() {
scheduling::printAddObjectError("PDEC Handler", objects::PDEC_HANDLER);
}
#endif /* OBSW_ADD_CCSDS_IP_CORE == 1 */
// All the TM store tasks run in permanent loops, frequency does not matter
PeriodicTaskIF* liveTmTask =
factory->createPeriodicTask("LIVE_TM", 60, PeriodicTaskIF::MINIMUM_STACK_SIZE, 1.0, nullptr);
@ -206,7 +207,6 @@ void scheduling::initTasks() {
if (result != returnvalue::OK) {
scheduling::printAddObjectError("CFDP_STORE_AND_TM", objects::CFDP_STORE_AND_TM_TASK);
}
#endif /* OBSW_ADD_CCSDS_IP_CORE == 1 */
#if OBSW_ADD_CFDP_COMPONENTS == 1
PeriodicTaskIF* cfdpTask = factory->createPeriodicTask(
@ -261,7 +261,6 @@ void scheduling::initTasks() {
PeriodicTaskIF* acsSysTask = factory->createPeriodicTask(
"ACS_SYS_TASK", 55, PeriodicTaskIF::MINIMUM_STACK_SIZE * 2, 0.4, missedDeadlineFunc);
static_cast<void>(acsSysTask);
result = acsSysTask->addComponent(objects::ACS_SUBSYSTEM);
if (result != returnvalue::OK) {
scheduling::printAddObjectError("ACS_SUBSYSTEM", objects::ACS_SUBSYSTEM);
@ -395,11 +394,11 @@ void scheduling::initTasks() {
genericSysTask->startTask();
#if OBSW_ADD_CCSDS_IP_CORES == 1
pdecHandlerTask->startTask();
#endif /* OBSW_ADD_CCSDS_IP_CORES == 1 */
liveTmTask->startTask();
logTmTask->startTask();
hkTmTask->startTask();
cfdpTmTask->startTask();
#endif /* OBSW_ADD_CCSDS_IP_CORES == 1 */
coreCtrlTask->startTask();
#if OBSW_ADD_SA_DEPL == 1

View File

@ -77,7 +77,8 @@ void ObjectFactory::produce(void* args) {
#endif /* OBSW_ADD_STAR_TRACKER == 1 */
#if OBSW_ADD_CCSDS_IP_CORES == 1
CcsdsIpCoreHandler* ipCoreHandler = nullptr;
CcsdsComponentArgs ccsdsArgs(*gpioComIF, *ipcStore, *tmStore, stores, &ipCoreHandler);
CcsdsComponentArgs ccsdsArgs(*gpioComIF, *ipcStore, *tmStore, stores, *pusFunnel, *cfdpFunnel,
&ipCoreHandler);
createCcsdsComponents(ccsdsArgs);
#if OBSW_TM_TO_PTME == 1
if (ccsdsArgs.liveDestination != nullptr) {

View File

@ -2,6 +2,7 @@
#include <fsfw/ipc/MutexGuard.h>
#include <fsfw/timemanager/Countdown.h>
#include <fsfw/timemanager/Stopwatch.h>
#include <unistd.h>
#include <cstring>

View File

@ -45,10 +45,13 @@ static constexpr uint32_t SA_DEPL_MAX_BURN_TIME = 180;
static constexpr uint32_t CCSDS_HANDLER_QUEUE_SIZE = 50;
static constexpr uint8_t NUMBER_OF_VIRTUAL_CHANNELS = 4;
static constexpr uint8_t VC0_QUEUE_SIZE = 80;
static constexpr uint8_t VC1_QUEUE_SIZE = 80;
static constexpr uint8_t VC2_QUEUE_SIZE = 50;
static constexpr uint8_t VC3_QUEUE_SIZE = 50;
static constexpr uint32_t VC0_LIVE_TM_QUEUE_SIZE = 300;
// There are three individual log stores!
static constexpr uint32_t MISC_STORE_QUEUE_SIZE = 200;
static constexpr uint32_t OK_STORE_QUEUE_SIZE = 350;
static constexpr uint32_t NOK_STORE_QUEUE_SIZE = 350;
static constexpr uint32_t HK_STORE_QUEUE_SIZE = 300;
static constexpr uint32_t CFDP_STORE_QUEUE_SIZE = 300;
static constexpr uint32_t MAX_PUS_FUNNEL_QUEUE_DEPTH = 100;

View File

@ -160,7 +160,8 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
PersistentTmStoreArgs storeArgs(objects::MISC_TM_STORE, "tm", "misc",
RolloverInterval::HOURLY, 2, **tmStore, sdcMan);
stores.miscStore = new PersistentTmStoreWithTmQueue(storeArgs, "MISC STORE", 500);
stores.miscStore =
new PersistentTmStoreWithTmQueue(storeArgs, "MISC STORE", config::MISC_STORE_QUEUE_SIZE);
(*pusFunnel)
->addPersistentTmStoreRouting(filters::miscFilter(),
stores.miscStore->getReportReceptionQueue(0));
@ -171,7 +172,8 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
{
PersistentTmStoreArgs storeArgs(objects::OK_TM_STORE, "tm", "ok", RolloverInterval::MINUTELY,
30, **tmStore, sdcMan);
stores.okStore = new PersistentTmStoreWithTmQueue(storeArgs, "OK STORE", 500);
stores.okStore =
new PersistentTmStoreWithTmQueue(storeArgs, "OK STORE", config::OK_STORE_QUEUE_SIZE);
(*pusFunnel)
->addPersistentTmStoreRouting(filters::okFilter(),
stores.okStore->getReportReceptionQueue(0));
@ -182,7 +184,8 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
{
PersistentTmStoreArgs storeArgs(objects::NOT_OK_TM_STORE, "tm", "nok",
RolloverInterval::MINUTELY, 30, **tmStore, sdcMan);
stores.notOkStore = new PersistentTmStoreWithTmQueue(storeArgs, "NOT OK STORE", 500);
stores.notOkStore =
new PersistentTmStoreWithTmQueue(storeArgs, "NOT OK STORE", config::NOK_STORE_QUEUE_SIZE);
(*pusFunnel)
->addPersistentTmStoreRouting(filters::notOkFilter(),
stores.notOkStore->getReportReceptionQueue(0));
@ -193,7 +196,8 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
{
PersistentTmStoreArgs storeArgs(objects::HK_TM_STORE, "tm", "hk", RolloverInterval::MINUTELY,
15, **tmStore, sdcMan);
stores.hkStore = new PersistentTmStoreWithTmQueue(storeArgs, "HK STORE", 500);
stores.hkStore =
new PersistentTmStoreWithTmQueue(storeArgs, "HK STORE", config::HK_STORE_QUEUE_SIZE);
(*pusFunnel)
->addPersistentTmStoreRouting(filters::hkFilter(),
stores.hkStore->getReportReceptionQueue(0));
@ -204,7 +208,8 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
{
PersistentTmStoreArgs storeArgs(objects::CFDP_TM_STORE, "tm", "cfdp",
RolloverInterval::MINUTELY, 30, **tmStore, sdcMan);
stores.cfdpStore = new PersistentTmStoreWithTmQueue(storeArgs, "CFDP STORE", 500);
stores.cfdpStore =
new PersistentTmStoreWithTmQueue(storeArgs, "CFDP STORE", config::CFDP_STORE_QUEUE_SIZE);
(*pusFunnel)
->addPersistentTmStoreRouting(filters::cfdpFilter(),
@ -221,8 +226,6 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
(*pusFunnel)->addLiveDestination("TCP Server", *tcpBridge, 0);
#endif
#endif
// Every TM packet goes through this funnel
new TmFunnelHandler(objects::TM_FUNNEL, **pusFunnel, **cfdpFunnel);
// PUS service stack
new Service1TelecommandVerification(objects::PUS_SERVICE_1_VERIFICATION, config::EIVE_PUS_APID,

View File

@ -1,16 +1,24 @@
#include "LiveTmTask.h"
#include <fsfw/tasks/TaskFactory.h>
#include <fsfw/timemanager/Stopwatch.h>
LiveTmTask::LiveTmTask(object_id_t objectId, VirtualChannelWithQueue& channel)
: SystemObject(objectId), channel(channel) {}
LiveTmTask::LiveTmTask(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel,
VirtualChannelWithQueue& channel)
: SystemObject(objectId), pusFunnel(pusFunnel), cfdpFunnel(cfdpFunnel), channel(channel) {}
ReturnValue_t LiveTmTask::performOperation(uint8_t opCode) {
while (true) {
// The funnel tasks are scheduled here directly as well.
ReturnValue_t result = channel.sendNextTm();
if (result == MessageQueueIF::EMPTY) {
// 5 ms IDLE delay. Might tweak this in the future.
TaskFactory::delayTask(5);
if (tmFunnelCd.hasTimedOut()) {
pusFunnel.performOperation(0);
cfdpFunnel.performOperation(0);
tmFunnelCd.resetTimer();
}
// 40 ms IDLE delay. Might tweak this in the future.
TaskFactory::delayTask(40);
}
}
}

View File

@ -3,15 +3,22 @@
#include <fsfw/objectmanager/SystemObject.h>
#include <fsfw/tasks/ExecutableObjectIF.h>
#include <fsfw/timemanager/Countdown.h>
#include <mission/tmtc/CfdpTmFunnel.h>
#include <mission/tmtc/PusTmFunnel.h>
#include <mission/tmtc/VirtualChannelWithQueue.h>
class LiveTmTask : public SystemObject, public ExecutableObjectIF {
public:
LiveTmTask(object_id_t objectId, VirtualChannelWithQueue& channel);
LiveTmTask(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel,
VirtualChannelWithQueue& channel);
ReturnValue_t performOperation(uint8_t opCode) override;
private:
Countdown tmFunnelCd = Countdown(100);
PusTmFunnel& pusFunnel;
CfdpTmFunnel& cfdpFunnel;
VirtualChannelWithQueue& channel;
};

View File

@ -1,6 +1,7 @@
#include "PersistentLogTmStoreTask.h"
#include <fsfw/tasks/TaskFactory.h>
#include <fsfw/timemanager/Stopwatch.h>
PersistentLogTmStoreTask::PersistentLogTmStoreTask(object_id_t objectId, StorageManagerIF& ipcStore,
LogStores stores, VirtualChannel& channel,
@ -14,30 +15,30 @@ ReturnValue_t PersistentLogTmStoreTask::performOperation(uint8_t opCode) {
}
bool someonesBusy = false;
bool busy = false;
busy = handleOneStore(stores.okStore);
busy = handleOneStore(stores.okStore, tcHandlingCd);
if (busy) {
someonesBusy = true;
}
busy = handleOneStore(stores.notOkStore);
busy = handleOneStore(stores.notOkStore, tcHandlingCd);
if (busy) {
someonesBusy = true;
}
busy = handleOneStore(stores.miscStore);
busy = handleOneStore(stores.miscStore, tcHandlingCd);
if (busy) {
someonesBusy = true;
}
if (not someonesBusy) {
TaskFactory::delayTask(10);
TaskFactory::delayTask(40);
}
}
}
void PersistentLogTmStoreTask::initStoresIfPossible() {
bool PersistentLogTmStoreTask::initStoresIfPossible() {
if (sdcMan.isSdCardUsable(std::nullopt)) {
stores.okStore.initializeTmStore();
stores.miscStore.initializeTmStore();
stores.notOkStore.initializeTmStore();
storesInitialized = true;
return true;
}
return false;
}

View File

@ -28,8 +28,9 @@ class PersistentLogTmStoreTask : public TmStoreTaskBase, public ExecutableObject
private:
LogStores stores;
Countdown tcHandlingCd = Countdown(400);
void initStoresIfPossible();
bool initStoresIfPossible();
};
#endif /* MISSION_TMTC_PERSISTENTLOGTMSTORETASK_H_ */

View File

@ -1,4 +1,5 @@
#include <fsfw/tasks/TaskFactory.h>
#include <fsfw/timemanager/Stopwatch.h>
#include <mission/tmtc/PersistentSingleTmStoreTask.h>
PersistentSingleTmStoreTask::PersistentSingleTmStoreTask(object_id_t objectId,
@ -10,14 +11,21 @@ PersistentSingleTmStoreTask::PersistentSingleTmStoreTask(object_id_t objectId,
ReturnValue_t PersistentSingleTmStoreTask::performOperation(uint8_t opCode) {
while (true) {
// Delay done by the check
if (not cyclicStoreCheck()) {
continue;
}
bool busy = handleOneStore(storeWithQueue);
bool busy = handleOneStore(storeWithQueue, tcHandlingCd);
if (not busy) {
TaskFactory::delayTask(10);
TaskFactory::delayTask(40);
}
}
}
void PersistentSingleTmStoreTask::initStoresIfPossible() {}
bool PersistentSingleTmStoreTask::initStoresIfPossible() {
if (sdcMan.isSdCardUsable(std::nullopt)) {
storeWithQueue.initializeTmStore();
return true;
}
return false;
}

View File

@ -17,8 +17,9 @@ class PersistentSingleTmStoreTask : public TmStoreTaskBase, public ExecutableObj
private:
PersistentTmStoreWithTmQueue& storeWithQueue;
Countdown tcHandlingCd = Countdown(400);
void initStoresIfPossible();
bool initStoresIfPossible();
};
#endif /* MISSION_TMTC_PERSISTENTSINGLETMSTORETASK_H_ */

View File

@ -66,8 +66,8 @@ ReturnValue_t PusTmFunnel::handleTmPacket(TmTcMessage &message) {
MessageQueueId_t destination;
if (persistentTmMap.packetMatches(packet, destination)) {
store_address_t storageId;
TmTcMessage msg(storageId);
result = tmStore.addData(&storageId, packetData, size);
TmTcMessage msg(storageId);
if (result != returnvalue::OK) {
sif::error << "PusLiveDemux::handlePacket: Store too full to create data copy" << std::endl;
} else {

View File

@ -1,25 +1,22 @@
#include "TmStoreTaskBase.h"
#include <fsfw/tasks/TaskFactory.h>
#include <fsfw/timemanager/Stopwatch.h>
TmStoreTaskBase::TmStoreTaskBase(object_id_t objectId, StorageManagerIF& ipcStore,
VirtualChannel& channel, SdCardMountedIF& sdcMan)
: SystemObject(objectId), ipcStore(ipcStore), channel(channel), sdcMan(sdcMan) {}
bool TmStoreTaskBase::handleOneStore(PersistentTmStoreWithTmQueue& store) {
bool TmStoreTaskBase::handleOneStore(PersistentTmStoreWithTmQueue& store, Countdown& tcHandlingCd) {
ReturnValue_t result;
bool tmToStoreReceived = true;
bool tcRequestReceived = true;
bool dumpsPerformed = false;
// Store TM persistently
ReturnValue_t result = store.handleNextTm();
if (result == MessageQueueIF::NO_QUEUE) {
result = store.handleNextTm();
if (result == MessageQueueIF::EMPTY) {
tmToStoreReceived = false;
}
// Handle TC requests, for example deletion or retrieval requests.
result = store.handleCommandQueue(ipcStore);
if (result == MessageQueueIF::NO_QUEUE) {
tcRequestReceived = false;
}
// Dump TMs when applicable
if (store.getState() == PersistentTmStore::State::DUMPING) {
size_t dumpedLen;
@ -31,6 +28,12 @@ bool TmStoreTaskBase::handleOneStore(PersistentTmStoreWithTmQueue& store) {
if (result == returnvalue::OK) {
dumpsPerformed = true;
}
} else {
// Handle TC requests, for example deletion or retrieval requests.
result = store.handleCommandQueue(ipcStore);
if (result == MessageQueueIF::EMPTY) {
tcRequestReceived = false;
}
}
if (tcRequestReceived or tmToStoreReceived or dumpsPerformed) {
return true;
@ -40,7 +43,7 @@ bool TmStoreTaskBase::handleOneStore(PersistentTmStoreWithTmQueue& store) {
bool TmStoreTaskBase::cyclicStoreCheck() {
if (not storesInitialized) {
initStoresIfPossible();
storesInitialized = initStoresIfPossible();
if (not storesInitialized) {
TaskFactory::delayTask(400);
return false;

View File

@ -16,7 +16,7 @@ class TmStoreTaskBase : public SystemObject {
* @param store
* @return
*/
bool handleOneStore(PersistentTmStoreWithTmQueue& store);
bool handleOneStore(PersistentTmStoreWithTmQueue& store, Countdown& tcHandlingCd);
/**
* Occasionally check whether SD card is okay to be used. If not, poll whether it is ready to
@ -24,7 +24,7 @@ class TmStoreTaskBase : public SystemObject {
*/
bool cyclicStoreCheck();
virtual void initStoresIfPossible() = 0;
virtual bool initStoresIfPossible() = 0;
StorageManagerIF& ipcStore;
Countdown sdCardCheckCd = Countdown(800);

View File

@ -11,7 +11,7 @@ VirtualChannelWithQueue::VirtualChannelWithQueue(object_id_t objectId, uint8_t v
const char* vcName, PtmeIF& ptme,
const std::atomic_bool& linkStateProvider,
StorageManagerIF& tmStore, uint32_t tmQueueDepth)
: VirtualChannel(objectId, vcId, vcName, ptme, linkStateProvider) {
: VirtualChannel(objectId, vcId, vcName, ptme, linkStateProvider), tmStore(tmStore) {
auto mqArgs = MqArgs(getObjectId(), reinterpret_cast<void*>(getVcid()));
tmQueue = QueueFactory::instance()->createMessageQueue(
tmQueueDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs);
@ -28,16 +28,16 @@ ReturnValue_t VirtualChannelWithQueue::sendNextTm() {
store_address_t storeId = message.getStorageId();
const uint8_t* data = nullptr;
size_t size = 0;
result = tmStore->getData(storeId, &data, &size);
result = tmStore.getData(storeId, &data, &size);
if (result != returnvalue::OK) {
sif::warning << "VirtualChannel::performOperation: Failed to read data from TM store"
<< std::endl;
tmStore->deleteData(storeId);
tmStore.deleteData(storeId);
return result;
}
write(data, size);
tmStore->deleteData(storeId);
tmStore.deleteData(storeId);
if (result != returnvalue::OK) {
return result;
}

View File

@ -38,5 +38,5 @@ class VirtualChannelWithQueue : public VirtualChannel, public AcceptsTelemetryIF
private:
MessageQueueIF* tmQueue = nullptr;
StorageManagerIF* tmStore = nullptr;
StorageManagerIF& tmStore;
};