Split up FIFO into StaticFIFO and normale FIFO

This commit is contained in:
Robin Müller 2020-07-05 23:53:13 +02:00
parent 2395e487ae
commit ebec074655
12 changed files with 309 additions and 84 deletions

View File

@ -1,82 +1,26 @@
#ifndef FIFO_H_ #ifndef FRAMEWORK_CONTAINER_FIFO_H_
#define FIFO_H_ #define FRAMEWORK_CONTAINER_FIFO_H_
#include <framework/returnvalues/HasReturnvaluesIF.h> #include <framework/container/FIFOBase.h>
#include <vector>
/** /**
* @brief Simple First-In-First-Out data structure * @brief Simple First-In-First-Out data structure. The maximum size
* can be set in the constructor. THe public interface of
* FIFOBase exposes the user interface for the FIFO.
* @tparam T Entry Type * @tparam T Entry Type
* @tparam capacity Maximum capacity * @tparam capacity Maximum capacity
*/ */
template<typename T, uint8_t capacity> template<typename T>
class FIFO { class FIFO: public FIFOBase<T> {
private:
uint8_t readIndex, writeIndex, currentSize;
T data[capacity];
uint8_t next(uint8_t current) {
++current;
if (current == capacity) {
current = 0;
}
return current;
}
public: public:
FIFO() : FIFO(size_t maxCapacity): FIFOBase<T>(values.data(), maxCapacity) {
readIndex(0), writeIndex(0), currentSize(0) { values.reserve(maxCapacity);
} values.resize(maxCapacity);
};
bool empty() { private:
return (currentSize == 0); std::vector<T> values;
}
bool full() {
return (currentSize == capacity);
}
uint8_t size(){
return currentSize;
}
ReturnValue_t insert(T value) {
if (full()) {
return FULL;
} else {
data[writeIndex] = value;
writeIndex = next(writeIndex);
++currentSize;
return HasReturnvaluesIF::RETURN_OK;
}
}
ReturnValue_t retrieve(T *value) {
if (empty()) {
return EMPTY;
} else {
*value = data[readIndex];
readIndex = next(readIndex);
--currentSize;
return HasReturnvaluesIF::RETURN_OK;
}
}
ReturnValue_t peek(T * value) {
if(empty()) {
return EMPTY;
} else {
*value = data[readIndex];
return HasReturnvaluesIF::RETURN_OK;
}
}
ReturnValue_t pop() {
T value;
return this->retrieve(&value);
}
static const uint8_t INTERFACE_ID = CLASS_ID::FIFO_CLASS;
static const ReturnValue_t FULL = MAKE_RETURN_CODE(1);
static const ReturnValue_t EMPTY = MAKE_RETURN_CODE(2);
}; };
#endif /* FIFO_H_ */ #endif /* FRAMEWORK_CONTAINER_FIFO_H_ */

59
container/FIFOBase.h Normal file
View File

@ -0,0 +1,59 @@
#ifndef FRAMEWORK_CONTAINER_FIFOBASE_H_
#define FRAMEWORK_CONTAINER_FIFOBASE_H_
#include <framework/returnvalues/HasReturnvaluesIF.h>
#include <cstddef>
template <typename T>
class FIFOBase {
public:
static const uint8_t INTERFACE_ID = CLASS_ID::FIFO_CLASS;
static const ReturnValue_t FULL = MAKE_RETURN_CODE(1);
static const ReturnValue_t EMPTY = MAKE_RETURN_CODE(2);
/** Default ctor, no input arguments required. */
FIFOBase(T* values, const size_t maxCapacity);
/**
* Insert value into FIFO
* @param value
* @return
*/
ReturnValue_t insert(T value);
/**
* Retrieve item from FIFO. This removes the item from the FIFO.
* @param value
* @return
*/
ReturnValue_t retrieve(T *value);
/**
* Retrieve item from FIFO without removing it from FIFO.
* @param value
* @return
*/
ReturnValue_t peek(T * value);
/**
* Remove item from FIFO.
* @return
*/
ReturnValue_t pop();
bool empty();
bool full();
size_t size();
private:
T* values;
size_t maxCapacity;
size_t readIndex = 0;
size_t writeIndex = 0;
size_t currentSize = 0;
size_t next(size_t current);
};
#include <framework/container/FIFOBase.tpp>
#endif /* FRAMEWORK_CONTAINER_FIFOBASE_H_ */

76
container/FIFOBase.tpp Normal file
View File

@ -0,0 +1,76 @@
#ifndef FRAMEWORK_CONTAINER_FIFOBASE_TPP_
#define FRAMEWORK_CONTAINER_FIFOBASE_TPP_
#ifndef FRAMEWORK_CONTAINER_FIFOBASE_H_
#error Include FIFOBase.h before FIFOBase.tpp!
#endif
template<typename T>
inline FIFOBase<T>::FIFOBase(T* values, const size_t maxCapacity):
values(values), maxCapacity(maxCapacity) {};
template<typename T>
inline ReturnValue_t FIFOBase<T>::insert(T value) {
if (full()) {
return FULL;
} else {
values[writeIndex] = value;
writeIndex = next(writeIndex);
++currentSize;
return HasReturnvaluesIF::RETURN_OK;
}
};
template<typename T>
inline ReturnValue_t FIFOBase<T>::retrieve(T* value) {
if (empty()) {
return EMPTY;
} else {
*value = values[readIndex];
readIndex = next(readIndex);
--currentSize;
return HasReturnvaluesIF::RETURN_OK;
}
};
template<typename T>
inline ReturnValue_t FIFOBase<T>::peek(T* value) {
if(empty()) {
return EMPTY;
} else {
*value = values[readIndex];
return HasReturnvaluesIF::RETURN_OK;
}
};
template<typename T>
inline ReturnValue_t FIFOBase<T>::pop() {
T value;
return this->retrieve(&value);
};
template<typename T>
inline bool FIFOBase<T>::empty() {
return (currentSize == 0);
};
template<typename T>
inline bool FIFOBase<T>::full() {
return (currentSize == maxCapacity);
}
template<typename T>
inline size_t FIFOBase<T>::size() {
return currentSize;
}
template<typename T>
inline size_t FIFOBase<T>::next(size_t current) {
++current;
if (current == maxCapacity) {
current = 0;
}
return current;
}
#endif

23
container/StaticFIFO.h Normal file
View File

@ -0,0 +1,23 @@
#ifndef FRAMEWORK_CONTAINER_STATICFIFO_H_
#define FRAMEWORK_CONTAINER_STATICFIFO_H_
#include <framework/returnvalues/HasReturnvaluesIF.h>
#include <framework/container/FIFOBase.h>
/**
* @brief Simple First-In-First-Out data structure with size fixed at
* compile time. The public interface of FIFOBase exposes
* the user interface for the FIFO.
* @tparam T Entry Type
* @tparam capacity Maximum capacity
*/
template<typename T, size_t capacity>
class StaticFIFO: public FIFOBase<T> {
public:
StaticFIFO(): FIFOBase<T>(values.data(), capacity) {};
private:
std::array<T, capacity> values;
};
#endif /* FRAMEWORK_CONTAINERS_STATICFIFO_H_ */

View File

@ -90,3 +90,10 @@ double timevalOperations::toDouble(const timeval timeval) {
double result = timeval.tv_sec * 1000000. + timeval.tv_usec; double result = timeval.tv_sec * 1000000. + timeval.tv_usec;
return result / 1000000.; return result / 1000000.;
} }
timeval timevalOperations::toTimeval(const double seconds) {
timeval tval;
tval.tv_sec = seconds;
tval.tv_usec = seconds *(double) 1e6 - (tval.tv_sec *1e6);
return tval;
}

View File

@ -41,6 +41,7 @@ namespace timevalOperations {
* @return seconds * @return seconds
*/ */
double toDouble(const timeval timeval); double toDouble(const timeval timeval);
timeval toTimeval(const double seconds);
} }
#endif /* TIMEVALOPERATIONS_H_ */ #endif /* TIMEVALOPERATIONS_H_ */

View File

@ -1,16 +1,80 @@
#include <framework/osal/linux/TcUnixUdpPollingTask.h> #include <framework/osal/linux/TcUnixUdpPollingTask.h>
TcSocketPollingTask::TcSocketPollingTask(object_id_t objectId, TcSocketPollingTask::TcSocketPollingTask(object_id_t objectId,
object_id_t tmtcUnixUdpBridge): SystemObject(objectId) { object_id_t tmtcUnixUdpBridge, size_t frameSize,
double timeoutSeconds): SystemObject(objectId),
tmtcBridgeId(tmtcUnixUdpBridge) {
if(frameSize > 0) {
this->frameSize = frameSize;
}
else {
this->frameSize = DEFAULT_MAX_FRAME_SIZE;
}
// Set up reception buffer with specified frame size.
// For now, it is assumed that only one frame is held in the buffer!
receptionBuffer.reserve(this->frameSize);
receptionBuffer.resize(this->frameSize);
if(timeoutSeconds == -1) {
receptionTimeout = DEFAULT_TIMEOUT;
}
else {
receptionTimeout = timevalOperations::toTimeval(timeoutSeconds);
}
// Set receive timeout.
int result = setsockopt(serverUdpSocket, SOL_SOCKET, SO_RCVTIMEO,
&receptionTimeout, sizeof(receptionTimeout));
if(result == -1) {
sif::error << "TcSocketPollingTask::TcSocketPollingTask: Setting receive"
"timeout failed with " << strerror(errno) << std::endl;
return;
}
} }
TcSocketPollingTask::~TcSocketPollingTask() { TcSocketPollingTask::~TcSocketPollingTask() {
} }
ReturnValue_t TcSocketPollingTask::performOperation(uint8_t opCode) { ReturnValue_t TcSocketPollingTask::performOperation(uint8_t opCode) {
// Poll for new data permanently. The call will block until the specified
// length of bytes has been received or a timeout occured.
while(1) {
//! Sender Address is cached here.
struct sockaddr_in senderAddress;
socklen_t senderSockLen = 0;
ssize_t bytesReceived = recvfrom(serverUdpSocket,
receptionBuffer.data(), frameSize, receptionFlags,
reinterpret_cast<sockaddr*>(&senderAddress), &senderSockLen);
if(bytesReceived < 0) {
//handle error
sif::error << "TcSocketPollingTask::performOperation: recvfrom "
"failed with " << strerror(errno) << std::endl;
continue;
}
sif::debug << "TcSocketPollingTask::performOperation: " << bytesReceived
<< " bytes received" << std::endl;
ReturnValue_t result = handleSuccessfullTcRead();
tmtcBridge->checkAndSetClientAddress(senderAddress);
}
return HasReturnvaluesIF::RETURN_OK; return HasReturnvaluesIF::RETURN_OK;
} }
ReturnValue_t TcSocketPollingTask::initialize() { ReturnValue_t TcSocketPollingTask::initialize() {
tmtcBridge = objectManager->get<TmTcUnixUdpBridge>(tmtcBridgeId);
if(tmtcBridge == nullptr) {
sif::error << "TcSocketPollingTask::TcSocketPollingTask: Invalid"
" TMTC bridge object!" << std::endl;
return ObjectManagerIF::CHILD_INIT_FAILED;
}
serverUdpSocket = tmtcBridge->serverSocket;
return HasReturnvaluesIF::RETURN_OK;
}
ReturnValue_t TcSocketPollingTask::handleSuccessfullTcRead() {
return HasReturnvaluesIF::RETURN_OK; return HasReturnvaluesIF::RETURN_OK;
} }

View File

@ -1,8 +1,11 @@
#ifndef FRAMEWORK_OSAL_LINUX_TCSOCKETPOLLINGTASK_H_ #ifndef FRAMEWORK_OSAL_LINUX_TCSOCKETPOLLINGTASK_H_
#define FRAMEWORK_OSAL_LINUX_TCSOCKETPOLLINGTASK_H_ #define FRAMEWORK_OSAL_LINUX_TCSOCKETPOLLINGTASK_H_
#include <framework/objectmanager/SystemObject.h> #include <framework/objectmanager/SystemObject.h>
#include <framework/osal/linux/TmTcUnixUdpBridge.h>
#include <framework/tasks/ExecutableObjectIF.h> #include <framework/tasks/ExecutableObjectIF.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <vector>
/** /**
* @brief This class can be used to implement the polling of a Unix socket, * @brief This class can be used to implement the polling of a Unix socket,
@ -17,15 +20,34 @@ class TcSocketPollingTask: public SystemObject,
public ExecutableObjectIF { public ExecutableObjectIF {
friend class TmTcUnixUdpBridge; friend class TmTcUnixUdpBridge;
public: public:
TcSocketPollingTask(object_id_t objectId, object_id_t tmtcUnixUdpBridge); static constexpr size_t DEFAULT_MAX_FRAME_SIZE = 2048;
//! 0.5 default milliseconds timeout for now.
static constexpr timeval DEFAULT_TIMEOUT = {.tv_sec = 0, .tv_usec = 500};
TcSocketPollingTask(object_id_t objectId, object_id_t tmtcUnixUdpBridge,
size_t frameSize = 0, double timeoutSeconds = -1);
virtual~ TcSocketPollingTask(); virtual~ TcSocketPollingTask();
virtual ReturnValue_t performOperation(uint8_t opCode) override; virtual ReturnValue_t performOperation(uint8_t opCode) override;
virtual ReturnValue_t initialize() override; virtual ReturnValue_t initialize() override;
private: private:
//! Sender Address is cached here. //! TMTC bridge is cached.
const struct sockaddr senderAddress; object_id_t tmtcBridgeId = objects::NO_OBJECT;
}; TmTcUnixUdpBridge* tmtcBridge = nullptr;
//! Reception flags: https://linux.die.net/man/2/recvfrom.
int receptionFlags = 0;
//! Server socket, which is member of TMTC bridge and is assigned in
//! constructor
int serverUdpSocket = 0;
std::vector<uint8_t> receptionBuffer;
size_t frameSize = 0;
timeval receptionTimeout;
ReturnValue_t handleSuccessfullTcRead();

View File

@ -1,11 +1,14 @@
#include <framework/osal/linux/TmTcUnixUdpBridge.h> #include <framework/osal/linux/TmTcUnixUdpBridge.h>
#include <framework/serviceinterface/ServiceInterfaceStream.h> #include <framework/serviceinterface/ServiceInterfaceStream.h>
#include <errno.h> #include <errno.h>
#include <framework/ipc/MutexHelper.h>
TmTcUnixUdpBridge::TmTcUnixUdpBridge(object_id_t objectId, TmTcUnixUdpBridge::TmTcUnixUdpBridge(object_id_t objectId,
object_id_t ccsdsPacketDistributor, uint16_t serverPort, object_id_t ccsdsPacketDistributor, uint16_t serverPort,
uint16_t clientPort): uint16_t clientPort):
TmTcBridge(objectId, ccsdsPacketDistributor) { TmTcBridge(objectId, ccsdsPacketDistributor) {
mutex = MutexFactory::instance()->createMutex();
uint16_t setServerPort = DEFAULT_UDP_SERVER_PORT; uint16_t setServerPort = DEFAULT_UDP_SERVER_PORT;
if(serverPort != 0xFFFF) { if(serverPort != 0xFFFF) {
setServerPort = serverPort; setServerPort = serverPort;
@ -16,6 +19,7 @@ TmTcUnixUdpBridge::TmTcUnixUdpBridge(object_id_t objectId,
setClientPort = clientPort; setClientPort = clientPort;
} }
// Set up UDP socket: https://man7.org/linux/man-pages/man7/ip.7.html
serverSocket = socket(AF_INET, SOCK_DGRAM, 0); serverSocket = socket(AF_INET, SOCK_DGRAM, 0);
if(socket < 0) { if(socket < 0) {
sif::error << "TmTcUnixUdpBridge::TmTcUnixUdpBridge: Could not open" sif::error << "TmTcUnixUdpBridge::TmTcUnixUdpBridge: Could not open"
@ -26,14 +30,16 @@ TmTcUnixUdpBridge::TmTcUnixUdpBridge(object_id_t objectId,
} }
serverAddress.sin_family = AF_INET; serverAddress.sin_family = AF_INET;
// Accept packets from any interface.
serverAddress.sin_addr.s_addr = htonl(INADDR_ANY); serverAddress.sin_addr.s_addr = htonl(INADDR_ANY);
serverAddress.sin_port = htons(setServerPort); serverAddress.sin_port = htons(setServerPort);
setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &serverSocketOptions, setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &serverSocketOptions,
sizeof(serverSocketOptions)); sizeof(serverSocketOptions));
serverSocketLen = sizeof(serverAddress);
int result = bind(serverSocket, int result = bind(serverSocket,
reinterpret_cast<struct sockaddr*>(&serverAddress), reinterpret_cast<struct sockaddr*>(&serverAddress),
sizeof(serverAddress)); serverSocketLen);
if(result == -1) { if(result == -1) {
sif::error << "TmTcUnixUdpBridge::TmTcUnixUdpBridge: Could not bind " sif::error << "TmTcUnixUdpBridge::TmTcUnixUdpBridge: Could not bind "
"local port " << setServerPort << " to server socket!" "local port " << setServerPort << " to server socket!"
@ -81,6 +87,17 @@ void TmTcUnixUdpBridge::handleSocketError() {
} }
} }
void TmTcUnixUdpBridge::setTimeout(float timeoutSeconds) {
}
void TmTcUnixUdpBridge::checkAndSetClientAddress(sockaddr_in newAddress) {
MutexHelper lock(mutex, 10);
// Set new IP address if it has changed.
if(clientAddress.sin_addr.s_addr != newAddress.sin_addr.s_addr) {
clientAddress.sin_addr.s_addr = newAddress.sin_addr.s_addr;
}
}
void TmTcUnixUdpBridge::handleBindError() { void TmTcUnixUdpBridge::handleBindError() {
// See: https://man7.org/linux/man-pages/man2/bind.2.html // See: https://man7.org/linux/man-pages/man2/bind.2.html
switch(errno) { switch(errno) {

View File

@ -7,16 +7,20 @@
#include <netinet/udp.h> #include <netinet/udp.h>
class TmTcUnixUdpBridge: public TmTcBridge { class TmTcUnixUdpBridge: public TmTcBridge {
friend class TcSocketPollingTask;
public: public:
// The ports chosen here should not be used by any other process. // The ports chosen here should not be used by any other process.
// List of used ports on Linux: /etc/services // List of used ports on Linux: /etc/services
static constexpr int DEFAULT_UDP_SERVER_PORT = 7301; static constexpr uint16_t DEFAULT_UDP_SERVER_PORT = 7301;
static constexpr int DEFAULT_UDP_CLIENT_PORT = 7302; static constexpr uint16_t DEFAULT_UDP_CLIENT_PORT = 7302;
TmTcUnixUdpBridge(object_id_t objectId, object_id_t ccsdsPacketDistributor, TmTcUnixUdpBridge(object_id_t objectId, object_id_t ccsdsPacketDistributor,
uint16_t serverPort = 0xFFFF,uint16_t clientPort = 0xFFFF); uint16_t serverPort = 0xFFFF,uint16_t clientPort = 0xFFFF);
virtual~ TmTcUnixUdpBridge(); virtual~ TmTcUnixUdpBridge();
void setTimeout(float timeoutSeconds);
void checkAndSetClientAddress(sockaddr_in clientAddress);
protected: protected:
virtual ReturnValue_t receiveTc(uint8_t ** recvBuffer, virtual ReturnValue_t receiveTc(uint8_t ** recvBuffer,
size_t * size) override; size_t * size) override;
@ -24,8 +28,16 @@ protected:
private: private:
int serverSocket = 0; int serverSocket = 0;
const int serverSocketOptions = 0; const int serverSocketOptions = 0;
struct sockaddr_in clientAddress; struct sockaddr_in clientAddress;
socklen_t clientSocketLen = 0;
struct sockaddr_in serverAddress; struct sockaddr_in serverAddress;
socklen_t serverSocketLen = 0;
//! Access to the client address is mutex protected as it is set
//! by another task.
MutexIF* mutex;
void handleSocketError(); void handleSocketError();
void handleBindError(); void handleBindError();

View File

@ -10,7 +10,7 @@
#include <framework/tmtcservices/VerificationReporter.h> #include <framework/tmtcservices/VerificationReporter.h>
#include <framework/ipc/CommandMessage.h> #include <framework/ipc/CommandMessage.h>
#include <framework/container/FixedMap.h> #include <framework/container/FixedMap.h>
#include <framework/container/FIFO.h> #include <framework/container/StaticFIFO.h>
#include <framework/serialize/SerializeIF.h> #include <framework/serialize/SerializeIF.h>
class TcPacketStored; class TcPacketStored;
@ -199,7 +199,7 @@ protected:
uint32_t state; uint32_t state;
Command_t command; Command_t command;
object_id_t objectId; object_id_t objectId;
FIFO<store_address_t, 3> fifo; StaticFIFO<store_address_t, 3> fifo;
}; };
using CommandMapIter = FixedMap<MessageQueueId_t, using CommandMapIter = FixedMap<MessageQueueId_t,

View File

@ -8,7 +8,7 @@
#include <framework/objectmanager/SystemObject.h> #include <framework/objectmanager/SystemObject.h>
#include <framework/tmtcservices/TmTcMessage.h> #include <framework/tmtcservices/TmTcMessage.h>
#include <framework/container/FIFO.h> #include <framework/container/StaticFIFO.h>
class TmTcBridge : public AcceptsTelemetryIF, class TmTcBridge : public AcceptsTelemetryIF,
public ExecutableObjectIF, public ExecutableObjectIF,
@ -143,7 +143,7 @@ protected:
* This fifo can be used to store downlink data * This fifo can be used to store downlink data
* which can not be sent at the moment. * which can not be sent at the moment.
*/ */
FIFO<store_address_t, LIMIT_DOWNLINK_PACKETS_STORED> tmFifo; StaticFIFO<store_address_t, LIMIT_DOWNLINK_PACKETS_STORED> tmFifo;
uint8_t sentPacketsPerCycle = DEFAULT_STORED_DATA_SENT_PER_CYCLE; uint8_t sentPacketsPerCycle = DEFAULT_STORED_DATA_SENT_PER_CYCLE;
uint8_t maxNumberOfPacketsStored = DEFAULT_DOWNLINK_PACKETS_STORED; uint8_t maxNumberOfPacketsStored = DEFAULT_DOWNLINK_PACKETS_STORED;
}; };