Compare commits

...

10 Commits

5 changed files with 160 additions and 345 deletions

View File

@@ -456,6 +456,15 @@ ReturnValue_t DeviceHandlerBase::insertInCommandMap(DeviceCommandId_t deviceComm
}
}
size_t DeviceHandlerBase::getNextReplyLength(DeviceCommandId_t commandId){
DeviceReplyIter iter = deviceReplyMap.find(commandId);
if(iter != deviceReplyMap.end()) {
return iter->second.replyLen;
}else{
return 0;
}
}
ReturnValue_t DeviceHandlerBase::updateReplyMapEntry(DeviceCommandId_t deviceReply,
uint16_t delayCycles, uint16_t maxDelayCycles, bool periodic) {
auto replyIter = deviceReplyMap.find(deviceReply);
@@ -646,16 +655,12 @@ void DeviceHandlerBase::doGetWrite() {
void DeviceHandlerBase::doSendRead() {
ReturnValue_t result;
size_t requestLen = 0;
size_t replyLen = 0;
if(cookieInfo.pendingCommand != deviceCommandMap.end()) {
DeviceReplyIter iter = deviceReplyMap.find(
cookieInfo.pendingCommand->first);
if(iter != deviceReplyMap.end()) {
requestLen = iter->second.replyLen;
}
replyLen = getNextReplyLength(cookieInfo.pendingCommand->first);
}
result = communicationInterface->requestReceiveMessage(comCookie, requestLen);
result = communicationInterface->requestReceiveMessage(comCookie, replyLen);
if (result == RETURN_OK) {
cookieInfo.state = COOKIE_READ_SENT;

View File

@@ -471,13 +471,27 @@ protected:
ReturnValue_t insertInReplyMap(DeviceCommandId_t deviceCommand,
uint16_t maxDelayCycles, LocalPoolDataSetBase* dataSet = nullptr,
size_t replyLen = 0, bool periodic = false);
/**
* @brief A simple command to add a command to the commandList.
* @brief A simple command to add a command to the commandList.
* @param deviceCommand The command to add
* @return - @c RETURN_OK when the command was successfully inserted,
* - @c RETURN_FAILED else.
*/
ReturnValue_t insertInCommandMap(DeviceCommandId_t deviceCommand);
/**
* @brief This function returns the reply length of the next reply to read.
*
* @param deviceCommand The command which triggered the device reply.
*
* @details The default implementation assumes only one reply is triggered by the command. In
* case the command triggers multiple replies (e.g. one acknowledgment, one data,
* and one execution status reply), this function can be overwritten to get the
* reply length of the next reply to read.
*/
virtual size_t getNextReplyLength(DeviceCommandId_t deviceCommand);
/**
* @brief This is a helper method to facilitate updating entries
* in the reply map.
@@ -953,7 +967,7 @@ protected:
* - NO_REPLY_EXPECTED if there was no reply found. This is not an
* error case as many commands do not expect a reply.
*/
virtual ReturnValue_t enableReplyInReplyMap(DeviceCommandMap::iterator cmd,
virtual ReturnValue_t enableReplyInReplyMap(DeviceCommandMap::iterator command,
uint8_t expectedReplies = 1, bool useAlternateId = false,
DeviceCommandId_t alternateReplyID = 0);

View File

@@ -12,54 +12,56 @@ PosixThread::PosixThread(const char* name_, int priority_, size_t stackSize_):
}
PosixThread::~PosixThread() {
//No deletion and no free of Stack Pointer
//No deletion and no free of Stack Pointer
}
ReturnValue_t PosixThread::sleep(uint64_t ns) {
//TODO sleep might be better with timer instead of sleep()
timespec time;
time.tv_sec = ns/1000000000;
time.tv_nsec = ns - time.tv_sec*1e9;
//TODO sleep might be better with timer instead of sleep()
timespec time;
time.tv_sec = ns/1000000000;
time.tv_nsec = ns - time.tv_sec*1e9;
//Remaining Time is not set here
int status = nanosleep(&time,NULL);
if(status != 0){
switch(errno){
case EINTR:
//The nanosleep() function was interrupted by a signal.
return HasReturnvaluesIF::RETURN_FAILED;
case EINVAL:
//The rqtp argument specified a nanosecond value less than zero or
// greater than or equal to 1000 million.
return HasReturnvaluesIF::RETURN_FAILED;
default:
return HasReturnvaluesIF::RETURN_FAILED;
}
//Remaining Time is not set here
int status = nanosleep(&time,NULL);
if(status != 0){
switch(errno){
case EINTR:
//The nanosleep() function was interrupted by a signal.
return HasReturnvaluesIF::RETURN_FAILED;
case EINVAL:
//The rqtp argument specified a nanosecond value less than zero or
// greater than or equal to 1000 million.
return HasReturnvaluesIF::RETURN_FAILED;
default:
return HasReturnvaluesIF::RETURN_FAILED;
}
}
return HasReturnvaluesIF::RETURN_OK;
}
return HasReturnvaluesIF::RETURN_OK;
}
void PosixThread::suspend() {
//Wait for SIGUSR1
int caughtSig = 0;
sigset_t waitSignal;
sigemptyset(&waitSignal);
sigaddset(&waitSignal, SIGUSR1);
sigwait(&waitSignal, &caughtSig);
if (caughtSig != SIGUSR1) {
//Wait for SIGUSR1
int caughtSig = 0;
sigset_t waitSignal;
sigemptyset(&waitSignal);
sigaddset(&waitSignal, SIGUSR1);
sigwait(&waitSignal, &caughtSig);
if (caughtSig != SIGUSR1) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "FixedTimeslotTask: Unknown Signal received: " <<
caughtSig << std::endl;
sif::error << "FixedTimeslotTask: Unknown Signal received: " <<
caughtSig << std::endl;
#endif
}
}
}
void PosixThread::resume(){
/* Signal the thread to start. Makes sense to call kill to start or? ;)
According to POSIX raise(signal) will call pthread_kill(pthread_self(), sig),
but as the call must be done from the thread itself this is not possible here */
pthread_kill(thread,SIGUSR1);
/* Signal the thread to start. Makes sense to call kill to start or? ;)
*
* According to Posix raise(signal) will call pthread_kill(pthread_self(), sig),
* but as the call must be done from the thread itsself this is not possible here
*/
pthread_kill(thread,SIGUSR1);
}
bool PosixThread::delayUntil(uint64_t* const prevoiusWakeTime_ms,
@@ -91,95 +93,95 @@ bool PosixThread::delayUntil(uint64_t* const prevoiusWakeTime_ms,
}
}
/* Update the wake time ready for the next call. */
/* Update the wake time ready for the next call. */
(*prevoiusWakeTime_ms) = nextTimeToWake_ms;
(*prevoiusWakeTime_ms) = nextTimeToWake_ms;
if (shouldDelay) {
uint64_t sleepTime = nextTimeToWake_ms - currentTime_ms;
PosixThread::sleep(sleepTime * 1000000ull);
return true;
}
/* We are shifting the time in case the deadline was missed like RTEMS */
(*prevoiusWakeTime_ms) = currentTime_ms;
return false;
if (shouldDelay) {
uint64_t sleepTime = nextTimeToWake_ms - currentTime_ms;
PosixThread::sleep(sleepTime * 1000000ull);
return true;
}
//We are shifting the time in case the deadline was missed like rtems
(*prevoiusWakeTime_ms) = currentTime_ms;
return false;
}
uint64_t PosixThread::getCurrentMonotonicTimeMs(){
timespec timeNow;
clock_gettime(CLOCK_MONOTONIC_RAW, &timeNow);
uint64_t currentTime_ms = (uint64_t) timeNow.tv_sec * 1000
+ timeNow.tv_nsec / 1000000;
timespec timeNow;
clock_gettime(CLOCK_MONOTONIC_RAW, &timeNow);
uint64_t currentTime_ms = (uint64_t) timeNow.tv_sec * 1000
+ timeNow.tv_nsec / 1000000;
return currentTime_ms;
return currentTime_ms;
}
void PosixThread::createTask(void* (*fnc_)(void*), void* arg_) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
//sif::debug << "PosixThread::createTask" << std::endl;
//sif::debug << "PosixThread::createTask" << std::endl;
#endif
/*
* The attr argument points to a pthread_attr_t structure whose contents
* are used at thread creation time to determine attributes for the new
* thread; this structure is initialized using pthread_attr_init(3) and
* related functions. If attr is NULL, then the thread is created with
* default attributes.
*/
pthread_attr_t attributes;
int status = pthread_attr_init(&attributes);
if(status != 0){
/*
* The attr argument points to a pthread_attr_t structure whose contents
are used at thread creation time to determine attributes for the new
thread; this structure is initialized using pthread_attr_init(3) and
related functions. If attr is NULL, then the thread is created with
default attributes.
*/
pthread_attr_t attributes;
int status = pthread_attr_init(&attributes);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "Posix Thread attribute init failed with: " <<
strerror(status) << std::endl;
sif::error << "Posix Thread attribute init failed with: " <<
strerror(status) << std::endl;
#endif
}
void* stackPointer;
status = posix_memalign(&stackPointer, sysconf(_SC_PAGESIZE), stackSize);
if(status != 0){
}
void* stackPointer;
status = posix_memalign(&stackPointer, sysconf(_SC_PAGESIZE), stackSize);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PosixThread::createTask: Stack init failed with: " <<
strerror(status) << std::endl;
sif::error << "PosixThread::createTask: Stack init failed with: " <<
strerror(status) << std::endl;
#endif
if(errno == ENOMEM) {
size_t stackMb = stackSize/10e6;
if(errno == ENOMEM) {
size_t stackMb = stackSize/10e6;
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PosixThread::createTask: Insufficient memory for"
" the requested " << stackMb << " MB" << std::endl;
sif::error << "PosixThread::createTask: Insufficient memory for"
" the requested " << stackMb << " MB" << std::endl;
#else
sif::printError("PosixThread::createTask: Insufficient memory for "
"the requested %lu MB\n", static_cast<unsigned long>(stackMb));
sif::printError("PosixThread::createTask: Insufficient memory for "
"the requested %lu MB\n", static_cast<unsigned long>(stackMb));
#endif
}
else if(errno == EINVAL) {
}
else if(errno == EINVAL) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PosixThread::createTask: Wrong alignment argument!"
<< std::endl;
sif::error << "PosixThread::createTask: Wrong alignment argument!"
<< std::endl;
#else
sif::printError("PosixThread::createTask: "
"Wrong alignment argument!\n");
sif::printError("PosixThread::createTask: "
"Wrong alignment argument!\n");
#endif
}
return;
}
}
return;
}
status = pthread_attr_setstack(&attributes, stackPointer, stackSize);
if(status != 0){
status = pthread_attr_setstack(&attributes, stackPointer, stackSize);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PosixThread::createTask: pthread_attr_setstack "
" failed with: " << strerror(status) << std::endl;
sif::error << "Make sure the specified stack size is valid and is "
"larger than the minimum allowed stack size." << std::endl;
sif::error << "PosixThread::createTask: pthread_attr_setstack "
" failed with: " << strerror(status) << std::endl;
sif::error << "Make sure the specified stack size is valid and is "
"larger than the minimum allowed stack size." << std::endl;
#endif
}
}
status = pthread_attr_setinheritsched(&attributes, PTHREAD_EXPLICIT_SCHED);
if(status != 0){
status = pthread_attr_setinheritsched(&attributes, PTHREAD_EXPLICIT_SCHED);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "Posix Thread attribute setinheritsched failed with: " <<
strerror(status) << std::endl;
sif::error << "Posix Thread attribute setinheritsched failed with: " <<
strerror(status) << std::endl;
#endif
}
#ifndef FSFW_USE_REALTIME_FOR_LINUX
@@ -190,18 +192,18 @@ void PosixThread::createTask(void* (*fnc_)(void*), void* arg_) {
status = pthread_attr_setschedpolicy(&attributes,SCHED_FIFO);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "Posix Thread attribute schedule policy failed with: " <<
strerror(status) << std::endl;
sif::error << "Posix Thread attribute schedule policy failed with: " <<
strerror(status) << std::endl;
#endif
}
}
sched_param scheduleParams;
scheduleParams.__sched_priority = priority;
status = pthread_attr_setschedparam(&attributes, &scheduleParams);
if(status != 0){
sched_param scheduleParams;
scheduleParams.__sched_priority = priority;
status = pthread_attr_setschedparam(&attributes, &scheduleParams);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "Posix Thread attribute schedule params failed with: " <<
strerror(status) << std::endl;
sif::error << "Posix Thread attribute schedule params failed with: " <<
strerror(status) << std::endl;
#endif
}
#endif
@@ -212,14 +214,14 @@ void PosixThread::createTask(void* (*fnc_)(void*), void* arg_) {
status = pthread_sigmask(SIG_BLOCK, &waitSignal, NULL);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "Posix Thread sigmask failed failed with: " <<
strerror(status) << " errno: " << strerror(errno) << std::endl;
sif::error << "Posix Thread sigmask failed failed with: " <<
strerror(status) << " errno: " << strerror(errno) << std::endl;
#endif
}
}
status = pthread_create(&thread,&attributes,fnc_,arg_);
if(status != 0){
status = pthread_create(&thread,&attributes,fnc_,arg_);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PosixThread::createTask: Failed with: " <<
strerror(status) << std::endl;
@@ -232,35 +234,35 @@ void PosixThread::createTask(void* (*fnc_)(void*), void* arg_) {
"\"all sudo setcap 'cap_sys_nice=eip'\" on the application or set "
"/etc/security/limit.conf\n");
#endif
}
}
status = pthread_setname_np(thread,name);
if(status != 0){
status = pthread_setname_np(thread,name);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PosixThread::createTask: setname failed with: " <<
strerror(status) << std::endl;
sif::error << "PosixThread::createTask: setname failed with: " <<
strerror(status) << std::endl;
#endif
if(status == ERANGE) {
if(status == ERANGE) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PosixThread::createTask: Task name length longer"
" than 16 chars. Truncating.." << std::endl;
sif::error << "PosixThread::createTask: Task name length longer"
" than 16 chars. Truncating.." << std::endl;
#endif
name[15] = '\0';
status = pthread_setname_np(thread,name);
if(status != 0){
name[15] = '\0';
status = pthread_setname_np(thread,name);
if(status != 0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PosixThread::createTask: Setting name"
" did not work.." << std::endl;
sif::error << "PosixThread::createTask: Setting name"
" did not work.." << std::endl;
#endif
}
}
}
}
}
}
status = pthread_attr_destroy(&attributes);
if(status!=0){
status = pthread_attr_destroy(&attributes);
if(status!=0){
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "Posix Thread attribute destroy failed with: " <<
strerror(status) << std::endl;
sif::error << "Posix Thread attribute destroy failed with: " <<
strerror(status) << std::endl;
#endif
}
}
}

View File

@@ -1,157 +0,0 @@
#include "TmTcUnixUdpBridge.h"
#include "tcpipHelpers.h"
#include "../../serviceinterface/ServiceInterface.h"
#include "../../ipc/MutexGuard.h"
#include <arpa/inet.h>
#include <unistd.h>
#include <netdb.h>
#include <cstring>
//! Debugging preprocessor define.
#define FSFW_UDP_SEND_WIRETAPPING_ENABLED 0
const std::string TmTcUnixUdpBridge::DEFAULT_UDP_SERVER_PORT = tcpip::DEFAULT_UDP_SERVER_PORT;
TmTcUnixUdpBridge::TmTcUnixUdpBridge(object_id_t objectId, object_id_t tcDestination,
object_id_t tmStoreId, object_id_t tcStoreId, std::string udpServerPort):
TmTcBridge(objectId, tcDestination, tmStoreId, tcStoreId) {
if(udpServerPort == "") {
this->udpServerPort = DEFAULT_UDP_SERVER_PORT;
}
else {
this->udpServerPort = udpServerPort;
}
mutex = MutexFactory::instance()->createMutex();
communicationLinkUp = false;
}
ReturnValue_t TmTcUnixUdpBridge::initialize() {
using namespace tcpip;
ReturnValue_t result = TmTcBridge::initialize();
if(result != HasReturnvaluesIF::RETURN_OK) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "TmTcUnixUdpBridge::initialize: TmTcBridge initialization failed!"
<< std::endl;
#endif
return result;
}
struct addrinfo *addrResult = nullptr;
struct addrinfo hints;
std::memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
hints.ai_flags = AI_PASSIVE;
/* Set up UDP socket:
https://man7.org/linux/man-pages/man3/getaddrinfo.3.html
Passing nullptr as the first parameter and specifying AI_PASSIVE in hints will cause
getaddrinfo to assign the address 0.0.0.0 (any address) */
int retval = getaddrinfo(nullptr, udpServerPort.c_str(), &hints, &addrResult);
if (retval != 0) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::warning << "TmTcWinUdpBridge::TmTcWinUdpBridge: Retrieving address info failed!" <<
std::endl;
#endif
return HasReturnvaluesIF::RETURN_FAILED;
}
/* Set up UDP socket: https://man7.org/linux/man-pages/man7/ip.7.html */
serverSocket = socket(addrResult->ai_family, addrResult->ai_socktype, addrResult->ai_protocol);
if(serverSocket < 0) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "TmTcUnixUdpBridge::TmTcUnixUdpBridge: Could not open UDP socket!" <<
std::endl;
#else
sif::printError("TmTcUnixUdpBridge::TmTcUnixUdpBridge: Could not open UDP socket!\n");
#endif /* FSFW_CPP_OSTREAM_ENABLED == 1 */
freeaddrinfo(addrResult);
handleError(Protocol::UDP, ErrorSources::SOCKET_CALL);
return HasReturnvaluesIF::RETURN_FAILED;
}
retval = bind(serverSocket, addrResult->ai_addr, static_cast<int>(addrResult->ai_addrlen));
if(retval != 0) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::warning << "TmTcWinUdpBridge::TmTcWinUdpBridge: Could not bind "
"local port (" << udpServerPort << ") to server socket!" << std::endl;
#endif
freeaddrinfo(addrResult);
handleError(Protocol::UDP, ErrorSources::BIND_CALL);
return HasReturnvaluesIF::RETURN_FAILED;
}
return HasReturnvaluesIF::RETURN_OK;
}
TmTcUnixUdpBridge::~TmTcUnixUdpBridge() {
if(mutex != nullptr) {
MutexFactory::instance()->deleteMutex(mutex);
}
close(serverSocket);
}
ReturnValue_t TmTcUnixUdpBridge::sendTm(const uint8_t *data, size_t dataLen) {
int flags = 0;
/* The target address can be set by different threads so this lock ensures thread-safety */
MutexGuard lock(mutex, timeoutType, mutexTimeoutMs);
#if FSFW_CPP_OSTREAM_ENABLED == 1 && FSFW_UDP_SEND_WIRETAPPING_ENABLED == 1
char ipAddress [15];
sif::debug << "IP Address Sender: "<<
inet_ntop(AF_INET,&clientAddress.sin_addr.s_addr, ipAddress, 15) << std::endl;
#endif
ssize_t bytesSent = sendto(
serverSocket,
data,
dataLen,
flags,
reinterpret_cast<sockaddr*>(&clientAddress),
clientAddressLen
);
if(bytesSent < 0) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::warning << "TmTcUnixUdpBridge::sendTm: Send operation failed." << std::endl;
#endif
tcpip::handleError(tcpip::Protocol::UDP, tcpip::ErrorSources::SENDTO_CALL);
}
#if FSFW_CPP_OSTREAM_ENABLED == 1 && FSFW_UDP_SEND_WIRETAPPING_ENABLED == 1
sif::debug << "TmTcUnixUdpBridge::sendTm: " << bytesSent << " bytes were"
" sent." << std::endl;
#endif
return HasReturnvaluesIF::RETURN_OK;
}
void TmTcUnixUdpBridge::checkAndSetClientAddress(sockaddr_in& newAddress) {
/* The target address can be set by different threads so this lock ensures thread-safety */
MutexGuard lock(mutex, timeoutType, mutexTimeoutMs);
#if FSFW_CPP_OSTREAM_ENABLED == 1 && FSFW_UDP_RCV_WIRETAPPING_ENABLED == 1
char ipAddress [15];
sif::debug << "IP Address Sender: "<< inet_ntop(AF_INET,
&newAddress.sin_addr.s_addr, ipAddress, 15) << std::endl;
sif::debug << "IP Address Old: " << inet_ntop(AF_INET,
&clientAddress.sin_addr.s_addr, ipAddress, 15) << std::endl;
#endif
registerCommConnect();
/* Set new IP address to reply to. */
clientAddress = newAddress;
clientAddressLen = sizeof(clientAddress);
}
void TmTcUnixUdpBridge::setMutexProperties(MutexIF::TimeoutType timeoutType,
dur_millis_t timeoutMs) {
this->timeoutType = timeoutType;
this->mutexTimeoutMs = timeoutMs;
}

View File

@@ -1,49 +0,0 @@
#ifndef FRAMEWORK_OSAL_LINUX_TMTCUNIXUDPBRIDGE_H_
#define FRAMEWORK_OSAL_LINUX_TMTCUNIXUDPBRIDGE_H_
#include "../../tmtcservices/AcceptsTelecommandsIF.h"
#include "../../tmtcservices/TmTcBridge.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/udp.h>
class TmTcUnixUdpBridge:
public TmTcBridge {
friend class TcUnixUdpPollingTask;
public:
/* The ports chosen here should not be used by any other process.
List of used ports on Linux: /etc/services */
static const std::string DEFAULT_UDP_SERVER_PORT;
TmTcUnixUdpBridge(object_id_t objectId, object_id_t tcDestination,
object_id_t tmStoreId, object_id_t tcStoreId,
std::string serverPort = "");
virtual~ TmTcUnixUdpBridge();
/**
* Set properties of internal mutex.
*/
void setMutexProperties(MutexIF::TimeoutType timeoutType, dur_millis_t timeoutMs);
ReturnValue_t initialize() override;
void checkAndSetClientAddress(sockaddr_in& clientAddress);
protected:
virtual ReturnValue_t sendTm(const uint8_t * data, size_t dataLen) override;
private:
int serverSocket = 0;
std::string udpServerPort;
struct sockaddr_in clientAddress;
socklen_t clientAddressLen = 0;
//! Access to the client address is mutex protected as it is set by another task.
MutexIF::TimeoutType timeoutType = MutexIF::TimeoutType::WAITING;
dur_millis_t mutexTimeoutMs = 20;
MutexIF* mutex;
};
#endif /* FRAMEWORK_OSAL_LINUX_TMTCUNIXUDPBRIDGE_H_ */