eive-obsw/gomspace/libgscsp/lib/libcsp/examples/zmqproxy.c

83 lines
2.2 KiB
C

#include <zmq.h>
#include <assert.h>
#include <stdio.h>
#include <pthread.h>
#include <malloc.h>
#include <unistd.h>
#include <csp/csp.h>
static void * task_capture(void *ctx) {
/* Subscriber (RX) */
void *subscriber = zmq_socket(ctx, ZMQ_SUB);
assert(zmq_connect(subscriber, "tcp://localhost:7000") == 0);
assert(zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0) == 0);
while (1) {
zmq_msg_t msg;
zmq_msg_init_size(&msg, 1024);
/* Receive data */
if (zmq_msg_recv(&msg, subscriber, 0) < 0) {
zmq_msg_close(&msg);
csp_log_error("ZMQ: %s\r\n", zmq_strerror(zmq_errno()));
continue;
}
int datalen = zmq_msg_size(&msg);
if (datalen < 5) {
csp_log_warn("ZMQ: Too short datalen: %u\r\n", datalen);
while(zmq_msg_recv(&msg, subscriber, ZMQ_NOBLOCK) > 0)
zmq_msg_close(&msg);
continue;
}
/* Create new csp packet */
csp_packet_t * packet = malloc(1024);
if (packet == NULL) {
zmq_msg_close(&msg);
continue;
}
/* Copy the data from zmq to csp */
char * satidptr = ((char *) &packet->id) - 1;
memcpy(satidptr, zmq_msg_data(&msg), datalen);
packet->length = datalen - 4 - 1;
printf("Input: Src %u, Dst %u, Dport %u, Sport %u, Pri %u, Flags 0x%02X, Size %"PRIu16"\r\n",
packet->id.src, packet->id.dst, packet->id.dport,
packet->id.sport, packet->id.pri, packet->id.flags, packet->length);
free(packet);
zmq_msg_close(&msg);
}
}
int main(int argc, char ** argv) {
/**
* ZMQ PROXY
*/
void * ctx = zmq_ctx_new();
assert(ctx);
void *frontend = zmq_socket(ctx, ZMQ_XSUB);
assert(frontend);
assert(zmq_bind (frontend, "tcp://*:6000") == 0);
void *backend = zmq_socket(ctx, ZMQ_XPUB);
assert(backend);
assert(zmq_bind(backend, "tcp://*:7000") == 0);
pthread_t capworker;
pthread_create(&capworker, NULL, task_capture, ctx);
printf("Starting ZMQproxy\r\n");
zmq_proxy(frontend, backend, NULL);
printf("Closing ZMQproxy\r\n");
zmq_ctx_destroy(ctx);
return 0;
}