tmtcbridge tweaks.

UDP bridge now working :-D
This commit is contained in:
Robin Müller 2020-07-08 03:18:09 +02:00
parent 2efcda735f
commit 264914e86a
15 changed files with 121 additions and 118 deletions

View File

@ -155,10 +155,10 @@ ReturnValue_t MessageQueue::receiveMessage(MessageQueueMessageIF* message) {
}
if(message->getMaximumMessageSize() < maxMessageSize) {
sif::error << "MessageQueue::receiveMessage: Message size "
<< message->getMaximumMessageSize() <<
" too small to receive data!" << std::endl;
return HasReturnvaluesIF::RETURN_FAILED;
sif::error << "MessageQueue::receiveMessage: Message size "
<< message->getMaximumMessageSize()
<< " too small to receive data!" << std::endl;
return HasReturnvaluesIF::RETURN_FAILED;
}
unsigned int messagePriority = 0;

View File

@ -68,12 +68,12 @@ void PeriodicPosixTask::taskFunctionality(void) {
char name[20] = {0};
int status = pthread_getname_np(pthread_self(),name,sizeof(name));
if(status == 0){
sif::error << "PeriodicPosixTask " << name << ": Deadline "
"missed." << std::endl;
//sif::error << "PeriodicPosixTask " << name << ": Deadline "
// "missed." << std::endl;
}
else {
sif::error << "PeriodicPosixTask X: Deadline missed. " <<
status << std::endl;
//sif::error << "PeriodicPosixTask X: Deadline missed. " <<
// status << std::endl;
}
if (this->deadlineMissedFunc != nullptr) {
this->deadlineMissedFunc();

View File

@ -26,8 +26,7 @@ TcUnixUdpPollingTask::TcUnixUdpPollingTask(object_id_t objectId,
}
}
TcUnixUdpPollingTask::~TcUnixUdpPollingTask() {
}
TcUnixUdpPollingTask::~TcUnixUdpPollingTask() {}
ReturnValue_t TcUnixUdpPollingTask::performOperation(uint8_t opCode) {
// Poll for new UDP datagrams in permanent loop.
@ -59,6 +58,30 @@ ReturnValue_t TcUnixUdpPollingTask::performOperation(uint8_t opCode) {
return HasReturnvaluesIF::RETURN_OK;
}
ReturnValue_t TcUnixUdpPollingTask::handleSuccessfullTcRead(size_t bytesRead) {
store_address_t storeId = 0;
ReturnValue_t result = tcStore->addData(&storeId,
receptionBuffer.data(), bytesRead);
// arrayprinter::print(receptionBuffer.data(), bytesRead);
if (result != HasReturnvaluesIF::RETURN_OK) {
sif::debug << "TcSerialPollingTask::transferPusToSoftwareBus: Data "
"storage failed" << std::endl;
sif::debug << "Packet size: " << bytesRead << std::endl;
return HasReturnvaluesIF::RETURN_FAILED;
}
TmTcMessage message(storeId);
result = MessageQueueSenderIF::sendMessage(targetTcDestination, &message);
if (result != HasReturnvaluesIF::RETURN_OK) {
sif::error << "Serial Polling: Sending message to queue failed"
<< std::endl;
tcStore->deleteData(storeId);
}
return result;
}
ReturnValue_t TcUnixUdpPollingTask::initialize() {
tcStore = objectManager->get<StorageManagerIF>(objects::TC_STORE);
if (tcStore == nullptr) {
@ -74,41 +97,27 @@ ReturnValue_t TcUnixUdpPollingTask::initialize() {
return ObjectManagerIF::CHILD_INIT_FAILED;
}
targetTcDestination = tmtcBridge->getReportReceptionQueue();
serverUdpSocket = tmtcBridge->serverSocket;
// Set receive timeout.
// int result = setsockopt(serverUdpSocket, SOL_SOCKET, SO_RCVTIMEO,
// &receptionTimeout, sizeof(receptionTimeout));
// if(result == -1) {
// sif::error << "TcSocketPollingTask::TcSocketPollingTask: Setting "
// "receive timeout failed with " << strerror(errno) << std::endl;
// return ObjectManagerIF::CHILD_INIT_FAILED;
// }
return HasReturnvaluesIF::RETURN_OK;
}
ReturnValue_t TcUnixUdpPollingTask::handleSuccessfullTcRead(size_t bytesRead) {
store_address_t storeId = 0;
ReturnValue_t result = tcStore->addData(&storeId,
receptionBuffer.data(), bytesRead);
// arrayprinter::print(receptionBuffer.data(), bytesRead);
if (result != HasReturnvaluesIF::RETURN_OK) {
sif::debug << "TcSerialPollingTask::transferPusToSoftwareBus: Data "
"storage failed" << std::endl;
sif::debug << "Packet size: " << bytesRead << std::endl;
return HasReturnvaluesIF::RETURN_FAILED;
}
ReturnValue_t TcUnixUdpPollingTask::initializeAfterTaskCreation() {
// Initialize the destination after task creation. This ensures
// that the destination will be set in the TMTC bridge.
targetTcDestination = tmtcBridge->getRequestQueue();
return HasReturnvaluesIF::RETURN_OK;
}
TmTcMessage message(storeId);
result = MessageQueueSenderIF::sendMessage(targetTcDestination, &message);
if (result != HasReturnvaluesIF::RETURN_OK) {
sif::error << "Serial Polling: Sending message to queue failed"
<< std::endl;
tcStore->deleteData(storeId);
void TcUnixUdpPollingTask::setTimeout(double timeoutSeconds) {
timeval tval;
tval = timevalOperations::toTimeval(timeoutSeconds);
int result = setsockopt(serverUdpSocket, SOL_SOCKET, SO_RCVTIMEO,
&tval, sizeof(receptionTimeout));
if(result == -1) {
sif::error << "TcSocketPollingTask::TcSocketPollingTask: Setting "
"receive timeout failed with " << strerror(errno) << std::endl;
}
return result;
}

View File

@ -29,8 +29,16 @@ public:
size_t frameSize = 0, double timeoutSeconds = -1);
virtual~ TcUnixUdpPollingTask();
/**
* Turn on optional timeout for UDP polling. In the default mode,
* the receive function will block until a packet is received.
* @param timeoutSeconds
*/
void setTimeout(double timeoutSeconds);
virtual ReturnValue_t performOperation(uint8_t opCode) override;
virtual ReturnValue_t initialize() override;
virtual ReturnValue_t initializeAfterTaskCreation() override;
protected:
StorageManagerIF* tcStore = nullptr;

View File

@ -1,8 +1,9 @@
#include <framework/osal/linux/TmTcUnixUdpBridge.h>
#include <framework/serviceinterface/ServiceInterfaceStream.h>
#include <errno.h>
#include <framework/ipc/MutexHelper.h>
#include <errno.h>
TmTcUnixUdpBridge::TmTcUnixUdpBridge(object_id_t objectId,
object_id_t ccsdsPacketDistributor, uint16_t serverPort,
uint16_t clientPort):
@ -14,17 +15,16 @@ TmTcUnixUdpBridge::TmTcUnixUdpBridge(object_id_t objectId,
setServerPort = serverPort;
}
uint16_t setClientPort = DEFAULT_UDP_CLIENT_PORT;
if(clientPort != 0xFFFF) {
setClientPort = clientPort;
}
// uint16_t setClientPort = DEFAULT_UDP_CLIENT_PORT;
// if(clientPort != 0xFFFF) {
// setClientPort = clientPort;
// }
// Set up UDP socket: https://man7.org/linux/man-pages/man7/ip.7.html
serverSocket = socket(AF_INET, SOCK_DGRAM, 0);
if(socket < 0) {
sif::error << "TmTcUnixUdpBridge::TmTcUnixUdpBridge: Could not open"
" UDP socket!" << std::endl;
// check errno here.
handleSocketError();
return;
}
@ -44,7 +44,6 @@ TmTcUnixUdpBridge::TmTcUnixUdpBridge(object_id_t objectId,
sif::error << "TmTcUnixUdpBridge::TmTcUnixUdpBridge: Could not bind "
"local port " << setServerPort << " to server socket!"
<< std::endl;
// check errno here.
handleBindError();
return;
}
@ -57,14 +56,21 @@ ReturnValue_t TmTcUnixUdpBridge::sendTm(const uint8_t *data, size_t dataLen) {
int flags = 0;
ssize_t result = send(serverSocket, data, dataLen, flags);
if(result < 0) {
//handle error
// todo: handle errors
sif::error << "TmTcUnixUdpBridge::sendTm: Send operation failed "
"with error " << strerror(errno) << std::endl;
}
return HasReturnvaluesIF::RETURN_OK;
}
void TmTcUnixUdpBridge::checkAndSetClientAddress(sockaddr_in newAddress) {
MutexHelper lock(mutex, 10);
// Set new IP address if it has changed.
if(clientAddress.sin_addr.s_addr != newAddress.sin_addr.s_addr) {
clientAddress.sin_addr.s_addr = newAddress.sin_addr.s_addr;
}
}
void TmTcUnixUdpBridge::handleSocketError() {
// See: https://man7.org/linux/man-pages/man2/socket.2.html
@ -87,17 +93,6 @@ void TmTcUnixUdpBridge::handleSocketError() {
}
}
void TmTcUnixUdpBridge::setTimeout(float timeoutSeconds) {
}
void TmTcUnixUdpBridge::checkAndSetClientAddress(sockaddr_in newAddress) {
MutexHelper lock(mutex, 10);
// Set new IP address if it has changed.
if(clientAddress.sin_addr.s_addr != newAddress.sin_addr.s_addr) {
clientAddress.sin_addr.s_addr = newAddress.sin_addr.s_addr;
}
}
void TmTcUnixUdpBridge::handleBindError() {
// See: https://man7.org/linux/man-pages/man2/bind.2.html
switch(errno) {
@ -128,16 +123,10 @@ void TmTcUnixUdpBridge::handleBindError() {
<< " with " << strerror(errno) << std::endl;
break;
}
default: {
default:
sif::error << "TmTcUnixBridge::TmTcUnixBridge: Unknown error"
<< std::endl;
break;
}
}
}
ReturnValue_t TmTcUnixUdpBridge::receiveTc(uint8_t **recvBuffer, size_t *size) {
// TC reception handled by separate polling task because it is blocking.
return HasReturnvaluesIF::RETURN_OK;
}

View File

@ -1,6 +1,7 @@
#ifndef FRAMEWORK_OSAL_LINUX_TMTCUNIXUDPBRIDGE_H_
#define FRAMEWORK_OSAL_LINUX_TMTCUNIXUDPBRIDGE_H_
#include <framework/tmtcservices/AcceptsTelecommandsIF.h>
#include <framework/tmtcservices/TmTcBridge.h>
#include <sys/socket.h>
#include <netinet/in.h>
@ -18,13 +19,11 @@ public:
uint16_t serverPort = 0xFFFF,uint16_t clientPort = 0xFFFF);
virtual~ TmTcUnixUdpBridge();
void setTimeout(float timeoutSeconds);
void checkAndSetClientAddress(sockaddr_in clientAddress);
protected:
virtual ReturnValue_t receiveTc(uint8_t ** recvBuffer,
size_t * size) override;
virtual ReturnValue_t sendTm(const uint8_t * data, size_t dataLen) override;
private:
int serverSocket = 0;
const int serverSocketOptions = 0;
@ -43,6 +42,4 @@ private:
void handleBindError();
};
#endif /* FRAMEWORK_OSAL_LINUX_TMTCUNIXUDPBRIDGE_H_ */

View File

@ -9,7 +9,7 @@ CCSDSDistributor::CCSDSDistributor(uint16_t setDefaultApid,
CCSDSDistributor::~CCSDSDistributor() {}
TcDistributor::TcMessageQueueMapIter CCSDSDistributor::selectDestination() {
TcDistributor::TcMqMapIter CCSDSDistributor::selectDestination() {
// sif::debug << "CCSDSDistributor::selectDestination received: " <<
// this->currentMessage.getStorageId().pool_index << ", " <<
// this->currentMessage.getStorageId().packet_index << std::endl;
@ -25,7 +25,7 @@ TcDistributor::TcMessageQueueMapIter CCSDSDistributor::selectDestination() {
// sif:: info << "CCSDSDistributor::selectDestination has packet with APID "
// << std::hex << currentPacket.getAPID() << std::dec << std::endl;
TcMessageQueueMapIter position = this->queueMap.find(currentPacket.getAPID());
TcMqMapIter position = this->queueMap.find(currentPacket.getAPID());
if ( position != this->queueMap.end() ) {
return position;
} else {

View File

@ -24,14 +24,14 @@ public:
* TcDistributor ctor with a certain object id.
* @details
* @c tcStore is set in the @c initialize method.
* @param set_default_apid The default APID, where packets with unknown
* @param setDefaultApid The default APID, where packets with unknown
* destination are sent to.
*/
CCSDSDistributor( uint16_t setDefaultApid, object_id_t setObjectId );
CCSDSDistributor(uint16_t setDefaultApid, object_id_t setObjectId);
/**
* The destructor is empty.
*/
~CCSDSDistributor();
virtual ~CCSDSDistributor();
MessageQueueId_t getRequestQueue() override;
ReturnValue_t registerApplication( uint16_t apid,
@ -49,7 +49,7 @@ protected:
* where a Acceptance Failure message should be generated.
* @return Iterator to map entry of found APID or iterator to default APID.
*/
TcMessageQueueMapIter selectDestination();
TcMqMapIter selectDestination() override;
/**
* The default APID, where packets with unknown APID are sent to.
*/

View File

@ -12,9 +12,9 @@ PUSDistributor::PUSDistributor(uint16_t setApid, object_id_t setObjectId,
PUSDistributor::~PUSDistributor() {}
TcDistributor::TcMessageQueueMapIter PUSDistributor::selectDestination() {
TcDistributor::TcMqMapIter PUSDistributor::selectDestination() {
// debug << "PUSDistributor::handlePacket received: " << this->current_packet_id.store_index << ", " << this->current_packet_id.packet_index << std::endl;
TcMessageQueueMapIter queueMapIt = this->queueMap.end();
TcMqMapIter queueMapIt = this->queueMap.end();
this->currentPacket.setStoreAddress(this->currentMessage.getStorageId());
if (currentPacket.getWholeData() != NULL) {
tcStatus = checker.checkPacket(&currentPacket);

View File

@ -66,7 +66,7 @@ protected:
* @return Iterator to map entry of found service id
* or iterator to @c map.end().
*/
TcMessageQueueMapIter selectDestination();
TcMqMapIter selectDestination() override;
/**
* The callback here handles the generation of acceptance
* success/failure messages.

View File

@ -28,7 +28,7 @@ ReturnValue_t TcDistributor::performOperation(uint8_t opCode) {
ReturnValue_t TcDistributor::handlePacket() {
TcMessageQueueMapIter queueMapIt = this->selectDestination();
TcMqMapIter queueMapIt = this->selectDestination();
ReturnValue_t returnValue = RETURN_FAILED;
if (queueMapIt != this->queueMap.end()) {
returnValue = this->tcQueue->sendMessage(queueMapIt->second,
@ -40,7 +40,7 @@ ReturnValue_t TcDistributor::handlePacket() {
void TcDistributor::print() {
sif::debug << "Distributor content is: " << std::endl << "ID\t| message queue id"
<< std::endl;
for (TcMessageQueueMapIter it = this->queueMap.begin(); it != this->queueMap.end();
for (TcMqMapIter it = this->queueMap.begin(); it != this->queueMap.end();
it++) {
sif::debug << it->first << "\t| 0x" << std::hex << it->second << std::dec
<< std::endl;

View File

@ -32,7 +32,7 @@ class TcDistributor : public SystemObject,
public HasReturnvaluesIF {
public:
using TcMessageQueueMap = std::map<uint32_t, MessageQueueId_t>;
using TcMessageQueueMapIter = std::map<uint32_t, MessageQueueId_t>::iterator;
using TcMqMapIter = std::map<uint32_t, MessageQueueId_t>::iterator;
static const uint8_t INTERFACE_ID = CLASS_ID::PACKET_DISTRIBUTION;
static const ReturnValue_t PACKET_LOST = MAKE_RETURN_CODE( 1 );
@ -90,7 +90,7 @@ protected:
* packet and select the map entry which represents the packet's target.
* @return An iterator to the map element to forward to or queuMap.end().
*/
virtual TcMessageQueueMapIter selectDestination() = 0;
virtual TcMqMapIter selectDestination() = 0;
/**
* The handlePacket method calls the child class's selectDestination method
* and forwards the packet to its destination, if found.

View File

@ -1,5 +1,5 @@
#ifndef ACCEPTSTELECOMMANDSIF_H_
#define ACCEPTSTELECOMMANDSIF_H_
#ifndef FRAMEWORK_TMTCSERVICES_ACCEPTSTELECOMMANDSIF_H_
#define FRAMEWORK_TMTCSERVICES_ACCEPTSTELECOMMANDSIF_H_
#include <framework/ipc/MessageQueueSenderIF.h>
@ -26,9 +26,9 @@ public:
/**
* @brief Getter for the service id.
* @details Any receiving service (at least any PUS service) shall have a
* service id. If the receiver can handle Telecommands, but for
* service ID. If the receiver can handle Telecommands, but for
* some reason has no service id, it shall return 0.
* @return The service id or 0.
* @return The service ID or 0.
*/
virtual uint16_t getIdentifier() = 0;
/**
@ -40,4 +40,4 @@ public:
};
#endif /* ACCEPTSTELECOMMANDSIF_H_ */
#endif /* FRAMEWORK_TMTCSERVICES_ACCEPTSTELECOMMANDSIF_H_ */

View File

@ -9,7 +9,7 @@ TmTcBridge::TmTcBridge(object_id_t objectId,
object_id_t ccsdsPacketDistributor): SystemObject(objectId),
ccsdsPacketDistributor(ccsdsPacketDistributor)
{
TmTcReceptionQueue = QueueFactory::instance()->
tmTcReceptionQueue = QueueFactory::instance()->
createMessageQueue(TMTC_RECEPTION_QUEUE_DEPTH);
}
@ -55,7 +55,8 @@ ReturnValue_t TmTcBridge::initialize() {
if (tcDistributor == NULL) {
return RETURN_FAILED;
}
TmTcReceptionQueue->setDefaultDestination(tcDistributor->getRequestQueue());
tmTcReceptionQueue->setDefaultDestination(tcDistributor->getRequestQueue());
return RETURN_OK;
}
@ -73,10 +74,7 @@ ReturnValue_t TmTcBridge::performOperation(uint8_t operationCode) {
}
ReturnValue_t TmTcBridge::handleTc() {
uint8_t * recvBuffer = nullptr;
size_t recvLen = 0;
ReturnValue_t result = receiveTc(&recvBuffer, &recvLen);
return result;
return HasReturnvaluesIF::RETURN_OK;
}
ReturnValue_t TmTcBridge::handleTm() {
@ -97,8 +95,8 @@ ReturnValue_t TmTcBridge::handleTmQueue() {
TmTcMessage message;
const uint8_t* data = nullptr;
size_t size = 0;
for (ReturnValue_t result = TmTcReceptionQueue->receiveMessage(&message);
result == RETURN_OK; result = TmTcReceptionQueue->receiveMessage(&message))
for (ReturnValue_t result = tmTcReceptionQueue->receiveMessage(&message);
result == RETURN_OK; result = tmTcReceptionQueue->receiveMessage(&message))
{
if(communicationLinkUp == false) {
result = storeDownlinkData(&message);
@ -183,10 +181,20 @@ void TmTcBridge::registerCommDisconnect() {
}
MessageQueueId_t TmTcBridge::getReportReceptionQueue(uint8_t virtualChannel) {
return TmTcReceptionQueue->getId();
return tmTcReceptionQueue->getId();
}
void TmTcBridge::printData(uint8_t * data, size_t dataLen) {
arrayprinter::print(data, dataLen);
}
uint16_t TmTcBridge::getIdentifier() {
// This is no PUS service, so we just return 0
return 0;
}
MessageQueueId_t TmTcBridge::getRequestQueue() {
// Default implementation: Relay TC messages to TC distributor directly.
return tmTcReceptionQueue->getDefaultDestination();
}

View File

@ -1,16 +1,18 @@
#ifndef FRAMEWORK_TMTCSERVICES_TMTCBRIDGE_H_
#define FRAMEWORK_TMTCSERVICES_TMTCBRIDGE_H_
#include <framework/container/FIFO.h>
#include <framework/objectmanager/SystemObject.h>
#include <framework/tmtcservices/AcceptsTelemetryIF.h>
#include <framework/tasks/ExecutableObjectIF.h>
#include <framework/ipc/MessageQueueIF.h>
#include <framework/storagemanager/StorageManagerIF.h>
#include <framework/objectmanager/SystemObject.h>
#include <framework/tmtcservices/AcceptsTelecommandsIF.h>
#include <framework/container/FIFO.h>
#include <framework/tmtcservices/TmTcMessage.h>
class TmTcBridge : public AcceptsTelemetryIF,
public AcceptsTelecommandsIF,
public ExecutableObjectIF,
public HasReturnvaluesIF,
public SystemObject {
@ -57,17 +59,19 @@ public:
*/
virtual ReturnValue_t performOperation(uint8_t operationCode = 0) override;
/**
* Return TMTC Reception Queue
* @param virtualChannel
* @return
*/
/** AcceptsTelemetryIF override */
virtual MessageQueueId_t getReportReceptionQueue(
uint8_t virtualChannel = 0) override;
/** AcceptsTelecommandsIF override */
virtual uint16_t getIdentifier() override;
virtual MessageQueueId_t getRequestQueue() override;
protected:
//! Used to send and receive TMTC messages.
//! TmTcMessage is used to transport messages between tasks.
MessageQueueIF* TmTcReceptionQueue = nullptr;
MessageQueueIF* tmTcReceptionQueue = nullptr;
StorageManagerIF* tcStore = nullptr;
StorageManagerIF* tmStore = nullptr;
object_id_t ccsdsPacketDistributor = 0;
@ -79,23 +83,11 @@ protected:
* @brief Handle TC reception
* @details
* Default implementation provided, but is empty.
* Child handler should override this in most cases orsend TC to the
* TC distributor directly with the address of the reception queue by
* calling getReportRecptionQueue()
* In most cases, TC reception will be handled in a separate task anyway.
* @return
*/
virtual ReturnValue_t handleTc();
/**
* Implemented by child class. Perform receiving of Telecommand,
* for example by implementing specific drivers or wrappers,
* e.g. UART Communication or an ethernet stack
* @param recvBuffer [out] Received data
* @param size [out] Size of received data
* @return
*/
virtual ReturnValue_t receiveTc(uint8_t ** recvBuffer, size_t * size) = 0;
/**
* Handle Telemetry. Default implementation provided.
* Calls sendTm()