Line data Source code
1 : #include <unistd.h>
2 : #include <stdio.h>
3 : #include <sys/uio.h>
4 : #include <string.h>
5 : #include <stdlib.h>
6 : #include <sys/types.h>
7 : #include <sys/socket.h>
8 : #include <netinet/in.h>
9 : #include <arpa/inet.h>
10 : #include <string.h>
11 : #include <errno.h>
12 : #include <netdb.h>
13 : #include <arpa/inet.h>
14 : #include <fcntl.h>
15 : #include <ctype.h>
16 : #include <sys/ioctl.h>
17 : #include <signal.h>
18 : #include <netinet/tcp.h>
19 :
20 : #include "connection_event.h"
21 : #include "log.h"
22 : #include "netio/netio.h"
23 : #include "netio/netio_tcp.h"
24 :
25 : #if defined DEBUG || defined DEBUG_TCP
26 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
27 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
28 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
29 : #else
30 : #define log_dbg(...)
31 : #define log_trc(...)
32 : #endif
33 :
34 :
35 : int sigpipe_trapped=0;
36 :
37 : // STATIC FUNCTIONS
38 : static void _connect_tcp(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen);
39 : void make_socket_non_blocking(int sfd);
40 :
41 :
42 : //Parameters:
43 : //- socket is still empty
44 : //- hostname and port have valid values
45 : //- addr and addrlen are empty
46 : static void
47 117 : _connect_tcp(struct netio_send_socket* niosocket, const char* hostname, unsigned port, void* addr, size_t addrlen)
48 : {
49 117 : int ret, sockfd;
50 117 : char ip[100];
51 117 : struct addrinfo hints, *servinfo, *p;
52 117 : struct sockaddr_in servaddr;
53 :
54 117 : log_dbg("_connect_tcp called");
55 :
56 117 : if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
57 : {
58 0 : log_fatal("Problem in creating the socket %d", errno);
59 0 : exit(2);
60 : }
61 :
62 117 : log_dbg("sockfd = %d", sockfd);
63 :
64 117 : if (isalpha(hostname[0]))
65 : {
66 : //Convert hostname to IP
67 0 : memset(&hints, 0, sizeof hints);
68 0 : hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
69 0 : hints.ai_socktype = SOCK_STREAM;
70 :
71 0 : int rv = getaddrinfo(hostname, "http", &hints, &servinfo);
72 0 : if (rv != 0)
73 : {
74 0 : log_fatal("getaddrinfo failed. getaddrinfo: %s", gai_strerror(rv));
75 0 : exit(2);
76 : }
77 :
78 0 : for(p = servinfo; p != NULL; p = p->ai_next)
79 : {
80 0 : struct sockaddr_in* h = (struct sockaddr_in *) p->ai_addr;
81 0 : strcpy(ip, inet_ntoa(h->sin_addr));
82 : }
83 :
84 0 : freeaddrinfo(servinfo);
85 : // End of hostname to IP conversion
86 : }
87 : else
88 : {
89 117 : strncpy(ip, hostname, sizeof(ip));
90 117 : ip[sizeof(ip)-1]=0;
91 : }
92 117 : log_dbg("%s resolved to %s\n" , hostname , ip);
93 :
94 : //Creation of the socket
95 117 : memset(&servaddr, 0, sizeof(servaddr));
96 117 : servaddr.sin_family = AF_INET;
97 117 : ret = inet_pton(AF_INET, ip, &servaddr.sin_addr); //IPv4
98 117 : if (ret < 1)
99 : {
100 0 : log_error("Problem with inet_pton. ret = %d\n", ret);
101 : }
102 117 : servaddr.sin_port = htons(port); //convert to big-endian order
103 :
104 117 : make_socket_non_blocking(sockfd);
105 :
106 : /* int opt=1; */
107 : /* ret = setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); */
108 : /* if (ret != 0) */
109 : /* { */
110 : /* log_error("Problem setting TCP_NODELAY, %s", strerror(errno)); */
111 : /* } */
112 : //#define CORK
113 : #ifdef CORK
114 : int opt=1;
115 : ret = setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, &opt, sizeof(opt));
116 : if (ret != 0)
117 : {
118 : log_error("Problem setting TCP_CORK, %s", strerror(errno));
119 : }
120 : #endif
121 :
122 : //Connection of the client to the socket
123 117 : ret = connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
124 117 : if (ret == 0)
125 : {
126 : log_dbg("Already connected. Is this a problem for eopll?");
127 : }
128 : else
129 : {
130 117 : log_dbg("ret = %d, %s", ret, strerror(errno));
131 :
132 117 : if (errno == EINPROGRESS)
133 : {
134 : log_dbg("connect is EINPROGRESS. Let epoll handle it.");
135 : }
136 : else
137 : {
138 0 : log_error("Problem in connecting to the server. ret = %d", ret);
139 0 : log_error("TCP connect errno = %d (%s)\n", errno, strerror(errno));
140 0 : return;
141 : }
142 : }
143 :
144 117 : log_dbg("connect OK. Calling netio_register_write_tcp_fd");
145 117 : niosocket->eq_ev_ctx.fd = sockfd;
146 117 : niosocket->eq_ev_ctx.data = niosocket; //name "niosocket" used to avoid clash with the posix function "socket" (the compiler did not like it)
147 117 : niosocket->eq_ev_ctx.cb = on_send_socket_tcp_cm_event;
148 117 : add_open_fd(&niosocket->ctx->evloop.openfds, sockfd, NETIO_TCP, USEND, niosocket);
149 117 : netio_register_write_tcp_fd(&niosocket->ctx->evloop, &niosocket->eq_ev_ctx);
150 117 : log_dbg("_connect_tcp done");
151 : }
152 :
153 :
154 : void
155 317 : make_socket_non_blocking(int sfd)
156 : {
157 317 : int flags, s;
158 :
159 317 : flags = fcntl(sfd, F_GETFL, 0);
160 316 : if (flags == -1)
161 : {
162 0 : log_error("fcntl cannot get fd");
163 : }
164 :
165 316 : flags |= O_NONBLOCK;
166 316 : s = fcntl(sfd, F_SETFL, flags);
167 317 : if (s == -1)
168 : {
169 0 : log_error("fcntl cannot set O_NONBLOCK");
170 : }
171 317 : }
172 :
173 :
174 : void
175 117 : netio_init_send_tcp_socket(struct netio_send_socket *socket, struct netio_context *ctx)
176 : {
177 117 : log_dbg("called with &ctx->evloop = %p", &ctx->evloop);
178 :
179 117 : memset(socket, 0, sizeof(*socket));
180 117 : socket->ctx = ctx;
181 117 : socket->tcp_fi_mode = NETIO_MODE_TCP;
182 117 : socket->message_request_header = NULL;
183 117 : socket->state = UNCONNECTED; //MJ: relevant for TCP?
184 :
185 117 : netio_signal_init(&ctx->evloop, &socket->tcp_signal);
186 :
187 : //socket->tcp_signal.cb = netio_tcp_send_on_signal_test;
188 117 : socket->tcp_signal.cb = netio_tcp_send_on_signal;
189 117 : socket->tcp_signal.data = socket;
190 : /* log_debug("socket is at = %p", socket); */
191 :
192 117 : if (sigpipe_trapped==0)
193 : {
194 117 : struct sigaction action;
195 117 : action.sa_handler = SIG_IGN;
196 117 : int ret = sigaction(SIGPIPE, &action, NULL);
197 117 : if (ret)
198 : {
199 0 : perror ("sigaction");
200 : }
201 : }
202 117 : log_dbg("netio_init_send_tcp_socket done.");
203 117 : }
204 :
205 :
206 : void
207 0 : netio_tcp_send_on_signal_test(void *ptr)
208 : {
209 0 : log_error("testtesttesttest.");
210 0 : log_fatal("testtesttesttest.");
211 0 : }
212 :
213 :
214 : void
215 100 : netio_init_listen_tcp_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr)
216 : {
217 100 : log_dbg("netio_init_listen_tcp_socket called");
218 100 : memset(socket, 0, sizeof(*socket));
219 100 : socket->ctx = ctx;
220 100 : socket->recv_sockets = NULL;
221 100 : socket->tcp_fi_mode = NETIO_MODE_TCP;
222 100 : if (attr == NULL){
223 : socket->attr.buffer_size = 0;
224 : socket->attr.num_buffers = 0;
225 : } else {
226 40 : socket->attr = *attr;
227 : }
228 100 : log_dbg("netio_init_listen_tcp_socket done");
229 100 : }
230 :
231 :
232 : void
233 100 : netio_init_recv_tcp_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket)
234 : {
235 100 : log_dbg("netio_init_recv_tcp_socket called");
236 100 : memset(socket, 0, sizeof(*socket));
237 100 : socket->ctx = lsocket->ctx;
238 100 : socket->lsocket = lsocket;
239 100 : socket->message_request_header = NULL;
240 100 : socket->tcp_fi_mode = NETIO_MODE_TCP;
241 :
242 100 : netio_signal_init(&socket->ctx->evloop, &socket->tcp_signal);
243 100 : socket->tcp_signal.cb = netio_tcp_recv_on_signal;
244 100 : socket->tcp_signal.data = socket;
245 100 : log_trc("socket is at = %p", socket);
246 100 : log_dbg("netio_init_recv_tcp_socket done");
247 100 : }
248 :
249 :
250 : void
251 117 : netio_connect_tcp(struct netio_send_socket* socket, const char* hostname, unsigned port)
252 : {
253 117 : log_info("connect_tcp with hostname = %s and port = %d", hostname, port);
254 117 : _connect_tcp(socket, hostname, port, NULL, 0);
255 117 : log_info("netio_connect_tcp done");
256 117 : }
257 :
258 :
259 : void
260 100 : netio_listen_tcp(struct netio_listen_socket* niosocket, const char* hostname, unsigned port)
261 : {
262 100 : int ret, listenfd;
263 100 : struct sockaddr_in servaddr;
264 :
265 100 : log_info("TCP/IP listening on %s:%d", hostname, port);
266 :
267 100 : if ((listenfd = socket (AF_INET, SOCK_STREAM, 0)) < 0) //Create a socket
268 : {
269 0 : log_fatal("Problem in creating the socket, errno = %d", errno);
270 0 : exit(2);
271 : }
272 :
273 100 : const int opt=1;
274 100 : ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
275 100 : if (ret)
276 : {
277 0 : perror("setsockopt");
278 : }
279 :
280 100 : char* _hostname = netio_domain_name_lookup(hostname);
281 :
282 : //preparation of the socket address
283 100 : servaddr.sin_family = AF_INET;
284 100 : servaddr.sin_addr.s_addr = inet_addr(_hostname);
285 100 : servaddr.sin_port = htons(port);
286 :
287 100 : free(_hostname);
288 100 : log_dbg("SERV_PORT = %d", port);
289 100 : log_dbg("htons(SERV_PORT) = %d", htons(port));
290 100 : log_dbg("hostname = %s", hostname);
291 100 : log_dbg("inet_addr(hostname) = %d", inet_addr(hostname));
292 100 : log_dbg("servaddr.sin_addr.s_addr = %d", servaddr.sin_addr.s_addr);
293 :
294 100 : ret=bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)); //bind the socket
295 100 : if (ret && errno != EINPROGRESS)
296 : {
297 0 : perror("bind");
298 : }
299 100 : socklen_t addrlen=sizeof(servaddr);
300 100 : ret=getsockname(listenfd, &servaddr, &addrlen);
301 100 : if (ret) {
302 0 : perror("getsockname");
303 : }
304 100 : niosocket->port=ntohs(servaddr.sin_port);
305 :
306 100 : make_socket_non_blocking(listenfd); //Make the socket non-blocking
307 :
308 : #define LISTENQ 8 //maximum number of client connections. MJ: review
309 100 : listen(listenfd, LISTENQ); //listen to the socket by creating a connection queue, then wait for clients
310 100 : log_dbg("Server running...waiting for connections.");
311 :
312 100 : niosocket->eq_ev_ctx.fd = listenfd;
313 100 : niosocket->eq_ev_ctx.data = niosocket; //name "niosocket" used to avoid clash with the posix function "socket" (the compiler did not like it)
314 100 : niosocket->eq_ev_ctx.cb = on_listen_socket_tcp_cm_event;
315 100 : niosocket->tcp_fi_mode = NETIO_MODE_TCP;
316 :
317 100 : add_open_fd(&niosocket->ctx->evloop.openfds, listenfd, NETIO_TCP, ULISTEN, niosocket);
318 100 : netio_register_read_tcp_fd(&niosocket->ctx->evloop, &niosocket->eq_ev_ctx);
319 100 : log_dbg("netio_listen_tcp done");
320 100 : }
321 :
322 :
323 : int
324 0 : netio_tcp_send_imm(struct netio_send_socket* socket,
325 : struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm){
326 :
327 0 : uint32_t total_size=size;
328 0 : struct iovec msg_iov[2];
329 0 : msg_iov[0].iov_base=&total_size;
330 0 : msg_iov[0].iov_len=sizeof(uint32_t);
331 0 : msg_iov[1].iov_base=addr;
332 0 : msg_iov[1].iov_len=size;
333 0 : struct msghdr hdr={0,0,&msg_iov[0],2,0,0};
334 0 : int status=sendmsg(socket->eq_ev_ctx.fd,&hdr,0);
335 0 : if (status==total_size+sizeof(uint32_t)) {
336 : /* int opt=1; */
337 : /* if (setsockopt(socket->eq_ev_ctx.fd, IPPROTO_TCP, TCP_NODELAY, */
338 : /* &opt, sizeof(opt)) != 0) { */
339 : /* log_error("Problem setting TCP_NODELAY, %s", strerror(errno)); */
340 : /* } */
341 0 : if (socket->cb_send_completed != 0) {
342 0 : socket->cb_send_completed(socket, key);
343 : }
344 0 : log_trc("netio_tcp_send_imm done");
345 : //printf("Sent %d bytes\n",status);
346 0 : return NETIO_STATUS_OK;
347 : }
348 0 : else if (status==-1) {
349 0 : if (errno!=EAGAIN && errno!=EWOULDBLOCK) {
350 0 : perror("Error from sendmsg: ");
351 0 : exit(1);
352 : }
353 : }
354 : else {
355 0 : printf("Error only sent %d bytes of message\n",status);
356 0 : exit(1);
357 : }
358 :
359 0 : char* mem=malloc(sizeof(struct netio_tcp_send_item)+
360 : total_size+sizeof(uint32_t));
361 0 : if(mem == NULL)
362 : {
363 0 : log_fatal("cannot allocate memory for descriptor");
364 0 : exit(2);
365 : }
366 :
367 0 : printf("Queueing send for later\n");
368 : // Couldn't send immediately so add to queue for event loop
369 0 : struct netio_tcp_send_item* mrd = (struct netio_tcp_send_item *) mem;
370 0 : mrd->element_active = NETIO_TCP_NEW;
371 0 : mrd->socket = socket;
372 0 : mrd->buffer = buf;
373 0 : mrd->bytes_left = sizeof(int);
374 0 : mrd->total_bytes = total_size;
375 0 : mrd->key = key;
376 0 : mrd->next_element = NULL;
377 :
378 : //Append the descriptor to the list
379 0 : if(socket->message_request_header == NULL)
380 : {
381 0 : socket->message_request_header = (void *)mrd;
382 0 : log_trc("List was empty. Descriptor linked to head of list");
383 : }
384 : else
385 : {
386 : struct netio_tcp_send_item* mrdq = (struct netio_tcp_send_item *)socket->message_request_header;
387 0 : while (mrdq->next_element != NULL)
388 : {
389 : mrdq = (struct netio_tcp_send_item *)mrdq->next_element;
390 : }
391 0 : mrdq->next_element = (void *)mrd;
392 : }
393 :
394 0 : log_trc("Calling netio_signal_fire for signal at %p", &socket->tcp_signal);
395 0 : netio_signal_fire(&socket->tcp_signal);
396 :
397 0 : log_trc("netio_tcp_send_imm done");
398 0 : return(NETIO_STATUS_OK);
399 : }
400 :
401 : int
402 0 : netio_tcp_sendv_imm(struct netio_send_socket* socket,
403 : uint32_t total_size,
404 : struct iovec* iov,
405 : size_t count,
406 : uint64_t key,
407 : uint64_t imm)
408 : {
409 : // static int immediates=0;
410 : // static int partials=0;
411 0 : static int totalpackets=0;
412 0 : totalpackets++;
413 :
414 : // if (totalpackets%10000==0) printf("total packets %d, fully sent immediately %d partialy sent %d\n",totalpackets,immediates,partials);
415 :
416 0 : int bytes_sent=0;
417 0 : int send_size=total_size+sizeof(uint32_t);
418 0 : if(socket->message_request_header == NULL)
419 : {
420 : /* No message in progress, try sending immediately */
421 0 : struct iovec* msg_iov=(struct iovec*)malloc((count+1)*sizeof(struct iovec));
422 0 : msg_iov[0].iov_base=&total_size;
423 0 : msg_iov[0].iov_len=sizeof(uint32_t);
424 0 : for (int entry=0;entry<count;entry++) {
425 0 : msg_iov[entry+1].iov_base=iov[entry].iov_base;
426 0 : msg_iov[entry+1].iov_len=iov[entry].iov_len;
427 : }
428 0 : struct msghdr hdr={0,0,&msg_iov[0],count+1,0,0};
429 0 : bytes_sent=sendmsg(socket->eq_ev_ctx.fd,&hdr,0);
430 0 : free(msg_iov);
431 0 : if (bytes_sent==send_size) {
432 : // immediates++;
433 : /* int opt=1; */
434 : /* if (setsockopt(socket->eq_ev_ctx.fd, IPPROTO_TCP, TCP_NODELAY, */
435 : /* &opt, sizeof(opt)) != 0) { */
436 : /* log_error("Problem setting TCP_NODELAY, %s", strerror(errno)); */
437 : /* } */
438 0 : if (socket->cb_send_completed != 0) {
439 0 : socket->cb_send_completed(socket, key);
440 : }
441 : //printf("Sent %d bytes\n",bytes_sent);
442 0 : return NETIO_STATUS_OK;
443 : }
444 0 : else if (bytes_sent==-1) {
445 0 : if (errno!=EAGAIN && errno!=EWOULDBLOCK) {
446 0 : perror("Error from sendmsg: ");
447 0 : exit(1);
448 : }
449 : }
450 : // partials++;
451 : }
452 :
453 : /* ------------------------------------------------------------------ */
454 : /* OK, we didn't manage to send the complete message, queue it for
455 : later send via the event loop */
456 : /* ------------------------------------------------------------------ */
457 0 : char* mem=malloc(sizeof(struct netio_tcp_send_item)+
458 : sizeof(struct netio_buffer)+sizeof(uint32_t)+total_size+sizeof(uint32_t));
459 0 : if(mem == NULL)
460 : {
461 0 : log_fatal("cannot allocate memory for descriptor");
462 0 : exit(2);
463 : }
464 :
465 0 : struct netio_tcp_send_item* mrd = (struct netio_tcp_send_item *) mem;
466 0 : mrd->bytes_left=send_size-bytes_sent;
467 : //printf("bytes_sent=%d, bytes_left=%d\n",bytes_sent,mrd->bytes_left);
468 0 : mrd->element_active = NETIO_TCP_IN_PROGRESS;
469 0 : mrd->socket = socket;
470 0 : mrd->buffer = (struct netio_buffer*) &mem[sizeof(struct netio_tcp_send_item)];
471 0 : mrd->buffer->data = (char*) &mrd->buffer[1];
472 0 : mrd->buffer->size = send_size;
473 0 : mrd->total_bytes = total_size;
474 0 : mrd->key = key;
475 0 : mrd->next_element = NULL;
476 :
477 : // Put length in front of data to send as one
478 0 : uint32_t* tlen_ptr=(uint32_t*) &mrd->buffer[1];
479 0 : *tlen_ptr=total_size;
480 0 : int offset=sizeof(uint32_t);
481 0 : for (int seg=0; seg<count; seg++)
482 : {
483 0 : memcpy((char*)mrd->buffer->data+offset, iov[seg].iov_base, iov[seg].iov_len);
484 0 : offset += iov[seg].iov_len;
485 : }
486 : // log_debug("send descriptor allocated and initialized");
487 :
488 : //Append the descriptor to the list
489 0 : if(socket->message_request_header == NULL)
490 : {
491 0 : socket->message_request_header = (void *)mrd;
492 : //log_trc("List was empty. Descriptor linked to head of list");
493 : }
494 : else
495 : {
496 : struct netio_tcp_send_item* mrdq = (struct netio_tcp_send_item *)socket->message_request_header;
497 0 : while (mrdq->next_element != NULL)
498 : {
499 : mrdq = (struct netio_tcp_send_item *)mrdq->next_element;
500 : }
501 0 : mrdq->next_element = (void *)mrd;
502 : }
503 :
504 : //log_trc("Calling netio_signal_fire for signal at %p", &socket->tcp_signal);
505 0 : netio_signal_fire(&socket->tcp_signal);
506 :
507 : //log_trc("netio_tcp_sendv_imm done");
508 0 : return(NETIO_STATUS_OK);
509 : }
510 :
511 :
512 : void
513 3625 : netio_tcp_send_on_signal(void *ptr)
514 : {
515 3625 : int goto_next_request, current_offset, n;
516 3625 : void *next_mrd;
517 3625 : struct netio_tcp_send_item *mrd;
518 3625 : struct netio_send_socket *socket;
519 :
520 : //log_trc("netio_tcp_send_on_signal called with ptr = %p", ptr);
521 3625 : socket = (struct netio_send_socket *) ptr;
522 3625 : goto_next_request = 1;
523 :
524 7114 : while(goto_next_request)
525 : {
526 : //Take the first message request descriptor from the head of the list
527 5067 : if (socket->message_request_header == NULL)
528 : {
529 : //log_debug("No message request pending. So, why are you here?");
530 1578 : int retVal;
531 1578 : socklen_t in_len = sizeof(retVal);
532 1578 : int ret = getsockopt(socket->eq_ev_ctx.fd, SOL_SOCKET, SO_ERROR,
533 : &retVal, &in_len);
534 1578 : if (ret==0 && retVal !=0) {
535 0 : log_info("%s -- calling connection closed callback", strerror(retVal));
536 0 : if (socket->cb_connection_closed != NULL) {
537 0 : socket->cb_connection_closed(socket);
538 : }
539 : } else {
540 : // When a stream socket peer has performed an orderly shutdown, the return value will be 0
541 1578 : if(!recv(socket->eq_ev_ctx.fd, NULL, 1, 0)) {
542 19 : log_info("0 bytes read from fd %d, calling cb_connection_closed", socket->eq_ev_ctx.fd);
543 19 : if (socket->cb_connection_closed != NULL) {
544 19 : socket->cb_connection_closed(socket);
545 : }
546 : }
547 : }
548 1578 : break;
549 : }
550 : else
551 : {
552 : //Get access to the message request at the head of the list
553 3489 : mrd = socket->message_request_header;
554 : /* log_trc("element_active = %d", mrd->element_active); */
555 : /* log_trc("bytes_left = %d", mrd->bytes_left); */
556 : /* log_trc("next_element = %p", mrd->next_element); */
557 : /* log_trc("buffer = %p", mrd->buffer); */
558 : /* log_trc("socket = %p", mrd->socket); */
559 : /* log_trc("mrd = %p", mrd); */
560 : }
561 :
562 3489 : if(mrd->element_active == NETIO_TCP_EMPTY)
563 : {
564 0 : log_fatal("netio_tcp_send_on_signal: element is not active");
565 0 : exit(2);
566 : }
567 :
568 3489 : void* data_addr = mrd->buffer->data;
569 3489 : int size_total = (int)mrd->buffer->size;
570 3489 : log_trc("Marker = 0x%08x, size_total = %d, size_remaining = %d", *((int*)data_addr), size_total, mrd->bytes_left);
571 3489 : int sockfd=mrd->socket->eq_ev_ctx.fd;
572 3489 : if(mrd->element_active == NETIO_TCP_NEW) //Send the message header (= message size)
573 : {
574 3489 : log_trc("start of new message of size %d", size_total);
575 3489 : int nsent=sizeof(int)-mrd->bytes_left;
576 3489 : unsigned char* s_ptr=(unsigned char*)&mrd->buffer->size;
577 3489 : n = send(sockfd, &s_ptr[nsent], mrd->bytes_left, 0);
578 3489 : if(n < 0)
579 : {
580 0 : if (errno == EWOULDBLOCK || errno == EAGAIN)
581 : {
582 : log_trc("Too busy....................................... ");
583 : }
584 0 : else if (errno==ECONNRESET || errno==EBADF || errno==EPIPE)
585 : {
586 : log_dbg("socket fd %d %s",sockfd,
587 0 : errno==ECONNRESET ? "Connection reset by peer" : "no longer open.");
588 :
589 0 : log_info("Calling connection closed callback");
590 0 : if (mrd->socket->cb_connection_closed != NULL)
591 : {
592 0 : mrd->socket->cb_connection_closed(mrd->socket);
593 : }
594 0 : mrd->socket->state=DISCONNECTED;
595 : /* Free all mrd entries associated with this socket */
596 0 : mrd->socket->message_request_header = NULL;
597 0 : while (mrd)
598 : {
599 0 : next_mrd = mrd->next_element; //MJ: Do we need a mutex to protect concurrent access to the next_element pointer?
600 0 : free(mrd);
601 0 : mrd=next_mrd;
602 : }
603 : }
604 : else
605 : {
606 0 : log_warn("Error %d from send, giving up on sending header", errno);
607 : }
608 0 : return;
609 : }
610 3489 : if(n != 4)
611 : {
612 3489 : log_dbg("only sent %d bytes of length (%d)",n,mrd->buffer->size);
613 : }
614 3489 : mrd->bytes_left-=n;
615 3489 : if (mrd->bytes_left==0)
616 : {
617 3489 : mrd->bytes_left=mrd->total_bytes;
618 3489 : mrd->element_active = NETIO_TCP_IN_PROGRESS; //header sent
619 : }
620 : else
621 : {
622 : return;
623 : }
624 : }
625 :
626 3489 : if(mrd->element_active == NETIO_TCP_IN_PROGRESS) //if we did not manage to send the header (EAGAIN) we must not send data!!
627 : {
628 3489 : current_offset = size_total - mrd->bytes_left;
629 3489 : log_trc("send size = %d, current_offset = %d, fd = %d", mrd->bytes_left, current_offset, mrd->socket->eq_ev_ctx.fd);
630 :
631 3489 : n = send(sockfd, (char*)data_addr + current_offset, mrd->bytes_left, 0);
632 3489 : if(n < 0)
633 : {
634 0 : if (errno == EWOULDBLOCK || errno == EAGAIN)
635 : {
636 : /* log_debug("Too busy.........................................."); */
637 : }
638 0 : else if (errno==ECONNRESET || errno==EBADF || errno==EPIPE)
639 : {
640 : log_dbg("socket fd %d %s",sockfd,
641 0 : errno==ECONNRESET ? "Connection reset by peer" : "no longer open.");
642 :
643 0 : log_info("Calling connection closed callback");
644 0 : if (mrd->socket->cb_connection_closed != NULL)
645 : {
646 0 : mrd->socket->cb_connection_closed(mrd->socket);
647 : }
648 0 : mrd->socket->message_request_header = NULL;
649 0 : mrd->socket->state=DISCONNECTED;
650 : /* Free all mrd entries associated with this socket */
651 0 : while (mrd)
652 : {
653 0 : next_mrd = mrd->next_element; //MJ: Do we need a mutex to protect concurrent access to the next_element pointer?
654 0 : free(mrd);
655 0 : mrd=next_mrd;
656 : }
657 : }
658 : else
659 : {
660 0 : log_warn("send() failed errno = %d", errno);
661 : }
662 0 : return;
663 : }
664 : /* #ifdef CORK */
665 : /* int opt=0; */
666 : /* int ret = setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, &opt, sizeof(opt)); */
667 : /* if (ret != 0) */
668 : /* { */
669 : /* log_error("Problem clearing TCP_CORK, %s", strerror(errno)); */
670 : /* } */
671 : /* #endif */
672 3489 : mrd->bytes_left = mrd->bytes_left - n;
673 3489 : if(mrd->bytes_left)
674 : {
675 : log_trc("d of %d bytes sent (remaining = %d)", n, mrd->bytes_left + n, mrd->bytes_left);
676 : return;
677 : }
678 : else
679 : {
680 3489 : log_trc("Remaining %d bytes sent. Item done", n);
681 3489 : if(mrd->socket->cb_send_completed) //Call the user callback
682 : {
683 : /* log_debug("calling user callback"); */
684 3488 : mrd->socket->cb_send_completed(mrd->socket, mrd->key);
685 : }
686 : else
687 : {
688 : //MJ Is this an error?
689 : // log_warn("No user callback registered");
690 3489 : }
691 : }
692 : }
693 :
694 : //Remove the message request descriptor from the list
695 3489 : next_mrd = mrd->next_element; //MJ: Do we need a mutex to protect concurrent access to the next_element pointer?
696 : /* log_debug("Processing next request at address = %p", next_mrd); */
697 :
698 : //Unlink the message request from the head of the list
699 3489 : if(next_mrd != NULL)
700 : {
701 1442 : mrd->socket->message_request_header = next_mrd;
702 : /* log_debug("Oldest request unlinked from head of list. The next younger request is at address %p", next_mrd); */
703 : }
704 : else
705 : {
706 2047 : mrd->socket->message_request_header = NULL;
707 2047 : goto_next_request = 0 ;
708 : /* log_debug("Request unlinked from head of list. List is now empty"); */
709 : }
710 3489 : mrd->buffer->to_send = 0; // set buffer free to use
711 3489 : free(mrd); //Release the memory held by the message request
712 : /* log_debug("Memory of mrd at %p freed", mrd); */
713 : }
714 : }
715 :
716 :
717 : void
718 15760583 : netio_tcp_recv_on_signal(void *ptr)
719 : {
720 15760583 : int goto_next_request, n, rest_size;
721 15760583 : void *next_mrd;
722 15760583 : struct netio_tcp_recv_item *mrd;
723 15760583 : struct netio_recv_socket *socket;
724 15760583 : struct netio_listen_socket *lsocket;
725 :
726 15760583 : log_trc("netio_tcp_recv_on_signal called with ptr = %p", ptr);
727 15760583 : socket = (struct netio_recv_socket *) ptr;
728 15760583 : lsocket = socket->lsocket;
729 15760583 : lsocket->tcp_fi_mode = NETIO_MODE_TCP; //??? should already be set!
730 :
731 15760583 : goto_next_request = 1;
732 15760583 : while(goto_next_request)
733 : {
734 : //Take the first message request descriptor from the head of the list
735 15760583 : if (socket->message_request_header == NULL)
736 : {
737 0 : log_warn("No message request pending. So, why are you here?");
738 0 : return;
739 : }
740 :
741 : //Get access to the message request at the head of the list
742 15760583 : mrd = socket->message_request_header;
743 15760583 : log_trc("element_active = %d", mrd->element_active);
744 15760583 : log_trc("bytes_received = %d", mrd->bytes_received);
745 15760583 : log_trc("next_element = %p", mrd->next_element);
746 15760583 : log_trc("buffer = %p", mrd->buffer);
747 15760583 : log_trc("socket = %p", mrd->socket);
748 15760583 : log_trc("mrd = %p", mrd);
749 15760583 : log_trc("fd = %d", mrd->socket->eq_ev_ctx.fd);
750 15760583 : log_trc("msg size = %d", mrd->message_size);
751 :
752 :
753 15760583 : if(mrd->element_active == NETIO_TCP_EMPTY)
754 : {
755 0 : log_fatal("element is not active");
756 0 : exit(2);
757 : }
758 :
759 15760583 : void* data_addr = mrd->buffer->data;
760 15760583 : int size = (int)mrd->buffer->size;
761 : /*log_debug("buffer->size = %d", size);*/
762 : //Receive the message header (= message size)
763 15760583 : if(mrd->element_active == NETIO_TCP_NEW) //header not yet received
764 : {
765 15297392 : unsigned char* s_ptr=(unsigned char*)&mrd->message_size;
766 30594785 : n = recv(mrd->socket->eq_ev_ctx.fd, &s_ptr[mrd->bytes_received],
767 15297392 : sizeof(int)-mrd->bytes_received, 0);
768 15297393 : if (n == 0) {
769 2 : log_info("0 bytes read from fd %d, closing socket",mrd->socket->eq_ev_ctx.fd);
770 2 : netio_close_socket(&socket->ctx->evloop,socket,URECV);
771 : }
772 15297393 : if (n == -1)
773 : {
774 0 : if (errno == EWOULDBLOCK || errno == EAGAIN)
775 : {
776 : /* log_debug("Nothing to read yet"); */
777 : }
778 : else if (errno==ECONNRESET || errno==EBADF || errno==EPIPE)
779 : {
780 : log_dbg("socket no longer open.");
781 : }
782 : else
783 : {
784 0 : log_warn("send() failed errno = %d", errno);
785 : }
786 0 : return;
787 : }
788 15297393 : mrd->bytes_received+=n;
789 15297393 : if (mrd->bytes_received==4)
790 : {
791 15297391 : mrd->bytes_received = 0;
792 15297391 : mrd->element_active = NETIO_TCP_IN_PROGRESS;
793 : /* log_debug("Header received. n = %d, Message size = %d bytes", n, size_total); */
794 15297391 : if (mrd->message_size > size)
795 : {
796 0 : log_error("Message size = %d bytes larger than buffer size = %d bytes. Truncating.", mrd->message_size, size);
797 0 : mrd->message_size = size;
798 : }
799 : }
800 : else
801 : {
802 : return;
803 : }
804 : }
805 :
806 15760582 : if(mrd->element_active == NETIO_TCP_IN_PROGRESS)
807 : {
808 : //Receive message data
809 15760582 : rest_size = mrd->message_size - mrd->bytes_received;
810 15760582 : log_trc("rest_size = %d", rest_size);
811 :
812 15760582 : while(rest_size)
813 : {
814 15780648 : n = recv(mrd->socket->eq_ev_ctx.fd, (char*)data_addr + mrd->bytes_received, rest_size, 0);
815 15780649 : if(n < 0)
816 : {
817 463194 : if (errno == EWOULDBLOCK || errno == EAGAIN)
818 : {
819 : log_trc("Out of data to read for now, returning to eventloop");
820 : return;
821 : }
822 0 : else if (errno==ECONNRESET || errno==EBADF || errno==EPIPE)
823 : {
824 0 : log_dbg("socket no longer open.");
825 :
826 : /* Clear what we've read??? */
827 0 : mrd->element_active = NETIO_TCP_NEW;
828 : }
829 : else
830 : {
831 0 : log_warn("recv() failed errno = %d", errno);
832 : }
833 0 : return;
834 : }
835 :
836 : /* iptr = (int *)(data_addr + mrd->bytes_received); */
837 : /* nn = n >> 2; */
838 : /* if (nn > 4) */
839 : /* { */
840 : /* log_debug("word 0 = 0x%08x", iptr[0]); */
841 : /* //DEBUG_LOG("netio_tcp_recv_on_signal (s=%d/m=%d): word 1 = 0x%08x", socket_loop, message_loop, iptr[1]); */
842 : /* //DEBUG_LOG("netio_tcp_recv_on_signal (s=%d/m=%d): word %d = 0x%08x", socket_loop, message_loop, nn - 2, iptr[nn - 2]); */
843 : /* //DEBUG_LOG("netio_tcp_recv_on_signal (s=%d/m=%d): word %d = 0x%08x", socket_loop, message_loop, nn - 1, iptr[nn - 1]); */
844 : /* } */
845 :
846 15317455 : rest_size = rest_size - n;
847 15317455 : mrd->bytes_received = mrd->bytes_received + n;
848 15317455 : if(rest_size)
849 : {
850 15317455 : log_dbg("%d of %d bytes received (remaining = %d)", n, mrd->message_size, rest_size);
851 : }
852 : else
853 : {
854 31078037 : log_dbg("remaining %d bytes received. Item done", n);
855 : }
856 : }
857 :
858 : //Call the user callback
859 15297389 : if(lsocket->cb_msg_received)
860 : {
861 15297389 : lsocket->cb_msg_received(mrd->socket, mrd->buffer, data_addr, mrd->message_size);
862 : } else {
863 15297389 : log_dbg("No user callback registered");
864 : }
865 :
866 : //Remove the message request descriptor from the list
867 15297389 : next_mrd = mrd->next_element; //MJ: Do we need a mutex to protect concurrent access to the next_element pointer?
868 15297389 : log_trc("Processing next request at address = %p", next_mrd);
869 :
870 : //Unlink the message request from the head of the list
871 15297389 : if(next_mrd != NULL)
872 : {
873 15297389 : mrd->socket->message_request_header = next_mrd;
874 15297389 : log_trc("Oldest request unlinked from head of list. The next younger request is at address %p", next_mrd);
875 : // netio_signal_fire(&socket->tcp_signal);
876 15297389 : goto_next_request = 0 ;
877 : } else {
878 0 : mrd->socket->message_request_header = NULL;
879 0 : goto_next_request = 0 ;
880 15297389 : log_trc("Request unlinked from head of list. List is now empty");
881 : }
882 :
883 15297389 : free(mrd); //Release the memory held by the message request
884 : }
885 : }
886 : }
|