diff --git a/Makefile b/Makefile index dcca06f..e486b8a 100644 --- a/Makefile +++ b/Makefile @@ -5,38 +5,30 @@ LDFLAGS+=-lpthread -lm ifeq ($(PREFIX),) PREFIX := /usr/local endif + UNAME := $(shell uname) ifeq ($(UNAME),Linux) -#Conditional for Linux -CFLAGS+= $(shell pkg-config --cflags librtlsdr) -LDFLAGS+=$(shell pkg-config --libs librtlsdr) - + CFLAGS += $(shell pkg-config --cflags librtlsdr libusb-1.0) + LDFLAGS +=$(shell pkg-config --libs librtlsdr libusb-1.0) +else ifeq ($(UNAME),Darwin) + CFLAGS += $(shell pkg-config --cflags librtlsdr libusb-1.0) + LDFLAGS += $(shell pkg-config --libs librtlsdr libusb-1.0) else -# -#ADD THE CORRECT PATH FOR LIBUSB AND RTLSDR -#TODO: -# CMAKE will be much better or create a conditional pkg-config - - -# RTLSDR -RTLSDR_INCLUDE=/tmp/rtl-sdr/include -RTLSDR_LIB=/tmp/rtl-sdr/build/src + #ADD THE CORRECT PATH FOR LIBUSB AND RTLSDR + #TODO: + # CMAKE will be much better or create a conditional pkg-config -# LIBUSB -LIBUSB_INCLUDE=/tmp/libusb/include/libusb-1.0 -LIBUSB_LIB=/tmp/libusb/lib - -ifeq ($(UNAME),Darwin) -#Conditional for OSX -CFLAGS+= -I/usr/local/include/ -I$(LIBUSB_INCLUDE) -I$(RTLSDR_INCLUDE) -LDFLAGS+= -L/usr/local/lib -L$(LIBUSB_LIB) -L$(RTLSDR_LIB) -lrtlsdr -lusb-1.0 -else -#Conditional for Windows -CFLAGS+=-I $(LIBUSB_INCLUDE) -I $(RTLSDR_INCLUDE) -LDFLAGS+=-L$(LIBUSB_INCLUDE) -L$(RTLSDR_LIB) -L/usr/lib -lusb-1.0 -lrtlsdr -lWs2_32 -endif + # RTLSDR + RTLSDR_INCLUDE=/tmp/rtl-sdr/include + RTLSDR_LIB=/tmp/rtl-sdr/build/src + # LIBUSB + LIBUSB_INCLUDE=/opt/homebrew/Cellar/libusb/1.0.24/include + LIBUSB_LIB=/opt/homebrew/Cellar/libusb/1.0.24/lib + #Conditional for Windows + CFLAGS+=-I $(LIBUSB_INCLUDE) -I $(RTLSDR_INCLUDE) + LDFLAGS+=-L$(LIBUSB_INCLUDE) -L$(RTLSDR_LIB) -L/usr/lib -lusb-1.0 -lrtlsdr -lWs2_32 endif CC?=gcc diff --git a/aisdecoder/aisdecoder.c b/aisdecoder/aisdecoder.c index 5aeb9ab..ebe2d78 100644 --- a/aisdecoder/aisdecoder.c +++ b/aisdecoder/aisdecoder.c @@ -151,7 +151,7 @@ int send_nmea( const char *sentence, unsigned int length) { return 0; } -int init_ais_decoder(char * host, char * port ,int show_levels,int _debug_nmea,int buf_len,int time_print_stats, int use_tcp_listener, int tcp_keep_ais_time, int add_sample_num){ +int init_ais_decoder(char * host, char * port ,int show_levels,int _debug_nmea,int buf_len,int time_print_stats, int use_tcp_listener, int tcp_keep_ais_time, int tcp_stream_forever, int add_sample_num){ debug_nmea=_debug_nmea; use_tcp = use_tcp_listener; pthread_mutex_init(&message_mutex, NULL); @@ -165,7 +165,7 @@ int init_ais_decoder(char * host, char * port ,int show_levels,int _debug_nmea,i } } else { - if (!initTcpSocket(port, debug_nmea, tcp_keep_ais_time)) { + if (!initTcpSocket(port, debug_nmea, tcp_keep_ais_time, tcp_stream_forever)) { return EXIT_FAILURE; } } diff --git a/aisdecoder/aisdecoder.h b/aisdecoder/aisdecoder.h index 33f8d9e..30bc2e8 100644 --- a/aisdecoder/aisdecoder.h +++ b/aisdecoder/aisdecoder.h @@ -1,6 +1,6 @@ #ifndef __AIS_RL_AIS_INC_ #define __AIS_RL_AIS_INC_ -int init_ais_decoder(char * host, char * port,int show_levels,int _debug_nmea,int buf_len,int time_print_stats, int use_tcp_listener, int tcp_keep_ais_time, int add_sample_num); +int init_ais_decoder(char * host, char * port,int show_levels,int _debug_nmea,int buf_len,int time_print_stats, int use_tcp_listener, int tcp_keep_ais_time, int tcp_stream_forever, int add_sample_num); void run_rtlais_decoder(short * buff, int len); const char *aisdecoder_next_message(); int free_ais_decoder(void); diff --git a/aisdecoder/lib/protodec.c b/aisdecoder/lib/protodec.c index 90d44c7..ff33225 100644 --- a/aisdecoder/lib/protodec.c +++ b/aisdecoder/lib/protodec.c @@ -163,7 +163,7 @@ void protodec_generate_nmea(struct demod_state_t *d, int bufferlen, int fillbits int inc; unsigned char sentences, sentencenum, nmeachk, letter; - received_t=received_t; // not used here, avoid compiling warnings + (void)(received_t); // not used here, avoid compiling warnings //6bits to nmea-ascii. One sentence len max 82char //inc. head + tail.This makes inside datamax 62char multipart, 62 single senlen = 56; //this is normally not needed.For testing only. May be fixed number diff --git a/main.c b/main.c index 4c43d51..2b3fe6f 100644 --- a/main.c +++ b/main.c @@ -55,6 +55,7 @@ void usage(void) "\t[-P port (default: 10110)]\n" "\t[-T use TCP communication, rtl-ais is tcp server ( -h is ignored)\n" "\t[-t time to keep ais messages in sec, using tcp listener (default: 15)\n" + "\t[-k keep TCP socket open and write new messages to it as they arrive\n" "\t[-n log NMEA sentences to console (stderr) (default off)]\n" "\t[-I add sample index to NMEA messages (default off)]\n" "\t[-L log sound levels to console (stderr) (default off)]\n\n" @@ -76,7 +77,7 @@ void usage(void) static volatile int do_exit = 0; static void sighandler(int signum) { - signum = signum; + (void)(signum); // unused argument fprintf(stderr, "Signal caught, exiting!\n"); do_exit = 1; } @@ -105,7 +106,7 @@ int main(int argc, char **argv) config.host = strdup("127.0.0.1"); config.port = strdup("10110"); - while ((opt = getopt(argc, argv, "l:r:s:o:EODd:g:p:RATIt:P:h:nLS:?")) != -1) + while ((opt = getopt(argc, argv, "l:r:s:o:EODd:g:p:RATIkt:P:h:nLS:?")) != -1) { switch (opt) { case 'l': @@ -152,12 +153,15 @@ int main(int argc, char **argv) case 'P': config.port=strdup(optarg); break; - case 'T': - config.use_tcp_listener=1; - break; - case 't': - config.tcp_keep_ais_time = atoi(optarg); - break; + case 'T': + config.use_tcp_listener=1; + break; + case 't': + config.tcp_keep_ais_time = atoi(optarg); + break; + case 'k': + config.tcp_stream_forever = 1; + break; case 'h': config.host=strdup(optarg); break; diff --git a/rtl_ais.c b/rtl_ais.c index d553537..9803ac1 100644 --- a/rtl_ais.c +++ b/rtl_ais.c @@ -471,6 +471,7 @@ void rtl_ais_default_config(struct rtl_ais_config *config) config->dc_filter=1; config->edge = 0; config->use_tcp_listener = 0, config->tcp_keep_ais_time = 15; + config->tcp_stream_forever = 0; config->use_internal_aisdecoder=1; config->seconds_for_decoder_stats=0; /* Aisdecoder */ @@ -589,7 +590,7 @@ struct rtl_ais_context *rtl_ais_start(struct rtl_ais_config *config) } } else{ // Internal AIS decoder - int ret=init_ais_decoder(config->host,config->port,config->show_levels,config->debug_nmea,ctx->stereo.bl_len,config->seconds_for_decoder_stats, config->use_tcp_listener, config->tcp_keep_ais_time, config->add_sample_num); + int ret=init_ais_decoder(config->host,config->port,config->show_levels,config->debug_nmea,ctx->stereo.bl_len,config->seconds_for_decoder_stats, config->use_tcp_listener, config->tcp_keep_ais_time, config->tcp_stream_forever, config->add_sample_num); if(ret != 0){ fprintf(stderr,"Error initializing built-in AIS decoder\n"); rtlsdr_cancel_async(ctx->dev); @@ -648,7 +649,7 @@ int rtl_ais_isactive(struct rtl_ais_context *ctx) const char *rtl_ais_next_message(struct rtl_ais_context *ctx) { - ctx = ctx; //unused for now + (void)(ctx); //unused for now return aisdecoder_next_message(); } diff --git a/rtl_ais.h b/rtl_ais.h index bec41ce..71d11cc 100644 --- a/rtl_ais.h +++ b/rtl_ais.h @@ -27,7 +27,7 @@ struct rtl_ais_config int oversample, dc_filter, use_internal_aisdecoder; int seconds_for_decoder_stats; - int use_tcp_listener, tcp_keep_ais_time; + int use_tcp_listener, tcp_keep_ais_time, tcp_stream_forever; /* Aisdecoder */ int show_levels, debug_nmea; char *port, *host, *filename; diff --git a/tcp_listener/tcp_listener.c b/tcp_listener/tcp_listener.c index e1fe652..21a7efb 100644 --- a/tcp_listener/tcp_listener.c +++ b/tcp_listener/tcp_listener.c @@ -11,6 +11,7 @@ #include #include #include +#include #if defined (__WIN32__) #include @@ -30,12 +31,18 @@ typedef struct t_sockIo { char from_ip[20]; struct t_sockIo *next; + // An active listener thread has a `pipe()` allocated, whose file + // descriptors are stored here. This pipe is written to with new + // AIS messages as they are decoded if `_tcp_stream_forever` is set. + int msgpipe[2]; + } TCP_SOCK, *P_TCP_SOCK; static int sockfd; static int _debug_nmea = 0; static int _debug = 0; static int _tcp_keep_ais_time = 15; +static int _tcp_stream_forever = 0; static int portno; pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;; @@ -51,7 +58,6 @@ typedef struct t_ais_mess { int length; struct timeval timestamp; struct t_ais_mess *next; - } AIS_MESS, *P_AIS_MESS; // Linked list ais messages. @@ -73,10 +79,10 @@ void remove_old_ais_messages( ); #include "tcp_listener.h" -int initTcpSocket(const char *portnumber, int debug_nmea, int tcp_keep_ais_time) { - +int initTcpSocket(const char *portnumber, int debug_nmea, int tcp_keep_ais_time, int tcp_stream_forever) { _debug_nmea = debug_nmea; _tcp_keep_ais_time = tcp_keep_ais_time; + _tcp_stream_forever = tcp_stream_forever; struct sockaddr_in serv_addr; #if defined (__WIN32__) WSADATA wsaData; @@ -93,6 +99,12 @@ int initTcpSocket(const char *portnumber, int debug_nmea, int tcp_keep_ais_time) serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_port = htons(portno); + + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) { + fprintf(stderr, "setsockopt(SO_REUSEADDR) failed! error %d\n", errno); + return 0; + } + if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { fprintf(stderr, "Failed to bind socket! error %d\n", errno); return 0; @@ -126,8 +138,8 @@ void closeTcpSocket() { // ------------------------------------------------------------ static void *tcp_listener_fn(void *arg) { int rc; - arg=arg; // not used, avoid compiling warnings P_TCP_SOCK t; + (void)(arg); // not used, avoid compiling warnings fprintf(stderr, "Tcp listen port %d\nAis message timeout with %d\n", portno, _tcp_keep_ais_time); @@ -165,23 +177,17 @@ static void *tcp_listener_fn(void *arg) { } // ------------------------------------------------------------ -// thread func for hanling client close +// thread func for handling a remote TCP client // ------------------------------------------------------------ void *handle_remote_close(void *arg) { - unsigned char buff[100]; - int rc; P_TCP_SOCK t = (P_TCP_SOCK) arg; P_AIS_MESS ais_temp; - struct timeval timeout; - timeout.tv_sec = 10; - timeout.tv_usec = 0; - setsockopt(t->sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)); - // get rid of old messages before send - remove_old_ais_messages(); - // send saved ais_messages to new socket + // On connect, send all saved ais_messages to new socket + pthread_mutex_lock(&ais_lock); ais_temp = ais_head; while (ais_temp != NULL) { + int rc; if (ais_temp->plmess != NULL) rc = send(t->sock, ais_temp->plmess, ais_temp->length, 0); else @@ -190,28 +196,68 @@ void *handle_remote_close(void *arg) { fprintf( stdout, "%ld: Send to %s, <%.*s>, rc=%d\n",ais_temp->timestamp.tv_sec, t->from_ip, ais_temp->length, ais_temp->message, rc); ais_temp = ais_temp->next; } + pthread_mutex_unlock(&ais_lock); - while(1){ - - rc = recv(t->sock, (char *)buff, 99, 0); - if( rc < 0) { - - // check timeout - if (errno == EAGAIN) - continue; - if( _debug) - fprintf( stdout, "Some socket error happend %d\n", errno); + while (1) { + fd_set fds; + int res; + int msgfd = t->msgpipe[0]; + const int nfds = (t->sock > msgfd ? t->sock : msgfd) + 1; + + // Listen on the client socket, and on the internal pipe which receives + // new messages. (New messages will only be sent to this pipe when the + // program is launched with the '-k' flag.) + FD_ZERO(&fds); + FD_SET(t->sock, &fds); + FD_SET(msgfd, &fds); + + res = pselect(nfds, &fds, NULL, NULL, NULL, NULL); + if (res < 0) { + if (_debug) + perror("select error"); break; } - else if( rc == 0) { - if( _debug) - fprintf( stdout, "client gracefully closed the socket\n"); - break; + + // Service remote client socket: If the client sends any data, close the + // socket (legacy behavior). + if (FD_ISSET(t->sock, &fds)) { + unsigned char buff[100]; + int rc; + rc = recv(t->sock, (char *)buff, 99, 0); + if (rc < 0) { + if (_debug && errno != EAGAIN) + perror("socket error"); + break; + } else if (rc >= 0) { + if (_debug) + fprintf(stdout, "client closed the socket\n"); + break; + } } - else { - if( _debug) - fprintf( stdout, "Something receiced from client <%.*s>\n", rc, buff ); - break; + + // If new messages became available internally, send them to the client. + if (FD_ISSET(msgfd, &fds)) { + unsigned char buff[1024]; + int rc; + rc = read(msgfd, (char *)buff, 1024); + if (rc < 0) { + if (_debug) { + perror("pipe error"); + } + break; + } else if (rc == 0) { + if (_debug) { + fprintf(stderr, "pipe closed\n"); + } + break; + } + + rc = send(t->sock, buff, rc, 0); + if (rc < 0) { + if (_debug) + perror("error writing to client"); + break; + } } } shutdown(t->sock, 2); @@ -287,7 +333,6 @@ void remove_old_ais_messages( ) { // send ais message to all clients // ------------------------------------------------------------ int add_nmea_ais_message(const char * mess, unsigned int length) { - P_AIS_MESS new_node; // remove eventually old messages @@ -312,7 +357,6 @@ int add_nmea_ais_message(const char * mess, unsigned int length) { new_node->length = length; gettimeofday(&new_node->timestamp, NULL); - if (ais_head == NULL) { ais_head = new_node; ais_end = new_node; @@ -323,6 +367,21 @@ int add_nmea_ais_message(const char * mess, unsigned int length) { pthread_mutex_unlock(&ais_lock); + // Send the message to all active clients. + if (_tcp_stream_forever) { + P_TCP_SOCK tcp_client; + + pthread_mutex_lock(&lock); + tcp_client = head; + while (tcp_client != NULL) { + int retval; + retval = write(tcp_client->msgpipe[1], mess, length); + tcp_client = tcp_client->next; + } + pthread_mutex_unlock(&lock); + } + + return 0; } @@ -358,6 +417,7 @@ void delete_ais_node(P_AIS_MESS p) { // ------------------------------------------------------------ P_TCP_SOCK init_node() { P_TCP_SOCK ptr; + int pipestatus; ptr = (P_TCP_SOCK) malloc(sizeof(TCP_SOCK)); @@ -365,9 +425,17 @@ P_TCP_SOCK init_node() { if (ptr == NULL) return (P_TCP_SOCK) NULL; - else { - return ptr; + + // Allocate pipe for publishing newly-arriving messages to + // connected clients. + pipestatus = pipe(ptr->msgpipe); + if (pipestatus < 0) { + perror("error allocating pipe"); + free(ptr); + return (P_TCP_SOCK) NULL; } + + return ptr; } // ------------------------------------------------------------ diff --git a/tcp_listener/tcp_listener.h b/tcp_listener/tcp_listener.h index 1695d9b..210efa7 100644 --- a/tcp_listener/tcp_listener.h +++ b/tcp_listener/tcp_listener.h @@ -8,7 +8,7 @@ #define MAX_TCP_CONNECTIONS 100 // Prototypes -int initTcpSocket( const char *portnumber, int debug_nmea, int tcp_keep_ais_time); +int initTcpSocket( const char *portnumber, int debug_nmea, int tcp_keep_ais_time, int tcp_stream_forever); int add_nmea_ais_message(const char * mess, unsigned int length); void closeTcpSocket();