fixing last bugs

This commit is contained in:
Robin Müller 2021-09-27 20:00:01 +02:00
parent 71036bf6b1
commit 9b7da4d9e6
No known key found for this signature in database
GPG Key ID: 11D4952C8CCEF814

View File

@ -31,9 +31,9 @@ const std::string TcpTmTcServer::DEFAULT_SERVER_PORT = tcpip::DEFAULT_SERVER_POR
TcpTmTcServer::TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge,
size_t receptionBufferSize, size_t ringBufferSize, std::string customTcpServerPort,
ReceptionModes receptionMode):
SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), receptionMode(receptionMode),
tcpConfig(customTcpServerPort), receptionBuffer(receptionBufferSize),
ringBuffer(ringBufferSize, true) {
SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), receptionMode(receptionMode),
tcpConfig(customTcpServerPort), receptionBuffer(receptionBufferSize),
ringBuffer(ringBufferSize, true) {
}
ReturnValue_t TcpTmTcServer::initialize() {
@ -127,6 +127,7 @@ ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) {
//connSocket = accept(listenerTcpSocket, &clientSockAddr, &connectorSockAddrLen);
connSocket = accept(listenerTcpSocket, nullptr, nullptr);
sif::debug << "accepted new conn socket " << connSocket << std::endl;
if(connSocket == INVALID_SOCKET) {
handleError(Protocol::TCP, ErrorSources::ACCEPT_CALL, 500);
closeSocket(connSocket);
@ -142,6 +143,7 @@ ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) {
handleError(Protocol::TCP, ErrorSources::SHUTDOWN_CALL);
}
closeSocket(connSocket);
connSocket = 0;
}
return HasReturnvaluesIF::RETURN_OK;
}
@ -160,24 +162,24 @@ ReturnValue_t TcpTmTcServer::initializeAfterTaskCreation() {
void TcpTmTcServer::handleServerOperation(socket_t& connSocket) {
//int retval = 0;
using namespace std::chrono_literals;
// Receive until the peer shuts down the connection, use select to do this
fd_set rfds;
fd_set efds;
FD_ZERO(&rfds);
FD_SET(connSocket, &rfds);
FD_ZERO(&efds);
FD_SET(connSocket, &efds);
timeval tv;
tv.tv_sec = 0;//tcpConfig.selectTimeoutMs / 1000;
tv.tv_usec = 0;//(tcpConfig.selectTimeoutMs % 1000) * 1000;
int nfds = connSocket + 1;
// using namespace std::chrono_literals;
//
// // Receive until the peer shuts down the connection, use select to do this
// fd_set rfds;
// fd_set efds;
//
// FD_ZERO(&rfds);
// FD_SET(connSocket, &rfds);
//
// FD_ZERO(&efds);
// FD_SET(connSocket, &efds);
//
// timeval tv = {};
// tv.tv_sec = 0;//tcpConfig.selectTimeoutMs / 1000;
// tv.tv_usec = 0;// DEFAULT_LOOP_DELAY_MS * 1000;//(tcpConfig.selectTimeoutMs % 1000) * 1000;
//
// int nfds = connSocket + 1;
//setsockopt(connSocket, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
// do {
// // Read all telecommands sent by the client
// retval = recv(
@ -198,64 +200,73 @@ void TcpTmTcServer::handleServerOperation(socket_t& connSocket) {
// tcpip::handleError(tcpip::Protocol::TCP, tcpip::ErrorSources::RECV_CALL);
// }
// } while(retval > 0);
// data available
// int test = recv(
// connSocket,
// reinterpret_cast<char*>(receptionBuffer.data()),
// receptionBuffer.capacity(),
// tcpConfig.tcpFlags
// );
// sif::debug << "Received " << test << " bytes" << std::endl;
while (true) {
sif::debug << "polling fd.." << std::endl;
int retval = select(nfds, &rfds, nullptr, &efds, &tv);
// data available
int test = recv(
int retval = recv(
connSocket,
reinterpret_cast<char*>(receptionBuffer.data()),
receptionBuffer.capacity(),
tcpConfig.tcpFlags
MSG_DONTWAIT//tcpConfig.tcpFlags
);
sif::debug << "Received " << retval << " bytes" << std::endl;
if(retval < 0) {
// client might have shut down connection
if(retval == 0) {
// Client closed connection
return;
}
else if(retval > 0) {
sif::debug << "some descriptor set.." << std::endl;
if(FD_ISSET(connSocket, &rfds)) {
// data available
int retval = recv(
connSocket,
reinterpret_cast<char*>(receptionBuffer.data()),
receptionBuffer.capacity(),
tcpConfig.tcpFlags
);
sif::debug << "Received " << retval << " bytes" << std::endl;
ringBuffer.writeData(receptionBuffer.data(), retval);
sif::debug << "Received " << retval << " bytes" << std::endl;
ringBuffer.writeData(receptionBuffer.data(), retval);
sif::debug << "select retval: " << retval << std::endl;
}
else if(retval < 0) {
if(errno == EAGAIN) {
// no data available. Check whether any packets have been read, then send back
// telemetry now
bool tcAvailable = false;
bool tmSent = false;
size_t availableReadData = ringBuffer.getAvailableReadData();
//sif::debug << "ring buffer data: " << availableReadData << std::endl;
//sif::debug << "last buf size: " << lastRingBufferSize << std::endl;
if(availableReadData > lastRingBufferSize) {
sif::debug << "ring buffer size changed" << std::endl;
tcAvailable = true;
handleRingBufferData(availableReadData);
}
ReturnValue_t result = handleTmSending(connSocket, tmSent);
if(result == CONN_BROKEN) {
return;
}
if(not tcAvailable and not tmSent) {
TaskFactory::delayTask(DEFAULT_LOOP_DELAY_MS);
}
}
if(FD_ISSET(connSocket, &efds)) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::warning << "TcpTmTcServer::handleServerOperation: "
"Exception detected" << std::endl;
#else
sif::printWarning("TcpTmTcServer::handleServerOperation: Exception detected\n");
#endif
}
}
else {
// no data available. Check whether any packets have been read, then send back
// telemetry now
bool tcAvailable = false;
bool tmSent = false;
size_t availableReadData = ringBuffer.getAvailableReadData();
//sif::debug << "ring buffer data: " << availableReadData << std::endl;
if(availableReadData > lastRingBufferSize) {
sif::debug << "ring buffer size changed" << std::endl;
tcAvailable = true;
handleRingBufferData(availableReadData);
}
ReturnValue_t result = handleTmSending(connSocket, tmSent);
if(result == CONN_BROKEN) {
return;
}
if(not tcAvailable and not tmSent) {
TaskFactory::delayTask(DEFAULT_LOOP_DELAY_MS);
if(errno == ETIMEDOUT) {
retval = 0;
}
}
// data available
// int retval = recv(
// connSocket,
// reinterpret_cast<char*>(receptionBuffer.data()),
// receptionBuffer.capacity(),
// tcpConfig.tcpFlags
// );
//sif::debug << "recv retval: " << retval << std::endl;
// data available
// int test = recv(
// connSocket,
// reinterpret_cast<char*>(receptionBuffer.data()),
// receptionBuffer.capacity(),
// tcpConfig.tcpFlags
// );
}
}
@ -317,12 +328,14 @@ ReturnValue_t TcpTmTcServer::handleTmSending(socket_t connSocket, bool& tmSent)
while((not tmtcBridge->tmFifo->empty()) and
(tmtcBridge->packetSentCounter < tmtcBridge->sentPacketsPerCycle)) {
// Send can fail, so only peek from the FIFO
sif::debug << "sending TM" << std::endl;
tmtcBridge->tmFifo->peek(&storeId);
// Using the store accessor will take care of deleting TM from the store automatically
ConstStorageAccessor storeAccessor(storeId);
ReturnValue_t result = tmStore->getData(storeId, storeAccessor);
if(result != HasReturnvaluesIF::RETURN_OK) {
sif::debug << "oh no" << std::endl;
return result;
}
int retval = send(connSocket,
@ -331,12 +344,14 @@ ReturnValue_t TcpTmTcServer::handleTmSending(socket_t connSocket, bool& tmSent)
tcpConfig.tcpTmFlags);
if(retval != static_cast<int>(storeAccessor.size())) {
// Assume that the client has closed the connection here for now
sif::debug << "conn broken?" << std::endl;
handleSocketError(storeAccessor);
return CONN_BROKEN;
}
else {
// Packet sent, clear FIFO entry
tmtcBridge->tmFifo->pop();
sif::debug << "fifo size: " << tmtcBridge->tmFifo->size() << std::endl;
tmSent = true;
}
}
@ -366,6 +381,7 @@ ReturnValue_t TcpTmTcServer::handleRingBufferData(size_t availableReadData) {
if(result == SpacePacketParser::NO_PACKET_FOUND) {
ringBuffer.deleteData(availableReadData);
lastRingBufferSize = ringBuffer.getAvailableReadData();
sif::debug << lastRingBufferSize << std::endl;
}
else if(result == HasReturnvaluesIF::RETURN_OK) {
// Space Packets were found. Handle them here
@ -373,13 +389,18 @@ ReturnValue_t TcpTmTcServer::handleRingBufferData(size_t availableReadData) {
SpacePacketParser::IndexSizePair idxSizePair;
while(not fifo.empty()) {
fifo.retrieve(&idxSizePair);
sif::debug << "handle tc" << std::endl;
result = handleTcReception(receptionBuffer.data() + idxSizePair.first,
idxSizePair.second);
ringBuffer.deleteData(idxSizePair.second);
if(result != HasReturnvaluesIF::RETURN_OK) {
status = result;
}
lastRingBufferSize = ringBuffer.getAvailableReadData();
sif::debug << lastRingBufferSize << std::endl;
}
}
std::memset(receptionBuffer.data(), 0, receptionBuffer.size());
return status;
}