LCOV - code coverage report
Current view: top level - netio-next/src - netio_tcp.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 232 401 57.9 %
Date: 2025-08-12 04:15:35 Functions: 9 12 75.0 %

          Line data    Source code
       1             : #include <unistd.h>
       2             : #include <stdio.h>
       3             : #include <sys/uio.h>
       4             : #include <string.h>
       5             : #include <stdlib.h>
       6             : #include <sys/types.h>
       7             : #include <sys/socket.h>
       8             : #include <netinet/in.h>
       9             : #include <arpa/inet.h>
      10             : #include <string.h>
      11             : #include <errno.h>
      12             : #include <netdb.h>
      13             : #include <arpa/inet.h>
      14             : #include <fcntl.h>
      15             : #include <ctype.h>
      16             : #include <sys/ioctl.h>
      17             : #include <signal.h>
      18             : #include <netinet/tcp.h>
      19             : 
      20             : #include "connection_event.h"
      21             : #include "log.h"
      22             : #include "netio/netio.h"
      23             : #include "netio/netio_tcp.h"
      24             : 
      25             : #if defined DEBUG || defined DEBUG_TCP
      26             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      27             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      28             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      29             : #else
      30             : #define log_dbg(...)
      31             : #define log_trc(...)
      32             : #endif
      33             : 
      34             : 
      35             : int sigpipe_trapped=0;
      36             : 
      37             : // STATIC FUNCTIONS
      38             : static void _connect_tcp(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen);
      39             : void make_socket_non_blocking(int sfd);
      40             : 
      41             : 
      42             : //Parameters:
      43             : //- socket is still empty
      44             : //- hostname and port have valid values
      45             : //- addr and addrlen are empty
      46             : static void
      47         117 : _connect_tcp(struct netio_send_socket* niosocket, const char* hostname, unsigned port, void* addr, size_t addrlen)
      48             : {
      49         117 :   int ret, sockfd;
      50         117 :   char ip[100];
      51         117 :   struct addrinfo hints, *servinfo, *p;
      52         117 :   struct sockaddr_in servaddr;
      53             : 
      54         117 :   log_dbg("_connect_tcp called");
      55             : 
      56         117 :   if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
      57             :   {
      58           0 :     log_fatal("Problem in creating the socket %d", errno);
      59           0 :     exit(2);
      60             :   }
      61             : 
      62         117 :   log_dbg("sockfd = %d", sockfd);
      63             : 
      64         117 :   if (isalpha(hostname[0]))
      65             :   {
      66             :      //Convert hostname to IP
      67           0 :      memset(&hints, 0, sizeof hints);
      68           0 :      hints.ai_family   = AF_UNSPEC; // use AF_INET6 to force IPv6
      69           0 :      hints.ai_socktype = SOCK_STREAM;
      70             : 
      71           0 :      int rv = getaddrinfo(hostname, "http", &hints, &servinfo);
      72           0 :      if (rv != 0)
      73             :      {
      74           0 :         log_fatal("getaddrinfo failed. getaddrinfo: %s", gai_strerror(rv));
      75           0 :         exit(2);
      76             :      }
      77             : 
      78           0 :      for(p = servinfo; p != NULL; p = p->ai_next)
      79             :      {
      80           0 :         struct sockaddr_in* h = (struct sockaddr_in *) p->ai_addr;
      81           0 :         strcpy(ip, inet_ntoa(h->sin_addr));
      82             :      }
      83             : 
      84           0 :      freeaddrinfo(servinfo);
      85             :      // End of hostname to IP conversion
      86             :   }
      87             :   else
      88             :   {
      89         117 :      strncpy(ip, hostname, sizeof(ip));
      90         117 :      ip[sizeof(ip)-1]=0;
      91             :   }
      92         117 :   log_dbg("%s resolved to %s\n" , hostname , ip);
      93             : 
      94             :   //Creation of the socket
      95         117 :   memset(&servaddr, 0, sizeof(servaddr));
      96         117 :   servaddr.sin_family = AF_INET;
      97         117 :   ret = inet_pton(AF_INET, ip, &servaddr.sin_addr);  //IPv4
      98         117 :   if (ret < 1)
      99             :   {
     100           0 :     log_error("Problem with inet_pton. ret = %d\n", ret);
     101             :   }
     102         117 :   servaddr.sin_port = htons(port);       //convert to big-endian order
     103             : 
     104         117 :   make_socket_non_blocking(sockfd);
     105             : 
     106             :   /* int opt=1; */
     107             :   /* ret = setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); */
     108             :   /* if (ret != 0) */
     109             :   /* { */
     110             :   /*   log_error("Problem setting TCP_NODELAY, %s", strerror(errno)); */
     111             :   /* } */
     112             :   //#define CORK
     113             : #ifdef CORK
     114             :       int opt=1;
     115             :       ret = setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, &opt, sizeof(opt));
     116             :       if (ret != 0)
     117             :         {
     118             :           log_error("Problem setting TCP_CORK, %s", strerror(errno));
     119             :         }
     120             : #endif
     121             : 
     122             :   //Connection of the client to the socket
     123         117 :   ret = connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
     124         117 :   if (ret == 0)
     125             :   {
     126             :     log_dbg("Already connected. Is this a problem for eopll?");
     127             :   }
     128             :   else
     129             :   {
     130         117 :     log_dbg("ret = %d, %s", ret, strerror(errno));
     131             : 
     132         117 :     if (errno == EINPROGRESS)
     133             :     {
     134             :       log_dbg("connect is EINPROGRESS. Let epoll handle it.");
     135             :     }
     136             :     else
     137             :     {
     138           0 :       log_error("Problem in connecting to the server. ret = %d", ret);
     139           0 :       log_error("TCP connect errno = %d (%s)\n", errno, strerror(errno));
     140           0 :       return;
     141             :     }
     142             :   }
     143             : 
     144         117 :   log_dbg("connect OK. Calling netio_register_write_tcp_fd");
     145         117 :   niosocket->eq_ev_ctx.fd   = sockfd;
     146         117 :   niosocket->eq_ev_ctx.data = niosocket;    //name "niosocket" used to avoid clash with the posix function "socket" (the compiler did not like it)
     147         117 :   niosocket->eq_ev_ctx.cb   = on_send_socket_tcp_cm_event;
     148         117 :   add_open_fd(&niosocket->ctx->evloop.openfds, sockfd, NETIO_TCP, USEND, niosocket);
     149         117 :   netio_register_write_tcp_fd(&niosocket->ctx->evloop, &niosocket->eq_ev_ctx);
     150         117 :   log_dbg("_connect_tcp done");
     151             : }
     152             : 
     153             : 
     154             : void
     155         317 : make_socket_non_blocking(int sfd)
     156             : {
     157         317 :   int flags, s;
     158             : 
     159         317 :   flags = fcntl(sfd, F_GETFL, 0);
     160         316 :   if (flags == -1)
     161             :   {
     162           0 :     log_error("fcntl cannot get fd");
     163             :   }
     164             : 
     165         316 :   flags |= O_NONBLOCK;
     166         316 :   s = fcntl(sfd, F_SETFL, flags);
     167         317 :   if (s == -1)
     168             :   {
     169           0 :     log_error("fcntl cannot set O_NONBLOCK");
     170             :   }
     171         317 : }
     172             : 
     173             : 
     174             : void
     175         117 : netio_init_send_tcp_socket(struct netio_send_socket *socket, struct netio_context *ctx)
     176             : {
     177         117 :   log_dbg("called with &ctx->evloop = %p", &ctx->evloop);
     178             : 
     179         117 :   memset(socket, 0, sizeof(*socket));
     180         117 :   socket->ctx                    = ctx;
     181         117 :   socket->tcp_fi_mode            = NETIO_MODE_TCP;
     182         117 :   socket->message_request_header = NULL;
     183         117 :   socket->state                  = UNCONNECTED;                         //MJ: relevant for TCP?
     184             : 
     185         117 :   netio_signal_init(&ctx->evloop, &socket->tcp_signal);
     186             : 
     187             :   //socket->tcp_signal.cb   = netio_tcp_send_on_signal_test;
     188         117 :   socket->tcp_signal.cb   = netio_tcp_send_on_signal;
     189         117 :   socket->tcp_signal.data = socket;
     190             :   /* log_debug("socket is at = %p", socket); */
     191             : 
     192         117 :   if (sigpipe_trapped==0)
     193             :   {
     194         117 :     struct sigaction action;
     195         117 :     action.sa_handler = SIG_IGN;
     196         117 :     int ret = sigaction(SIGPIPE, &action, NULL);
     197         117 :     if (ret)
     198             :     {
     199           0 :       perror ("sigaction");
     200             :     }
     201             :   }
     202         117 :   log_dbg("netio_init_send_tcp_socket done.");
     203         117 : }
     204             : 
     205             : 
     206             : void
     207           0 : netio_tcp_send_on_signal_test(void *ptr)
     208             : {
     209           0 :   log_error("testtesttesttest.");
     210           0 :   log_fatal("testtesttesttest.");
     211           0 : }
     212             : 
     213             : 
     214             : void
     215         100 : netio_init_listen_tcp_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr)
     216             : {
     217         100 :   log_dbg("netio_init_listen_tcp_socket called");
     218         100 :   memset(socket, 0, sizeof(*socket));
     219         100 :   socket->ctx = ctx;
     220         100 :   socket->recv_sockets = NULL;
     221         100 :   socket->tcp_fi_mode = NETIO_MODE_TCP;
     222         100 :   if (attr == NULL){
     223             :     socket->attr.buffer_size = 0;
     224             :     socket->attr.num_buffers = 0;
     225             :   } else {
     226          40 :     socket->attr = *attr;
     227             :   }
     228         100 :   log_dbg("netio_init_listen_tcp_socket done");
     229         100 : }
     230             : 
     231             : 
     232             : void
     233         100 : netio_init_recv_tcp_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket)
     234             : {
     235         100 :   log_dbg("netio_init_recv_tcp_socket called");
     236         100 :   memset(socket, 0, sizeof(*socket));
     237         100 :   socket->ctx                    = lsocket->ctx;
     238         100 :   socket->lsocket                = lsocket;
     239         100 :   socket->message_request_header = NULL;
     240         100 :   socket->tcp_fi_mode            = NETIO_MODE_TCP;
     241             : 
     242         100 :   netio_signal_init(&socket->ctx->evloop, &socket->tcp_signal);
     243         100 :   socket->tcp_signal.cb   = netio_tcp_recv_on_signal;
     244         100 :   socket->tcp_signal.data = socket;
     245         100 :   log_trc("socket is at = %p", socket);
     246         100 :   log_dbg("netio_init_recv_tcp_socket done");
     247         100 : }
     248             : 
     249             : 
     250             : void
     251         117 : netio_connect_tcp(struct netio_send_socket* socket, const char* hostname, unsigned port)
     252             : {
     253         117 :   log_info("connect_tcp with hostname = %s and port = %d", hostname, port);
     254         117 :   _connect_tcp(socket, hostname, port, NULL, 0);
     255         117 :   log_info("netio_connect_tcp done");
     256         117 : }
     257             : 
     258             : 
     259             : void
     260         100 : netio_listen_tcp(struct netio_listen_socket* niosocket, const char* hostname, unsigned port)
     261             : {
     262         100 :   int ret, listenfd;
     263         100 :   struct sockaddr_in servaddr;
     264             : 
     265         100 :   log_info("TCP/IP listening on %s:%d", hostname, port);
     266             : 
     267         100 :   if ((listenfd = socket (AF_INET, SOCK_STREAM, 0)) < 0)    //Create a socket
     268             :   {
     269           0 :     log_fatal("Problem in creating the socket, errno = %d", errno);
     270           0 :     exit(2);
     271             :   }
     272             : 
     273         100 :   const int opt=1;
     274         100 :   ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
     275         100 :   if (ret)
     276             :   {
     277           0 :     perror("setsockopt");
     278             :   }
     279             : 
     280         100 :   char* _hostname = netio_domain_name_lookup(hostname);
     281             : 
     282             :   //preparation of the socket address
     283         100 :   servaddr.sin_family      = AF_INET;
     284         100 :   servaddr.sin_addr.s_addr = inet_addr(_hostname);
     285         100 :   servaddr.sin_port        = htons(port);
     286             : 
     287         100 :   free(_hostname);
     288         100 :   log_dbg("SERV_PORT           = %d", port);
     289         100 :   log_dbg("htons(SERV_PORT)    = %d", htons(port));
     290         100 :   log_dbg("hostname            = %s", hostname);
     291         100 :   log_dbg("inet_addr(hostname) = %d", inet_addr(hostname));
     292         100 :   log_dbg("servaddr.sin_addr.s_addr = %d", servaddr.sin_addr.s_addr);
     293             : 
     294         100 :   ret=bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));  //bind the socket
     295         100 :   if (ret && errno != EINPROGRESS)
     296             :   {
     297           0 :     perror("bind");
     298             :   }
     299         100 :   socklen_t addrlen=sizeof(servaddr);
     300         100 :   ret=getsockname(listenfd, &servaddr, &addrlen);
     301         100 :   if (ret) {
     302           0 :      perror("getsockname");
     303             :   }
     304         100 :   niosocket->port=ntohs(servaddr.sin_port);
     305             : 
     306         100 :   make_socket_non_blocking(listenfd);                               //Make the socket non-blocking
     307             : 
     308             :   #define LISTENQ   8                                               //maximum number of client connections. MJ: review
     309         100 :   listen(listenfd, LISTENQ);                                        //listen to the socket by creating a connection queue, then wait for clients
     310         100 :   log_dbg("Server running...waiting for connections.");
     311             : 
     312         100 :   niosocket->eq_ev_ctx.fd   = listenfd;
     313         100 :   niosocket->eq_ev_ctx.data = niosocket;                            //name "niosocket" used to avoid clash with the posix function "socket" (the compiler did not like it)
     314         100 :   niosocket->eq_ev_ctx.cb   = on_listen_socket_tcp_cm_event;
     315         100 :   niosocket->tcp_fi_mode = NETIO_MODE_TCP;
     316             : 
     317         100 :   add_open_fd(&niosocket->ctx->evloop.openfds, listenfd, NETIO_TCP, ULISTEN, niosocket);
     318         100 :   netio_register_read_tcp_fd(&niosocket->ctx->evloop, &niosocket->eq_ev_ctx);
     319         100 :   log_dbg("netio_listen_tcp done");
     320         100 : }
     321             : 
     322             : 
     323             : int
     324           0 : netio_tcp_send_imm(struct netio_send_socket* socket,
     325             :                    struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm){
     326             : 
     327           0 :   uint32_t total_size=size;
     328           0 :   struct iovec msg_iov[2];
     329           0 :   msg_iov[0].iov_base=&total_size;
     330           0 :   msg_iov[0].iov_len=sizeof(uint32_t);
     331           0 :   msg_iov[1].iov_base=addr;
     332           0 :   msg_iov[1].iov_len=size;
     333           0 :   struct msghdr hdr={0,0,&msg_iov[0],2,0,0};
     334           0 :   int status=sendmsg(socket->eq_ev_ctx.fd,&hdr,0);
     335           0 :   if (status==total_size+sizeof(uint32_t)) {
     336             :     /* int opt=1; */
     337             :     /* if (setsockopt(socket->eq_ev_ctx.fd, IPPROTO_TCP, TCP_NODELAY, */
     338             :     /*                &opt, sizeof(opt)) != 0) { */
     339             :     /*   log_error("Problem setting TCP_NODELAY, %s", strerror(errno)); */
     340             :     /* } */
     341           0 :     if (socket->cb_send_completed != 0) {
     342           0 :       socket->cb_send_completed(socket, key);
     343             :     }
     344           0 :     log_trc("netio_tcp_send_imm done");
     345             :     //printf("Sent %d bytes\n",status);
     346           0 :     return NETIO_STATUS_OK;
     347             :   }
     348           0 :   else if (status==-1) {
     349           0 :     if (errno!=EAGAIN && errno!=EWOULDBLOCK) {
     350           0 :       perror("Error from sendmsg: ");
     351           0 :       exit(1);
     352             :     }
     353             :   }
     354             :   else {
     355           0 :     printf("Error only sent %d bytes of message\n",status);
     356           0 :     exit(1);
     357             :   }
     358             : 
     359           0 :   char* mem=malloc(sizeof(struct netio_tcp_send_item)+
     360             :                    total_size+sizeof(uint32_t));
     361           0 :   if(mem == NULL)
     362             :   {
     363           0 :     log_fatal("cannot allocate memory for descriptor");
     364           0 :     exit(2);
     365             :   }
     366             : 
     367           0 :   printf("Queueing send for later\n");
     368             :   // Couldn't send immediately so add to queue for event loop
     369           0 :   struct netio_tcp_send_item* mrd = (struct netio_tcp_send_item *) mem;
     370           0 :   mrd->element_active = NETIO_TCP_NEW;
     371           0 :   mrd->socket         = socket;
     372           0 :   mrd->buffer         = buf;
     373           0 :   mrd->bytes_left     = sizeof(int);
     374           0 :   mrd->total_bytes    = total_size;
     375           0 :   mrd->key            = key;
     376           0 :   mrd->next_element   = NULL;
     377             : 
     378             :   //Append the descriptor to the list
     379           0 :   if(socket->message_request_header == NULL)
     380             :   {
     381           0 :     socket->message_request_header = (void *)mrd;
     382           0 :     log_trc("List was empty. Descriptor linked to head of list");
     383             :   }
     384             :   else
     385             :   {
     386             :     struct netio_tcp_send_item* mrdq = (struct netio_tcp_send_item *)socket->message_request_header;
     387           0 :     while (mrdq->next_element != NULL)
     388             :     {
     389             :       mrdq = (struct netio_tcp_send_item *)mrdq->next_element;
     390             :     }
     391           0 :     mrdq->next_element = (void *)mrd;
     392             :   }
     393             : 
     394           0 :   log_trc("Calling netio_signal_fire for signal at %p", &socket->tcp_signal);
     395           0 :   netio_signal_fire(&socket->tcp_signal);
     396             : 
     397           0 :   log_trc("netio_tcp_send_imm done");
     398           0 :   return(NETIO_STATUS_OK);
     399             : }
     400             : 
     401             : int
     402           0 : netio_tcp_sendv_imm(struct netio_send_socket* socket,
     403             :                     uint32_t total_size,
     404             :                     struct iovec* iov,
     405             :                     size_t count,
     406             :                     uint64_t key,
     407             :                     uint64_t imm)
     408             : {
     409             :   // static int immediates=0;
     410             :   // static int partials=0;
     411           0 :   static int totalpackets=0;
     412           0 :   totalpackets++;
     413             : 
     414             :   // if (totalpackets%10000==0) printf("total packets %d, fully sent immediately %d partialy sent %d\n",totalpackets,immediates,partials);
     415             : 
     416           0 :   int bytes_sent=0;
     417           0 :   int send_size=total_size+sizeof(uint32_t);
     418           0 :   if(socket->message_request_header == NULL)
     419             :   {
     420             :     /* No message in progress, try sending immediately */
     421           0 :     struct iovec* msg_iov=(struct iovec*)malloc((count+1)*sizeof(struct iovec));
     422           0 :     msg_iov[0].iov_base=&total_size;
     423           0 :     msg_iov[0].iov_len=sizeof(uint32_t);
     424           0 :     for (int entry=0;entry<count;entry++) {
     425           0 :       msg_iov[entry+1].iov_base=iov[entry].iov_base;
     426           0 :       msg_iov[entry+1].iov_len=iov[entry].iov_len;
     427             :     }
     428           0 :     struct msghdr hdr={0,0,&msg_iov[0],count+1,0,0};
     429           0 :     bytes_sent=sendmsg(socket->eq_ev_ctx.fd,&hdr,0);
     430           0 :     free(msg_iov);
     431           0 :     if (bytes_sent==send_size) {
     432             :       // immediates++;
     433             :       /* int opt=1; */
     434             :       /* if (setsockopt(socket->eq_ev_ctx.fd, IPPROTO_TCP, TCP_NODELAY, */
     435             :       /*                &opt, sizeof(opt)) != 0) { */
     436             :       /*   log_error("Problem setting TCP_NODELAY, %s", strerror(errno)); */
     437             :       /* } */
     438           0 :       if (socket->cb_send_completed != 0) {
     439           0 :         socket->cb_send_completed(socket, key);
     440             :       }
     441             :       //printf("Sent %d bytes\n",bytes_sent);
     442           0 :       return NETIO_STATUS_OK;
     443             :     }
     444           0 :     else if (bytes_sent==-1) {
     445           0 :       if (errno!=EAGAIN && errno!=EWOULDBLOCK) {
     446           0 :         perror("Error from sendmsg: ");
     447           0 :         exit(1);
     448             :       }
     449             :     }
     450             :     // partials++;
     451             :   }
     452             : 
     453             :   /* ------------------------------------------------------------------ */
     454             :   /*  OK, we didn't manage to send the complete message, queue it for
     455             :       later send via the event loop */
     456             :   /* ------------------------------------------------------------------ */
     457           0 :   char* mem=malloc(sizeof(struct netio_tcp_send_item)+
     458             :                    sizeof(struct netio_buffer)+sizeof(uint32_t)+total_size+sizeof(uint32_t));
     459           0 :   if(mem == NULL)
     460             :   {
     461           0 :     log_fatal("cannot allocate memory for descriptor");
     462           0 :     exit(2);
     463             :   }
     464             : 
     465           0 :   struct netio_tcp_send_item* mrd = (struct netio_tcp_send_item *) mem;
     466           0 :   mrd->bytes_left=send_size-bytes_sent;
     467             :   //printf("bytes_sent=%d, bytes_left=%d\n",bytes_sent,mrd->bytes_left);
     468           0 :   mrd->element_active = NETIO_TCP_IN_PROGRESS;
     469           0 :   mrd->socket         = socket;
     470           0 :   mrd->buffer         = (struct netio_buffer*) &mem[sizeof(struct netio_tcp_send_item)];
     471           0 :   mrd->buffer->data   = (char*) &mrd->buffer[1];
     472           0 :   mrd->buffer->size   = send_size;
     473           0 :   mrd->total_bytes    = total_size;
     474           0 :   mrd->key            = key;
     475           0 :   mrd->next_element   = NULL;
     476             : 
     477             :   // Put length in front of data to send as one
     478           0 :   uint32_t* tlen_ptr=(uint32_t*) &mrd->buffer[1];
     479           0 :   *tlen_ptr=total_size;
     480           0 :   int offset=sizeof(uint32_t);
     481           0 :   for (int seg=0; seg<count; seg++)
     482             :   {
     483           0 :     memcpy((char*)mrd->buffer->data+offset, iov[seg].iov_base, iov[seg].iov_len);
     484           0 :     offset += iov[seg].iov_len;
     485             :   }
     486             :   //    log_debug("send descriptor allocated and initialized");
     487             : 
     488             :   //Append the descriptor to the list
     489           0 :   if(socket->message_request_header == NULL)
     490             :   {
     491           0 :     socket->message_request_header = (void *)mrd;
     492             :     //log_trc("List was empty. Descriptor linked to head of list");
     493             :   }
     494             :   else
     495             :   {
     496             :     struct netio_tcp_send_item* mrdq = (struct netio_tcp_send_item *)socket->message_request_header;
     497           0 :     while (mrdq->next_element != NULL)
     498             :     {
     499             :       mrdq = (struct netio_tcp_send_item *)mrdq->next_element;
     500             :     }
     501           0 :     mrdq->next_element = (void *)mrd;
     502             :   }
     503             : 
     504             :   //log_trc("Calling netio_signal_fire for signal at %p", &socket->tcp_signal);
     505           0 :   netio_signal_fire(&socket->tcp_signal);
     506             : 
     507             :   //log_trc("netio_tcp_sendv_imm done");
     508           0 :   return(NETIO_STATUS_OK);
     509             : }
     510             : 
     511             : 
     512             : void
     513        3625 : netio_tcp_send_on_signal(void *ptr)
     514             : {
     515        3625 :   int goto_next_request, current_offset, n;
     516        3625 :   void *next_mrd;
     517        3625 :   struct netio_tcp_send_item *mrd;
     518        3625 :   struct netio_send_socket *socket;
     519             : 
     520             :   //log_trc("netio_tcp_send_on_signal called with ptr = %p", ptr);
     521        3625 :   socket = (struct netio_send_socket *) ptr;
     522        3625 :   goto_next_request = 1;
     523             : 
     524        7114 :   while(goto_next_request)
     525             :   {
     526             :     //Take the first message request descriptor from the head of the list
     527        5067 :     if (socket->message_request_header == NULL)
     528             :     {
     529             :       //log_debug("No message request pending. So, why are you here?");
     530        1578 :       int retVal;
     531        1578 :       socklen_t in_len = sizeof(retVal);
     532        1578 :       int ret = getsockopt(socket->eq_ev_ctx.fd, SOL_SOCKET, SO_ERROR,
     533             :                            &retVal, &in_len);
     534        1578 :       if (ret==0 && retVal !=0) {
     535           0 :         log_info("%s -- calling connection closed callback", strerror(retVal));
     536           0 :         if (socket->cb_connection_closed != NULL) {
     537           0 :           socket->cb_connection_closed(socket);
     538             :         }
     539             :       } else {
     540             :         // When a stream socket peer has performed an orderly shutdown, the return value will be 0
     541        1578 :         if(!recv(socket->eq_ev_ctx.fd, NULL, 1, 0)) {
     542          19 :           log_info("0 bytes read from fd %d, calling cb_connection_closed", socket->eq_ev_ctx.fd);
     543          19 :           if (socket->cb_connection_closed != NULL) {
     544          19 :             socket->cb_connection_closed(socket);
     545             :           }
     546             :         }
     547             :       }
     548        1578 :       break;
     549             :     }
     550             :     else
     551             :     {
     552             :       //Get access to the message request at the head of the list
     553        3489 :       mrd = socket->message_request_header;
     554             :       /* log_trc("element_active = %d", mrd->element_active); */
     555             :       /* log_trc("bytes_left     = %d", mrd->bytes_left); */
     556             :       /* log_trc("next_element   = %p", mrd->next_element); */
     557             :       /* log_trc("buffer         = %p", mrd->buffer); */
     558             :       /* log_trc("socket         = %p", mrd->socket); */
     559             :       /* log_trc("mrd            = %p", mrd); */
     560             :     }
     561             : 
     562        3489 :     if(mrd->element_active == NETIO_TCP_EMPTY)
     563             :     {
     564           0 :       log_fatal("netio_tcp_send_on_signal: element is not active");
     565           0 :       exit(2);
     566             :     }
     567             : 
     568        3489 :     void* data_addr  = mrd->buffer->data;
     569        3489 :     int size_total = (int)mrd->buffer->size;
     570        3489 :     log_trc("Marker = 0x%08x, size_total = %d, size_remaining = %d", *((int*)data_addr), size_total, mrd->bytes_left);
     571        3489 :     int sockfd=mrd->socket->eq_ev_ctx.fd;
     572        3489 :     if(mrd->element_active == NETIO_TCP_NEW)    //Send the message header (= message size)
     573             :     {
     574        3489 :       log_trc("start of new message of size %d", size_total);
     575        3489 :       int nsent=sizeof(int)-mrd->bytes_left;
     576        3489 :       unsigned char* s_ptr=(unsigned char*)&mrd->buffer->size;
     577        3489 :       n = send(sockfd, &s_ptr[nsent], mrd->bytes_left, 0);
     578        3489 :       if(n < 0)
     579             :       {
     580           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
     581             :         {
     582             :           log_trc("Too busy....................................... ");
     583             :         }
     584           0 :         else if (errno==ECONNRESET || errno==EBADF || errno==EPIPE)
     585             :         {
     586             :           log_dbg("socket fd %d %s",sockfd,
     587           0 :                     errno==ECONNRESET ? "Connection reset by peer" : "no longer open.");
     588             : 
     589           0 :           log_info("Calling connection closed callback");
     590           0 :           if (mrd->socket->cb_connection_closed != NULL)
     591             :           {
     592           0 :             mrd->socket->cb_connection_closed(mrd->socket);
     593             :           }
     594           0 :           mrd->socket->state=DISCONNECTED;
     595             :           /* Free all mrd entries associated with this socket */
     596           0 :           mrd->socket->message_request_header = NULL;
     597           0 :           while (mrd)
     598             :           {
     599           0 :             next_mrd = mrd->next_element;                                               //MJ: Do we need a mutex to protect concurrent access to the next_element pointer?
     600           0 :             free(mrd);
     601           0 :             mrd=next_mrd;
     602             :           }
     603             :         }
     604             :         else
     605             :         {
     606           0 :           log_warn("Error %d from send, giving up on sending header", errno);
     607             :         }
     608           0 :         return;
     609             :       }
     610        3489 :       if(n != 4)
     611             :       {
     612        3489 :         log_dbg("only sent %d bytes of length (%d)",n,mrd->buffer->size);
     613             :       }
     614        3489 :       mrd->bytes_left-=n;
     615        3489 :       if (mrd->bytes_left==0)
     616             :       {
     617        3489 :         mrd->bytes_left=mrd->total_bytes;
     618        3489 :         mrd->element_active = NETIO_TCP_IN_PROGRESS;  //header sent
     619             :       }
     620             :       else
     621             :       {
     622             :         return;
     623             :       }
     624             :     }
     625             : 
     626        3489 :     if(mrd->element_active == NETIO_TCP_IN_PROGRESS)    //if we did not manage to send the header (EAGAIN) we must not send data!!
     627             :     {
     628        3489 :       current_offset = size_total - mrd->bytes_left;
     629        3489 :       log_trc("send size = %d, current_offset = %d, fd = %d", mrd->bytes_left, current_offset, mrd->socket->eq_ev_ctx.fd);
     630             : 
     631        3489 :       n = send(sockfd, (char*)data_addr + current_offset, mrd->bytes_left, 0);
     632        3489 :       if(n < 0)
     633             :       {
     634           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
     635             :         {
     636             :           /* log_debug("Too busy.........................................."); */
     637             :         }
     638           0 :         else if (errno==ECONNRESET || errno==EBADF || errno==EPIPE)
     639             :         {
     640             :           log_dbg("socket fd %d %s",sockfd,
     641           0 :                     errno==ECONNRESET ? "Connection reset by peer" : "no longer open.");
     642             : 
     643           0 :           log_info("Calling connection closed callback");
     644           0 :           if (mrd->socket->cb_connection_closed != NULL)
     645             :           {
     646           0 :             mrd->socket->cb_connection_closed(mrd->socket);
     647             :           }
     648           0 :           mrd->socket->message_request_header = NULL;
     649           0 :           mrd->socket->state=DISCONNECTED;
     650             :           /* Free all mrd entries associated with this socket */
     651           0 :           while (mrd)
     652             :           {
     653           0 :             next_mrd = mrd->next_element;                                               //MJ: Do we need a mutex to protect concurrent access to the next_element pointer?
     654           0 :             free(mrd);
     655           0 :             mrd=next_mrd;
     656             :           }
     657             :         }
     658             :         else
     659             :         {
     660           0 :           log_warn("send() failed errno = %d", errno);
     661             :         }
     662           0 :         return;
     663             :       }
     664             : /* #ifdef CORK */
     665             : /*       int opt=0; */
     666             : /*       int ret = setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, &opt, sizeof(opt)); */
     667             : /*       if (ret != 0) */
     668             : /*         { */
     669             : /*           log_error("Problem clearing TCP_CORK, %s", strerror(errno)); */
     670             : /*         } */
     671             : /* #endif */
     672        3489 :       mrd->bytes_left = mrd->bytes_left - n;
     673        3489 :       if(mrd->bytes_left)
     674             :       {
     675             :         log_trc("d of %d bytes sent (remaining = %d)", n, mrd->bytes_left + n, mrd->bytes_left);
     676             :         return;
     677             :       }
     678             :       else
     679             :       {
     680        3489 :         log_trc("Remaining %d bytes sent. Item done", n);
     681        3489 :         if(mrd->socket->cb_send_completed)   //Call the user callback
     682             :         {
     683             :           /* log_debug("calling user callback"); */
     684        3488 :           mrd->socket->cb_send_completed(mrd->socket, mrd->key);
     685             :         }
     686             :         else
     687             :         {
     688             :           //MJ Is this an error?
     689             :           //            log_warn("No user callback registered");
     690        3489 :         }
     691             :       }
     692             :     }
     693             : 
     694             :     //Remove the message request descriptor from the list
     695        3489 :     next_mrd = mrd->next_element;                                               //MJ: Do we need a mutex to protect concurrent access to the next_element pointer?
     696             :     /* log_debug("Processing next request at address = %p", next_mrd); */
     697             : 
     698             :     //Unlink the message request from the head of the list
     699        3489 :     if(next_mrd != NULL)
     700             :     {
     701        1442 :       mrd->socket->message_request_header = next_mrd;
     702             :       /* log_debug("Oldest request unlinked from head of list. The next younger request is at address %p", next_mrd); */
     703             :     }
     704             :     else
     705             :     {
     706        2047 :       mrd->socket->message_request_header = NULL;
     707        2047 :       goto_next_request = 0 ;
     708             :       /* log_debug("Request unlinked from head of list. List is now empty"); */
     709             :     }
     710        3489 :     mrd->buffer->to_send = 0; // set buffer free to use
     711        3489 :     free(mrd);                              //Release the memory held by the message request
     712             :     /* log_debug("Memory of mrd at %p freed", mrd); */
     713             :   }
     714             : }
     715             : 
     716             : 
     717             : void
     718    15760583 : netio_tcp_recv_on_signal(void *ptr)
     719             : {
     720    15760583 :   int goto_next_request, n, rest_size;
     721    15760583 :   void *next_mrd;
     722    15760583 :   struct netio_tcp_recv_item *mrd;
     723    15760583 :   struct netio_recv_socket *socket;
     724    15760583 :   struct netio_listen_socket *lsocket;
     725             : 
     726    15760583 :   log_trc("netio_tcp_recv_on_signal called with ptr = %p", ptr);
     727    15760583 :   socket = (struct netio_recv_socket *) ptr;
     728    15760583 :   lsocket = socket->lsocket;
     729    15760583 :   lsocket->tcp_fi_mode = NETIO_MODE_TCP;  //??? should already be set!
     730             : 
     731    15760583 :   goto_next_request = 1;
     732    15760583 :   while(goto_next_request)
     733             :   {
     734             :     //Take the first message request descriptor from the head of the list
     735    15760583 :     if (socket->message_request_header == NULL)
     736             :     {
     737           0 :       log_warn("No message request pending. So, why are you here?");
     738           0 :       return;
     739             :     }
     740             : 
     741             :     //Get access to the message request at the head of the list
     742    15760583 :     mrd = socket->message_request_header;
     743    15760583 :     log_trc("element_active = %d", mrd->element_active);
     744    15760583 :     log_trc("bytes_received = %d", mrd->bytes_received);
     745    15760583 :     log_trc("next_element   = %p", mrd->next_element);
     746    15760583 :     log_trc("buffer         = %p", mrd->buffer);
     747    15760583 :     log_trc("socket         = %p", mrd->socket);
     748    15760583 :     log_trc("mrd            = %p", mrd);
     749    15760583 :     log_trc("fd             = %d", mrd->socket->eq_ev_ctx.fd);
     750    15760583 :     log_trc("msg size       = %d", mrd->message_size);
     751             : 
     752             : 
     753    15760583 :     if(mrd->element_active == NETIO_TCP_EMPTY)
     754             :     {
     755           0 :       log_fatal("element is not active");
     756           0 :       exit(2);
     757             :     }
     758             : 
     759    15760583 :     void* data_addr = mrd->buffer->data;
     760    15760583 :     int size = (int)mrd->buffer->size;
     761             :     /*log_debug("buffer->size = %d", size);*/
     762             :     //Receive the message header (= message size)
     763    15760583 :     if(mrd->element_active == NETIO_TCP_NEW)  //header not yet received
     764             :     {
     765    15297392 :       unsigned char* s_ptr=(unsigned char*)&mrd->message_size;
     766    30594785 :       n = recv(mrd->socket->eq_ev_ctx.fd, &s_ptr[mrd->bytes_received],
     767    15297392 :                sizeof(int)-mrd->bytes_received, 0);
     768    15297393 :       if (n == 0) {
     769           2 :         log_info("0 bytes read from fd %d, closing socket",mrd->socket->eq_ev_ctx.fd);
     770           2 :         netio_close_socket(&socket->ctx->evloop,socket,URECV);
     771             :       }
     772    15297393 :       if (n == -1)
     773             :       {
     774           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
     775             :         {
     776             :           /* log_debug("Nothing to read yet"); */
     777             :         }
     778             :         else if (errno==ECONNRESET || errno==EBADF || errno==EPIPE)
     779             :         {
     780             :           log_dbg("socket no longer open.");
     781             :         }
     782             :         else
     783             :         {
     784           0 :           log_warn("send() failed errno = %d", errno);
     785             :         }
     786           0 :         return;
     787             :       }
     788    15297393 :       mrd->bytes_received+=n;
     789    15297393 :       if (mrd->bytes_received==4)
     790             :       {
     791    15297391 :         mrd->bytes_received = 0;
     792    15297391 :         mrd->element_active = NETIO_TCP_IN_PROGRESS;
     793             :         /* log_debug("Header received. n = %d, Message size = %d bytes", n, size_total); */
     794    15297391 :         if (mrd->message_size > size)
     795             :         {
     796           0 :           log_error("Message size = %d bytes larger than buffer size = %d bytes. Truncating.", mrd->message_size, size);
     797           0 :           mrd->message_size = size;
     798             :         }
     799             :       }
     800             :       else
     801             :       {
     802             :         return;
     803             :       }
     804             :     }
     805             : 
     806    15760582 :     if(mrd->element_active == NETIO_TCP_IN_PROGRESS)
     807             :     {
     808             :       //Receive message data
     809    15760582 :       rest_size = mrd->message_size - mrd->bytes_received;
     810    15760582 :       log_trc("rest_size = %d", rest_size);
     811             : 
     812    15760582 :       while(rest_size)
     813             :       {
     814    15780648 :         n = recv(mrd->socket->eq_ev_ctx.fd, (char*)data_addr + mrd->bytes_received, rest_size, 0);
     815    15780649 :         if(n < 0)
     816             :         {
     817      463194 :           if (errno == EWOULDBLOCK || errno == EAGAIN)
     818             :           {
     819             :             log_trc("Out of data to read for now, returning to eventloop");
     820             :             return;
     821             :           }
     822           0 :           else if (errno==ECONNRESET || errno==EBADF || errno==EPIPE)
     823             :           {
     824           0 :             log_dbg("socket no longer open.");
     825             : 
     826             :             /* Clear what we've read??? */
     827           0 :             mrd->element_active = NETIO_TCP_NEW;
     828             :           }
     829             :           else
     830             :           {
     831           0 :             log_warn("recv() failed errno = %d", errno);
     832             :           }
     833           0 :           return;
     834             :         }
     835             : 
     836             :         /* iptr = (int *)(data_addr + mrd->bytes_received); */
     837             :         /* nn = n >> 2; */
     838             :         /* if (nn > 4) */
     839             :         /* { */
     840             :         /*   log_debug("word 0 = 0x%08x", iptr[0]); */
     841             :         /*   //DEBUG_LOG("netio_tcp_recv_on_signal (s=%d/m=%d): word 1 = 0x%08x", socket_loop, message_loop, iptr[1]); */
     842             :         /*   //DEBUG_LOG("netio_tcp_recv_on_signal (s=%d/m=%d): word %d = 0x%08x", socket_loop, message_loop, nn - 2, iptr[nn - 2]); */
     843             :         /*   //DEBUG_LOG("netio_tcp_recv_on_signal (s=%d/m=%d): word %d = 0x%08x", socket_loop, message_loop, nn - 1, iptr[nn - 1]); */
     844             :         /* } */
     845             : 
     846    15317455 :         rest_size = rest_size - n;
     847    15317455 :         mrd->bytes_received = mrd->bytes_received + n;
     848    15317455 :         if(rest_size)
     849             :         {
     850    15317455 :           log_dbg("%d of %d bytes received (remaining = %d)", n, mrd->message_size, rest_size);
     851             :         }
     852             :         else
     853             :         {
     854    31078037 :           log_dbg("remaining %d bytes received. Item done", n);
     855             :         }
     856             :       }
     857             : 
     858             :       //Call the user callback
     859    15297389 :       if(lsocket->cb_msg_received)
     860             :       {
     861    15297389 :         lsocket->cb_msg_received(mrd->socket, mrd->buffer, data_addr, mrd->message_size);
     862             :       } else {
     863    15297389 :         log_dbg("No user callback registered");
     864             :       }
     865             : 
     866             :       //Remove the message request descriptor from the list
     867    15297389 :       next_mrd = mrd->next_element;                                              //MJ: Do we need a mutex to protect concurrent access to the next_element pointer?
     868    15297389 :       log_trc("Processing next request at address = %p", next_mrd);
     869             : 
     870             :       //Unlink the message request from the head of the list
     871    15297389 :       if(next_mrd != NULL)
     872             :       {
     873    15297389 :         mrd->socket->message_request_header = next_mrd;
     874    15297389 :         log_trc("Oldest request unlinked from head of list. The next younger request is at address %p", next_mrd);
     875             :         // netio_signal_fire(&socket->tcp_signal);
     876    15297389 :         goto_next_request = 0 ;
     877             :       } else {
     878           0 :         mrd->socket->message_request_header = NULL;
     879           0 :         goto_next_request = 0 ;
     880    15297389 :         log_trc("Request unlinked from head of list. List is now empty");
     881             :       }
     882             : 
     883    15297389 :       free(mrd);                              //Release the memory held by the message request
     884             :     }
     885             :   }
     886             : }

Generated by: LCOV version 1.0