Line data Source code
1 : #include <string.h>
2 : #include <stdio.h>
3 : #include <unistd.h>
4 : #include "log.h"
5 : #include "connection_event.h"
6 : #include "netio/netio.h"
7 : #include "netio/netio_tcp.h"
8 :
9 : #if defined DEBUG || defined DEBUG_BUF
10 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
11 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
13 : #else
14 : #define log_dbg(...)
15 : #define log_trc(...)
16 : #endif
17 :
18 : #define FATAL(msg, c) \
19 : do { \
20 : log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
21 : exit(2); \
22 : } while(0);
23 :
24 :
25 : /* This type is used as a length-marker in buffers for encoded messages.
26 : Encoding of buffers is as follows: LEN0|MSG0|LEN1|MSG1|...|LEN_N|MSG_N */
27 : typedef uint32_t msg_size_t;
28 :
29 :
30 : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
31 :
32 :
33 : static int
34 1120641 : flush(struct netio_buffered_send_socket* socket)
35 : {
36 1120641 : int ret = NETIO_STATUS_OK;
37 1120641 : if(socket->current_buffer)
38 : {
39 1116749 : socket->current_buffer->size = socket->pos;
40 1116749 : int send_status = netio_send_buffer(&socket->send_socket, socket->current_buffer);
41 1116748 : if (send_status == NETIO_STATUS_AGAIN){
42 0 : socket->busy = 1;
43 0 : log_dbg("netio_send_buffer returned %d, trying again", ret);
44 0 : ret = NETIO_STATUS_AGAIN;
45 : } else {
46 1116748 : socket->busy = 0;
47 1116748 : socket->current_buffer = NULL;
48 1116748 : if(socket->timeout_ms != 0){
49 65999 : netio_timer_stop(&socket->flush_timer);
50 : }
51 : }
52 : } else { //there is no current buffer. disable busy if on
53 3892 : socket->busy = 0;
54 : }
55 1120640 : return ret;
56 : }
57 :
58 :
59 : static void
60 32 : flush_cb(void* ptr)
61 : {
62 32 : struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)ptr;
63 32 : flush(socket);
64 32 : }
65 :
66 :
67 : static void
68 1116103 : on_send_completed(struct netio_send_socket* socket, uint64_t key)
69 : {
70 1116103 : struct netio_buffer* buf = (struct netio_buffer*)key;
71 1116103 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
72 1116103 : if(netio_bufferstack_push(&bs->buffers, buf)) {
73 0 : log_fatal("The buffer stack exceeded its limits.");
74 0 : exit(1);
75 : }
76 1116103 : if(bs->buffers.available_buffers == 1) {
77 212274 : netio_signal_fire(&bs->signal_buffer_available);
78 : }
79 1116103 : }
80 :
81 : static void
82 171 : on_connect(struct netio_send_socket* socket)
83 : {
84 171 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
85 171 : netio_bufferstack_send_init(&bs->buffers, socket, bs->num_pages, bs->buffersize, 0);
86 171 : if(bs->cb_connection_established) {
87 171 : bs->cb_connection_established(bs);
88 : }
89 171 : }
90 :
91 : static void
92 167 : on_buf_send_socket_connection_closed(struct netio_send_socket* ssocket)
93 : {
94 167 : log_dbg("on_buf_send_socket_connection_closed callback");
95 167 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(ssocket->usr);
96 167 : log_dbg("removing buffer available signal fd %d from evloop %d", bs->signal_buffer_available.ev_ctx.fd, bs->signal_buffer_available.epollfd);
97 167 : netio_signal_close(&ssocket->ctx->evloop, &bs->signal_buffer_available);
98 167 : if(bs->timeout_ms != 0){
99 162 : log_dbg("removing flush timer fd %d from evloop %d", bs->flush_timer.ev_ctx.fd, bs->signal_buffer_available.epollfd);
100 162 : netio_timer_close(&ssocket->ctx->evloop, &bs->flush_timer);
101 : }
102 167 : bs->current_buffer = NULL;
103 167 : if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
104 159 : handle_send_socket_shutdown(ssocket);
105 8 : } else if (ssocket->tcp_fi_mode == NETIO_MODE_TCP){
106 8 : handle_tcp_send_socket_shutdown(ssocket);
107 : }
108 :
109 167 : netio_bufferstack_close(&bs->buffers, bs->num_pages);
110 :
111 : // This is pubsocket_on_connection_closed in pubsub.c
112 : // pubsocket_on_connection_closed will call table_remove_subscription.
113 : // table_remove_subscription can call netio_disconnect that sends a shutdown
114 : // For RDMA shutdown goes via CM, for TCP/IP it requires the FD.
115 167 : if(bs->pub_socket) { //only remove when send socket is part of a publish socket
116 23 : if(bs->cb_connection_closed) {
117 23 : bs->cb_connection_closed(bs);
118 : }
119 23 : struct netio_publish_socket* psocket = (struct netio_publish_socket*)bs->pub_socket;
120 23 : remove_socket(&(psocket->subscription_table.socket_list), (void*)bs);
121 : } else {
122 144 : if(bs->cb_connection_closed) {
123 144 : bs->cb_connection_closed(bs);
124 : }
125 : }
126 167 : }
127 :
128 : static void
129 26 : on_error_connection_refused(struct netio_send_socket* socket) {
130 26 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
131 26 : netio_signal_close(&(socket->ctx->evloop), &bs->signal_buffer_available);
132 26 : if(bs->timeout_ms != 0){
133 23 : netio_timer_close(&(socket->ctx->evloop), &bs->flush_timer);
134 : }
135 26 : if(bs->cb_error_connection_refused) {
136 26 : bs->cb_error_connection_refused(bs);
137 : }
138 26 : }
139 :
140 : static void
141 7327586 : on_buffered_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
142 : {
143 7327586 : struct netio_buffered_recv_socket* bs = (struct netio_buffered_recv_socket*)socket->usr;
144 7327586 : if(bs->lsocket->cb_msg_received) {
145 : size_t pos = 0;
146 1670207131 : while(pos < len) {
147 1662879745 : msg_size_t* s = (msg_size_t*)((char *)buf->data + pos);
148 1662879745 : pos += sizeof(msg_size_t);
149 1662879745 : bs->lsocket->cb_msg_received(bs, (char *)buf->data + pos, *s);
150 1662879545 : pos += *s;
151 : }
152 : }
153 :
154 : //to study the L1ID pileup
155 7327386 : struct netio_subscribe_socket* ssocket = bs->lsocket->listen_socket.usr;
156 7327386 : if(ssocket && ssocket->cb_buf_received) {
157 600 : ssocket->cb_buf_received(ssocket, buf, len);
158 : }
159 :
160 7327384 : netio_post_recv(socket, buf);
161 7327574 : }
162 :
163 :
164 :
165 : // API FUNCTIONS ///////////////////////////////////////////////////////////////
166 :
167 : /**
168 : * Initializes a buffered listen socket.
169 : *
170 : * @param socket: The socket to initialize
171 : * @param ctx: The NetIO context object in which to initialize the socket
172 : * @param attr: Buffer attributes of the socket. Attributes need to match on
173 : * the sending and receiving side of a socket
174 : */
175 : void
176 137 : netio_buffered_listen_socket_init(struct netio_buffered_listen_socket* socket,
177 : struct netio_context* ctx,
178 : struct netio_buffered_socket_attr* attr)
179 : {
180 137 : memset(socket, 0, sizeof(*socket));
181 137 : if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
182 0 : log_error("Number of recv pages requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
183 0 : attr->num_pages = NETIO_DOMAIN_MAX_MR;
184 : }
185 137 : socket->pagesize = attr->pagesize;
186 137 : socket->num_pages = attr->num_pages;
187 137 : netio_init_listen_socket(&socket->listen_socket, ctx, NULL);
188 137 : }
189 :
190 :
191 : /* Same as above except _tcp in netio_init_listen_socket */
192 : void
193 58 : netio_buffered_listen_tcp_socket_init(struct netio_buffered_listen_socket* socket,
194 : struct netio_context* ctx,
195 : struct netio_buffered_socket_attr* attr)
196 : {
197 58 : memset(socket, 0, sizeof(*socket));
198 58 : socket->pagesize = attr->pagesize;
199 58 : socket->num_pages = attr->num_pages;
200 58 : netio_init_listen_tcp_socket(&socket->listen_socket, ctx, NULL);
201 58 : }
202 :
203 :
204 : /**
205 : * Bind the listen socket to an interface and port number and bring the listen socket to 'listening' state.
206 : *
207 : * @param socket: The buffered listen socket
208 : * @param hostname: A hostname, typically an IP address, which identifies the interface on which to bind
209 : * @param port: The port name to listen on
210 : */
211 : void
212 137 : netio_buffered_listen(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port)
213 : {
214 137 : log_info("netio_buffered_listen %s", hostname);
215 137 : int ret;
216 137 : struct fi_info* hints;
217 137 : struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
218 :
219 137 : hints = fi_allocinfo();
220 137 : hints->ep_attr->type = FI_EP_MSG;
221 137 : hints->caps = FI_MSG;
222 137 : hints->mode = FI_LOCAL_MR;
223 137 : char port_addr[32];
224 137 : snprintf(port_addr, 32, "%u", port);
225 :
226 137 : log_dbg("listening (libfabric) on %s:%s", hostname, port_addr);
227 :
228 137 : if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
229 : &socket->listen_socket.fi)))
230 : {
231 0 : FATAL("Buf-listen socket, fail to get interface info, error ", ret);
232 : }
233 : // printf("%s\n", fi_tostr(socket->listen_socket.fi, FI_TYPE_INFO));
234 :
235 137 : if((ret = fi_fabric(socket->listen_socket.fi->fabric_attr, &socket->listen_socket.fabric, NULL)))
236 : {
237 0 : FATAL("Buf-listen socket, cannot open fabric, error ", ret);
238 : }
239 :
240 137 : if((ret = fi_eq_open(socket->listen_socket.fabric, &eq_attr, &socket->listen_socket.eq, NULL)))
241 : {
242 0 : FATAL("Buf-listen socket, cannot open Event Queue, error ", ret);
243 : }
244 :
245 137 : if((ret = fi_passive_ep(socket->listen_socket.fabric, socket->listen_socket.fi, &socket->listen_socket.pep, NULL)))
246 : {
247 0 : FATAL("Buf-listen socket, cannot open passive endpoint, error ", ret);
248 : }
249 :
250 137 : if((ret = fi_pep_bind(socket->listen_socket.pep, &socket->listen_socket.eq->fid, 0)))
251 : {
252 0 : FATAL("Buf-listen socket, cannot bind passive endpoint, error ", ret);
253 : }
254 :
255 137 : if((ret = fi_listen(socket->listen_socket.pep)))
256 : {
257 0 : FATAL("Buf-listen socket, cannot enable, error ", ret);
258 : }
259 :
260 137 : if((ret = fi_control(&socket->listen_socket.eq->fid, FI_GETWAIT, &socket->listen_socket.eqfd)))
261 : {
262 0 : FATAL("Buf-listen socket failed to obtain Event Queue wait object", ret);
263 : }
264 :
265 137 : socket->listen_socket.eq_ev_ctx.fd = socket->listen_socket.eqfd;
266 137 : socket->listen_socket.eq_ev_ctx.data = socket;
267 137 : socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_libfabric_cm_event;
268 137 : struct netio_eventloop* eloop = &socket->listen_socket.ctx->evloop;
269 137 : netio_register_read_fd(eloop, &socket->listen_socket.eq_ev_ctx);
270 137 : add_open_fd(&eloop->openfds, socket->listen_socket.eqfd, NETIO_EQ, BLISTEN, socket);
271 137 : log_dbg("netio_buffered_listen_socket: registering EQ fd %d", socket->listen_socket.eqfd);
272 137 : fi_freeinfo(hints);
273 137 : }
274 :
275 :
276 : /* _tcp version of above. This time there are more differences */
277 : void
278 58 : netio_buffered_listen_tcp(struct netio_buffered_listen_socket* socket,
279 : const char* hostname, unsigned port)
280 : {
281 58 : log_info("Buffered TCP/IP listening on %s:%d", hostname, port);
282 :
283 58 : netio_listen_tcp(&socket->listen_socket, hostname, port);
284 :
285 58 : socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_tcp_cm_event;
286 58 : netio_register_read_tcp_fd(&socket->listen_socket.ctx->evloop, &socket->listen_socket.eq_ev_ctx);
287 58 : }
288 :
289 :
290 :
291 : /**
292 : * Initializes a buffered send socket.
293 : *
294 : * @param socket: The socket to initialize
295 : * @param ctx: The NetIO context object in which to initialize the socket
296 : * @param attr: Buffer attributes of the socket. Attributes need to match on
297 : * the sending and receiving side of a socket
298 : */
299 : void
300 188 : netio_buffered_send_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
301 : {
302 188 : memset(socket, 0, sizeof(*socket));
303 188 : netio_init_send_socket(&socket->send_socket, ctx);
304 188 : socket->send_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
305 188 : socket->send_socket.usr = socket;
306 188 : socket->send_socket.cb_send_completed = on_send_completed;
307 188 : socket->send_socket.cb_connection_established = on_connect;
308 188 : socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
309 188 : socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
310 188 : socket->current_buffer = NULL;
311 188 : socket->pub_socket = NULL;
312 188 : socket->pos = 0;
313 188 : socket->busy = 0;
314 188 : socket->watermark = attr->watermark;
315 188 : if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
316 0 : log_error("Number of send buffers requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
317 0 : attr->num_pages = NETIO_DOMAIN_MAX_MR;
318 : }
319 188 : socket->num_pages = attr->num_pages;
320 188 : socket->buffersize = attr->pagesize;
321 188 : netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
322 188 : socket->signal_buffer_available.cb = NULL; //deactivated by default
323 188 : socket->timeout_ms = attr->timeout_ms;
324 188 : if(attr->timeout_ms != 0){
325 178 : netio_timer_init(&ctx->evloop, &socket->flush_timer);
326 178 : socket->flush_timer.cb = flush_cb;
327 178 : socket->flush_timer.data = socket;
328 : } else {
329 10 : socket->flush_timer.cb = NULL;
330 : }
331 188 : }
332 :
333 : /* Same as above except for the _tcp in netio_init_send_tcp_socket */
334 : /* If this works, consider factoring out common code */
335 : void
336 9 : netio_buffered_send_tcp_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
337 : {
338 9 : memset(socket, 0, sizeof(*socket));
339 9 : netio_init_send_tcp_socket(&socket->send_socket, ctx);
340 9 : socket->send_socket.usr = socket;
341 9 : socket->send_socket.cb_send_completed = on_send_completed;
342 9 : socket->send_socket.cb_connection_established = on_connect;
343 9 : socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
344 9 : socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
345 9 : socket->current_buffer = NULL;
346 9 : socket->pub_socket = NULL;
347 9 : socket->pos = 0;
348 9 : socket->busy = 0;
349 9 : socket->watermark = attr->watermark;
350 9 : socket->num_pages = attr->num_pages;
351 9 : socket->buffersize = attr->pagesize;
352 9 : netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
353 9 : socket->signal_buffer_available.cb = NULL; //deactivated by default
354 9 : socket->timeout_ms = attr->timeout_ms;
355 9 : if(attr->timeout_ms != 0){
356 9 : netio_timer_init(&ctx->evloop, &socket->flush_timer);
357 9 : socket->flush_timer.cb = flush_cb;
358 9 : socket->flush_timer.data = socket;
359 : } else {
360 0 : socket->flush_timer.cb = NULL;
361 : }
362 9 : }
363 :
364 4 : void netio_buffered_send_socket_init_and_connect(struct netio_buffered_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
365 4 : int tcp = netio_tcp_mode(hostname);
366 4 : const char* host = netio_hostname(hostname);
367 4 : if (tcp) {
368 2 : netio_buffered_send_tcp_socket_init(socket, ctx, attr);
369 : } else {
370 2 : netio_buffered_send_socket_init(socket, ctx, attr);
371 : }
372 4 : netio_buffered_connect(socket, host, port);
373 4 : }
374 :
375 2 : void netio_buffered_listen_socket_init_and_listen(struct netio_buffered_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
376 2 : int tcp = netio_tcp_mode(hostname);
377 2 : const char* host = netio_hostname(hostname);
378 2 : if (tcp) {
379 1 : netio_buffered_listen_tcp_socket_init(socket, ctx, attr);
380 1 : netio_buffered_listen_tcp(socket, host, port);
381 : } else {
382 1 : netio_buffered_listen_socket_init(socket, ctx, attr);
383 1 : netio_buffered_listen(socket, host, port);
384 : }
385 2 : }
386 :
387 : /**
388 : * Connect a buffered send socket to a remote.
389 : *
390 : * @param socket: The buffered send socket
391 : * @param hostname: Hostname or IP address of the remote endpoint
392 : * @param port: Port number of the remote endpoint
393 : */
394 : void
395 173 : netio_buffered_connect(struct netio_buffered_send_socket* socket, const char* hostname, unsigned port)
396 : {
397 173 : netio_connect(&socket->send_socket, netio_hostname(hostname), port);
398 173 : }
399 :
400 :
401 : void
402 22 : netio_buffered_connect_rawaddr(struct netio_buffered_send_socket* socket, void* addr, size_t addrlen)
403 : {
404 22 : netio_connect_rawaddr(&socket->send_socket, addr, addrlen);
405 22 : }
406 :
407 :
408 : /**
409 : * Send a message on a buffered connection.
410 : *
411 : * @param socket: The buffered send socket
412 : * @param data: Pointer to message
413 : * @param size: Size of the message
414 : *
415 : * @return
416 : * - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
417 : * Increase `pagesize` in the buffer attributes.
418 : * - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
419 : * - `NETIO_STATUS_OK` Message was successfully copied to internal buffers */
420 : int
421 16064 : netio_buffered_send(struct netio_buffered_send_socket* socket, void* data, size_t size)
422 : {
423 16064 : struct iovec iov;
424 16064 : iov.iov_base = data;
425 16064 : iov.iov_len = size;
426 16064 : return netio_buffered_sendv(socket, &iov, 1);
427 : }
428 :
429 :
430 : /**
431 : * Send a message on a buffered connection.
432 : *
433 : * @param socket: The buffered send socket
434 : * @param iov: Pointer to a scatter/gather buffer
435 : * @param num: Number of elements in the scatter/gather buffer
436 : *
437 : * @return
438 : * - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
439 : * Increase `pagesize` in the buffer attributes.
440 : * - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
441 : * - `NETIO_STATUS_OK` Message was successfully copied to internal buffers
442 : */
443 : int
444 62633908 : netio_buffered_sendv(struct netio_buffered_send_socket* socket, struct iovec* iov, size_t num)
445 : {
446 62633908 : if(socket->busy){
447 0 : int ret = flush(socket);
448 0 : if (ret == NETIO_STATUS_AGAIN){
449 : return NETIO_STATUS_AGAIN;
450 : }
451 : }
452 :
453 62633908 : size_t total_size = 0;
454 251045586 : for(unsigned int i=0; i<num; i++) {
455 188411678 : total_size += iov[i].iov_len;
456 : }
457 :
458 : //if current message is larger than the whole buffer
459 62633908 : if(total_size+sizeof(msg_size_t) > socket->buffersize) {
460 : return NETIO_STATUS_TOO_BIG;
461 : }
462 :
463 62633908 : if(socket->current_buffer == NULL) {
464 1335603 : if(netio_bufferstack_pop(&socket->buffers, &socket->current_buffer)) {
465 : return NETIO_STATUS_AGAIN;
466 : }
467 1116748 : socket->pos = 0;
468 : //Enable flush timer
469 1116748 : if(socket->timeout_ms != 0 ){
470 65999 : netio_timer_start_ms(&socket->flush_timer, socket->timeout_ms);
471 : }
472 : } else {
473 : //if current message is larger than remaining space
474 : //flush buffer and retry with a new one
475 61298305 : if(socket->pos+total_size+sizeof(msg_size_t) >= socket->buffersize){
476 4000 : flush(socket);
477 4000 : return NETIO_STATUS_AGAIN;
478 : }
479 : }
480 :
481 62411053 : *(msg_size_t*)((char*)socket->current_buffer->data + socket->pos) = total_size;
482 62411053 : socket->pos += sizeof(msg_size_t);
483 250154252 : for(unsigned int i=0; i<num; i++) {
484 187743199 : memcpy((char *)socket->current_buffer->data + socket->pos, iov[i].iov_base, iov[i].iov_len);
485 187743199 : socket->pos += iov[i].iov_len;
486 : }
487 :
488 62411053 : if(socket->pos > socket->watermark) {
489 51793 : flush(socket);
490 : }
491 : return NETIO_STATUS_OK;
492 : }
493 :
494 : void
495 113 : netio_buffered_recv_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
496 : {
497 113 : memset(socket, 0, sizeof(*socket));
498 113 : socket->lsocket = lsocket;
499 113 : netio_init_recv_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
500 113 : socket->recv_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
501 113 : socket->recv_socket.usr = socket;
502 :
503 113 : socket->num_pages = socket->lsocket->num_pages;
504 113 : socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
505 24649 : for(unsigned int i=0; i<socket->num_pages; i++) {
506 24536 : socket->pages[i].data = malloc(socket->lsocket->pagesize);
507 24536 : socket->pages[i].size = socket->lsocket->pagesize;
508 : }
509 113 : socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
510 113 : }
511 :
512 : /* Same as above except for the _tcp in netio_init_recv_tcp_socket */
513 : void
514 58 : netio_buffered_recv_tcp_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
515 : {
516 58 : memset(socket, 0, sizeof(*socket));
517 58 : socket->lsocket = lsocket;
518 58 : netio_init_recv_tcp_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
519 58 : socket->recv_socket.usr = socket;
520 :
521 58 : socket->num_pages = socket->lsocket->num_pages;
522 58 : socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
523 7959 : for(unsigned int i=0; i<socket->num_pages; i++) {
524 7901 : socket->pages[i].data = malloc(socket->lsocket->pagesize);
525 7901 : socket->pages[i].size = socket->lsocket->pagesize;
526 : }
527 58 : socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
528 58 : }
529 :
530 :
531 : /**
532 : * Flushes the current buffer of the given buffered send socket.
533 : *
534 : * @param socket The buffered send socket
535 : */
536 : void
537 1064816 : netio_buffered_flush(struct netio_buffered_send_socket* socket)
538 : {
539 1064816 : flush(socket);
540 1064816 : }
|