.. _program_listing_file_buffered.c: Program Listing for File buffered.c =================================== |exhale_lsh| :ref:`Return to documentation for file ` (``buffered.c``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include #include #include #include "log.h" #include "connection_event.h" #include "netio/netio.h" #include "netio/netio_tcp.h" #if defined DEBUG || defined DEBUG_BUF #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__) #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__) #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__) #else #define log_dbg(...) #define log_trc(...) #endif #define FATAL(msg, c) \ do { \ log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \ exit(2); \ } while(0); /* This type is used as a length-marker in buffers for encoded messages. Encoding of buffers is as follows: LEN0|MSG0|LEN1|MSG1|...|LEN_N|MSG_N */ typedef uint32_t msg_size_t; // STATIC FUNCTIONS //////////////////////////////////////////////////////////// static int flush(struct netio_buffered_send_socket* socket) { int ret = NETIO_STATUS_OK; if(socket->current_buffer) { socket->current_buffer->size = socket->pos; int send_status = netio_send_buffer(&socket->send_socket, socket->current_buffer); if (send_status == NETIO_STATUS_AGAIN){ socket->busy = 1; log_dbg("netio_send_buffer returned %d, trying again", ret); ret = NETIO_STATUS_AGAIN; } else { socket->busy = 0; socket->current_buffer = NULL; if(socket->timeout_ms != 0){ netio_timer_stop(&socket->flush_timer); } } } else { //there is no current buffer. disable busy if on socket->busy = 0; } return ret; } static void flush_cb(void* ptr) { struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)ptr; flush(socket); } static void on_send_completed(struct netio_send_socket* socket, uint64_t key) { struct netio_buffer* buf = (struct netio_buffer*)key; struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr); if(netio_bufferstack_push(&bs->buffers, buf)) { log_fatal("The buffer stack exceeded its limits."); exit(1); } if(bs->buffers.available_buffers == 1) { netio_signal_fire(&bs->signal_buffer_available); } } static void on_connect(struct netio_send_socket* socket) { struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr); netio_bufferstack_send_init(&bs->buffers, socket, bs->num_pages, bs->buffersize, 0); if(bs->cb_connection_established) { bs->cb_connection_established(bs); } } static void on_buf_send_socket_connection_closed(struct netio_send_socket* ssocket) { log_dbg("on_buf_send_socket_connection_closed callback"); struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(ssocket->usr); log_dbg("removing buffer available signal fd %d from evloop %d", bs->signal_buffer_available.ev_ctx.fd, bs->signal_buffer_available.epollfd); netio_signal_close(&ssocket->ctx->evloop, &bs->signal_buffer_available); if(bs->timeout_ms != 0){ log_dbg("removing flush timer fd %d from evloop %d", bs->flush_timer.ev_ctx.fd, bs->signal_buffer_available.epollfd); netio_timer_close(&ssocket->ctx->evloop, &bs->flush_timer); } bs->current_buffer = NULL; if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){ handle_send_socket_shutdown(ssocket); } else if (ssocket->tcp_fi_mode == NETIO_MODE_TCP){ handle_tcp_send_socket_shutdown(ssocket); } netio_bufferstack_close(&bs->buffers, bs->num_pages); // This is pubsocket_on_connection_closed in pubsub.c // pubsocket_on_connection_closed will call table_remove_subscription. // table_remove_subscription can call netio_disconnect that sends a shutdown // For RDMA shutdown goes via CM, for TCP/IP it requires the FD. if(bs->pub_socket) { //only remove when send socket is part of a publish socket if(bs->cb_connection_closed) { bs->cb_connection_closed(bs); } struct netio_publish_socket* psocket = (struct netio_publish_socket*)bs->pub_socket; remove_socket(&(psocket->subscription_table.socket_list), (void*)bs); } else { if(bs->cb_connection_closed) { bs->cb_connection_closed(bs); } } } static void on_error_connection_refused(struct netio_send_socket* socket) { struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr); netio_signal_close(&(socket->ctx->evloop), &bs->signal_buffer_available); if(bs->timeout_ms != 0){ netio_timer_close(&(socket->ctx->evloop), &bs->flush_timer); } if(bs->cb_error_connection_refused) { bs->cb_error_connection_refused(bs); } } static void on_buffered_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len) { struct netio_buffered_recv_socket* bs = (struct netio_buffered_recv_socket*)socket->usr; if(bs->lsocket->cb_msg_received) { size_t pos = 0; while(pos < len) { msg_size_t* s = (msg_size_t*)((char *)buf->data + pos); pos += sizeof(msg_size_t); bs->lsocket->cb_msg_received(bs, (char *)buf->data + pos, *s); pos += *s; } } //to study the L1ID pileup struct netio_subscribe_socket* ssocket = bs->lsocket->listen_socket.usr; if(ssocket && ssocket->cb_buf_received) { ssocket->cb_buf_received(ssocket, buf, len); } netio_post_recv(socket, buf); } // API FUNCTIONS /////////////////////////////////////////////////////////////// void netio_buffered_listen_socket_init(struct netio_buffered_listen_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr) { memset(socket, 0, sizeof(*socket)); if (attr->num_pages > NETIO_DOMAIN_MAX_MR){ log_error("Number of recv pages requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR); attr->num_pages = NETIO_DOMAIN_MAX_MR; } socket->pagesize = attr->pagesize; socket->num_pages = attr->num_pages; netio_init_listen_socket(&socket->listen_socket, ctx, NULL); } /* Same as above except _tcp in netio_init_listen_socket */ void netio_buffered_listen_tcp_socket_init(struct netio_buffered_listen_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr) { memset(socket, 0, sizeof(*socket)); socket->pagesize = attr->pagesize; socket->num_pages = attr->num_pages; netio_init_listen_tcp_socket(&socket->listen_socket, ctx, NULL); } void netio_buffered_listen(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port) { log_info("netio_buffered_listen %s", hostname); int ret; struct fi_info* hints; struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0}; hints = fi_allocinfo(); hints->ep_attr->type = FI_EP_MSG; hints->caps = FI_MSG; hints->mode = FI_LOCAL_MR; char port_addr[32]; snprintf(port_addr, 32, "%u", port); log_dbg("listening (libfabric) on %s:%s", hostname, port_addr); if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints, &socket->listen_socket.fi))) { FATAL("Buf-listen socket, fail to get interface info, error ", ret); } // printf("%s\n", fi_tostr(socket->listen_socket.fi, FI_TYPE_INFO)); if((ret = fi_fabric(socket->listen_socket.fi->fabric_attr, &socket->listen_socket.fabric, NULL))) { FATAL("Buf-listen socket, cannot open fabric, error ", ret); } if((ret = fi_eq_open(socket->listen_socket.fabric, &eq_attr, &socket->listen_socket.eq, NULL))) { FATAL("Buf-listen socket, cannot open Event Queue, error ", ret); } if((ret = fi_passive_ep(socket->listen_socket.fabric, socket->listen_socket.fi, &socket->listen_socket.pep, NULL))) { FATAL("Buf-listen socket, cannot open passive endpoint, error ", ret); } if((ret = fi_pep_bind(socket->listen_socket.pep, &socket->listen_socket.eq->fid, 0))) { FATAL("Buf-listen socket, cannot bind passive endpoint, error ", ret); } if((ret = fi_listen(socket->listen_socket.pep))) { FATAL("Buf-listen socket, cannot enable, error ", ret); } if((ret = fi_control(&socket->listen_socket.eq->fid, FI_GETWAIT, &socket->listen_socket.eqfd))) { FATAL("Buf-listen socket failed to obtain Event Queue wait object", ret); } socket->listen_socket.eq_ev_ctx.fd = socket->listen_socket.eqfd; socket->listen_socket.eq_ev_ctx.data = socket; socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_libfabric_cm_event; struct netio_eventloop* eloop = &socket->listen_socket.ctx->evloop; netio_register_read_fd(eloop, &socket->listen_socket.eq_ev_ctx); add_open_fd(&eloop->openfds, socket->listen_socket.eqfd, NETIO_EQ, BLISTEN, socket); log_dbg("netio_buffered_listen_socket: registering EQ fd %d", socket->listen_socket.eqfd); fi_freeinfo(hints); } /* _tcp version of above. This time there are more differences */ void netio_buffered_listen_tcp(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port) { log_info("Buffered TCP/IP listening on %s:%d", hostname, port); netio_listen_tcp(&socket->listen_socket, hostname, port); socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_tcp_cm_event; netio_register_read_tcp_fd(&socket->listen_socket.ctx->evloop, &socket->listen_socket.eq_ev_ctx); } void netio_buffered_send_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr) { memset(socket, 0, sizeof(*socket)); netio_init_send_socket(&socket->send_socket, ctx); socket->send_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED; socket->send_socket.usr = socket; socket->send_socket.cb_send_completed = on_send_completed; socket->send_socket.cb_connection_established = on_connect; socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed; socket->send_socket.cb_error_connection_refused = on_error_connection_refused; socket->current_buffer = NULL; socket->pub_socket = NULL; socket->pos = 0; socket->busy = 0; socket->watermark = attr->watermark; if (attr->num_pages > NETIO_DOMAIN_MAX_MR){ log_error("Number of send buffers requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR); attr->num_pages = NETIO_DOMAIN_MAX_MR; } socket->num_pages = attr->num_pages; socket->buffersize = attr->pagesize; netio_signal_init(&ctx->evloop, &socket->signal_buffer_available); socket->signal_buffer_available.cb = NULL; //deactivated by default socket->timeout_ms = attr->timeout_ms; if(attr->timeout_ms != 0){ netio_timer_init(&ctx->evloop, &socket->flush_timer); socket->flush_timer.cb = flush_cb; socket->flush_timer.data = socket; } else { socket->flush_timer.cb = NULL; } } /* Same as above except for the _tcp in netio_init_send_tcp_socket */ /* If this works, consider factoring out common code */ void netio_buffered_send_tcp_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr) { memset(socket, 0, sizeof(*socket)); netio_init_send_tcp_socket(&socket->send_socket, ctx); socket->send_socket.usr = socket; socket->send_socket.cb_send_completed = on_send_completed; socket->send_socket.cb_connection_established = on_connect; socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed; socket->send_socket.cb_error_connection_refused = on_error_connection_refused; socket->current_buffer = NULL; socket->pub_socket = NULL; socket->pos = 0; socket->busy = 0; socket->watermark = attr->watermark; socket->num_pages = attr->num_pages; socket->buffersize = attr->pagesize; netio_signal_init(&ctx->evloop, &socket->signal_buffer_available); socket->signal_buffer_available.cb = NULL; //deactivated by default socket->timeout_ms = attr->timeout_ms; if(attr->timeout_ms != 0){ netio_timer_init(&ctx->evloop, &socket->flush_timer); socket->flush_timer.cb = flush_cb; socket->flush_timer.data = socket; } else { socket->flush_timer.cb = NULL; } } void netio_buffered_send_socket_init_and_connect(struct netio_buffered_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) { int tcp = netio_tcp_mode(hostname); const char* host = netio_hostname(hostname); if (tcp) { netio_buffered_send_tcp_socket_init(socket, ctx, attr); } else { netio_buffered_send_socket_init(socket, ctx, attr); } netio_buffered_connect(socket, host, port); } void netio_buffered_listen_socket_init_and_listen(struct netio_buffered_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) { int tcp = netio_tcp_mode(hostname); const char* host = netio_hostname(hostname); if (tcp) { netio_buffered_listen_tcp_socket_init(socket, ctx, attr); netio_buffered_listen_tcp(socket, host, port); } else { netio_buffered_listen_socket_init(socket, ctx, attr); netio_buffered_listen(socket, host, port); } } void netio_buffered_connect(struct netio_buffered_send_socket* socket, const char* hostname, unsigned port) { netio_connect(&socket->send_socket, netio_hostname(hostname), port); } void netio_buffered_connect_rawaddr(struct netio_buffered_send_socket* socket, void* addr, size_t addrlen) { netio_connect_rawaddr(&socket->send_socket, addr, addrlen); } int netio_buffered_send(struct netio_buffered_send_socket* socket, void* data, size_t size) { struct iovec iov; iov.iov_base = data; iov.iov_len = size; return netio_buffered_sendv(socket, &iov, 1); } int netio_buffered_sendv(struct netio_buffered_send_socket* socket, struct iovec* iov, size_t num) { if(socket->busy){ int ret = flush(socket); if (ret == NETIO_STATUS_AGAIN){ return NETIO_STATUS_AGAIN; } } size_t total_size = 0; for(unsigned int i=0; i socket->buffersize) { return NETIO_STATUS_TOO_BIG; } if(socket->current_buffer == NULL) { if(netio_bufferstack_pop(&socket->buffers, &socket->current_buffer)) { return NETIO_STATUS_AGAIN; } socket->pos = 0; //Enable flush timer if(socket->timeout_ms != 0 ){ netio_timer_start_ms(&socket->flush_timer, socket->timeout_ms); } } else { //if current message is larger than remaining space //flush buffer and retry with a new one if(socket->pos+total_size+sizeof(msg_size_t) >= socket->buffersize){ flush(socket); return NETIO_STATUS_AGAIN; } } *(msg_size_t*)((char*)socket->current_buffer->data + socket->pos) = total_size; socket->pos += sizeof(msg_size_t); for(unsigned int i=0; icurrent_buffer->data + socket->pos, iov[i].iov_base, iov[i].iov_len); socket->pos += iov[i].iov_len; } if(socket->pos > socket->watermark) { flush(socket); } return NETIO_STATUS_OK; } void netio_buffered_recv_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket) { memset(socket, 0, sizeof(*socket)); socket->lsocket = lsocket; netio_init_recv_socket(&socket->recv_socket, &socket->lsocket->listen_socket); socket->recv_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED; socket->recv_socket.usr = socket; socket->num_pages = socket->lsocket->num_pages; socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer)); for(unsigned int i=0; inum_pages; i++) { socket->pages[i].data = malloc(socket->lsocket->pagesize); socket->pages[i].size = socket->lsocket->pagesize; } socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received; } /* Same as above except for the _tcp in netio_init_recv_tcp_socket */ void netio_buffered_recv_tcp_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket) { memset(socket, 0, sizeof(*socket)); socket->lsocket = lsocket; netio_init_recv_tcp_socket(&socket->recv_socket, &socket->lsocket->listen_socket); socket->recv_socket.usr = socket; socket->num_pages = socket->lsocket->num_pages; socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer)); for(unsigned int i=0; inum_pages; i++) { socket->pages[i].data = malloc(socket->lsocket->pagesize); socket->pages[i].size = socket->lsocket->pagesize; } socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received; } void netio_buffered_flush(struct netio_buffered_send_socket* socket) { flush(socket); }