Line data Source code
1 : #include <string.h>
2 : #include <stdio.h>
3 : #include <unistd.h>
4 : #include "log.h"
5 : #include "connection_event.h"
6 : #include "netio/netio.h"
7 : #include "netio/netio_tcp.h"
8 :
9 : #if defined DEBUG || defined DEBUG_BUF
10 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
11 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
13 : #else
14 : #define log_dbg(...)
15 : #define log_trc(...)
16 : #endif
17 :
18 : #define FATAL(msg, c) \
19 : do { \
20 : log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
21 : exit(2); \
22 : } while(0);
23 :
24 :
25 : /* This type is used as a length-marker in buffers for encoded messages.
26 : Encoding of buffers is as follows: LEN0|MSG0|LEN1|MSG1|...|LEN_N|MSG_N */
27 : typedef uint32_t msg_size_t;
28 :
29 :
30 : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
31 :
32 :
33 : static int
34 26 : flush(struct netio_buffered_send_socket* socket)
35 : {
36 : int ret = NETIO_STATUS_OK;
37 26 : if(socket->current_buffer)
38 : {
39 26 : socket->current_buffer->size = socket->pos;
40 26 : int send_status = netio_send_buffer(&socket->send_socket, socket->current_buffer);
41 26 : if (send_status == NETIO_STATUS_AGAIN){
42 0 : socket->busy = 1;
43 : log_dbg("netio_send_buffer returned %d, trying again", ret);
44 : ret = NETIO_STATUS_AGAIN;
45 : } else {
46 26 : socket->busy = 0;
47 26 : socket->current_buffer = NULL;
48 26 : if(socket->timeout_ms != 0){
49 24 : netio_timer_stop(&socket->flush_timer);
50 : }
51 : }
52 : } else { //there is no current buffer. disable busy if on
53 0 : socket->busy = 0;
54 : }
55 26 : return ret;
56 : }
57 :
58 :
59 : static void
60 20 : flush_cb(void* ptr)
61 : {
62 : struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)ptr;
63 20 : flush(socket);
64 20 : }
65 :
66 :
67 : static void
68 26 : on_send_completed(struct netio_send_socket* socket, uint64_t key)
69 : {
70 26 : struct netio_buffer* buf = (struct netio_buffer*)key;
71 26 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
72 26 : if(netio_bufferstack_push(&bs->buffers, buf)) {
73 0 : log_fatal("The buffer stack exceeded its limits.");
74 0 : exit(1);
75 : }
76 26 : if(bs->buffers.available_buffers == 1) {
77 0 : netio_signal_fire(&bs->signal_buffer_available);
78 : }
79 26 : }
80 :
81 : static void
82 11 : on_connect(struct netio_send_socket* socket)
83 : {
84 11 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
85 11 : netio_bufferstack_send_init(&bs->buffers, socket, bs->num_pages, bs->buffersize, 0);
86 11 : if(bs->cb_connection_established) {
87 11 : bs->cb_connection_established(bs);
88 : }
89 11 : }
90 :
91 : static void
92 7 : on_buf_send_socket_connection_closed(struct netio_send_socket* ssocket)
93 : {
94 : log_dbg("on_buf_send_socket_connection_closed callback");
95 7 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(ssocket->usr);
96 : log_dbg("removing buffer available signal fd %d from evloop %d", bs->signal_buffer_available.ev_ctx.fd, bs->signal_buffer_available.epollfd);
97 7 : netio_signal_close(&ssocket->ctx->evloop, &bs->signal_buffer_available);
98 7 : if(bs->timeout_ms != 0){
99 : log_dbg("removing flush timer fd %d from evloop %d", bs->flush_timer.ev_ctx.fd, bs->signal_buffer_available.epollfd);
100 6 : netio_timer_close(&ssocket->ctx->evloop, &bs->flush_timer);
101 : }
102 7 : bs->current_buffer = NULL;
103 7 : if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
104 4 : handle_send_socket_shutdown(ssocket);
105 3 : } else if (ssocket->tcp_fi_mode == NETIO_MODE_TCP){
106 3 : handle_tcp_send_socket_shutdown(ssocket);
107 : }
108 :
109 7 : netio_bufferstack_close(&bs->buffers, bs->num_pages);
110 :
111 : // This is pubsocket_on_connection_closed in pubsub.c
112 : // pubsocket_on_connection_closed will call table_remove_subscription.
113 : // table_remove_subscription can call netio_disconnect that sends a shutdown
114 : // For RDMA shutdown goes via CM, for TCP/IP it requires the FD.
115 7 : if(bs->pub_socket) { //only remove when send socket is part of a publish socket
116 5 : if(bs->cb_connection_closed) {
117 5 : bs->cb_connection_closed(bs);
118 : }
119 5 : struct netio_publish_socket* psocket = (struct netio_publish_socket*)bs->pub_socket;
120 5 : remove_socket(&(psocket->subscription_table.socket_list), (void*)bs);
121 : } else {
122 2 : if(bs->cb_connection_closed) {
123 2 : bs->cb_connection_closed(bs);
124 : }
125 : }
126 7 : }
127 :
128 : static void
129 3 : on_error_connection_refused(struct netio_send_socket* socket) {
130 3 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
131 3 : netio_signal_close(&(socket->ctx->evloop), &bs->signal_buffer_available);
132 3 : if(bs->timeout_ms != 0){
133 0 : netio_timer_close(&(socket->ctx->evloop), &bs->flush_timer);
134 : }
135 3 : if(bs->cb_error_connection_refused) {
136 3 : bs->cb_error_connection_refused(bs);
137 : }
138 3 : }
139 :
140 : static void
141 6624749 : on_buffered_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
142 : {
143 6624749 : struct netio_buffered_recv_socket* bs = (struct netio_buffered_recv_socket*)socket->usr;
144 6624749 : if(bs->lsocket->cb_msg_received) {
145 : size_t pos = 0;
146 1721332241 : while(pos < len) {
147 1714707573 : msg_size_t* s = (msg_size_t*)((char *)buf->data + pos);
148 1714707573 : pos += sizeof(msg_size_t);
149 1714707573 : bs->lsocket->cb_msg_received(bs, (char *)buf->data + pos, *s);
150 1714707492 : pos += *s;
151 : }
152 : }
153 :
154 : //to study the L1ID pileup
155 6624668 : struct netio_subscribe_socket* ssocket = bs->lsocket->listen_socket.usr;
156 6624668 : if(ssocket && ssocket->cb_buf_received) {
157 0 : ssocket->cb_buf_received(ssocket, buf, len);
158 : }
159 :
160 6624668 : netio_post_recv(socket, buf);
161 6624759 : }
162 :
163 :
164 :
165 : // API FUNCTIONS ///////////////////////////////////////////////////////////////
166 :
167 : /**
168 : * Initializes a buffered listen socket.
169 : *
170 : * @param socket: The socket to initialize
171 : * @param ctx: The NetIO context object in which to initialize the socket
172 : * @param attr: Buffer attributes of the socket. Attributes need to match on
173 : * the sending and receiving side of a socket
174 : */
175 : void
176 33 : netio_buffered_listen_socket_init(struct netio_buffered_listen_socket* socket,
177 : struct netio_context* ctx,
178 : struct netio_buffered_socket_attr* attr)
179 : {
180 33 : memset(socket, 0, sizeof(*socket));
181 33 : if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
182 0 : log_error("Number of recv pages requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
183 0 : attr->num_pages = NETIO_DOMAIN_MAX_MR;
184 : }
185 33 : socket->pagesize = attr->pagesize;
186 33 : socket->num_pages = attr->num_pages;
187 33 : netio_init_listen_socket(&socket->listen_socket, ctx, NULL);
188 33 : }
189 :
190 :
191 : /* Same as above except _tcp in netio_init_listen_socket */
192 : void
193 27 : netio_buffered_listen_tcp_socket_init(struct netio_buffered_listen_socket* socket,
194 : struct netio_context* ctx,
195 : struct netio_buffered_socket_attr* attr)
196 : {
197 27 : memset(socket, 0, sizeof(*socket));
198 27 : socket->pagesize = attr->pagesize;
199 27 : socket->num_pages = attr->num_pages;
200 27 : netio_init_listen_tcp_socket(&socket->listen_socket, ctx, NULL);
201 27 : }
202 :
203 :
204 : /**
205 : * Bind the listen socket to an interface and port number and bring the listen socket to 'listening' state.
206 : *
207 : * @param socket: The buffered listen socket
208 : * @param hostname: A hostname, typically an IP address, which identifies the interface on which to bind
209 : * @param port: The port name to listen on
210 : */
211 : void
212 33 : netio_buffered_listen(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port)
213 : {
214 33 : log_info("netio_buffered_listen %s", hostname);
215 : int ret;
216 : struct fi_info* hints;
217 33 : struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
218 :
219 : hints = fi_allocinfo();
220 33 : hints->ep_attr->type = FI_EP_MSG;
221 33 : hints->caps = FI_MSG;
222 33 : hints->mode = FI_LOCAL_MR;
223 : char port_addr[32];
224 33 : snprintf(port_addr, 32, "%u", port);
225 :
226 : log_dbg("listening (libfabric) on %s:%s", hostname, port_addr);
227 :
228 33 : if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
229 : &socket->listen_socket.fi)))
230 : {
231 0 : FATAL("Buf-listen socket, fail to get interface info, error ", ret);
232 : }
233 : // printf("%s\n", fi_tostr(socket->listen_socket.fi, FI_TYPE_INFO));
234 :
235 33 : if((ret = fi_fabric(socket->listen_socket.fi->fabric_attr, &socket->listen_socket.fabric, NULL)))
236 : {
237 0 : FATAL("Buf-listen socket, cannot open fabric, error ", ret);
238 : }
239 :
240 33 : if((ret = fi_eq_open(socket->listen_socket.fabric, &eq_attr, &socket->listen_socket.eq, NULL)))
241 : {
242 0 : FATAL("Buf-listen socket, cannot open Event Queue, error ", ret);
243 : }
244 :
245 33 : if((ret = fi_passive_ep(socket->listen_socket.fabric, socket->listen_socket.fi, &socket->listen_socket.pep, NULL)))
246 : {
247 0 : FATAL("Buf-listen socket, cannot open passive endpoint, error ", ret);
248 : }
249 :
250 33 : if((ret = fi_pep_bind(socket->listen_socket.pep, &socket->listen_socket.eq->fid, 0)))
251 : {
252 0 : FATAL("Buf-listen socket, cannot bind passive endpoint, error ", ret);
253 : }
254 :
255 33 : if((ret = fi_listen(socket->listen_socket.pep)))
256 : {
257 0 : FATAL("Buf-listen socket, cannot enable, error ", ret);
258 : }
259 :
260 33 : if((ret = fi_control(&socket->listen_socket.eq->fid, FI_GETWAIT, &socket->listen_socket.eqfd)))
261 : {
262 0 : FATAL("Buf-listen socket failed to obtain Event Queue wait object", ret);
263 : }
264 :
265 33 : socket->listen_socket.eq_ev_ctx.fd = socket->listen_socket.eqfd;
266 33 : socket->listen_socket.eq_ev_ctx.data = socket;
267 33 : socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_libfabric_cm_event;
268 33 : struct netio_eventloop* eloop = &socket->listen_socket.ctx->evloop;
269 33 : netio_register_read_fd(eloop, &socket->listen_socket.eq_ev_ctx);
270 33 : add_open_fd(&eloop->openfds, socket->listen_socket.eqfd, NETIO_EQ, BLISTEN, socket);
271 : log_dbg("netio_buffered_listen_socket: registering EQ fd %d", socket->listen_socket.eqfd);
272 33 : fi_freeinfo(hints);
273 33 : }
274 :
275 :
276 : /* _tcp version of above. This time there are more differences */
277 : void
278 27 : netio_buffered_listen_tcp(struct netio_buffered_listen_socket* socket,
279 : const char* hostname, unsigned port)
280 : {
281 27 : log_info("Buffered TCP/IP listening on %s:%d", hostname, port);
282 :
283 27 : netio_listen_tcp(&socket->listen_socket, hostname, port);
284 :
285 27 : socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_tcp_cm_event;
286 27 : netio_register_read_tcp_fd(&socket->listen_socket.ctx->evloop, &socket->listen_socket.eq_ev_ctx);
287 27 : }
288 :
289 :
290 :
291 : /**
292 : * Initializes a buffered send socket.
293 : *
294 : * @param socket: The socket to initialize
295 : * @param ctx: The NetIO context object in which to initialize the socket
296 : * @param attr: Buffer attributes of the socket. Attributes need to match on
297 : * the sending and receiving side of a socket
298 : */
299 : void
300 10 : netio_buffered_send_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
301 : {
302 10 : memset(socket, 0, sizeof(*socket));
303 10 : netio_init_send_socket(&socket->send_socket, ctx);
304 10 : socket->send_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
305 10 : socket->send_socket.usr = socket;
306 10 : socket->send_socket.cb_send_completed = on_send_completed;
307 10 : socket->send_socket.cb_connection_established = on_connect;
308 10 : socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
309 10 : socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
310 10 : socket->current_buffer = NULL;
311 10 : socket->pub_socket = NULL;
312 10 : socket->pos = 0;
313 10 : socket->busy = 0;
314 10 : socket->watermark = attr->watermark;
315 10 : if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
316 0 : log_error("Number of send buffers requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
317 0 : attr->num_pages = NETIO_DOMAIN_MAX_MR;
318 : }
319 10 : socket->num_pages = attr->num_pages;
320 10 : socket->buffersize = attr->pagesize;
321 10 : netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
322 10 : socket->signal_buffer_available.cb = NULL; //deactivated by default
323 10 : socket->timeout_ms = attr->timeout_ms;
324 10 : if(attr->timeout_ms != 0){
325 4 : netio_timer_init(&ctx->evloop, &socket->flush_timer);
326 4 : socket->flush_timer.cb = flush_cb;
327 4 : socket->flush_timer.data = socket;
328 : } else {
329 6 : socket->flush_timer.cb = NULL;
330 : }
331 10 : }
332 :
333 : /* Same as above except for the _tcp in netio_init_send_tcp_socket */
334 : /* If this works, consider factoring out common code */
335 : void
336 4 : netio_buffered_send_tcp_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
337 : {
338 4 : memset(socket, 0, sizeof(*socket));
339 4 : netio_init_send_tcp_socket(&socket->send_socket, ctx);
340 4 : socket->send_socket.usr = socket;
341 4 : socket->send_socket.cb_send_completed = on_send_completed;
342 4 : socket->send_socket.cb_connection_established = on_connect;
343 4 : socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
344 4 : socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
345 4 : socket->current_buffer = NULL;
346 4 : socket->pub_socket = NULL;
347 4 : socket->pos = 0;
348 4 : socket->busy = 0;
349 4 : socket->watermark = attr->watermark;
350 4 : socket->num_pages = attr->num_pages;
351 4 : socket->buffersize = attr->pagesize;
352 4 : netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
353 4 : socket->signal_buffer_available.cb = NULL; //deactivated by default
354 4 : socket->timeout_ms = attr->timeout_ms;
355 4 : if(attr->timeout_ms != 0){
356 4 : netio_timer_init(&ctx->evloop, &socket->flush_timer);
357 4 : socket->flush_timer.cb = flush_cb;
358 4 : socket->flush_timer.data = socket;
359 : } else {
360 0 : socket->flush_timer.cb = NULL;
361 : }
362 4 : }
363 :
364 4 : void netio_buffered_send_socket_init_and_connect(struct netio_buffered_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
365 4 : int tcp = netio_tcp_mode(hostname);
366 4 : const char* host = netio_hostname(hostname);
367 4 : if (tcp) {
368 2 : netio_buffered_send_tcp_socket_init(socket, ctx, attr);
369 : } else {
370 2 : netio_buffered_send_socket_init(socket, ctx, attr);
371 : }
372 4 : netio_buffered_connect(socket, host, port);
373 4 : }
374 :
375 2 : void netio_buffered_listen_socket_init_and_listen(struct netio_buffered_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
376 2 : int tcp = netio_tcp_mode(hostname);
377 2 : const char* host = netio_hostname(hostname);
378 2 : if (tcp) {
379 1 : netio_buffered_listen_tcp_socket_init(socket, ctx, attr);
380 1 : netio_buffered_listen_tcp(socket, host, port);
381 : } else {
382 1 : netio_buffered_listen_socket_init(socket, ctx, attr);
383 1 : netio_buffered_listen(socket, host, port);
384 : }
385 2 : }
386 :
387 : /**
388 : * Connect a buffered send socket to a remote.
389 : *
390 : * @param socket: The buffered send socket
391 : * @param hostname: Hostname or IP address of the remote endpoint
392 : * @param port: Port number of the remote endpoint
393 : */
394 : void
395 8 : netio_buffered_connect(struct netio_buffered_send_socket* socket, const char* hostname, unsigned port)
396 : {
397 8 : netio_connect(&socket->send_socket, netio_hostname(hostname), port);
398 8 : }
399 :
400 :
401 : void
402 4 : netio_buffered_connect_rawaddr(struct netio_buffered_send_socket* socket, void* addr, size_t addrlen)
403 : {
404 4 : netio_connect_rawaddr(&socket->send_socket, addr, addrlen);
405 4 : }
406 :
407 :
408 : /**
409 : * Send a message on a buffered connection.
410 : *
411 : * @param socket: The buffered send socket
412 : * @param data: Pointer to message
413 : * @param size: Size of the message
414 : *
415 : * @return
416 : * - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
417 : * Increase `pagesize` in the buffer attributes.
418 : * - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
419 : * - `NETIO_STATUS_OK` Message was successfully copied to internal buffers */
420 : int
421 2002 : netio_buffered_send(struct netio_buffered_send_socket* socket, void* data, size_t size)
422 : {
423 : struct iovec iov;
424 2002 : iov.iov_base = data;
425 2002 : iov.iov_len = size;
426 2002 : return netio_buffered_sendv(socket, &iov, 1);
427 : }
428 :
429 :
430 : /**
431 : * Send a message on a buffered connection.
432 : *
433 : * @param socket: The buffered send socket
434 : * @param iov: Pointer to a scatter/gather buffer
435 : * @param num: Number of elements in the scatter/gather buffer
436 : *
437 : * @return
438 : * - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
439 : * Increase `pagesize` in the buffer attributes.
440 : * - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
441 : * - `NETIO_STATUS_OK` Message was successfully copied to internal buffers
442 : */
443 : int
444 12104 : netio_buffered_sendv(struct netio_buffered_send_socket* socket, struct iovec* iov, size_t num)
445 : {
446 12104 : if(socket->busy){
447 0 : int ret = flush(socket);
448 0 : if (ret == NETIO_STATUS_AGAIN){
449 : return NETIO_STATUS_AGAIN;
450 : }
451 : }
452 :
453 : size_t total_size = 0;
454 34310 : for(unsigned int i=0; i<num; i++) {
455 22206 : total_size += iov[i].iov_len;
456 : }
457 :
458 : //if current message is larger than the whole buffer
459 12104 : if(total_size+sizeof(msg_size_t) > socket->buffersize) {
460 : return NETIO_STATUS_TOO_BIG;
461 : }
462 :
463 12104 : if(socket->current_buffer == NULL) {
464 26 : if(netio_bufferstack_pop(&socket->buffers, &socket->current_buffer)) {
465 : return NETIO_STATUS_AGAIN;
466 : }
467 26 : socket->pos = 0;
468 : //Enable flush timer
469 26 : if(socket->timeout_ms != 0 ){
470 24 : netio_timer_start_ms(&socket->flush_timer, socket->timeout_ms);
471 : }
472 : } else {
473 : //if current message is larger than remaining space
474 : //flush buffer and retry with a new one
475 12078 : if(socket->pos+total_size+sizeof(msg_size_t) >= socket->buffersize){
476 0 : flush(socket);
477 0 : return NETIO_STATUS_AGAIN;
478 : }
479 : }
480 :
481 12104 : *(msg_size_t*)((char*)socket->current_buffer->data + socket->pos) = total_size;
482 12104 : socket->pos += sizeof(msg_size_t);
483 34310 : for(unsigned int i=0; i<num; i++) {
484 22206 : memcpy((char *)socket->current_buffer->data + socket->pos, iov[i].iov_base, iov[i].iov_len);
485 22206 : socket->pos += iov[i].iov_len;
486 : }
487 :
488 12104 : if(socket->pos > socket->watermark) {
489 0 : flush(socket);
490 : }
491 : return NETIO_STATUS_OK;
492 : }
493 :
494 : void
495 31 : netio_buffered_recv_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
496 : {
497 31 : memset(socket, 0, sizeof(*socket));
498 31 : socket->lsocket = lsocket;
499 31 : netio_init_recv_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
500 31 : socket->recv_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
501 31 : socket->recv_socket.usr = socket;
502 :
503 31 : socket->num_pages = socket->lsocket->num_pages;
504 31 : socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
505 7479 : for(unsigned int i=0; i<socket->num_pages; i++) {
506 7448 : socket->pages[i].data = malloc(socket->lsocket->pagesize);
507 7448 : socket->pages[i].size = socket->lsocket->pagesize;
508 : }
509 31 : socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
510 31 : }
511 :
512 : /* Same as above except for the _tcp in netio_init_recv_tcp_socket */
513 : void
514 27 : netio_buffered_recv_tcp_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
515 : {
516 27 : memset(socket, 0, sizeof(*socket));
517 27 : socket->lsocket = lsocket;
518 27 : netio_init_recv_tcp_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
519 27 : socket->recv_socket.usr = socket;
520 :
521 27 : socket->num_pages = socket->lsocket->num_pages;
522 27 : socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
523 6937 : for(unsigned int i=0; i<socket->num_pages; i++) {
524 6910 : socket->pages[i].data = malloc(socket->lsocket->pagesize);
525 6910 : socket->pages[i].size = socket->lsocket->pagesize;
526 : }
527 27 : socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
528 27 : }
529 :
530 :
531 : /**
532 : * Flushes the current buffer of the given buffered send socket.
533 : *
534 : * @param socket The buffered send socket
535 : */
536 : void
537 6 : netio_buffered_flush(struct netio_buffered_send_socket* socket)
538 : {
539 6 : flush(socket);
540 6 : }
|