Line data Source code
1 : #include "netio/netio.h"
2 :
3 : #include <string.h>
4 : #include <stdio.h>
5 : #include "log.h"
6 :
7 : #if defined DEBUG || defined DEBUG_MR
8 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
9 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
10 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
11 : #else
12 : #define log_dbg(...)
13 : #define log_trc(...)
14 : #endif
15 :
16 :
17 : static void
18 171 : init_bufferstack(struct netio_buffer_stack* stack, unsigned num, size_t bufsize)
19 : {
20 171 : stack->buffers = malloc(num*sizeof(struct netio_buffer*));
21 171 : stack->buffer_addresses = malloc(num*sizeof(struct netio_buffer*));
22 171 : stack->num_buffers = num;
23 171 : stack->available_buffers = num;
24 40003 : for(unsigned int i=0; i<num; i++) {
25 39832 : stack->buffers[i] = malloc(sizeof(struct netio_buffer));
26 39832 : stack->buffers[i]->data = malloc(bufsize);
27 39832 : stack->buffers[i]->size = bufsize;
28 39832 : stack->buffers[i]->mr = NULL;
29 39832 : stack->buffer_addresses[i] = stack->buffers[i];
30 : }
31 :
32 171 : }
33 :
34 :
35 40287 : void netio_register_recv_buffer(struct netio_recv_socket* socket, struct netio_buffer* buf, uint64_t flags)
36 : {
37 40287 : if (socket->tcp_fi_mode != NETIO_MODE_LIBFABRIC) {
38 : return;
39 : }
40 :
41 40287 : log_trc("registering recv buffer number: %d", socket->reg_mr);
42 40287 : uint64_t access = FI_RECV;
43 40287 : int ret = 0;
44 40287 : if((ret = fi_mr_reg(socket->domain, buf->data, buf->size, access, 0, socket->req_key++, 0, &buf->mr, NULL)))
45 : {
46 0 : log_fatal("Failed to register recv buffer (key %lu) failed. Error %d: %s",socket->req_key, ret, fi_strerror(-ret));
47 : //exit(2);
48 : }
49 0 : if(ret==0){
50 40287 : if (socket->reg_mr >= NETIO_DOMAIN_MAX_MR){
51 0 : log_fatal("Trying to register recv buffer number %u. MR array size %lu.", socket->reg_mr, NETIO_DOMAIN_MAX_MR);
52 0 : exit(1);
53 : }
54 40287 : socket->mr[socket->reg_mr]=&buf->mr->fid;
55 40287 : socket->reg_mr+=1;
56 : }
57 : }
58 :
59 :
60 38388 : void netio_register_send_buffer(struct netio_send_socket* socket, struct netio_buffer* buf, uint64_t flags)
61 : {
62 38388 : if (socket->tcp_fi_mode != NETIO_MODE_LIBFABRIC) {
63 : return;
64 : }
65 :
66 38121 : uint64_t access = FI_SEND;
67 38121 : int ret = 0;
68 38121 : if((ret = fi_mr_reg(socket->domain->domain, buf->data, buf->size, access, 0, socket->domain->req_key++, 0, &buf->mr, NULL)))
69 : {
70 0 : log_fatal("Failed to register send buffer failed. Error %d: %s.", ret, fi_strerror(-ret));
71 0 : exit(2);
72 : }
73 38121 : if(ret==0){
74 38121 : if (socket->domain->reg_mr >= NETIO_DOMAIN_MAX_MR){
75 0 : log_fatal("Trying to register send buffer number %u. MR array size %lu.", socket->domain->reg_mr, NETIO_DOMAIN_MAX_MR);
76 0 : exit(1);
77 : }
78 38121 : socket->domain->mr[socket->domain->reg_mr]=&buf->mr->fid;
79 38121 : socket->domain->reg_mr+=1;
80 : }
81 : }
82 :
83 :
84 : void
85 171 : netio_bufferstack_send_init(struct netio_buffer_stack* stack, struct netio_send_socket* socket, size_t num, size_t bufsize, uint64_t flags)
86 : {
87 171 : log_dbg("Initializing memory: %d pages", num);
88 171 : init_bufferstack(stack, num, bufsize);
89 171 : if (socket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
90 162 : socket->domain->reg_mr=0;
91 37690 : for(unsigned i=0; i<num; i++) {
92 37528 : netio_register_send_buffer(socket, stack->buffers[i], flags);
93 : }
94 : }
95 171 : }
96 :
97 : void
98 167 : netio_bufferstack_close(struct netio_buffer_stack* stack, size_t num)
99 : {
100 167 : log_dbg("Freeing memory: %d pages", num);
101 39215 : for(unsigned int i=0; i < num; i++) {
102 39048 : if(stack->buffers == NULL){log_warn("Buffer already released."); return;}
103 39048 : free(stack->buffer_addresses[i]->data);
104 39048 : free(stack->buffer_addresses[i]);
105 : }
106 167 : free(stack->buffers);
107 167 : free(stack->buffer_addresses);
108 167 : stack->buffers = NULL;
109 : }
110 :
111 :
112 : int
113 1335603 : netio_bufferstack_pop(struct netio_buffer_stack* stack, struct netio_buffer** buffer)
114 : {
115 1335603 : log_trc("pop: available %lu", stack->available_buffers);
116 1335603 : if(stack->available_buffers > 0) {
117 1116748 : stack->available_buffers--;
118 1116748 : *buffer = stack->buffers[stack->available_buffers];
119 1116748 : return 0;
120 : } else {
121 : return 1;
122 : }
123 : }
124 :
125 :
126 : int
127 1116103 : netio_bufferstack_push(struct netio_buffer_stack* stack, struct netio_buffer* buffer)
128 : {
129 1116103 : log_trc("push: available %lu", stack->available_buffers);
130 1116103 : if(stack->available_buffers < stack->num_buffers) {
131 1116103 : stack->buffers[stack->available_buffers] = buffer;
132 1116103 : stack->available_buffers++;
133 1116103 : return 0;
134 : } else {
135 : return 1;
136 : }
137 : }
|