.. _program_listing_file_netio.c: Program Listing for File netio.c ================================ |exhale_lsh| :ref:`Return to documentation for file ` (``netio.c``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include #include #include #include #include "netio/netio.h" #include "netio/netio_tcp.h" #include "connection_event.h" #include "completion_event.h" #include #include #include #include #include #include #include #include #include "log.h" #if defined DEBUG || defined DEBUG_IO #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__) #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__) #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__) #else #define log_dbg(...) #define log_trc(...) #endif # define ERROR_LOG( ... ) do { log_fatal(__VA_ARGS__); fflush(stdout); exit(-2);} while(0) #define ON_ERROR_CONNECTION_REFUSED(s, msg, c) \ do { \ s->fi_errno = -c; \ s->fi_message = strdup(msg); \ netio_error_connection_refused_fire(s); \ } while(0); #define ON_ERROR_BIND_REFUSED(s, msg, c) \ do { \ s->fi_errno = -c; \ s->fi_message = strdup(msg); \ netio_error_bind_refused_fire(s); \ } while(0); //Globals // STATIC FUNCTIONS //////////////////////////////////////////////////////////// static int _socket_init_info(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen) { log_dbg("Going trough _socket_init_info"); int ret=0; struct fi_info* hints; hints = fi_allocinfo(); hints->addr_format = FI_FORMAT_UNSPEC; hints->ep_attr->type = FI_EP_MSG; hints->caps = FI_MSG; hints->mode = FI_LOCAL_MR; // As of libfabric 1.10, the tcp provider only support FI_PROGRESS_MANUAL // So the following will not allow the tcp provider to be used hints->domain_attr->data_progress = FI_PROGRESS_AUTO; hints->domain_attr->resource_mgmt = FI_RM_ENABLED; char port_addr[32]; snprintf(port_addr, 32, "%u", port); log_dbg("connecting to endpoint %s:%u", hostname, port); uint64_t flags = 0; if(hostname == NULL) { hostname = "127.0.0.1"; flags = FI_SOURCE; } if(addr) { // TODO we have to convert the sockaddr_in into hostname port - why can't we directly supply the addr+addrlen in hints->dest_addr? struct sockaddr_in* sockaddr = (struct sockaddr_in*)addr; char* str_addr = inet_ntoa(sockaddr->sin_addr); log_dbg("sockaddr: %s:%d", str_addr, ntohs(sockaddr->sin_port)); hostname = str_addr; snprintf(port_addr, 32, "%d", ntohs(sockaddr->sin_port)); flags = 0; } if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, flags, hints, &socket->fi))) { fi_freeinfo(hints); log_error("Failed to initialise socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_getinfo failed", ret); return -1; } log_dbg("addr format: %x", socket->fi->addr_format); log_dbg("fi_freeinfo"); fi_freeinfo(hints); return 0; } static int _socket_init_domain(struct netio_send_socket* socket) { int ret=0; struct netio_domain *domain = malloc(sizeof(struct netio_domain)); domain->reg_mr = 0; domain->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*)); domain->nb_sockets = 1; socket->domain = domain; if((ret = fi_fabric(socket->fi->fabric_attr, &domain->fabric, NULL))) { log_error("Failed to initialise fabric, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_fabric failed", ret); return -1; } if((ret = fi_domain(domain->fabric, socket->fi, &domain->domain, NULL))) { log_error("Failed to initialise domain, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_domain failed", ret); return -1; } return 0; } static int _socket_connect(struct netio_send_socket* socket) { int ret=0; struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0}; //Resources initialisation socket->eqfd = -1; socket->cqfd = -1; socket->ep = NULL; socket->eq = NULL; socket->cq = NULL; if((ret = fi_eq_open(socket->domain->fabric, &eq_attr, &socket->eq, NULL))) { log_error("Failed to open Event Queue for send socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_eq_open failed", ret); return -1; } if((ret = fi_endpoint(socket->domain->domain, socket->fi, &socket->ep, NULL))) { log_error("Failed to open Endpoint for send socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_endpoint failed", ret); return -1; } if((ret = fi_ep_bind((socket->ep), &socket->eq->fid, 0))) { log_error("Failed to bind endpoint, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret); return -1; } struct fi_cq_attr cq_attr; cq_attr.size = NETIO_MAX_CQ_ENTRIES; /* # entries for CQ */ cq_attr.flags = 0; /* operation flags */ cq_attr.format = FI_CQ_FORMAT_DATA; //FI_CQ_FORMAT_CONTEXT; /* completion format */ cq_attr.wait_obj= FI_WAIT_FD; /* requested wait object */ cq_attr.signaling_vector = 0; /* interrupt affinity */ cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */ // The threshold indicates the number of entries that are to be queued before at the CQ before the wait is satisfied. cq_attr.wait_set = NULL; /* optional wait set */ //FI_TRANSMIT CQ if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->cq, NULL)) != 0) { log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret); return -1; } if((ret = fi_ep_bind((socket->ep), &socket->cq->fid, FI_TRANSMIT)) != 0) { log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret); return -1; } //FI_RECV CQ - also necessary cq_attr.format = FI_CQ_FORMAT_UNSPEC; cq_attr.wait_obj= FI_WAIT_NONE; if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->rcq, NULL)) != 0) { log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret); return -1; } if((ret = fi_ep_bind((socket->ep), &socket->rcq->fid, FI_RECV)) != 0) { log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret); return -1; } if((ret = fi_enable(socket->ep)) != 0) { log_error("Failed to enable endpoint for send socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_enable failed", ret); return -1; } /* Connect to server */ if((ret = fi_connect(socket->ep, socket->fi->dest_addr, NULL, 0)) != 0) { log_warn("Connection to remote failed, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_connect failed", ret); return -1; } if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)) != 0) { log_error("Cannot retrieve the Event Queue wait object of send socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_control failed", ret); return -1; } socket->eq_ev_ctx.fd = socket->eqfd; socket->eq_ev_ctx.data = socket; socket->eq_ev_ctx.cb = on_send_socket_libfabric_cm_event; log_dbg("Adding SEND EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid); add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->eq->fid, socket->eqfd, socket, on_send_socket_libfabric_cm_event); add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, USEND, socket); netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx); log_dbg("send_socket: EQ fd %d waiting for connection", socket->eqfd); return 0; } // API FUNCTIONS /////////////////////////////////////////////////////////////// void netio_set_debug_level(int level) { log_set_level(level); } int netio_tcp_mode(const char* hostname) { return (strncmp( hostname, "tcp:", 4) == 0); } const char* netio_protocol(const char* hostname) { return netio_tcp_mode(hostname) ? "tcp" : "libfabric"; } const char* netio_hostname(const char* hostname) { const char* split = strchr(hostname, ':'); return split ? &split[1] : hostname; } void netio_init(struct netio_context* ctx) { log_set_level(DEFAULT_DEBUG_LEVEL); memset(ctx, 0, sizeof(*ctx)); netio_eventloop_init(&ctx->evloop); } void netio_init_send_socket(struct netio_send_socket* socket, struct netio_context* ctx) { memset(socket, 0, sizeof(*socket)); socket->ctx = ctx; socket->epollfd = socket->ctx->evloop.epollfd; socket->state = UNCONNECTED; socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC; socket->cq_size = NETIO_MAX_CQ_EVENTS; socket->unbuf_pub_socket = NULL; socket->cb_internal_connection_closed = NULL; socket->deferred_subs = NULL; socket->recv_socket = NULL; } void netio_init_listen_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr) { memset(socket, 0, sizeof(*socket)); socket->ctx = ctx; socket->recv_sockets = NULL; socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC; if (attr == NULL){ socket->attr.buffer_size = 0; socket->attr.num_buffers = 0; } else { if (attr->num_buffers > NETIO_DOMAIN_MAX_MR){ log_error("Number of recv buffers requested %lu exceeds max value. Using %lu.", attr->num_buffers, NETIO_DOMAIN_MAX_MR); attr->num_buffers = NETIO_DOMAIN_MAX_MR; } socket->attr = *attr; } } void netio_init_recv_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket) { memset(socket, 0, sizeof(*socket)); socket->ctx = lsocket->ctx; socket->lsocket = lsocket; socket->reg_mr = 0; socket->cq_size = NETIO_MAX_CQ_EVENTS; socket->sub_msg_buffers = NULL; socket->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*)); socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC; } void netio_send_socket_init_and_connect(struct netio_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port) { int tcp = netio_tcp_mode(hostname); const char* host = netio_hostname(hostname); if (tcp) { netio_init_send_tcp_socket(socket, ctx); netio_connect_tcp(socket, host, port); } else { netio_init_send_socket(socket, ctx); netio_connect_domain(socket, host, port, NULL); } } void netio_listen_socket_init_and_listen(struct netio_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_unbuffered_socket_attr* attr) { int tcp = netio_tcp_mode(hostname); const char* host = netio_hostname(hostname); if (tcp) { netio_init_listen_tcp_socket(socket, ctx, attr); netio_listen_tcp(socket, host, port); } else { netio_init_listen_socket(socket, ctx, attr); netio_listen(socket, host, port); } } void netio_connect(struct netio_send_socket* socket, const char* hostname, unsigned port) { if (socket->tcp_fi_mode == NETIO_MODE_TCP){ netio_connect_tcp(socket, netio_hostname(hostname), port); } else{ netio_connect_domain(socket, netio_hostname(hostname), port, NULL); } } void netio_connect_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, struct netio_domain* domain) { log_dbg("_socket_init_info"); if ((_socket_init_info(socket, hostname, port, NULL, 0))) return; if (domain == NULL) { log_dbg("_socket_init_domain in netio_connect_domain"); if ( _socket_init_domain(socket) ) return; } else { domain->nb_sockets += 1; socket->domain = domain; } _socket_connect(socket); } void netio_connect_rawaddr(struct netio_send_socket* socket, void* addr, size_t addrlen) { netio_connect_rawaddr_domain(socket, addr, addrlen, NULL); } void netio_connect_rawaddr_domain(struct netio_send_socket* socket, void* addr, size_t addrlen, struct netio_domain* domain) { log_dbg("_socket_init_info"); if ((_socket_init_info(socket, NULL, 0, addr, addrlen))) return; if (domain == NULL) { log_dbg("_socket_init_domain in netio_connect_rawaddr_domain"); if ( _socket_init_domain(socket) ) return; } else { log_dbg("socket %p keeping domain %p with %d sockets", socket, domain, domain->nb_sockets); domain->nb_sockets += 1; socket->domain = domain; } _socket_connect(socket); } void netio_disconnect(struct netio_send_socket* socket) { if(socket->tcp_fi_mode == NETIO_MODE_TCP){ shutdown(socket->cq_ev_ctx.fd, SHUT_RDWR); } else { if(!socket->ep) { log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket); return; } int ret=0; if((ret = fi_shutdown(socket->ep, 0))){ log_error("Failed to shutdown send_socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret); return; } } } void netio_connection_shutdown(void* ptr) { log_dbg("Handle_connection_shutdown."); struct signal_data* sd = (struct signal_data*)ptr; struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr; int ret=0; if(!socket->ep){ log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket); return; } if((ret = fi_shutdown(socket->ep, 0))) { ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret); return; } //clean up signal netio_signal_close(sd->evloop, sd->signal); free(sd->signal); free(sd); } void netio_listen(struct netio_listen_socket* socket, const char* hostname, unsigned port) { int ret=0; struct fi_info* hints; struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0}; hints = fi_allocinfo(); hints->addr_format = FI_FORMAT_UNSPEC; hints->ep_attr->type = FI_EP_MSG; hints->caps = FI_MSG; hints->mode = FI_LOCAL_MR; char port_addr[32]; snprintf(port_addr, 32, "%u", port); //Resource initialisation socket->eqfd = -1; socket->pep = NULL; socket->eq = NULL; socket->fi = NULL; if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints, &socket->fi))) { log_error("Failed to get info on local interface, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_BIND_REFUSED(socket, "fi_getinfo failed", ret); return; } log_dbg("addr format: %x", socket->fi->addr_format); if((ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL))) { log_error("Failed to open fabric for listen socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_BIND_REFUSED(socket, "fi_fabric failed", ret); return; } if((ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL))) { log_error("Failed to open Event Queue for listen socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_BIND_REFUSED(socket, "fi_eq_open failed", ret); return; } if((ret = fi_passive_ep(socket->fabric, socket->fi, &socket->pep, NULL))) { log_error("Failed to open passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_BIND_REFUSED(socket, "fi_passive_ep failed", ret); return; } if((ret = fi_pep_bind(socket->pep, &socket->eq->fid, 0))) { log_error("Failed to bind passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_BIND_REFUSED(socket, "fi_pep_bind failed", ret); return; } if((ret = fi_listen(socket->pep))) { log_error("Failed to enable listen socket, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_BIND_REFUSED(socket, "fi_listen failed", ret); return; } if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd))) { log_error("Failed to retrive listen socket Event Queue wait object, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_BIND_REFUSED(socket, "fi_control failed", ret); return; } socket->eq_ev_ctx.fd = socket->eqfd; socket->eq_ev_ctx.data = socket; socket->eq_ev_ctx.cb = on_listen_socket_libfabric_cm_event; //TODO: The listen EQ should be added too, but I don't know how to remove it to avoid a trywait SEGFAULT. //printf("Adding LISTEN EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid); //add_polled_fid(&socket->ctx->evloop.pfids, socket->fabric, &socket->eq->fid, socket->eqfd, socket, on_listen_socket_cm_event); add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, ULISTEN, socket); netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx); log_dbg("netio_listen_socket: registering EQ fd %d", socket->eqfd); fi_freeinfo(hints); } size_t netio_listen_socket_endpoint(struct netio_listen_socket* socket, struct sockaddr_storage *sa) { size_t addrlen; //memset(sa, 0, sizeof(*sa)); addrlen = sizeof(struct sockaddr_storage); int ret=0; if((ret = fi_getname(&socket->pep->fid, sa, &addrlen))) { log_error("Failed to retrieve the local endpoint address, error %d: %s", ret, fi_strerror(-ret)); ON_ERROR_BIND_REFUSED(socket, "fi_getname failed", ret); return 0; } return addrlen; } void netio_post_recv(struct netio_recv_socket* socket, struct netio_buffer* buf) { struct iovec iov; void* desc; struct netio_tcp_recv_item *mrdn; if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) { iov.iov_base = buf->data; iov.iov_len = buf->size; desc = fi_mr_desc(buf->mr); struct fi_msg msg; msg.msg_iov = &iov; /* scatter-gather array */ msg.desc = &desc; msg.iov_count = 1; msg.addr = 0; msg.context = buf; msg.data = 0; uint64_t flags; flags = FI_REMOTE_CQ_DATA;//FI_MULTI_RECV; int ret=0; if((ret = fi_recvmsg(socket->ep, &msg, flags)) != 0) { log_error("Failed to post a buffer to receive inbound messages, error %d: %s", ret, fi_strerror(-ret)); } } else { //Allocate memory for a message request descriptor struct netio_tcp_recv_item *mrd; mrd = (struct netio_tcp_recv_item *) malloc(sizeof(struct netio_tcp_recv_item)); if(mrd == NULL) { ERROR_LOG("cannot allocate memory for descriptor"); } /* log_debug("mrd is at %p", (void *)mrd); */ mrd->element_active = 1; //MJ do we need this variable? mrd->socket = socket; //this is a netio_recv_socket mrd->buffer = buf; mrd->next_element = NULL; mrd->bytes_received = 0; mrd->message_size = 0; /* log_debug("receive descriptor allocated and initialized"); */ //Append the descriptor to the list if(socket->message_request_header == NULL) { socket->message_request_header = (void *)mrd; /* log_debug("descriptor linked to head of queue"); */ } else { int free_item = 1; struct netio_tcp_recv_item *mrdq; mrdq = (struct netio_tcp_recv_item *)socket->message_request_header; /* log_debug("Head of list points at = %p", mrdq); */ int mrd_linked = 0; do { if (mrdq->next_element == NULL) { mrdq->next_element = (void *)mrd; mrd_linked = 1; // log_error("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq); } else { mrdn = (struct netio_tcp_recv_item *)mrdq->next_element; // log_error("item = %d. %p points at %p", free_item, mrdq, mrdn); free_item++; mrdq = mrdn; } } while(!mrd_linked); } /* log_debug("Calling netio_signal_fire"); log_debug("&socket->tcp_signal = %p", &socket->tcp_signal); netio_signal_fire(&socket->tcp_signal); */ return; } } void netio_remove_recv_socket(struct netio_recv_socket* socket){ struct netio_listen_socket* lsocket = socket->lsocket; int ret = remove_socket(&lsocket->recv_sockets, (void*)socket); if (ret == 0){ log_info("Unbuffered connection closed, recv socket deleted."); } else { log_warn("Unbuffered connection closed, could not delete recv socket."); } } void netio_buffered_remove_recv_socket(struct netio_buffered_recv_socket* socket){ struct netio_buffered_listen_socket* lsocket = socket->lsocket; int ret = remove_socket(&(lsocket->listen_socket.recv_sockets), (void*)socket); if (ret == 0){ log_info("Buffered connection closed, buffered recv socket deleted."); } else { log_warn("Buffered connection closed, could not delete recv socket."); } } static int generic_sendmsg(struct netio_send_socket* socket, struct iovec* iov, void** desc, size_t count, uint64_t key, uint64_t add_flags, uint64_t imm) { int ret=0; uint64_t flags; struct fi_msg msg; msg.msg_iov = iov; /* scatter-gather array */ msg.desc = desc; msg.iov_count = count; msg.addr = 0; msg.context = (void*)key; msg.data = imm; log_trc("sending iov message with immediate value 0x%lx", msg.data); flags = FI_INJECT_COMPLETE | add_flags; if(!socket->ep || !socket->ep->msg){ log_error("Failed sending message because of null message or null endpoint."); return NETIO_STATUS_ERROR; } if((ret = fi_sendmsg(socket->ep, &msg, flags)) != 0) { if(ret == -FI_EAGAIN) { return NETIO_STATUS_AGAIN; } log_error("Failed to send message error (IOV count %lu, key %lu) %d: %s.", count, key, ret, fi_strerror(-ret)); return NETIO_STATUS_ERROR; } return NETIO_STATUS_OK; } int netio_send_buffer(struct netio_send_socket* socket, struct netio_buffer* buf) { return netio_send(socket, buf, buf->data, buf->size, (uint64_t)buf); } int netio_send_inline_buffer(struct netio_send_socket* socket, struct netio_buffer* buf) { struct iovec iov; void* desc; iov.iov_base = buf->data; iov.iov_len = buf->size; desc = fi_mr_desc(buf->mr); uint64_t key = (uint64_t)buf; uint64_t flags = FI_INJECT; return generic_sendmsg(socket, /* struct netio_send_socket* socket */ &iov, /* struct iovec* iov */ &desc, /* void** desc */ 1, /* size_t count */ key, /* uint64_t key */ flags, /* uint64_t add_flags */ 0 /* uint64_t imm */ ); } int netio_send(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key) { struct iovec iov; void* desc; struct netio_tcp_send_item *mrdn; if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) { iov.iov_base = addr; iov.iov_len = size; desc = fi_mr_desc(buf->mr); return generic_sendmsg(socket, /* struct netio_send_socket* socket */ &iov, /* struct iovec* iov */ &desc, /* void** desc */ 1, /* size_t count */ key, /* uint64_t key */ 0, /* uint64_t add_flags */ 0 /* uint64_t imm */ ); } else { if (socket->state!=CONNECTED) { log_warn("socket not connected (state=%d)",socket->state); } //Allocate memory for a message request descriptor struct netio_tcp_send_item *mrd; mrd = (struct netio_tcp_send_item *) malloc(sizeof(struct netio_tcp_send_item)); if(mrd == NULL) { ERROR_LOG("cannot allocate memory for descriptor"); } mrd->element_active = NETIO_TCP_NEW; mrd->socket = socket; mrd->buffer = buf; log_dbg("netio-tcp: setting buffer size to msg size and buffer data to addr"); mrd->buffer->size = size; mrd->buffer->data = addr; mrd->total_bytes = size; mrd->bytes_left = sizeof(int); mrd->next_element = NULL; mrd->key = (uint64_t) buf; // log_debug("send descriptor allocated and initialized"); //Append the descriptor to the list if(socket->message_request_header == NULL) { socket->message_request_header = (void *)mrd; // log_debug("List was empty. Descriptor linked to head of list"); } else { int free_item = 1; struct netio_tcp_send_item *mrdq; mrdq = (struct netio_tcp_send_item *)socket->message_request_header; // log_debug("Head of list points at = %p", mrdq); int mrd_linked = 0; do { if (mrdq->next_element == NULL) { mrdq->next_element = (void *)mrd; mrd_linked = 1; // log_debug("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq); } else { mrdn = (struct netio_tcp_send_item *)mrdq->next_element; // log_debug("Item = %d. %p points at %p", free_item, mrdq, mrdn); free_item++; mrdq = mrdn; } } while(!mrd_linked); } // log_debug("Calling netio_signal_fire for signal at %p", &socket->tcp_signal); netio_signal_fire(&socket->tcp_signal); // log_info("done"); return(NETIO_STATUS_OK); } } int netio_send_imm(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm) { struct iovec iov; void* desc; iov.iov_base = addr; iov.iov_len = size; desc = fi_mr_desc(buf->mr); return generic_sendmsg(socket, /* struct netio_send_socket* socket */ &iov, /* struct iovec* iov */ &desc, /* void** desc */ 1, /* size_t count */ key, /* uint64_t key */ FI_REMOTE_CQ_DATA, /* uint64_t add_flags */ imm /* uint64_t imm */ ); } int netio_sendv(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key) { void* descarray[NETIO_MAX_IOV_LEN]; for(unsigned i=0; imr); } return generic_sendmsg(socket, /* struct netio_send_socket* socket */ iov, /* struct iovec* iov */ descarray, /* void** desc */ count, /* size_t count */ key, /* uint64_t key */ 0, /* uint64_t add_flags */ 0 /* uint64_t imm */ ); } int netio_sendv_imm(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key, uint64_t imm) { void* descarray[NETIO_MAX_IOV_LEN]; for(unsigned i=0; imr); } return generic_sendmsg(socket, /* struct netio_send_socket* socket */ iov, /* struct iovec* iov */ descarray, /* void** desc */ count, /* size_t count */ key, /* uint64_t key */ FI_REMOTE_CQ_DATA, /* uint64_t add_flags */ imm /* uint64_t imm */ ); }