subscription mechanism almost complete

This commit is contained in:
Robin Müller 2020-10-14 20:05:39 +02:00
parent a9a31308ae
commit ab603abada
3 changed files with 64 additions and 4 deletions

View File

@ -96,8 +96,11 @@ public:
* @details * @details
* Can be overriden by the child class to handle changed datasets. * Can be overriden by the child class to handle changed datasets.
* @param sid * @param sid
* @param storeId If a snapshot was requested, data will be located inside
* the IPC store with this store ID.
*/ */
virtual void handleChangedDatasetOrVariable(sid_t sid) { virtual void handleChangedDataset(sid_t sid,
store_address_t storeId = storeId::INVALID_STORE_ADDRESS) {
return; return;
} }
@ -107,8 +110,11 @@ public:
* @details * @details
* Can be overriden by the child class to handle changed pool IDs. * Can be overriden by the child class to handle changed pool IDs.
* @param sid * @param sid
* @param storeId If a snapshot was requested, data will be located inside
* the IPC store with this store ID.
*/ */
virtual void handleChangedPoolVariable(lp_id_t poolId) { virtual void handleChangedPoolVariable(lp_id_t poolId,
store_address_t storeId = storeId::INVALID_STORE_ADDRESS) {
return; return;
} }

View File

@ -129,6 +129,14 @@ ReturnValue_t LocalDataPoolManager::performHkOperation() {
} }
if(poolObj->hasChanged()) { if(poolObj->hasChanged()) {
// prepare and send update notification. // prepare and send update notification.
CommandMessage notification;
HousekeepingMessage::setUpdateNotificationVariableCommand(
&notification, receiver.dataId.localPoolId);
ReturnValue_t result = hkQueue->sendMessage(
receiver.destinationQueue, &notification);
if(result != HasReturnvaluesIF::RETURN_OK) {
status = result;
}
toReset = poolObj; toReset = poolObj;
} }
@ -169,6 +177,16 @@ ReturnValue_t LocalDataPoolManager::performHkOperation() {
} }
if(poolObj->hasChanged()) { if(poolObj->hasChanged()) {
// prepare and send update snapshot. // prepare and send update snapshot.
CommandMessage notification;
// todo: serialize into store with timestamp.
store_address_t storeId;
HousekeepingMessage::setUpdateSnapshotSetCommand(
&notification, receiver.dataId.sid, storeId);
ReturnValue_t result = hkQueue->sendMessage(
receiver.destinationQueue, &notification);
if(result != HasReturnvaluesIF::RETURN_OK) {
status = result;
}
toReset = poolObj; toReset = poolObj;
} }
} }
@ -180,6 +198,16 @@ ReturnValue_t LocalDataPoolManager::performHkOperation() {
} }
if(dataSet->hasChanged()) { if(dataSet->hasChanged()) {
// prepare and send update snapshot. // prepare and send update snapshot.
CommandMessage notification;
// todo: serialize into store with timestamp.
store_address_t storeId;
HousekeepingMessage::setUpdateSnapshotVariableCommand(
&notification, receiver.dataId.localPoolId, storeId);
ReturnValue_t result = hkQueue->sendMessage(
receiver.destinationQueue, &notification);
if(result != HasReturnvaluesIF::RETURN_OK) {
status = result;
}
toReset = dataSet; toReset = dataSet;
} }
} }
@ -384,6 +412,7 @@ ReturnValue_t LocalDataPoolManager::handleHousekeepingMessage(
sid_t sid = HousekeepingMessage::getSid(message); sid_t sid = HousekeepingMessage::getSid(message);
ReturnValue_t result = HasReturnvaluesIF::RETURN_OK; ReturnValue_t result = HasReturnvaluesIF::RETURN_OK;
switch(command) { switch(command) {
// Houskeeping interface handling.
case(HousekeepingMessage::ENABLE_PERIODIC_DIAGNOSTICS_GENERATION): { case(HousekeepingMessage::ENABLE_PERIODIC_DIAGNOSTICS_GENERATION): {
result = togglePeriodicGeneration(sid, true, true); result = togglePeriodicGeneration(sid, true, true);
break; break;
@ -438,8 +467,28 @@ ReturnValue_t LocalDataPoolManager::handleHousekeepingMessage(
dataSet, true); dataSet, true);
} }
// Notification handling.
case(HousekeepingMessage::UPDATE_NOTIFICATION_SET): { case(HousekeepingMessage::UPDATE_NOTIFICATION_SET): {
owner->handleChangedDatasetOrVariable(sid); owner->handleChangedDataset(sid);
return HasReturnvaluesIF::RETURN_OK;
}
case(HousekeepingMessage::UPDATE_NOTIFICATION_VARIABLE): {
lp_id_t locPoolId = HousekeepingMessage::
getUpdateNotificationVariableCommand(message);
owner->handleChangedPoolVariable(locPoolId);
return HasReturnvaluesIF::RETURN_OK;
}
case(HousekeepingMessage::UPDATE_SNAPSHOT_SET): {
store_address_t storeId;
HousekeepingMessage::getUpdateSnapshotSetCommand(message, &storeId);
owner->handleChangedDataset(sid, storeId);
return HasReturnvaluesIF::RETURN_OK;
}
case(HousekeepingMessage::UPDATE_SNAPSHOT_VARIABLE): {
store_address_t storeId;
lp_id_t localPoolId = HousekeepingMessage::
getUpdateSnapshotVariableCommand(message, &storeId);
owner->handleChangedPoolVariable(localPoolId, storeId);
return HasReturnvaluesIF::RETURN_OK; return HasReturnvaluesIF::RETURN_OK;
} }

View File

@ -3,16 +3,21 @@
#include <cstdint> #include <cstdint>
namespace storeId {
static constexpr uint32_t INVALID_STORE_ADDRESS = 0xffffffff;
}
/** /**
* This union defines the type that identifies where a data packet is * This union defines the type that identifies where a data packet is
* stored in the store. It comprises of a raw part to read it as raw value and * stored in the store. It comprises of a raw part to read it as raw value and
* a structured part to use it in pool-like stores. * a structured part to use it in pool-like stores.
*/ */
union store_address_t { union store_address_t {
/** /**
* Default Constructor, initializing to INVALID_ADDRESS * Default Constructor, initializing to INVALID_ADDRESS
*/ */
store_address_t():raw(0xFFFFFFFF){} store_address_t(): raw(storeId::INVALID_STORE_ADDRESS){}
/** /**
* Constructor to create an address object using the raw address * Constructor to create an address object using the raw address
* *