Program Listing for File netio.c
↰ Return to documentation for file (netio.c
)
#include <unistd.h>
#include <stdio.h>
#include <sys/uio.h>
#include <string.h>
#include "netio/netio.h"
#include "netio/netio_tcp.h"
#include "connection_event.h"
#include "completion_event.h"
#include <stdlib.h>
#include <sys/types.h>
#include <errno.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.h"
#if defined DEBUG || defined DEBUG_IO
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
#define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
#else
#define log_dbg(...)
#define log_trc(...)
#endif
# define ERROR_LOG( ... ) do { log_fatal(__VA_ARGS__); fflush(stdout); exit(-2);} while(0)
#define ON_ERROR_CONNECTION_REFUSED(s, msg, c) \
do { \
s->fi_errno = -c; \
s->fi_message = strdup(msg); \
netio_error_connection_refused_fire(s); \
} while(0);
#define ON_ERROR_BIND_REFUSED(s, msg, c) \
do { \
s->fi_errno = -c; \
s->fi_message = strdup(msg); \
netio_error_bind_refused_fire(s); \
} while(0);
//Globals
// STATIC FUNCTIONS ////////////////////////////////////////////////////////////
static int
_socket_init_info(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
{
log_dbg("Going trough _socket_init_info");
int ret=0;
struct fi_info* hints;
hints = fi_allocinfo();
hints->addr_format = FI_FORMAT_UNSPEC;
hints->ep_attr->type = FI_EP_MSG;
hints->caps = FI_MSG;
hints->mode = FI_LOCAL_MR;
// As of libfabric 1.10, the tcp provider only support FI_PROGRESS_MANUAL
// So the following will not allow the tcp provider to be used
hints->domain_attr->data_progress = FI_PROGRESS_AUTO;
hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
char port_addr[32];
snprintf(port_addr, 32, "%u", port);
log_dbg("connecting to endpoint %s:%u", hostname, port);
uint64_t flags = 0;
if(hostname == NULL) {
hostname = "127.0.0.1";
flags = FI_SOURCE;
}
if(addr) {
// TODO we have to convert the sockaddr_in into hostname port - why can't we directly supply the addr+addrlen in hints->dest_addr?
struct sockaddr_in* sockaddr = (struct sockaddr_in*)addr;
char* str_addr = inet_ntoa(sockaddr->sin_addr);
log_dbg("sockaddr: %s:%d", str_addr, ntohs(sockaddr->sin_port));
hostname = str_addr;
snprintf(port_addr, 32, "%d", ntohs(sockaddr->sin_port));
flags = 0;
}
if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, flags, hints, &socket->fi)))
{
fi_freeinfo(hints);
log_error("Failed to initialise socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_getinfo failed", ret);
return -1;
}
log_dbg("addr format: %x", socket->fi->addr_format);
log_dbg("fi_freeinfo");
fi_freeinfo(hints);
return 0;
}
static int
_socket_init_domain(struct netio_send_socket* socket)
{
int ret=0;
struct netio_domain *domain = malloc(sizeof(struct netio_domain));
domain->reg_mr = 0;
domain->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
domain->nb_sockets = 1;
socket->domain = domain;
if((ret = fi_fabric(socket->fi->fabric_attr, &domain->fabric, NULL)))
{
log_error("Failed to initialise fabric, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_fabric failed", ret);
return -1;
}
if((ret = fi_domain(domain->fabric, socket->fi, &domain->domain, NULL)))
{
log_error("Failed to initialise domain, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_domain failed", ret);
return -1;
}
return 0;
}
static int
_socket_connect(struct netio_send_socket* socket)
{
int ret=0;
struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
//Resources initialisation
socket->eqfd = -1;
socket->cqfd = -1;
socket->ep = NULL;
socket->eq = NULL;
socket->cq = NULL;
if((ret = fi_eq_open(socket->domain->fabric, &eq_attr, &socket->eq, NULL)))
{
log_error("Failed to open Event Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_eq_open failed", ret);
return -1;
}
if((ret = fi_endpoint(socket->domain->domain, socket->fi, &socket->ep, NULL)))
{
log_error("Failed to open Endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_endpoint failed", ret);
return -1;
}
if((ret = fi_ep_bind((socket->ep), &socket->eq->fid, 0)))
{
log_error("Failed to bind endpoint, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
return -1;
}
struct fi_cq_attr cq_attr;
cq_attr.size = NETIO_MAX_CQ_ENTRIES; /* # entries for CQ */
cq_attr.flags = 0; /* operation flags */
cq_attr.format = FI_CQ_FORMAT_DATA; //FI_CQ_FORMAT_CONTEXT; /* completion format */
cq_attr.wait_obj= FI_WAIT_FD; /* requested wait object */
cq_attr.signaling_vector = 0; /* interrupt affinity */
cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */ // The threshold indicates the number of entries that are to be queued before at the CQ before the wait is satisfied.
cq_attr.wait_set = NULL; /* optional wait set */
//FI_TRANSMIT CQ
if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->cq, NULL)) != 0)
{
log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
return -1;
}
if((ret = fi_ep_bind((socket->ep), &socket->cq->fid, FI_TRANSMIT)) != 0)
{
log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
return -1;
}
//FI_RECV CQ - also necessary
cq_attr.format = FI_CQ_FORMAT_UNSPEC;
cq_attr.wait_obj= FI_WAIT_NONE;
if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->rcq, NULL)) != 0)
{
log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
return -1;
}
if((ret = fi_ep_bind((socket->ep), &socket->rcq->fid, FI_RECV)) != 0)
{
log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
return -1;
}
if((ret = fi_enable(socket->ep)) != 0)
{
log_error("Failed to enable endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_enable failed", ret);
return -1;
}
/* Connect to server */
if((ret = fi_connect(socket->ep, socket->fi->dest_addr, NULL, 0)) != 0)
{
log_warn("Connection to remote failed, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_connect failed", ret);
return -1;
}
if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)) != 0)
{
log_error("Cannot retrieve the Event Queue wait object of send socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_control failed", ret);
return -1;
}
socket->eq_ev_ctx.fd = socket->eqfd;
socket->eq_ev_ctx.data = socket;
socket->eq_ev_ctx.cb = on_send_socket_libfabric_cm_event;
log_dbg("Adding SEND EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->eq->fid, socket->eqfd, socket, on_send_socket_libfabric_cm_event);
add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, USEND, socket);
netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
log_dbg("send_socket: EQ fd %d waiting for connection", socket->eqfd);
return 0;
}
// API FUNCTIONS ///////////////////////////////////////////////////////////////
void
netio_set_debug_level(int level)
{
log_set_level(level);
}
int netio_tcp_mode(const char* hostname) {
return (strncmp( hostname, "tcp:", 4) == 0);
}
const char* netio_protocol(const char* hostname) {
return netio_tcp_mode(hostname) ? "tcp" : "libfabric";
}
const char* netio_hostname(const char* hostname) {
const char* split = strchr(hostname, ':');
return split ? &split[1] : hostname;
}
void
netio_init(struct netio_context* ctx)
{
log_set_level(DEFAULT_DEBUG_LEVEL);
memset(ctx, 0, sizeof(*ctx));
netio_eventloop_init(&ctx->evloop);
}
void
netio_init_send_socket(struct netio_send_socket* socket, struct netio_context* ctx)
{
memset(socket, 0, sizeof(*socket));
socket->ctx = ctx;
socket->epollfd = socket->ctx->evloop.epollfd;
socket->state = UNCONNECTED;
socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
socket->cq_size = NETIO_MAX_CQ_EVENTS;
socket->unbuf_pub_socket = NULL;
socket->cb_internal_connection_closed = NULL;
socket->deferred_subs = NULL;
socket->recv_socket = NULL;
}
void
netio_init_listen_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr)
{
memset(socket, 0, sizeof(*socket));
socket->ctx = ctx;
socket->recv_sockets = NULL;
socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
if (attr == NULL){
socket->attr.buffer_size = 0;
socket->attr.num_buffers = 0;
} else {
if (attr->num_buffers > NETIO_DOMAIN_MAX_MR){
log_error("Number of recv buffers requested %lu exceeds max value. Using %lu.", attr->num_buffers, NETIO_DOMAIN_MAX_MR);
attr->num_buffers = NETIO_DOMAIN_MAX_MR;
}
socket->attr = *attr;
}
}
void
netio_init_recv_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket)
{
memset(socket, 0, sizeof(*socket));
socket->ctx = lsocket->ctx;
socket->lsocket = lsocket;
socket->reg_mr = 0;
socket->cq_size = NETIO_MAX_CQ_EVENTS;
socket->sub_msg_buffers = NULL;
socket->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
}
void
netio_send_socket_init_and_connect(struct netio_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port) {
int tcp = netio_tcp_mode(hostname);
const char* host = netio_hostname(hostname);
if (tcp) {
netio_init_send_tcp_socket(socket, ctx);
netio_connect_tcp(socket, host, port);
} else {
netio_init_send_socket(socket, ctx);
netio_connect_domain(socket, host, port, NULL);
}
}
void
netio_listen_socket_init_and_listen(struct netio_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_unbuffered_socket_attr* attr) {
int tcp = netio_tcp_mode(hostname);
const char* host = netio_hostname(hostname);
if (tcp) {
netio_init_listen_tcp_socket(socket, ctx, attr);
netio_listen_tcp(socket, host, port);
} else {
netio_init_listen_socket(socket, ctx, attr);
netio_listen(socket, host, port);
}
}
void
netio_connect(struct netio_send_socket* socket, const char* hostname, unsigned port)
{
if (socket->tcp_fi_mode == NETIO_MODE_TCP){
netio_connect_tcp(socket, netio_hostname(hostname), port);
}
else{
netio_connect_domain(socket, netio_hostname(hostname), port, NULL);
}
}
void
netio_connect_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, struct netio_domain* domain)
{
log_dbg("_socket_init_info");
if ((_socket_init_info(socket, hostname, port, NULL, 0))) return;
if (domain == NULL) {
log_dbg("_socket_init_domain in netio_connect_domain");
if ( _socket_init_domain(socket) ) return;
} else {
domain->nb_sockets += 1;
socket->domain = domain;
}
_socket_connect(socket);
}
void netio_connect_rawaddr(struct netio_send_socket* socket, void* addr, size_t addrlen)
{
netio_connect_rawaddr_domain(socket, addr, addrlen, NULL);
}
void netio_connect_rawaddr_domain(struct netio_send_socket* socket, void* addr, size_t addrlen, struct netio_domain* domain)
{
log_dbg("_socket_init_info");
if ((_socket_init_info(socket, NULL, 0, addr, addrlen))) return;
if (domain == NULL) {
log_dbg("_socket_init_domain in netio_connect_rawaddr_domain");
if ( _socket_init_domain(socket) ) return;
} else {
log_dbg("socket %p keeping domain %p with %d sockets", socket, domain, domain->nb_sockets);
domain->nb_sockets += 1;
socket->domain = domain;
}
_socket_connect(socket);
}
void
netio_disconnect(struct netio_send_socket* socket)
{
if(socket->tcp_fi_mode == NETIO_MODE_TCP){
shutdown(socket->cq_ev_ctx.fd, SHUT_RDWR);
} else {
if(!socket->ep) {
log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
return;
}
int ret=0;
if((ret = fi_shutdown(socket->ep, 0))){
log_error("Failed to shutdown send_socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
return;
}
}
}
void
netio_connection_shutdown(void* ptr)
{
log_dbg("Handle_connection_shutdown.");
struct signal_data* sd = (struct signal_data*)ptr;
struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
int ret=0;
if(!socket->ep){
log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
return;
}
if((ret = fi_shutdown(socket->ep, 0)))
{
ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
return;
}
//clean up signal
netio_signal_close(sd->evloop, sd->signal);
free(sd->signal);
free(sd);
}
void
netio_listen(struct netio_listen_socket* socket, const char* hostname, unsigned port)
{
int ret=0;
struct fi_info* hints;
struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
hints = fi_allocinfo();
hints->addr_format = FI_FORMAT_UNSPEC;
hints->ep_attr->type = FI_EP_MSG;
hints->caps = FI_MSG;
hints->mode = FI_LOCAL_MR;
char port_addr[32];
snprintf(port_addr, 32, "%u", port);
//Resource initialisation
socket->eqfd = -1;
socket->pep = NULL;
socket->eq = NULL;
socket->fi = NULL;
if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
&socket->fi)))
{
log_error("Failed to get info on local interface, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_BIND_REFUSED(socket, "fi_getinfo failed", ret);
return;
}
log_dbg("addr format: %x", socket->fi->addr_format);
if((ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL)))
{
log_error("Failed to open fabric for listen socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_BIND_REFUSED(socket, "fi_fabric failed", ret);
return;
}
if((ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL)))
{
log_error("Failed to open Event Queue for listen socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_BIND_REFUSED(socket, "fi_eq_open failed", ret);
return;
}
if((ret = fi_passive_ep(socket->fabric, socket->fi, &socket->pep, NULL)))
{
log_error("Failed to open passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_BIND_REFUSED(socket, "fi_passive_ep failed", ret);
return;
}
if((ret = fi_pep_bind(socket->pep, &socket->eq->fid, 0)))
{
log_error("Failed to bind passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_BIND_REFUSED(socket, "fi_pep_bind failed", ret);
return;
}
if((ret = fi_listen(socket->pep)))
{
log_error("Failed to enable listen socket, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_BIND_REFUSED(socket, "fi_listen failed", ret);
return;
}
if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)))
{
log_error("Failed to retrive listen socket Event Queue wait object, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_BIND_REFUSED(socket, "fi_control failed", ret);
return;
}
socket->eq_ev_ctx.fd = socket->eqfd;
socket->eq_ev_ctx.data = socket;
socket->eq_ev_ctx.cb = on_listen_socket_libfabric_cm_event;
//TODO: The listen EQ should be added too, but I don't know how to remove it to avoid a trywait SEGFAULT.
//printf("Adding LISTEN EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
//add_polled_fid(&socket->ctx->evloop.pfids, socket->fabric, &socket->eq->fid, socket->eqfd, socket, on_listen_socket_cm_event);
add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, ULISTEN, socket);
netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
log_dbg("netio_listen_socket: registering EQ fd %d", socket->eqfd);
fi_freeinfo(hints);
}
size_t netio_listen_socket_endpoint(struct netio_listen_socket* socket, struct sockaddr_storage *sa)
{
size_t addrlen;
//memset(sa, 0, sizeof(*sa));
addrlen = sizeof(struct sockaddr_storage);
int ret=0;
if((ret = fi_getname(&socket->pep->fid, sa, &addrlen)))
{
log_error("Failed to retrieve the local endpoint address, error %d: %s", ret, fi_strerror(-ret));
ON_ERROR_BIND_REFUSED(socket, "fi_getname failed", ret);
return 0;
}
return addrlen;
}
void
netio_post_recv(struct netio_recv_socket* socket, struct netio_buffer* buf)
{
struct iovec iov;
void* desc;
struct netio_tcp_recv_item *mrdn;
if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC)
{
iov.iov_base = buf->data;
iov.iov_len = buf->size;
desc = fi_mr_desc(buf->mr);
struct fi_msg msg;
msg.msg_iov = &iov; /* scatter-gather array */
msg.desc = &desc;
msg.iov_count = 1;
msg.addr = 0;
msg.context = buf;
msg.data = 0;
uint64_t flags;
flags = FI_REMOTE_CQ_DATA;//FI_MULTI_RECV;
int ret=0;
if((ret = fi_recvmsg(socket->ep, &msg, flags)) != 0)
{
log_error("Failed to post a buffer to receive inbound messages, error %d: %s", ret, fi_strerror(-ret));
}
}
else
{
//Allocate memory for a message request descriptor
struct netio_tcp_recv_item *mrd;
mrd = (struct netio_tcp_recv_item *) malloc(sizeof(struct netio_tcp_recv_item));
if(mrd == NULL)
{
ERROR_LOG("cannot allocate memory for descriptor");
}
/* log_debug("mrd is at %p", (void *)mrd); */
mrd->element_active = 1; //MJ do we need this variable?
mrd->socket = socket; //this is a netio_recv_socket
mrd->buffer = buf;
mrd->next_element = NULL;
mrd->bytes_received = 0;
mrd->message_size = 0;
/* log_debug("receive descriptor allocated and initialized"); */
//Append the descriptor to the list
if(socket->message_request_header == NULL)
{
socket->message_request_header = (void *)mrd;
/* log_debug("descriptor linked to head of queue"); */
}
else
{
int free_item = 1;
struct netio_tcp_recv_item *mrdq;
mrdq = (struct netio_tcp_recv_item *)socket->message_request_header;
/* log_debug("Head of list points at = %p", mrdq); */
int mrd_linked = 0;
do
{
if (mrdq->next_element == NULL)
{
mrdq->next_element = (void *)mrd;
mrd_linked = 1;
// log_error("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq);
}
else
{
mrdn = (struct netio_tcp_recv_item *)mrdq->next_element;
// log_error("item = %d. %p points at %p", free_item, mrdq, mrdn);
free_item++;
mrdq = mrdn;
}
}
while(!mrd_linked);
}
/*
log_debug("Calling netio_signal_fire");
log_debug("&socket->tcp_signal = %p", &socket->tcp_signal);
netio_signal_fire(&socket->tcp_signal);
*/
return;
}
}
void
netio_remove_recv_socket(struct netio_recv_socket* socket){
struct netio_listen_socket* lsocket = socket->lsocket;
int ret = remove_socket(&lsocket->recv_sockets, (void*)socket);
if (ret == 0){
log_info("Unbuffered connection closed, recv socket deleted.");
} else {
log_warn("Unbuffered connection closed, could not delete recv socket.");
}
}
void
netio_buffered_remove_recv_socket(struct netio_buffered_recv_socket* socket){
struct netio_buffered_listen_socket* lsocket = socket->lsocket;
int ret = remove_socket(&(lsocket->listen_socket.recv_sockets), (void*)socket);
if (ret == 0){
log_info("Buffered connection closed, buffered recv socket deleted.");
} else {
log_warn("Buffered connection closed, could not delete recv socket.");
}
}
static int
generic_sendmsg(struct netio_send_socket* socket,
struct iovec* iov,
void** desc,
size_t count,
uint64_t key,
uint64_t add_flags,
uint64_t imm)
{
int ret=0;
uint64_t flags;
struct fi_msg msg;
msg.msg_iov = iov; /* scatter-gather array */
msg.desc = desc;
msg.iov_count = count;
msg.addr = 0;
msg.context = (void*)key;
msg.data = imm;
log_trc("sending iov message with immediate value 0x%lx", msg.data);
flags = FI_INJECT_COMPLETE | add_flags;
if(!socket->ep || !socket->ep->msg){
log_error("Failed sending message because of null message or null endpoint.");
return NETIO_STATUS_ERROR;
}
if((ret = fi_sendmsg(socket->ep, &msg, flags)) != 0)
{
if(ret == -FI_EAGAIN) {
return NETIO_STATUS_AGAIN;
}
log_error("Failed to send message error (IOV count %lu, key %lu) %d: %s.", count, key, ret, fi_strerror(-ret));
return NETIO_STATUS_ERROR;
}
return NETIO_STATUS_OK;
}
int
netio_send_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
{
return netio_send(socket, buf, buf->data, buf->size, (uint64_t)buf);
}
int
netio_send_inline_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
{
struct iovec iov;
void* desc;
iov.iov_base = buf->data;
iov.iov_len = buf->size;
desc = fi_mr_desc(buf->mr);
uint64_t key = (uint64_t)buf;
uint64_t flags = FI_INJECT;
return generic_sendmsg(socket, /* struct netio_send_socket* socket */
&iov, /* struct iovec* iov */
&desc, /* void** desc */
1, /* size_t count */
key, /* uint64_t key */
flags, /* uint64_t add_flags */
0 /* uint64_t imm */
);
}
int
netio_send(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key)
{
struct iovec iov;
void* desc;
struct netio_tcp_send_item *mrdn;
if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC)
{
iov.iov_base = addr;
iov.iov_len = size;
desc = fi_mr_desc(buf->mr);
return generic_sendmsg(socket, /* struct netio_send_socket* socket */
&iov, /* struct iovec* iov */
&desc, /* void** desc */
1, /* size_t count */
key, /* uint64_t key */
0, /* uint64_t add_flags */
0 /* uint64_t imm */
);
}
else
{
if (socket->state!=CONNECTED)
{
log_warn("socket not connected (state=%d)",socket->state);
}
//Allocate memory for a message request descriptor
struct netio_tcp_send_item *mrd;
mrd = (struct netio_tcp_send_item *) malloc(sizeof(struct netio_tcp_send_item));
if(mrd == NULL)
{
ERROR_LOG("cannot allocate memory for descriptor");
}
mrd->element_active = NETIO_TCP_NEW;
mrd->socket = socket;
mrd->buffer = buf;
log_dbg("netio-tcp: setting buffer size to msg size and buffer data to addr");
mrd->buffer->size = size;
mrd->buffer->data = addr;
mrd->total_bytes = size;
mrd->bytes_left = sizeof(int);
mrd->next_element = NULL;
mrd->key = (uint64_t) buf;
// log_debug("send descriptor allocated and initialized");
//Append the descriptor to the list
if(socket->message_request_header == NULL)
{
socket->message_request_header = (void *)mrd;
// log_debug("List was empty. Descriptor linked to head of list");
}
else
{
int free_item = 1;
struct netio_tcp_send_item *mrdq;
mrdq = (struct netio_tcp_send_item *)socket->message_request_header;
// log_debug("Head of list points at = %p", mrdq);
int mrd_linked = 0;
do
{
if (mrdq->next_element == NULL)
{
mrdq->next_element = (void *)mrd;
mrd_linked = 1;
// log_debug("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq);
}
else
{
mrdn = (struct netio_tcp_send_item *)mrdq->next_element;
// log_debug("Item = %d. %p points at %p", free_item, mrdq, mrdn);
free_item++;
mrdq = mrdn;
}
}
while(!mrd_linked);
}
// log_debug("Calling netio_signal_fire for signal at %p", &socket->tcp_signal);
netio_signal_fire(&socket->tcp_signal);
// log_info("done");
return(NETIO_STATUS_OK);
}
}
int
netio_send_imm(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm)
{
struct iovec iov;
void* desc;
iov.iov_base = addr;
iov.iov_len = size;
desc = fi_mr_desc(buf->mr);
return generic_sendmsg(socket, /* struct netio_send_socket* socket */
&iov, /* struct iovec* iov */
&desc, /* void** desc */
1, /* size_t count */
key, /* uint64_t key */
FI_REMOTE_CQ_DATA, /* uint64_t add_flags */
imm /* uint64_t imm */
);
}
int
netio_sendv(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key)
{
void* descarray[NETIO_MAX_IOV_LEN];
for(unsigned i=0; i<count; i++) {
descarray[i] = fi_mr_desc(buf[i]->mr);
}
return generic_sendmsg(socket, /* struct netio_send_socket* socket */
iov, /* struct iovec* iov */
descarray, /* void** desc */
count, /* size_t count */
key, /* uint64_t key */
0, /* uint64_t add_flags */
0 /* uint64_t imm */
);
}
int
netio_sendv_imm(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key, uint64_t imm)
{
void* descarray[NETIO_MAX_IOV_LEN];
for(unsigned i=0; i<count; i++) {
descarray[i] = fi_mr_desc(buf[i]->mr);
}
return generic_sendmsg(socket, /* struct netio_send_socket* socket */
iov, /* struct iovec* iov */
descarray, /* void** desc */
count, /* size_t count */
key, /* uint64_t key */
FI_REMOTE_CQ_DATA, /* uint64_t add_flags */
imm /* uint64_t imm */
);
}