Program Listing for File pubsub.c
↰ Return to documentation for file (pubsub.c
)
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <netdb.h>
#include "log.h"
#include "netio/netio.h"
#include "netio/netio_tcp.h"
#if defined DEBUG || defined DEBUG_PUB
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
#define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
#else
#define log_dbg(...)
#define log_trc(...)
#endif
char*
netio_domain_name_lookup(const char* domain_name)
{
if(!domain_name) return NULL;
struct sockaddr_in sock_address;
char* ip_address = (char*)malloc(sizeof(char) * 17);
int is_ip_address = inet_pton(AF_INET, domain_name, &(sock_address.sin_addr));
if(!is_ip_address)
{
struct hostent* host = gethostbyname(domain_name);
if(host)
{
strcpy(ip_address, inet_ntoa(*(struct in_addr *)host->h_addr));
}
else
{
char* _domain_name = malloc(sizeof(char) * (strlen(domain_name) + 1));
strcpy(_domain_name, domain_name);
log_error("The host cannot be resolved. Domain name set to %s", _domain_name);
free(ip_address);
return _domain_name;
}
}
else
{
strcpy(ip_address, domain_name);
}
return ip_address;
}
static int
cmp_subscription(const void* a, const void *b)
{
struct netio_subscription* suba = (struct netio_subscription*)a;
struct netio_subscription* subb = (struct netio_subscription*)b;
if(suba->tag == subb->tag) {
return 0;
}
return suba->tag > subb->tag ? 1 : -1;
}
static int
table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_buffered_send_socket* socket)
{
if(table->num_subscriptions == table->size) {
log_error("Maximum number of subscriptions. New subscription for %lx dropped.", tag);
return 1;
}
// TODO need to keep the list sorted
table->subscriptions[table->num_subscriptions].tag = tag;
table->subscriptions[table->num_subscriptions].socket = socket;
table->subscriptions[table->num_subscriptions].again = 0;
log_dbg("Adding connection in subscription table, tag=%lu, socket=%p",
table->subscriptions[table->num_subscriptions].tag,
table->subscriptions[table->num_subscriptions].socket
);
table->num_subscriptions++;
table->ts++;
log_info("New entry in buffered subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
return 0;
}
static void
table_remove_subscription(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
{
log_dbg("Total subscriptions: %lu", table->num_subscriptions);
unsigned i=0;
unsigned remaining_subscriptions_of_socket=0;
for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
}
while(i<table->num_subscriptions) {
if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
log_dbg("Removing entry in subscription table, tag=%lu, socket=%p index %lu becomes %u",
table->subscriptions[i].tag,
table->subscriptions[i].socket,
table->num_subscriptions-1,
i);
table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
table->num_subscriptions--;
remaining_subscriptions_of_socket--;
table->ts++;
}
else{
i++;
log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
}
}
qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
log_info("Removing entry from buffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
if(closed_connection==0 && remaining_subscriptions_of_socket==0){
log_warn("Disconnecting endpoint with zero subscriptions");
if (socket->send_socket.tcp_fi_mode == NETIO_MODE_TCP){
// netio_disconnect(&socket->send_socket); // TODO prevents re-subscription
} else if (socket->send_socket.tcp_fi_mode == NETIO_MODE_LIBFABRIC){
netio_disconnect(&socket->send_socket);
}
}
}
static int
table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket)
{
log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
unsigned remaining_subscriptions_of_socket=0;
for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
}
log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
return remaining_subscriptions_of_socket;
}
static void
on_buffer_available(void* ptr)
{
log_trc("a buffer became available, calling callback");
struct netio_publish_socket* socket = (struct netio_publish_socket*)ptr;
if(socket->cb_buffer_available) {
socket->cb_buffer_available(socket);
}
}
static void
pubsocket_on_connection_established(struct netio_buffered_send_socket* socket)
{
log_dbg("publish socket established connection to remote, can publish now");
struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
//add deferred subscriptions to the table
struct deferred_subscription* sub = socket->send_socket.deferred_subs;
while(sub){
int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
if(pubsocket->cb_subscribe && ret == 0){
pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
}
pop_subscription(&sub);
}
//user callback
if(pubsocket->cb_connection_established) {
pubsocket->cb_connection_established(pubsocket);
}
}
static void
pubsocket_on_connection_closed(struct netio_buffered_send_socket* socket)
{
log_dbg("published socket with buffered send socket %p and send socket %p: connection to remote was closed", socket, &socket->send_socket);
struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
if(pubsocket->cb_connection_closed) {
pubsocket->cb_connection_closed(pubsocket);
}
//Only if the connection was closed without unsubscribing first.
if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
log_dbg("Removing a send socket for which a connection was closed without unsubscribing first");
uint8_t connection_closed = 1;
table_remove_subscription(&pubsocket->subscription_table, socket, 0, connection_closed);
}
}
static struct netio_buffered_send_socket*
socket_list_add_or_lookup(struct netio_publish_socket* pubsocket,
struct netio_socket_list** list,
void* addr, size_t addrlen,
int port,
struct netio_context* ctx,
struct netio_buffered_socket_attr* attr)
{
if(addrlen == 0) {
return NULL;
}
struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port);
if ( entry == NULL ) {
entry = add_socket_with_address(list, BSEND, addr, addrlen, port);
struct netio_buffered_send_socket* bufsocket = entry->socket;
if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
netio_buffered_send_socket_init(bufsocket, ctx, attr);
bufsocket->pub_socket = pubsocket;
bufsocket->cb_connection_established = pubsocket_on_connection_established;
bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
netio_buffered_connect_rawaddr(bufsocket, entry->addr, entry->addrlen);
}
else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){
netio_buffered_send_tcp_socket_init(bufsocket, ctx, attr);
bufsocket->pub_socket = pubsocket;
bufsocket->cb_connection_established = pubsocket_on_connection_established;
bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port);
netio_connect_tcp(&bufsocket->send_socket, entry->addr, port);
}
else {
log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode);
remove_socket(list, bufsocket);
return NULL;
}
bufsocket->signal_buffer_available.data = pubsocket;
bufsocket->signal_buffer_available.cb = on_buffer_available;
}
struct netio_buffered_send_socket* ss = (struct netio_buffered_send_socket*)entry->socket;
return ss;
}
static void
subscribe(struct netio_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag)
{
struct netio_buffered_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx, &pubsocket->attr);
if(socket->send_socket.recv_socket == NULL){
socket->send_socket.recv_socket = recv_socket;
}
if (socket->send_socket.state == CONNECTED){
table_add_subscription(&pubsocket->subscription_table, tag, socket);
if(pubsocket->cb_subscribe) {
pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
}
} else {
push_back_subscription(&socket->send_socket.deferred_subs, tag);
}
}
static void
unsubscribe(struct netio_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag)
{
struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port);
if(list == NULL){return;}
struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)list->socket;
uint8_t connection_closed = 0;
table_remove_subscription(&pubsocket->subscription_table, socket, tag, connection_closed);
pubsocket->subscription_table.ts++;
if(pubsocket->cb_unsubscribe) {
pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
}
}
static void
lsocket_on_connection_established(struct netio_recv_socket* socket)
{
log_dbg("Buffered listen socket: on connection established");
if(socket->tcp_fi_mode == NETIO_MODE_TCP){
//libfabric buffers posted in on_listen_socket_cm_event
socket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
for (int i = 0; i < 32; i++){
socket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
socket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
socket->sub_msg_buffers[i]->data = malloc(socket->sub_msg_buffers[i]->size);
netio_post_recv(socket, socket->sub_msg_buffers[i]);
}
}
}
static void
parse_subscription_message(struct netio_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
{
if (msg->action){
log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag);
}
else{
log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag);
}
}
static void
lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
{
log_dbg("message received by recv socket %p", socket);
struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->lsocket->usr;
if(len != sizeof(struct netio_subscription_message)) {
log_error("Illegal subscription message size %lu. Did you confuse tohost and from host links", len);
netio_post_recv(socket, buf);
return;
}
parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
netio_post_recv(socket, buf);
}
static int
send_subscription_message(struct netio_subscribe_socket* socket, netio_tag_t tag, int action)
{
if( action == NETIO_SUBSCRIBE ){
log_info("Sending subscription for tag 0x%lx", tag);
} else if ( action == NETIO_UNSUBSCRIBE ){
log_info("Sending unsubscription for tag 0x%lx", tag);
} else {
log_error("Invalid subscription action %d", action);
return 0;
}
int ret = 0;
if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
socket->msg.tag = tag;
socket->msg.action = action;
socket->buf.data = &socket->msg;
socket->buf.size = sizeof(struct netio_subscription_message);
ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
} else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
// look for available buf
size_t id = 0;
while (socket->bufs[id].to_send) {
id++;
}
log_info("tag id %d", id);
socket->msgs[id] = socket->msgs[0]; // initialize msg to default
socket->msgs[id].tag = tag;
socket->msgs[id].action = action;
socket->bufs[id].data = &socket->msgs[id];
socket->bufs[id].size = sizeof(struct netio_subscription_message);
socket->bufs[id].to_send = 1;
ret = netio_send_buffer(&socket->socket, &socket->bufs[id]);
} else {
log_error("Socket connection type unsupported %d", socket->tcp_fi_mode);
ret = 1;
}
log_info("send_subscription_message done");
return ret;
}
static void
subsocket_on_connection_established(struct netio_send_socket* socket)
{
log_dbg("subsocket connection established");
int ret;
struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
if(subscribe_socket->total_tags == 0){
log_info("Closing send connection again because there is no tag to subscribe to.");
netio_disconnect(socket);
}
if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
if((ret = fi_getname(&subscribe_socket->recv_socket.listen_socket.pep->fid,
subscribe_socket->msg.addr,
&subscribe_socket->msg.addrlen)) != 0) {
log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
exit(1);
}
subscribe_socket->buf.data = &subscribe_socket->msg;
subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
}
else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
// initialize msgs[0] with defaults
struct sockaddr sock_addr;
socklen_t addrlen=sizeof(sock_addr);
getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen);
getnameinfo(&sock_addr, addrlen,
subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN,
NULL, 0, NI_NUMERICHOST);
addrlen=strlen(subscribe_socket->msgs[0].addr);
subscribe_socket->msgs[0].addr[addrlen] = 0;
subscribe_socket->msgs[0].addrlen = addrlen+1;
subscribe_socket->msgs[0].port = subscribe_socket->msg.port;
}
// send tags one by one
while(subscribe_socket->total_tags > 0){
size_t idx = subscribe_socket->total_tags - 1;
netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
if (ret == NETIO_STATUS_OK){
subscribe_socket->total_tags--;
} else {
log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
break;
}
}
}
static void
subsocket_on_send_connection_closed(struct netio_send_socket* socket)
{
log_dbg("subsocket_on_send_connection_closed callback");
struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
subscribe_socket->state = NONE;
if(socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
handle_send_socket_shutdown(socket);
}
else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
handle_tcp_send_socket_shutdown(socket);
}
}
static void
subsocket_on_error_connection_refused(struct netio_send_socket* ssocket) {
log_dbg("subsocket connection refused");
struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)ssocket->usr;
subscribe_socket->state = NONE;
handle_listen_socket_shutdown(&subscribe_socket->recv_socket.listen_socket);
if(subscribe_socket->cb_error_connection_refused) {
subscribe_socket->cb_error_connection_refused(subscribe_socket);
}
}
static void
subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
{
struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
//check for remaining tags from on_connection_established
while(subscribe_socket->total_tags > 0){
size_t idx = subscribe_socket->total_tags - 1;
netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
if (ret == NETIO_STATUS_OK){
subscribe_socket->total_tags--;
} else {
log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
break;
}
}
}
static void
subscribe_socket_on_connection_established(struct netio_buffered_recv_socket* rsocket)
{
log_dbg("connection to subscribe socket has been established");
struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
if(socket->cb_connection_established) {
socket->cb_connection_established(socket);
}
}
static void
subscribe_socket_on_connection_closed(struct netio_buffered_recv_socket* rsocket)
{
log_info("connection to subscribe socket has been closed");
struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
if(socket->cb_connection_closed) {
socket->cb_connection_closed(socket);
}
}
static void
subscribe_socket_on_msg_received(struct netio_buffered_recv_socket* rsocket, void* data, size_t len)
{
log_trc("buffer received");
struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
if(len <= sizeof(netio_tag_t)) {
log_error("Received invalid message of size %lu. Need %lu bytes for tag, at least 1 of payload", len, sizeof(netio_tag_t));
return;
}
netio_tag_t tag = *((netio_tag_t*)data);
if(socket->cb_msg_received) {
socket->cb_msg_received(socket, tag, (char *)data + sizeof(netio_tag_t), len - sizeof(netio_tag_t));
}
}
void
netio_subscription_table_init(struct netio_subscription_table* table)
{
table->socket_list = NULL;
table->subscriptions = (struct netio_subscription*)malloc(NETIO_INITIAL_SUBSCRIPTIONS*sizeof(struct netio_subscription));
table->num_subscriptions = 0;
table->size = NETIO_INITIAL_SUBSCRIPTIONS;
table->ts = 0;
}
/*
* Initializes a buffered publish socket.
*
* @param socket: The buffered publish socket to initialize
* @param ctx: The NetIO context in which to initialize the socket
* @param hostname: Hostname or IP address to bind to
* @param port: Port to bind to
* @param attr: Buffered connection settings to be used for the underlying connections
*
* @see `netio_buffered_send_socket_init` for a description of the connection parameters
*/
void
netio_publish_libfabric_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
{
memset(socket, 0, sizeof(*socket));
socket->ctx = ctx;
socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC;
netio_subscription_table_init(&socket->subscription_table);
netio_init_listen_socket(&socket->lsocket, ctx, NULL);
if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
log_error("Number of requested publisher netio pages %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
attr->num_pages = NETIO_DOMAIN_MAX_MR;
}
memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
socket->lsocket.usr = socket;
socket->lsocket.cb_connection_established = lsocket_on_connection_established;
socket->lsocket.cb_msg_received = lsocket_on_msg_received;
socket->lsocket.recv_sub_msg = 1;
char* _hostname = netio_domain_name_lookup(hostname);
netio_listen(&socket->lsocket, (const char*)_hostname, port);
free(_hostname);
}
void
netio_publish_tcp_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
{
memset(socket, 0, sizeof(*socket));
socket->ctx = ctx;
socket->tcp_fi_mode = NETIO_MODE_TCP;
netio_subscription_table_init(&socket->subscription_table);
netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL);
memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
socket->lsocket.usr = socket;
socket->lsocket.cb_connection_established = lsocket_on_connection_established;
socket->lsocket.cb_msg_received = lsocket_on_msg_received;
socket->lsocket.recv_sub_msg = 1;
char* _hostname = netio_domain_name_lookup(hostname);
netio_listen_tcp(&socket->lsocket, (const char*)_hostname, port);
free(_hostname);
}
/*
* Initializes a buffered publish socket.
*
* @param socket: The buffered publish socket to initialize
* @param ctx: The NetIO context in which to initialize the socket
* @param hostname: Hostname or IP address to bind to
* @param port: Port to bind to
* @param attr: Buffered connection settings to be used for the underlying connections
*
* @see `netio_buffered_send_socket_init` for a description of the connection parameters
*/
void
netio_publish_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
{
if (netio_tcp_mode(hostname)) {
netio_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, attr);
} else {
netio_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, attr);
}
}
void
netio_subscribe_libfabric_socket_init(struct netio_subscribe_socket* socket,
struct netio_context* ctx,
struct netio_buffered_socket_attr* attr,
const char* hostname,
const char* remote_host,
unsigned remote_port)
{
memset(socket, 0, sizeof(*socket));
socket->ctx = ctx;
socket->state = NONE;
socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC;
memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
netio_buffered_listen_socket_init(&socket->recv_socket, ctx, &socket->attr);
socket->recv_socket.listen_socket.usr = socket;
socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
//set cb_buf_received meant only for pileup measurement to NULL
socket->cb_buf_received = NULL;
char* lookedup_hostname = netio_domain_name_lookup(hostname);
char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
netio_buffered_listen(&socket->recv_socket, (const char*)lookedup_hostname, 0);
socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
socket->remote_port = remote_port;
socket->total_tags = 0;
free(lookedup_remote_hostname);
free(lookedup_hostname);
}
/* Same as above except tcp instead of libfabric */
void
netio_subscribe_tcp_socket_init(struct netio_subscribe_socket* socket,
struct netio_context* ctx,
struct netio_buffered_socket_attr* attr,
const char* hostname,
const char* remote_host,
unsigned remote_port)
{
log_info("subscribe_tcp from <%s> to <%s>",hostname,remote_host);
memset(socket, 0, sizeof(*socket));
socket->ctx = ctx;
socket->state = NONE;
socket->tcp_fi_mode=NETIO_MODE_TCP;
memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
log_dbg("Prepare recv socket (buffered_listen_tcp_socket)");
netio_buffered_listen_tcp_socket_init(&socket->recv_socket, ctx, &socket->attr);
socket->recv_socket.listen_socket.usr = socket;
socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
//set cb_buf_received meant only for pileup measurement to NULL
socket->cb_buf_received = NULL;
char* lookedup_hostname = netio_domain_name_lookup(hostname);
char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
netio_buffered_listen_tcp(&socket->recv_socket, (const char*)lookedup_hostname, 0);
socket->msg.port=socket->recv_socket.listen_socket.port;
socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
socket->remote_port = remote_port;
socket->total_tags = 0;
free(lookedup_remote_hostname);
free(lookedup_hostname);
}
void
netio_subscribe_socket_init(struct netio_subscribe_socket* socket,
struct netio_context* ctx,
struct netio_buffered_socket_attr* attr,
const char* hostname,
const char* remote_host,
unsigned remote_port)
{
if (netio_tcp_mode(hostname)) {
netio_subscribe_tcp_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port);
} else {
netio_subscribe_libfabric_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port);
}
}
void
netio_subscription_cache_init(struct netio_subscription_cache* cache)
{
cache->ts = 0;
cache->count = 0;
cache->idx_start = 0;
}
static unsigned
lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
{
struct netio_subscription key;
key.tag = tag;
struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
table->subscriptions,
table->num_subscriptions,
sizeof(struct netio_subscription),
cmp_subscription);
if(ptr == NULL) {
return 0;
}
unsigned start_idx = ptr - table->subscriptions;
while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
start_idx--;
}
unsigned count = 0;
for(unsigned int i=start_idx; i<table->num_subscriptions; i++) {
if(table->subscriptions[i].tag == tag) {
count++;
} else {
break;
}
}
*start = start_idx;
return count;
}
static unsigned
lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
{
if(cache->ts != table->ts) {
cache->count = lookup_tag(table, tag, &cache->idx_start);
cache->ts = table->ts;
}
*start = cache->idx_start;
return cache->count;
}
int
netio_buffered_publishi(struct netio_publish_socket* socket, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache)
{
int ret = NETIO_STATUS_OK;
netio_tag_t tag = *(netio_tag_t*)iov[0].iov_base;
unsigned start_idx;
unsigned num_subscriptions;
if(cache) {
num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
} else {
num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
}
if(num_subscriptions == 0) return NETIO_STATUS_OK_NOSUB;
for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
if(subscription->tag == tag) {
// skip connections that were already successful if we are in reentry mode
if(flags & NETIO_REENTRY) {
if(subscription->again == 0) {
continue;
}
}
int result = netio_buffered_sendv(subscription->socket, iov, len);
log_dbg("Sending iov on subscription->socket, result=%d",result);
if(result == NETIO_STATUS_OK) {
subscription->again = 0;
} else if(result == NETIO_STATUS_AGAIN) {
subscription->again = 1;
ret = NETIO_STATUS_AGAIN;
}
else if(result == NETIO_STATUS_TOO_BIG) {
subscription->again = 0;
ret = NETIO_STATUS_TOO_BIG;
}
else {
return result; // some error occured and we return immediately
}
}
}
return ret;
}
int
netio_buffered_publish(struct netio_publish_socket* socket, netio_tag_t tag, void* data, size_t len, int flags, struct netio_subscription_cache* cache)
{
log_trc("netio_buffered_publish (size=%lu)", len);
struct iovec iov[2];
iov[0].iov_base = &tag;
iov[0].iov_len = sizeof(netio_tag_t);
iov[1].iov_base = data;
iov[1].iov_len = len;
return netio_buffered_publishi(socket, iov, 2, flags, cache);
}
int
netio_buffered_publishv(struct netio_publish_socket* socket, netio_tag_t tag, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache)
{
struct iovec iovv[NETIO_MAX_IOV_LEN + 1];
//NETIO_MAX_IOV_LEN is not a limititation, as all entries will be copied in one in netio_buffered_sendv
iovv[0].iov_base = &tag;
iovv[0].iov_len = sizeof(netio_tag_t);
size_t size = (len > NETIO_MAX_IOV_LEN) ? NETIO_MAX_IOV_LEN : len;
memcpy(iovv + 1, iov, size * sizeof(struct iovec));
return netio_buffered_publishi(socket, iovv, (size + 1), flags, cache);
}
void
netio_buffered_publish_flush(struct netio_publish_socket* socket, netio_tag_t tag, struct netio_subscription_cache* cache)
{
for(unsigned i=0; i<socket->subscription_table.num_subscriptions; i++) {
struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
netio_buffered_flush(subscription->socket);
}
}
int
netio_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
{
if(socket->state == NONE) {
//A new socket is created and the on subsocket_on_connection_established
//will trigger the actual subscriptions.
log_dbg("Creating and connecting a new send_socket");
if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) {
netio_init_send_socket(&socket->socket, socket->ctx);
socket->socket.usr = socket;
socket->socket.cb_connection_established = subsocket_on_connection_established;
socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
socket->socket.cb_send_completed = subsocket_on_send_completed;
netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
} else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
netio_init_send_tcp_socket(&socket->socket, socket->ctx);
socket->socket.usr = socket;
socket->socket.cb_connection_established = subsocket_on_connection_established;
socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
socket->socket.cb_send_completed = subsocket_on_send_completed;
netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port);
}
socket->state = INITIALIZED;
}
for(unsigned i=0; i<socket->total_tags; i++) {
if(socket->tags_to_subscribe[i] == tag) {
return 0;
}
}
//if send socket connected send message
//otherwise on_connection_established will do it
if (socket->socket.state){
log_info("Sending subscription message for tag 0x%lx", tag);
int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
return ret;
} else {
log_info("Queing subscription message for txg 0x%lx", tag);
socket->tags_to_subscribe[socket->total_tags] = tag;
socket->total_tags++;
return 0;
}
}
static int
remove_tag_to_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
{
int found = 0;
for(unsigned int i=0; i<socket->total_tags; ++i){
if(socket->tags_to_subscribe[i] == tag){
log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
for(unsigned int j = i; j < socket->total_tags-1; ++j){
socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
}
found = 1;
socket->total_tags--;
break;
}
}
if(found == 0){
log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
}
return NETIO_STATUS_OK;
}
int
netio_unsubscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
{
int ret = NETIO_STATUS_OK;
if(socket->state == INITIALIZED) {
log_dbg("Subscribe socket initialised, can proceed with usubscription");
if (socket->socket.state) {
ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
} else {
ret = remove_tag_to_subscribe(socket, tag);
}
} else {
log_dbg("The connection has been already closed.");
}
return ret;
}
unsigned
netio_pubsocket_get_minimum_pages(struct netio_publish_socket* socket)
{
if (!socket) {
return 0;
}
size_t pages = socket->attr.num_pages;
struct netio_socket_list* itr = socket->subscription_table.socket_list;
while(itr != NULL){
struct netio_buffered_send_socket* buf_send_socket = (struct netio_buffered_send_socket*)itr->socket;
uint64_t socket_pages = buf_send_socket->buffers.available_buffers;
if(socket_pages < pages){
pages = socket_pages;
}
itr = itr->next;
}
return pages;
}