Program Listing for File buffered.c
↰ Return to documentation for file (buffered.c
)
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "log.h"
#include "connection_event.h"
#include "netio/netio.h"
#include "netio/netio_tcp.h"
#if defined DEBUG || defined DEBUG_BUF
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
#define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
#else
#define log_dbg(...)
#define log_trc(...)
#endif
#define FATAL(msg, c) \
do { \
log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
exit(2); \
} while(0);
/* This type is used as a length-marker in buffers for encoded messages.
Encoding of buffers is as follows: LEN0|MSG0|LEN1|MSG1|...|LEN_N|MSG_N */
typedef uint32_t msg_size_t;
// STATIC FUNCTIONS ////////////////////////////////////////////////////////////
static int
flush(struct netio_buffered_send_socket* socket)
{
int ret = NETIO_STATUS_OK;
if(socket->current_buffer)
{
socket->current_buffer->size = socket->pos;
int send_status = netio_send_buffer(&socket->send_socket, socket->current_buffer);
if (send_status == NETIO_STATUS_AGAIN){
socket->busy = 1;
log_dbg("netio_send_buffer returned %d, trying again", ret);
ret = NETIO_STATUS_AGAIN;
} else {
socket->busy = 0;
socket->current_buffer = NULL;
if(socket->timeout_ms != 0){
netio_timer_stop(&socket->flush_timer);
}
}
} else { //there is no current buffer. disable busy if on
socket->busy = 0;
}
return ret;
}
static void
flush_cb(void* ptr)
{
struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)ptr;
flush(socket);
}
static void
on_send_completed(struct netio_send_socket* socket, uint64_t key)
{
struct netio_buffer* buf = (struct netio_buffer*)key;
struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
if(netio_bufferstack_push(&bs->buffers, buf)) {
log_fatal("The buffer stack exceeded its limits.");
exit(1);
}
if(bs->buffers.available_buffers == 1) {
netio_signal_fire(&bs->signal_buffer_available);
}
}
static void
on_connect(struct netio_send_socket* socket)
{
struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
netio_bufferstack_send_init(&bs->buffers, socket, bs->num_pages, bs->buffersize, 0);
if(bs->cb_connection_established) {
bs->cb_connection_established(bs);
}
}
static void
on_buf_send_socket_connection_closed(struct netio_send_socket* ssocket)
{
log_dbg("on_buf_send_socket_connection_closed callback");
struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(ssocket->usr);
log_dbg("removing buffer available signal fd %d from evloop %d", bs->signal_buffer_available.ev_ctx.fd, bs->signal_buffer_available.epollfd);
netio_signal_close(&ssocket->ctx->evloop, &bs->signal_buffer_available);
if(bs->timeout_ms != 0){
log_dbg("removing flush timer fd %d from evloop %d", bs->flush_timer.ev_ctx.fd, bs->signal_buffer_available.epollfd);
netio_timer_close(&ssocket->ctx->evloop, &bs->flush_timer);
}
bs->current_buffer = NULL;
if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
handle_send_socket_shutdown(ssocket);
} else if (ssocket->tcp_fi_mode == NETIO_MODE_TCP){
handle_tcp_send_socket_shutdown(ssocket);
}
netio_bufferstack_close(&bs->buffers, bs->num_pages);
// This is pubsocket_on_connection_closed in pubsub.c
// pubsocket_on_connection_closed will call table_remove_subscription.
// table_remove_subscription can call netio_disconnect that sends a shutdown
// For RDMA shutdown goes via CM, for TCP/IP it requires the FD.
if(bs->pub_socket) { //only remove when send socket is part of a publish socket
if(bs->cb_connection_closed) {
bs->cb_connection_closed(bs);
}
struct netio_publish_socket* psocket = (struct netio_publish_socket*)bs->pub_socket;
remove_socket(&(psocket->subscription_table.socket_list), (void*)bs);
} else {
if(bs->cb_connection_closed) {
bs->cb_connection_closed(bs);
}
}
}
static void
on_error_connection_refused(struct netio_send_socket* socket) {
struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
netio_signal_close(&(socket->ctx->evloop), &bs->signal_buffer_available);
if(bs->timeout_ms != 0){
netio_timer_close(&(socket->ctx->evloop), &bs->flush_timer);
}
if(bs->cb_error_connection_refused) {
bs->cb_error_connection_refused(bs);
}
}
static void
on_buffered_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
{
struct netio_buffered_recv_socket* bs = (struct netio_buffered_recv_socket*)socket->usr;
if(bs->lsocket->cb_msg_received) {
size_t pos = 0;
while(pos < len) {
msg_size_t* s = (msg_size_t*)((char *)buf->data + pos);
pos += sizeof(msg_size_t);
bs->lsocket->cb_msg_received(bs, (char *)buf->data + pos, *s);
pos += *s;
}
}
//to study the L1ID pileup
struct netio_subscribe_socket* ssocket = bs->lsocket->listen_socket.usr;
if(ssocket && ssocket->cb_buf_received) {
ssocket->cb_buf_received(ssocket, buf, len);
}
netio_post_recv(socket, buf);
}
// API FUNCTIONS ///////////////////////////////////////////////////////////////
void
netio_buffered_listen_socket_init(struct netio_buffered_listen_socket* socket,
struct netio_context* ctx,
struct netio_buffered_socket_attr* attr)
{
memset(socket, 0, sizeof(*socket));
if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
log_error("Number of recv pages requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
attr->num_pages = NETIO_DOMAIN_MAX_MR;
}
socket->pagesize = attr->pagesize;
socket->num_pages = attr->num_pages;
netio_init_listen_socket(&socket->listen_socket, ctx, NULL);
}
/* Same as above except _tcp in netio_init_listen_socket */
void
netio_buffered_listen_tcp_socket_init(struct netio_buffered_listen_socket* socket,
struct netio_context* ctx,
struct netio_buffered_socket_attr* attr)
{
memset(socket, 0, sizeof(*socket));
socket->pagesize = attr->pagesize;
socket->num_pages = attr->num_pages;
netio_init_listen_tcp_socket(&socket->listen_socket, ctx, NULL);
}
void
netio_buffered_listen(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port)
{
log_info("netio_buffered_listen %s", hostname);
int ret;
struct fi_info* hints;
struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
hints = fi_allocinfo();
hints->ep_attr->type = FI_EP_MSG;
hints->caps = FI_MSG;
hints->mode = FI_LOCAL_MR;
char port_addr[32];
snprintf(port_addr, 32, "%u", port);
log_dbg("listening (libfabric) on %s:%s", hostname, port_addr);
if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
&socket->listen_socket.fi)))
{
FATAL("Buf-listen socket, fail to get interface info, error ", ret);
}
// printf("%s\n", fi_tostr(socket->listen_socket.fi, FI_TYPE_INFO));
if((ret = fi_fabric(socket->listen_socket.fi->fabric_attr, &socket->listen_socket.fabric, NULL)))
{
FATAL("Buf-listen socket, cannot open fabric, error ", ret);
}
if((ret = fi_eq_open(socket->listen_socket.fabric, &eq_attr, &socket->listen_socket.eq, NULL)))
{
FATAL("Buf-listen socket, cannot open Event Queue, error ", ret);
}
if((ret = fi_passive_ep(socket->listen_socket.fabric, socket->listen_socket.fi, &socket->listen_socket.pep, NULL)))
{
FATAL("Buf-listen socket, cannot open passive endpoint, error ", ret);
}
if((ret = fi_pep_bind(socket->listen_socket.pep, &socket->listen_socket.eq->fid, 0)))
{
FATAL("Buf-listen socket, cannot bind passive endpoint, error ", ret);
}
if((ret = fi_listen(socket->listen_socket.pep)))
{
FATAL("Buf-listen socket, cannot enable, error ", ret);
}
if((ret = fi_control(&socket->listen_socket.eq->fid, FI_GETWAIT, &socket->listen_socket.eqfd)))
{
FATAL("Buf-listen socket failed to obtain Event Queue wait object", ret);
}
socket->listen_socket.eq_ev_ctx.fd = socket->listen_socket.eqfd;
socket->listen_socket.eq_ev_ctx.data = socket;
socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_libfabric_cm_event;
struct netio_eventloop* eloop = &socket->listen_socket.ctx->evloop;
netio_register_read_fd(eloop, &socket->listen_socket.eq_ev_ctx);
add_open_fd(&eloop->openfds, socket->listen_socket.eqfd, NETIO_EQ, BLISTEN, socket);
log_dbg("netio_buffered_listen_socket: registering EQ fd %d", socket->listen_socket.eqfd);
fi_freeinfo(hints);
}
/* _tcp version of above. This time there are more differences */
void
netio_buffered_listen_tcp(struct netio_buffered_listen_socket* socket,
const char* hostname, unsigned port)
{
log_info("Buffered TCP/IP listening on %s:%d", hostname, port);
netio_listen_tcp(&socket->listen_socket, hostname, port);
socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_tcp_cm_event;
netio_register_read_tcp_fd(&socket->listen_socket.ctx->evloop, &socket->listen_socket.eq_ev_ctx);
}
void
netio_buffered_send_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
{
memset(socket, 0, sizeof(*socket));
netio_init_send_socket(&socket->send_socket, ctx);
socket->send_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
socket->send_socket.usr = socket;
socket->send_socket.cb_send_completed = on_send_completed;
socket->send_socket.cb_connection_established = on_connect;
socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
socket->current_buffer = NULL;
socket->pub_socket = NULL;
socket->pos = 0;
socket->busy = 0;
socket->watermark = attr->watermark;
if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
log_error("Number of send buffers requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
attr->num_pages = NETIO_DOMAIN_MAX_MR;
}
socket->num_pages = attr->num_pages;
socket->buffersize = attr->pagesize;
netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
socket->signal_buffer_available.cb = NULL; //deactivated by default
socket->timeout_ms = attr->timeout_ms;
if(attr->timeout_ms != 0){
netio_timer_init(&ctx->evloop, &socket->flush_timer);
socket->flush_timer.cb = flush_cb;
socket->flush_timer.data = socket;
} else {
socket->flush_timer.cb = NULL;
}
}
/* Same as above except for the _tcp in netio_init_send_tcp_socket */
/* If this works, consider factoring out common code */
void
netio_buffered_send_tcp_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
{
memset(socket, 0, sizeof(*socket));
netio_init_send_tcp_socket(&socket->send_socket, ctx);
socket->send_socket.usr = socket;
socket->send_socket.cb_send_completed = on_send_completed;
socket->send_socket.cb_connection_established = on_connect;
socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
socket->current_buffer = NULL;
socket->pub_socket = NULL;
socket->pos = 0;
socket->busy = 0;
socket->watermark = attr->watermark;
socket->num_pages = attr->num_pages;
socket->buffersize = attr->pagesize;
netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
socket->signal_buffer_available.cb = NULL; //deactivated by default
socket->timeout_ms = attr->timeout_ms;
if(attr->timeout_ms != 0){
netio_timer_init(&ctx->evloop, &socket->flush_timer);
socket->flush_timer.cb = flush_cb;
socket->flush_timer.data = socket;
} else {
socket->flush_timer.cb = NULL;
}
}
void netio_buffered_send_socket_init_and_connect(struct netio_buffered_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
int tcp = netio_tcp_mode(hostname);
const char* host = netio_hostname(hostname);
if (tcp) {
netio_buffered_send_tcp_socket_init(socket, ctx, attr);
} else {
netio_buffered_send_socket_init(socket, ctx, attr);
}
netio_buffered_connect(socket, host, port);
}
void netio_buffered_listen_socket_init_and_listen(struct netio_buffered_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
int tcp = netio_tcp_mode(hostname);
const char* host = netio_hostname(hostname);
if (tcp) {
netio_buffered_listen_tcp_socket_init(socket, ctx, attr);
netio_buffered_listen_tcp(socket, host, port);
} else {
netio_buffered_listen_socket_init(socket, ctx, attr);
netio_buffered_listen(socket, host, port);
}
}
void
netio_buffered_connect(struct netio_buffered_send_socket* socket, const char* hostname, unsigned port)
{
netio_connect(&socket->send_socket, netio_hostname(hostname), port);
}
void
netio_buffered_connect_rawaddr(struct netio_buffered_send_socket* socket, void* addr, size_t addrlen)
{
netio_connect_rawaddr(&socket->send_socket, addr, addrlen);
}
int
netio_buffered_send(struct netio_buffered_send_socket* socket, void* data, size_t size)
{
struct iovec iov;
iov.iov_base = data;
iov.iov_len = size;
return netio_buffered_sendv(socket, &iov, 1);
}
int
netio_buffered_sendv(struct netio_buffered_send_socket* socket, struct iovec* iov, size_t num)
{
if(socket->busy){
int ret = flush(socket);
if (ret == NETIO_STATUS_AGAIN){
return NETIO_STATUS_AGAIN;
}
}
size_t total_size = 0;
for(unsigned int i=0; i<num; i++) {
total_size += iov[i].iov_len;
}
//if current message is larger than the whole buffer
if(total_size+sizeof(msg_size_t) > socket->buffersize) {
return NETIO_STATUS_TOO_BIG;
}
if(socket->current_buffer == NULL) {
if(netio_bufferstack_pop(&socket->buffers, &socket->current_buffer)) {
return NETIO_STATUS_AGAIN;
}
socket->pos = 0;
//Enable flush timer
if(socket->timeout_ms != 0 ){
netio_timer_start_ms(&socket->flush_timer, socket->timeout_ms);
}
} else {
//if current message is larger than remaining space
//flush buffer and retry with a new one
if(socket->pos+total_size+sizeof(msg_size_t) >= socket->buffersize){
flush(socket);
return NETIO_STATUS_AGAIN;
}
}
*(msg_size_t*)((char*)socket->current_buffer->data + socket->pos) = total_size;
socket->pos += sizeof(msg_size_t);
for(unsigned int i=0; i<num; i++) {
memcpy((char *)socket->current_buffer->data + socket->pos, iov[i].iov_base, iov[i].iov_len);
socket->pos += iov[i].iov_len;
}
if(socket->pos > socket->watermark) {
flush(socket);
}
return NETIO_STATUS_OK;
}
void
netio_buffered_recv_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
{
memset(socket, 0, sizeof(*socket));
socket->lsocket = lsocket;
netio_init_recv_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
socket->recv_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
socket->recv_socket.usr = socket;
socket->num_pages = socket->lsocket->num_pages;
socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
for(unsigned int i=0; i<socket->num_pages; i++) {
socket->pages[i].data = malloc(socket->lsocket->pagesize);
socket->pages[i].size = socket->lsocket->pagesize;
}
socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
}
/* Same as above except for the _tcp in netio_init_recv_tcp_socket */
void
netio_buffered_recv_tcp_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
{
memset(socket, 0, sizeof(*socket));
socket->lsocket = lsocket;
netio_init_recv_tcp_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
socket->recv_socket.usr = socket;
socket->num_pages = socket->lsocket->num_pages;
socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
for(unsigned int i=0; i<socket->num_pages; i++) {
socket->pages[i].data = malloc(socket->lsocket->pagesize);
socket->pages[i].size = socket->lsocket->pagesize;
}
socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
}
void
netio_buffered_flush(struct netio_buffered_send_socket* socket)
{
flush(socket);
}