tcp keep open implementation done
This commit is contained in:
parent
68fe94d594
commit
d6b3167922
@ -5,8 +5,7 @@
|
|||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Circular buffer implementation, useful for buffering
|
* @brief Circular buffer implementation, useful for buffering into data streams.
|
||||||
* into data streams.
|
|
||||||
* @details
|
* @details
|
||||||
* Note that the deleteData() has to be called to increment the read pointer.
|
* Note that the deleteData() has to be called to increment the read pointer.
|
||||||
* This class allocated dynamically, so
|
* This class allocated dynamically, so
|
||||||
@ -20,8 +19,8 @@ public:
|
|||||||
* @param size
|
* @param size
|
||||||
* @param overwriteOld If the ring buffer is overflowing at a write
|
* @param overwriteOld If the ring buffer is overflowing at a write
|
||||||
* operation, the oldest data will be overwritten.
|
* operation, the oldest data will be overwritten.
|
||||||
* @param maxExcessBytes These additional bytes will be allocated in addtion
|
* @param maxExcessBytes These additional bytes will be allocated in addition
|
||||||
* to the specified size to accomodate contiguous write operations
|
* to the specified size to accommodate continuous write operations
|
||||||
* with getFreeElement.
|
* with getFreeElement.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@ -32,10 +31,10 @@ public:
|
|||||||
* @param buffer
|
* @param buffer
|
||||||
* @param size
|
* @param size
|
||||||
* @param overwriteOld
|
* @param overwriteOld
|
||||||
* If the ring buffer is overflowing at a write operartion, the oldest data
|
* If the ring buffer is overflowing at a write operation, the oldest data
|
||||||
* will be overwritten.
|
* will be overwritten.
|
||||||
* @param maxExcessBytes
|
* @param maxExcessBytes
|
||||||
* If the buffer can accomodate additional bytes for contigous write
|
* If the buffer can accommodate additional bytes for contiguous write
|
||||||
* operations with getFreeElement, this is the maximum allowed additional
|
* operations with getFreeElement, this is the maximum allowed additional
|
||||||
* size
|
* size
|
||||||
*/
|
*/
|
||||||
@ -48,7 +47,7 @@ public:
|
|||||||
* Write to circular buffer and increment write pointer by amount.
|
* Write to circular buffer and increment write pointer by amount.
|
||||||
* @param data
|
* @param data
|
||||||
* @param amount
|
* @param amount
|
||||||
* @return -@c RETURN_OK if write operation was successfull
|
* @return -@c RETURN_OK if write operation was successful
|
||||||
* -@c RETURN_FAILED if
|
* -@c RETURN_FAILED if
|
||||||
*/
|
*/
|
||||||
ReturnValue_t writeData(const uint8_t* data, size_t amount);
|
ReturnValue_t writeData(const uint8_t* data, size_t amount);
|
||||||
@ -108,7 +107,7 @@ public:
|
|||||||
* Delete data by incrementing read pointer.
|
* Delete data by incrementing read pointer.
|
||||||
* @param amount
|
* @param amount
|
||||||
* @param deleteRemaining
|
* @param deleteRemaining
|
||||||
* If the amount specified is larger than the remaing size to read and this
|
* If the amount specified is larger than the remaining size to read and this
|
||||||
* is set to true, the remaining amount will be deleted as well
|
* is set to true, the remaining amount will be deleted as well
|
||||||
* @param trueAmount [out]
|
* @param trueAmount [out]
|
||||||
* If deleteRemaining was set to true, the amount deleted will be assigned
|
* If deleteRemaining was set to true, the amount deleted will be assigned
|
||||||
|
@ -29,12 +29,11 @@
|
|||||||
const std::string TcpTmTcServer::DEFAULT_SERVER_PORT = tcpip::DEFAULT_SERVER_PORT;
|
const std::string TcpTmTcServer::DEFAULT_SERVER_PORT = tcpip::DEFAULT_SERVER_PORT;
|
||||||
|
|
||||||
TcpTmTcServer::TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge,
|
TcpTmTcServer::TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge,
|
||||||
size_t receptionBufferSize, std::string customTcpServerPort):
|
size_t receptionBufferSize, size_t ringBufferSize, std::string customTcpServerPort,
|
||||||
SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge),
|
ReceptionModes receptionMode):
|
||||||
tcpPort(customTcpServerPort), receptionBuffer(receptionBufferSize) {
|
SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), receptionMode(receptionMode),
|
||||||
if(tcpPort == "") {
|
tcpConfig(customTcpServerPort), receptionBuffer(receptionBufferSize),
|
||||||
tcpPort = DEFAULT_SERVER_PORT;
|
ringBuffer(ringBufferSize, true) {
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t TcpTmTcServer::initialize() {
|
ReturnValue_t TcpTmTcServer::initialize() {
|
||||||
@ -45,6 +44,16 @@ ReturnValue_t TcpTmTcServer::initialize() {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch(receptionMode) {
|
||||||
|
case(ReceptionModes::SPACE_PACKETS): {
|
||||||
|
// For now, hardcode a maximum of 5 store packets here and no split packets are allowed
|
||||||
|
spacePacketParser = new SpacePacketParser(5, false);
|
||||||
|
if(spacePacketParser == nullptr) {
|
||||||
|
return HasReturnvaluesIF::RETURN_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tcStore = ObjectManager::instance()->get<StorageManagerIF>(objects::TC_STORE);
|
tcStore = ObjectManager::instance()->get<StorageManagerIF>(objects::TC_STORE);
|
||||||
if (tcStore == nullptr) {
|
if (tcStore == nullptr) {
|
||||||
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
||||||
@ -67,7 +76,7 @@ ReturnValue_t TcpTmTcServer::initialize() {
|
|||||||
hints.ai_flags = AI_PASSIVE;
|
hints.ai_flags = AI_PASSIVE;
|
||||||
|
|
||||||
// Listen to all addresses (0.0.0.0) by using AI_PASSIVE in the hint flags
|
// Listen to all addresses (0.0.0.0) by using AI_PASSIVE in the hint flags
|
||||||
retval = getaddrinfo(nullptr, tcpPort.c_str(), &hints, &addrResult);
|
retval = getaddrinfo(nullptr, tcpConfig.tcpPort.c_str(), &hints, &addrResult);
|
||||||
if (retval != 0) {
|
if (retval != 0) {
|
||||||
handleError(Protocol::TCP, ErrorSources::GETADDRINFO_CALL);
|
handleError(Protocol::TCP, ErrorSources::GETADDRINFO_CALL);
|
||||||
return HasReturnvaluesIF::RETURN_FAILED;
|
return HasReturnvaluesIF::RETURN_FAILED;
|
||||||
@ -109,7 +118,7 @@ ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) {
|
|||||||
|
|
||||||
// Listen for connection requests permanently for lifetime of program
|
// Listen for connection requests permanently for lifetime of program
|
||||||
while(true) {
|
while(true) {
|
||||||
retval = listen(listenerTcpSocket, tcpBacklog);
|
retval = listen(listenerTcpSocket, tcpConfig.tcpBacklog);
|
||||||
if(retval == SOCKET_ERROR) {
|
if(retval == SOCKET_ERROR) {
|
||||||
handleError(Protocol::TCP, ErrorSources::LISTEN_CALL, 500);
|
handleError(Protocol::TCP, ErrorSources::LISTEN_CALL, 500);
|
||||||
continue;
|
continue;
|
||||||
@ -149,7 +158,7 @@ ReturnValue_t TcpTmTcServer::initializeAfterTaskCreation() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void TcpTmTcServer::handleServerOperation(socket_t connSocket) {
|
void TcpTmTcServer::handleServerOperation(socket_t connSocket) {
|
||||||
int retval = 0;
|
//int retval = 0;
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
// Receive until the peer shuts down the connection, use select to do this
|
// Receive until the peer shuts down the connection, use select to do this
|
||||||
@ -163,8 +172,8 @@ void TcpTmTcServer::handleServerOperation(socket_t connSocket) {
|
|||||||
FD_SET(connSocket, &efds);
|
FD_SET(connSocket, &efds);
|
||||||
|
|
||||||
timeval tv;
|
timeval tv;
|
||||||
tv.tv_sec = selectTimeoutMs / 1000;
|
tv.tv_sec = 0;//tcpConfig.selectTimeoutMs / 1000;
|
||||||
tv.tv_usec = (selectTimeoutMs % 1000) * 1000;
|
tv.tv_usec = 0;//(tcpConfig.selectTimeoutMs % 1000) * 1000;
|
||||||
|
|
||||||
int nfds = connSocket + 1;
|
int nfds = connSocket + 1;
|
||||||
|
|
||||||
@ -189,7 +198,6 @@ void TcpTmTcServer::handleServerOperation(socket_t connSocket) {
|
|||||||
// }
|
// }
|
||||||
// } while(retval > 0);
|
// } while(retval > 0);
|
||||||
while (true) {
|
while (true) {
|
||||||
uint32_t index = 0;
|
|
||||||
int retval = select(nfds, &rfds, nullptr, &efds, &tv);
|
int retval = select(nfds, &rfds, nullptr, &efds, &tv);
|
||||||
if(retval < 0) {
|
if(retval < 0) {
|
||||||
// client might have shut down connection
|
// client might have shut down connection
|
||||||
@ -202,40 +210,57 @@ void TcpTmTcServer::handleServerOperation(socket_t connSocket) {
|
|||||||
connSocket,
|
connSocket,
|
||||||
reinterpret_cast<char*>(receptionBuffer.data()),
|
reinterpret_cast<char*>(receptionBuffer.data()),
|
||||||
receptionBuffer.capacity(),
|
receptionBuffer.capacity(),
|
||||||
tcpFlags
|
tcpConfig.tcpFlags
|
||||||
);
|
);
|
||||||
handleTcReception(retval);
|
ringBuffer.writeData(receptionBuffer.data(), retval);
|
||||||
//int result = receiveData();
|
|
||||||
//if(result == 0) {
|
|
||||||
// break;
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
if(FD_ISSET(connSocket, &efds)) {
|
if(FD_ISSET(connSocket, &efds)) {
|
||||||
//spdlog::error("{}: Exception detected on receive FD", tcpip::SERVER_PR);
|
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
||||||
|
sif::warning << "TcpTmTcServer::handleServerOperation: "
|
||||||
|
"Exception detected" << std::endl;
|
||||||
|
#else
|
||||||
|
sif::printWarning("TcpTmTcServer::handleServerOperation: Exception detected\n");
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// no data available. Send back telemetry now
|
// no data available. Check whether any packets have been read, then send back
|
||||||
handleTmSending(connSocket);
|
// telemetry now
|
||||||
//TaskFactory::delayTask(500);
|
bool tcAvailable = false;
|
||||||
|
bool tmSent = false;
|
||||||
|
size_t availableReadData = ringBuffer.getAvailableReadData();
|
||||||
|
if(availableReadData > lastRingBufferSize) {
|
||||||
|
tcAvailable = true;
|
||||||
|
handleRingBufferData(availableReadData);
|
||||||
|
}
|
||||||
|
ReturnValue_t result = handleTmSending(connSocket, tmSent);
|
||||||
|
if(result == CONN_BROKEN) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if(not tcAvailable and not tmSent) {
|
||||||
|
TaskFactory::delayTask(DEFAULT_LOOP_DELAY_MS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t TcpTmTcServer::handleTcReception(size_t bytesRecvd) {
|
ReturnValue_t TcpTmTcServer::handleTcReception(uint8_t* spacePacket, size_t packetSize) {
|
||||||
#if FSFW_TCP_RECV_WIRETAPPING_ENABLED == 1
|
#if FSFW_TCP_RECV_WIRETAPPING_ENABLED == 1
|
||||||
arrayprinter::print(receptionBuffer.data(), bytesRead);
|
arrayprinter::print(receptionBuffer.data(), bytesRead);
|
||||||
#endif
|
#endif
|
||||||
|
if(spacePacket == nullptr or packetSize == 0) {
|
||||||
|
return HasReturnvaluesIF::RETURN_FAILED;
|
||||||
|
}
|
||||||
store_address_t storeId;
|
store_address_t storeId;
|
||||||
ReturnValue_t result = tcStore->addData(&storeId, receptionBuffer.data(), bytesRecvd);
|
ReturnValue_t result = tcStore->addData(&storeId, spacePacket, packetSize);
|
||||||
if (result != HasReturnvaluesIF::RETURN_OK) {
|
if (result != HasReturnvaluesIF::RETURN_OK) {
|
||||||
#if FSFW_VERBOSE_LEVEL >= 1
|
#if FSFW_VERBOSE_LEVEL >= 1
|
||||||
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
||||||
sif::warning << "TcpTmTcServer::handleServerOperation: Data storage with packet size" <<
|
sif::warning << "TcpTmTcServer::handleServerOperation: Data storage with packet size" <<
|
||||||
bytesRecvd << " failed" << std::endl;
|
packetSize << " failed" << std::endl;
|
||||||
#else
|
#else
|
||||||
sif::printWarning("TcpTmTcServer::handleServerOperation: Data storage with packet size %d "
|
sif::printWarning("TcpTmTcServer::handleServerOperation: Data storage with packet size %d "
|
||||||
"failed\n", bytesRecvd);
|
"failed\n", packetSize);
|
||||||
#endif /* FSFW_CPP_OSTREAM_ENABLED == 1 */
|
#endif /* FSFW_CPP_OSTREAM_ENABLED == 1 */
|
||||||
#endif /* FSFW_VERBOSE_LEVEL >= 1 */
|
#endif /* FSFW_VERBOSE_LEVEL >= 1 */
|
||||||
return result;
|
return result;
|
||||||
@ -259,21 +284,25 @@ ReturnValue_t TcpTmTcServer::handleTcReception(size_t bytesRecvd) {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpTmTcServer::setTcpBacklog(uint8_t tcpBacklog) {
|
|
||||||
this->tcpBacklog = tcpBacklog;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string TcpTmTcServer::getTcpPort() const {
|
std::string TcpTmTcServer::getTcpPort() const {
|
||||||
return tcpPort;
|
return tcpConfig.tcpPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t TcpTmTcServer::handleTmSending(socket_t connSocket) {
|
void TcpTmTcServer::setSpacePacketParsingOptions(uint8_t parserFifoSize) {
|
||||||
|
}
|
||||||
|
|
||||||
|
TcpTmTcServer::TcpConfig& TcpTmTcServer::getTcpConfigStruct() {
|
||||||
|
return tcpConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
ReturnValue_t TcpTmTcServer::handleTmSending(socket_t connSocket, bool& tmSent) {
|
||||||
// Access to the FIFO is mutex protected because it is filled by the bridge
|
// Access to the FIFO is mutex protected because it is filled by the bridge
|
||||||
MutexGuard(tmtcBridge->mutex, tmtcBridge->timeoutType, tmtcBridge->mutexTimeoutMs);
|
MutexGuard(tmtcBridge->mutex, tmtcBridge->timeoutType, tmtcBridge->mutexTimeoutMs);
|
||||||
store_address_t storeId;
|
store_address_t storeId;
|
||||||
while((not tmtcBridge->tmFifo->empty()) and
|
while((not tmtcBridge->tmFifo->empty()) and
|
||||||
(tmtcBridge->packetSentCounter < tmtcBridge->sentPacketsPerCycle)) {
|
(tmtcBridge->packetSentCounter < tmtcBridge->sentPacketsPerCycle)) {
|
||||||
tmtcBridge->tmFifo->retrieve(&storeId);
|
// Send can fail, so only peek from the FIFO
|
||||||
|
tmtcBridge->tmFifo->peek(&storeId);
|
||||||
|
|
||||||
// Using the store accessor will take care of deleting TM from the store automatically
|
// Using the store accessor will take care of deleting TM from the store automatically
|
||||||
ConstStorageAccessor storeAccessor(storeId);
|
ConstStorageAccessor storeAccessor(storeId);
|
||||||
@ -284,10 +313,81 @@ ReturnValue_t TcpTmTcServer::handleTmSending(socket_t connSocket) {
|
|||||||
int retval = send(connSocket,
|
int retval = send(connSocket,
|
||||||
reinterpret_cast<const char*>(storeAccessor.data()),
|
reinterpret_cast<const char*>(storeAccessor.data()),
|
||||||
storeAccessor.size(),
|
storeAccessor.size(),
|
||||||
tcpTmFlags);
|
tcpConfig.tcpTmFlags);
|
||||||
if(retval != static_cast<int>(storeAccessor.size())) {
|
if(retval != static_cast<int>(storeAccessor.size())) {
|
||||||
tcpip::handleError(tcpip::Protocol::TCP, tcpip::ErrorSources::SEND_CALL);
|
// Assume that the client has closed the connection here for now
|
||||||
|
handleSocketError(storeAccessor);
|
||||||
|
return CONN_BROKEN;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Packet sent, clear FIFO entry
|
||||||
|
tmtcBridge->tmFifo->pop();
|
||||||
|
tmSent = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return HasReturnvaluesIF::RETURN_OK;
|
return HasReturnvaluesIF::RETURN_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ReturnValue_t TcpTmTcServer::handleRingBufferData(size_t availableReadData) {
|
||||||
|
ReturnValue_t status = HasReturnvaluesIF::RETURN_OK;
|
||||||
|
ReturnValue_t result = HasReturnvaluesIF::RETURN_OK;
|
||||||
|
lastRingBufferSize = availableReadData;
|
||||||
|
if(availableReadData == ringBuffer.getMaxSize()) {
|
||||||
|
#if FSFW_VERBOSE_LEVEL >= 1
|
||||||
|
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
||||||
|
// Possible configuration error, too much data or/and data coming in too fast,
|
||||||
|
// requiring larger buffers
|
||||||
|
sif::warning << "TcpTmTcServer::handleServerOperation: Ring buffer reached " <<
|
||||||
|
"fill count" << std::endl;
|
||||||
|
#else
|
||||||
|
sif::printWarning("TcpTmTcServer::handleServerOperation: Ring buffer reached "
|
||||||
|
"fill count");
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
ringBuffer.readData(receptionBuffer.data(), availableReadData, true);
|
||||||
|
result = spacePacketParser->parsePusPackets(receptionBuffer.data(),
|
||||||
|
receptionBuffer.size());
|
||||||
|
if(result == SpacePacketParser::NO_PACKET_FOUND) {
|
||||||
|
ringBuffer.deleteData(availableReadData);
|
||||||
|
lastRingBufferSize = ringBuffer.getAvailableReadData();
|
||||||
|
}
|
||||||
|
else if(result == HasReturnvaluesIF::RETURN_OK) {
|
||||||
|
// Space Packets were found. Handle them here
|
||||||
|
auto fifo = spacePacketParser->fifo();
|
||||||
|
SpacePacketParser::IndexSizePair idxSizePair;
|
||||||
|
while(not fifo.empty()) {
|
||||||
|
fifo.retrieve(&idxSizePair);
|
||||||
|
result = handleTcReception(receptionBuffer.data() + idxSizePair.first,
|
||||||
|
idxSizePair.second);
|
||||||
|
if(result != HasReturnvaluesIF::RETURN_OK) {
|
||||||
|
status = result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
void TcpTmTcServer::handleSocketError(ConstStorageAccessor &accessor) {
|
||||||
|
// Don't delete data
|
||||||
|
accessor.release();
|
||||||
|
auto socketError = getLastSocketError();
|
||||||
|
switch(socketError) {
|
||||||
|
#if defined PLATFORM_WIN
|
||||||
|
case(WSAECONNRESET): {
|
||||||
|
// See https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-send
|
||||||
|
// Remote client might have shut down connection
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
case(EPIPE): {
|
||||||
|
// See https://man7.org/linux/man-pages/man2/send.2.html
|
||||||
|
// Remote client might have shut down connection
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
default: {
|
||||||
|
tcpip::handleError(tcpip::Protocol::TCP, tcpip::ErrorSources::SEND_CALL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,11 +1,13 @@
|
|||||||
#ifndef FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_
|
#ifndef FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_
|
||||||
#define FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_
|
#define FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_
|
||||||
|
|
||||||
|
#include <fsfw/tmtcservices/SpacePacketParser.h>
|
||||||
#include "TcpIpBase.h"
|
#include "TcpIpBase.h"
|
||||||
|
|
||||||
#include "fsfw/platform.h"
|
#include "fsfw/platform.h"
|
||||||
#include "fsfw/osal/common/tcpipHelpers.h"
|
#include "fsfw/osal/common/tcpipHelpers.h"
|
||||||
#include "fsfw/ipc/messageQueueDefinitions.h"
|
#include "fsfw/ipc/messageQueueDefinitions.h"
|
||||||
|
#include "fsfw/container/SimpleRingBuffer.h"
|
||||||
#include "fsfw/ipc/MessageQueueIF.h"
|
#include "fsfw/ipc/MessageQueueIF.h"
|
||||||
#include "fsfw/objectmanager/frameworkObjects.h"
|
#include "fsfw/objectmanager/frameworkObjects.h"
|
||||||
#include "fsfw/objectmanager/SystemObject.h"
|
#include "fsfw/objectmanager/SystemObject.h"
|
||||||
@ -42,10 +44,37 @@ class TcpTmTcServer:
|
|||||||
public TcpIpBase,
|
public TcpIpBase,
|
||||||
public ExecutableObjectIF {
|
public ExecutableObjectIF {
|
||||||
public:
|
public:
|
||||||
|
enum class ReceptionModes {
|
||||||
|
SPACE_PACKETS
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TcpConfig {
|
||||||
|
public:
|
||||||
|
TcpConfig(std::string tcpPort): tcpPort(tcpPort) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Passed to the recv call
|
||||||
|
*/
|
||||||
|
int tcpFlags = 0;
|
||||||
|
int tcpBacklog = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Passed to the select call which is used to ensure non-blocking TC reception
|
||||||
|
*/
|
||||||
|
//uint32_t selectTimeoutMs = DEFAULT_SELECT_TIMEOUT_MS;
|
||||||
|
/**
|
||||||
|
* Passed to the send call
|
||||||
|
*/
|
||||||
|
int tcpTmFlags = 0;
|
||||||
|
|
||||||
|
const std::string tcpPort;
|
||||||
|
};
|
||||||
|
|
||||||
static const std::string DEFAULT_SERVER_PORT;
|
static const std::string DEFAULT_SERVER_PORT;
|
||||||
|
|
||||||
static constexpr size_t ETHERNET_MTU_SIZE = 1500;
|
static constexpr size_t ETHERNET_MTU_SIZE = 1500;
|
||||||
static constexpr uint32_t DEFAULT_SELECT_TIMEOUT_MS = 200;
|
static constexpr size_t RING_BUFFER_SIZE = ETHERNET_MTU_SIZE * 3;
|
||||||
|
static constexpr uint32_t DEFAULT_LOOP_DELAY_MS = 200;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TCP Server Constructor
|
* TCP Server Constructor
|
||||||
@ -56,12 +85,19 @@ public:
|
|||||||
* @param customTcpServerPort The user can specify another port than the default (7301) here.
|
* @param customTcpServerPort The user can specify another port than the default (7301) here.
|
||||||
*/
|
*/
|
||||||
TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge,
|
TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge,
|
||||||
size_t receptionBufferSize = ETHERNET_MTU_SIZE + 1,
|
size_t receptionBufferSize = RING_BUFFER_SIZE,
|
||||||
std::string customTcpServerPort = "");
|
size_t ringBufferSize = RING_BUFFER_SIZE,
|
||||||
|
std::string customTcpServerPort = DEFAULT_SERVER_PORT,
|
||||||
|
ReceptionModes receptionMode = ReceptionModes::SPACE_PACKETS);
|
||||||
virtual~ TcpTmTcServer();
|
virtual~ TcpTmTcServer();
|
||||||
|
|
||||||
void setSelectTimeout(uint32_t timeout);
|
/**
|
||||||
void setTcpBacklog(uint8_t tcpBacklog);
|
* Get a handle to the TCP configuration struct, which can be used to configure TCP
|
||||||
|
* properties
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
TcpConfig& getTcpConfigStruct();
|
||||||
|
void setSpacePacketParsingOptions(uint8_t parserFifoSize);
|
||||||
|
|
||||||
ReturnValue_t initialize() override;
|
ReturnValue_t initialize() override;
|
||||||
ReturnValue_t performOperation(uint8_t opCode) override;
|
ReturnValue_t performOperation(uint8_t opCode) override;
|
||||||
@ -73,26 +109,28 @@ protected:
|
|||||||
StorageManagerIF* tcStore = nullptr;
|
StorageManagerIF* tcStore = nullptr;
|
||||||
StorageManagerIF* tmStore = nullptr;
|
StorageManagerIF* tmStore = nullptr;
|
||||||
private:
|
private:
|
||||||
|
static constexpr ReturnValue_t CONN_BROKEN = HasReturnvaluesIF::makeReturnCode(1, 0);
|
||||||
//! TMTC bridge is cached.
|
//! TMTC bridge is cached.
|
||||||
object_id_t tmtcBridgeId = objects::NO_OBJECT;
|
object_id_t tmtcBridgeId = objects::NO_OBJECT;
|
||||||
TcpTmTcBridge* tmtcBridge = nullptr;
|
TcpTmTcBridge* tmtcBridge = nullptr;
|
||||||
|
|
||||||
std::string tcpPort;
|
ReceptionModes receptionMode;
|
||||||
int tcpFlags = 0;
|
TcpConfig tcpConfig;
|
||||||
uint32_t selectTimeoutMs = DEFAULT_SELECT_TIMEOUT_MS;
|
|
||||||
socket_t listenerTcpSocket = 0;
|
|
||||||
struct sockaddr tcpAddress;
|
struct sockaddr tcpAddress;
|
||||||
|
socket_t listenerTcpSocket = 0;
|
||||||
|
|
||||||
MessageQueueId_t targetTcDestination = MessageQueueIF::NO_QUEUE;
|
MessageQueueId_t targetTcDestination = MessageQueueIF::NO_QUEUE;
|
||||||
int tcpAddrLen = sizeof(tcpAddress);
|
|
||||||
int tcpBacklog = 3;
|
|
||||||
|
|
||||||
std::vector<uint8_t> receptionBuffer;
|
std::vector<uint8_t> receptionBuffer;
|
||||||
int tcpSockOpt = 0;
|
SimpleRingBuffer ringBuffer;
|
||||||
int tcpTmFlags = 0;
|
SpacePacketParser* spacePacketParser = nullptr;
|
||||||
|
uint8_t lastRingBufferSize = 0;
|
||||||
|
|
||||||
void handleServerOperation(socket_t connSocket);
|
void handleServerOperation(socket_t connSocket);
|
||||||
ReturnValue_t handleTcReception(size_t bytesRecvd);
|
ReturnValue_t handleTcReception(uint8_t* spacePacket, size_t packetSize);
|
||||||
ReturnValue_t handleTmSending(socket_t connSocket);
|
ReturnValue_t handleTmSending(socket_t connSocket, bool& tmSent);
|
||||||
|
ReturnValue_t handleRingBufferData(size_t availableReadData);
|
||||||
|
void handleSocketError(ConstStorageAccessor& accessor);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_ */
|
#endif /* FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_ */
|
||||||
|
@ -6,4 +6,5 @@ target_sources(${LIB_FSFW_NAME}
|
|||||||
TmTcBridge.cpp
|
TmTcBridge.cpp
|
||||||
TmTcMessage.cpp
|
TmTcMessage.cpp
|
||||||
VerificationReporter.cpp
|
VerificationReporter.cpp
|
||||||
|
SpacePacketParser.cpp
|
||||||
)
|
)
|
@ -1,11 +1,11 @@
|
|||||||
#include "PusParser.h"
|
|
||||||
#include <fsfw/serviceinterface/ServiceInterface.h>
|
#include <fsfw/serviceinterface/ServiceInterface.h>
|
||||||
|
#include <fsfw/tmtcservices/SpacePacketParser.h>
|
||||||
|
|
||||||
PusParser::PusParser(uint16_t maxExpectedPusPackets, bool storeSplitPackets):
|
SpacePacketParser::SpacePacketParser(uint16_t maxExpectedPusPackets, bool storeSplitPackets):
|
||||||
indexSizePairFIFO(maxExpectedPusPackets) {
|
indexSizePairFIFO(maxExpectedPusPackets) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t PusParser::parsePusPackets(const uint8_t *frame,
|
ReturnValue_t SpacePacketParser::parsePusPackets(const uint8_t *frame,
|
||||||
size_t frameSize) {
|
size_t frameSize) {
|
||||||
if(frame == nullptr or frameSize < 5) {
|
if(frame == nullptr or frameSize < 5) {
|
||||||
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
||||||
@ -36,7 +36,7 @@ ReturnValue_t PusParser::parsePusPackets(const uint8_t *frame,
|
|||||||
// Size of a pus packet is the value in the packet length field plus 7.
|
// Size of a pus packet is the value in the packet length field plus 7.
|
||||||
if(packetSize > frameSize) {
|
if(packetSize > frameSize) {
|
||||||
if(storeSplitPackets) {
|
if(storeSplitPackets) {
|
||||||
indexSizePairFIFO.insert(indexSizePair(0, frameSize));
|
indexSizePairFIFO.insert(IndexSizePair(0, frameSize));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
#if FSFW_VERBOSE_LEVEL >= 1
|
#if FSFW_VERBOSE_LEVEL >= 1
|
||||||
@ -56,7 +56,7 @@ ReturnValue_t PusParser::parsePusPackets(const uint8_t *frame,
|
|||||||
return SPLIT_PACKET;
|
return SPLIT_PACKET;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
indexSizePairFIFO.insert(indexSizePair(0, packetSize));
|
indexSizePairFIFO.insert(IndexSizePair(0, packetSize));
|
||||||
if(packetSize == frameSize) {
|
if(packetSize == frameSize) {
|
||||||
return HasReturnvaluesIF::RETURN_OK;
|
return HasReturnvaluesIF::RETURN_OK;
|
||||||
}
|
}
|
||||||
@ -66,7 +66,7 @@ ReturnValue_t PusParser::parsePusPackets(const uint8_t *frame,
|
|||||||
return readMultiplePackets(frame, frameSize, packetSize);
|
return readMultiplePackets(frame, frameSize, packetSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t PusParser::readMultiplePackets(const uint8_t *frame,
|
ReturnValue_t SpacePacketParser::readMultiplePackets(const uint8_t *frame,
|
||||||
size_t frameSize, size_t startIndex) {
|
size_t frameSize, size_t startIndex) {
|
||||||
while (startIndex < frameSize) {
|
while (startIndex < frameSize) {
|
||||||
ReturnValue_t result = readNextPacket(frame, frameSize, startIndex);
|
ReturnValue_t result = readNextPacket(frame, frameSize, startIndex);
|
||||||
@ -77,17 +77,17 @@ ReturnValue_t PusParser::readMultiplePackets(const uint8_t *frame,
|
|||||||
return HasReturnvaluesIF::RETURN_OK;
|
return HasReturnvaluesIF::RETURN_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
DynamicFIFO<PusParser::indexSizePair>* PusParser::fifo(){
|
DynamicFIFO<SpacePacketParser::IndexSizePair>& SpacePacketParser::fifo(){
|
||||||
return &indexSizePairFIFO;
|
return indexSizePairFIFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
PusParser::indexSizePair PusParser::getNextFifoPair() {
|
SpacePacketParser::IndexSizePair SpacePacketParser::getNextFifoPair() {
|
||||||
indexSizePair nextIndexSizePair;
|
IndexSizePair nextIndexSizePair;
|
||||||
indexSizePairFIFO.retrieve(&nextIndexSizePair);
|
indexSizePairFIFO.retrieve(&nextIndexSizePair);
|
||||||
return nextIndexSizePair;
|
return nextIndexSizePair;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t PusParser::readNextPacket(const uint8_t *frame,
|
ReturnValue_t SpacePacketParser::readNextPacket(const uint8_t *frame,
|
||||||
size_t frameSize, size_t& currentIndex) {
|
size_t frameSize, size_t& currentIndex) {
|
||||||
// sif::debug << startIndex << std::endl;
|
// sif::debug << startIndex << std::endl;
|
||||||
if(currentIndex + 5 > frameSize) {
|
if(currentIndex + 5 > frameSize) {
|
||||||
@ -107,7 +107,7 @@ ReturnValue_t PusParser::readNextPacket(const uint8_t *frame,
|
|||||||
if(nextPacketSize > remainingSize)
|
if(nextPacketSize > remainingSize)
|
||||||
{
|
{
|
||||||
if(storeSplitPackets) {
|
if(storeSplitPackets) {
|
||||||
indexSizePairFIFO.insert(indexSizePair(currentIndex, remainingSize));
|
indexSizePairFIFO.insert(IndexSizePair(currentIndex, remainingSize));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
#if FSFW_VERBOSE_LEVEL >= 1
|
#if FSFW_VERBOSE_LEVEL >= 1
|
||||||
@ -127,7 +127,7 @@ ReturnValue_t PusParser::readNextPacket(const uint8_t *frame,
|
|||||||
return SPLIT_PACKET;
|
return SPLIT_PACKET;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValue_t result = indexSizePairFIFO.insert(indexSizePair(currentIndex,
|
ReturnValue_t result = indexSizePairFIFO.insert(IndexSizePair(currentIndex,
|
||||||
nextPacketSize));
|
nextPacketSize));
|
||||||
if (result != HasReturnvaluesIF::RETURN_OK) {
|
if (result != HasReturnvaluesIF::RETURN_OK) {
|
||||||
// FIFO full.
|
// FIFO full.
|
@ -11,11 +11,11 @@
|
|||||||
* @brief This small helper class scans a given buffer for PUS packets.
|
* @brief This small helper class scans a given buffer for PUS packets.
|
||||||
* Can be used if PUS packets are serialized in a tightly packed frame.
|
* Can be used if PUS packets are serialized in a tightly packed frame.
|
||||||
* @details
|
* @details
|
||||||
* The parser uses the payload length field of PUS packets to find
|
* The parser uses the length field field of the space packets to find
|
||||||
* the respective PUS packet sizes.
|
* the respective space packet sizes.
|
||||||
*
|
*
|
||||||
* The parser parses a buffer by taking a pointer and the maximum size to scan.
|
* The parser parses a buffer by taking a pointer and the maximum size to scan.
|
||||||
* If PUS packets are found, they are stored in a FIFO which stores pairs
|
* If space packets are found, they are stored in a FIFO which stores pairs
|
||||||
* consisting of the index in the buffer and the respective packet sizes.
|
* consisting of the index in the buffer and the respective packet sizes.
|
||||||
*
|
*
|
||||||
* If the parser detects split packets (which means that the size of the
|
* If the parser detects split packets (which means that the size of the
|
||||||
@ -23,17 +23,18 @@
|
|||||||
* store that split packet or throw away the packet.
|
* store that split packet or throw away the packet.
|
||||||
* @author R. Mueller
|
* @author R. Mueller
|
||||||
*/
|
*/
|
||||||
class PusParser {
|
class SpacePacketParser {
|
||||||
public:
|
public:
|
||||||
//! The first entry is the index inside the buffer while the second index
|
//! The first entry is the index inside the buffer while the second index
|
||||||
//! is the size of the PUS packet starting at that index.
|
//! is the size of the PUS packet starting at that index.
|
||||||
using indexSizePair = std::pair<size_t, size_t>;
|
using IndexSizePair = std::pair<size_t, size_t>;
|
||||||
|
|
||||||
static constexpr uint8_t INTERFACE_ID = CLASS_ID::PUS_PARSER;
|
static constexpr uint8_t INTERFACE_ID = CLASS_ID::PUS_PARSER;
|
||||||
static constexpr ReturnValue_t NO_PACKET_FOUND = MAKE_RETURN_CODE(0x00);
|
static constexpr ReturnValue_t NO_PACKET_FOUND = MAKE_RETURN_CODE(0x00);
|
||||||
static constexpr ReturnValue_t SPLIT_PACKET = MAKE_RETURN_CODE(0x01);
|
static constexpr ReturnValue_t SPLIT_PACKET = MAKE_RETURN_CODE(0x01);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parser constructor.
|
* @brief Parser constructor.
|
||||||
* @param maxExpectedPusPackets
|
* @param maxExpectedPusPackets
|
||||||
* Maximum expected number of PUS packets. A good estimate is to divide
|
* Maximum expected number of PUS packets. A good estimate is to divide
|
||||||
* the frame size by the minimum size of a PUS packet (12 bytes)
|
* the frame size by the minimum size of a PUS packet (12 bytes)
|
||||||
@ -41,31 +42,34 @@ public:
|
|||||||
* Specifies whether split packets are also stored inside the FIFO,
|
* Specifies whether split packets are also stored inside the FIFO,
|
||||||
* with the size being the remaining frame size.
|
* with the size being the remaining frame size.
|
||||||
*/
|
*/
|
||||||
PusParser(uint16_t maxExpectedPusPackets, bool storeSplitPackets);
|
SpacePacketParser(uint16_t maxExpectedPusPackets, bool storeSplitPackets);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse a given frame for PUS packets
|
* Parse a given frame for PUS packets
|
||||||
* @param frame
|
* @param frame
|
||||||
* @param frameSize
|
* @param frameSize
|
||||||
* @return -@c NO_PACKET_FOUND if no packet was found.
|
* @return
|
||||||
|
* -@c NO_PACKET_FOUND if no packet was found
|
||||||
|
* -@c SPLIT_PACKET if splitting is enabled and a split packet was found
|
||||||
|
* -@c RETURN_OK if a packet was found. The index and sizes are stored in the internal FIFO
|
||||||
*/
|
*/
|
||||||
ReturnValue_t parsePusPackets(const uint8_t* frame, size_t frameSize);
|
ReturnValue_t parsePusPackets(const uint8_t* frame, size_t frameSize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Accessor function to get a reference to the internal FIFO which
|
* Accessor function to get a reference to the internal FIFO which
|
||||||
* stores pairs of indexi and packet sizes. This FIFO is filled
|
* stores pairs of index and packet sizes. This FIFO is filled
|
||||||
* by the parsePusPackets() function.
|
* by the #parsePusPackets function.
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
DynamicFIFO<indexSizePair>* fifo();
|
DynamicFIFO<IndexSizePair>& fifo();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the next index and packet size pair from the FIFO.
|
* Retrieve the next index and packet size pair from the FIFO.
|
||||||
* This also removed it from the FIFO. Please note that if the FIFO
|
* This also removes it from the FIFO. Please note that if the FIFO
|
||||||
* is empty, an empty pair will be returned.
|
* is empty, an empty pair will be returned.
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
indexSizePair getNextFifoPair();
|
IndexSizePair getNextFifoPair();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
@ -73,7 +77,7 @@ private:
|
|||||||
* inside the receive buffer. The maximum number of entries is defined
|
* inside the receive buffer. The maximum number of entries is defined
|
||||||
* by the first constructor argument.
|
* by the first constructor argument.
|
||||||
*/
|
*/
|
||||||
DynamicFIFO<indexSizePair> indexSizePairFIFO;
|
DynamicFIFO<IndexSizePair> indexSizePairFIFO;
|
||||||
|
|
||||||
bool storeSplitPackets = false;
|
bool storeSplitPackets = false;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user