Line data Source code
1 : #include <string.h>
2 : #include <stdio.h>
3 : #include <unistd.h>
4 : #include "log.h"
5 : #include "connection_event.h"
6 : #include "netio/netio.h"
7 :
8 : #if defined DEBUG || defined DEBUG_BUF
9 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
10 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
11 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #else
13 : #define log_dbg(...)
14 : #define log_trc(...)
15 : #endif
16 :
17 : #define FATAL(msg, c) \
18 : do { \
19 : log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
20 : exit(2); \
21 : } while(0);
22 :
23 :
24 : /* This type is used as a length-marker in buffers for encoded messages.
25 : Encoding of buffers is as follows: LEN0|MSG0|LEN1|MSG1|...|LEN_N|MSG_N */
26 : typedef uint32_t msg_size_t;
27 :
28 :
29 : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
30 :
31 :
32 : static int
33 1087084 : flush(struct netio_buffered_send_socket* socket)
34 : {
35 1087084 : int ret = NETIO_STATUS_OK;
36 1087084 : if(socket->current_buffer)
37 : {
38 1087082 : socket->current_buffer->size = socket->pos;
39 1087082 : int send_status = netio_send_buffer(&socket->send_socket, socket->current_buffer);
40 1087082 : if (send_status == NETIO_STATUS_AGAIN){
41 0 : socket->busy = 1;
42 0 : log_dbg("netio_send_buffer returned %d, trying again", ret);
43 0 : ret = NETIO_STATUS_AGAIN;
44 : } else {
45 1087082 : socket->busy = 0;
46 1087082 : socket->current_buffer = NULL;
47 1087082 : if(socket->timeout_ms != 0){
48 286437 : netio_timer_stop(&socket->flush_timer);
49 : }
50 : }
51 : } else { //there is no current buffer. disable busy if on
52 2 : socket->busy = 0;
53 : }
54 1087084 : return ret;
55 : }
56 :
57 :
58 : static void
59 82 : flush_cb(void* ptr)
60 : {
61 82 : struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)ptr;
62 82 : flush(socket);
63 82 : }
64 :
65 :
66 : static void
67 1084072 : on_send_completed(struct netio_send_socket* socket, uint64_t key)
68 : {
69 1084072 : struct netio_buffer* buf = (struct netio_buffer*)key;
70 1084072 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
71 1084072 : if(netio_bufferstack_push(&bs->buffers, buf)) {
72 0 : log_fatal("The buffer stack exceeded its limits.");
73 0 : exit(1);
74 : }
75 1084072 : if(bs->buffers.available_buffers == 1) {
76 214525 : netio_signal_fire(&bs->signal_buffer_available);
77 : }
78 1084072 : }
79 :
80 : static void
81 328 : on_connect(struct netio_send_socket* socket)
82 : {
83 328 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
84 328 : netio_bufferstack_send_init(&bs->buffers, socket, bs->num_pages, bs->buffersize, 0);
85 328 : if(bs->cb_connection_established) {
86 328 : bs->cb_connection_established(bs);
87 : }
88 328 : }
89 :
90 : static void
91 310 : on_buf_send_socket_connection_closed(struct netio_send_socket* ssocket)
92 : {
93 310 : log_dbg("on_buf_send_socket_connection_closed callback");
94 310 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(ssocket->usr);
95 310 : log_dbg("removing buffer available signal fd %d from evloop %d", bs->signal_buffer_available.ev_ctx.fd, bs->signal_buffer_available.epollfd);
96 310 : netio_signal_close(&ssocket->ctx->evloop, &bs->signal_buffer_available);
97 310 : if(bs->timeout_ms != 0){
98 304 : log_dbg("removing flush timer fd %d from evloop %d", bs->flush_timer.ev_ctx.fd, bs->signal_buffer_available.epollfd);
99 304 : netio_timer_close(&ssocket->ctx->evloop, &bs->flush_timer);
100 : }
101 310 : bs->current_buffer = NULL;
102 310 : handle_send_socket_shutdown(ssocket);
103 310 : netio_bufferstack_close(&bs->buffers, bs->num_pages);
104 :
105 310 : if(bs->pub_socket) { //only remove when send socket is part of a publish socket
106 39 : if(bs->cb_connection_closed) {
107 39 : bs->cb_connection_closed(bs);
108 : }
109 39 : struct netio_publish_socket* psocket = (struct netio_publish_socket*)bs->pub_socket;
110 39 : remove_socket(&(psocket->subscription_table.socket_list), (void*)bs);
111 : } else {
112 271 : if(bs->cb_connection_closed) {
113 271 : bs->cb_connection_closed(bs);
114 : }
115 : }
116 310 : }
117 :
118 : static void
119 70 : on_error_connection_refused(struct netio_send_socket* socket) {
120 70 : struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
121 70 : netio_signal_close(&(socket->ctx->evloop), &bs->signal_buffer_available);
122 70 : if(bs->timeout_ms != 0){
123 64 : netio_timer_close(&(socket->ctx->evloop), &bs->flush_timer);
124 : }
125 70 : if(bs->cb_error_connection_refused) {
126 70 : bs->cb_error_connection_refused(bs);
127 : }
128 70 : }
129 :
130 : static void
131 17250790 : on_buffered_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
132 : {
133 17250790 : struct netio_buffered_recv_socket* bs = (struct netio_buffered_recv_socket*)socket->usr;
134 17250790 : if(bs->lsocket->cb_msg_received) {
135 : size_t pos = 0;
136 3423792159 : while(pos < len) {
137 3406541450 : msg_size_t* s = (msg_size_t*)((char *)buf->data + pos);
138 3406541450 : pos += sizeof(msg_size_t);
139 3406541450 : bs->lsocket->cb_msg_received(bs, (char *)buf->data + pos, *s);
140 3406541369 : pos += *s;
141 : }
142 : }
143 :
144 : //to study the L1ID pileup
145 17250709 : struct netio_subscribe_socket* ssocket = bs->lsocket->listen_socket.usr;
146 17250709 : if(ssocket && ssocket->cb_buf_received) {
147 600 : ssocket->cb_buf_received(ssocket, buf, len);
148 : }
149 :
150 17250707 : netio_post_recv(socket, buf);
151 17250735 : }
152 :
153 :
154 :
155 : // API FUNCTIONS ///////////////////////////////////////////////////////////////
156 :
157 : /**
158 : * Initializes a buffered listen socket.
159 : *
160 : * @param socket: The socket to initialize
161 : * @param ctx: The NetIO context object in which to initialize the socket
162 : * @param attr: Buffer attributes of the socket. Attributes need to match on
163 : * the sending and receiving side of a socket
164 : */
165 : void
166 311 : netio_buffered_listen_socket_init(struct netio_buffered_listen_socket* socket,
167 : struct netio_context* ctx,
168 : struct netio_buffered_socket_attr* attr)
169 : {
170 311 : memset(socket, 0, sizeof(*socket));
171 311 : if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
172 0 : log_error("Number of recv pages requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
173 0 : attr->num_pages = NETIO_DOMAIN_MAX_MR;
174 : }
175 311 : socket->pagesize = attr->pagesize;
176 311 : socket->num_pages = attr->num_pages;
177 311 : netio_init_listen_socket(&socket->listen_socket, ctx, NULL);
178 311 : }
179 :
180 :
181 : /**
182 : * Bind the listen socket to an interface and port number and bring the listen socket to 'listening' state.
183 : *
184 : * @param socket: The buffered listen socket
185 : * @param hostname: A hostname, typically an IP address, which identifies the interface on which to bind
186 : * @param port: The port name to listen on
187 : */
188 : void
189 311 : netio_buffered_listen(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port)
190 : {
191 311 : int ret;
192 311 : struct fi_info* hints;
193 311 : struct fi_eq_attr eq_attr;
194 311 : eq_attr.wait_obj = FI_WAIT_FD;
195 :
196 311 : hints = fi_allocinfo();
197 311 : hints->ep_attr->type = FI_EP_MSG;
198 311 : hints->caps = FI_MSG;
199 311 : hints->mode = FI_LOCAL_MR;
200 311 : char port_addr[32];
201 311 : snprintf(port_addr, 32, "%u", port);
202 :
203 311 : log_dbg("listening on %s:%s", hostname, port_addr);
204 :
205 311 : if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
206 : &socket->listen_socket.fi)))
207 : {
208 0 : FATAL("Buf-listen socket, fail to get interface info, error ", ret);
209 : }
210 : // printf("%s\n", fi_tostr(socket->listen_socket.fi, FI_TYPE_INFO));
211 :
212 311 : if((ret = fi_fabric(socket->listen_socket.fi->fabric_attr, &socket->listen_socket.fabric, NULL)))
213 : {
214 0 : FATAL("Buf-listen socket, cannot open fabric, error ", ret);
215 : }
216 :
217 311 : if((ret = fi_eq_open(socket->listen_socket.fabric, &eq_attr, &socket->listen_socket.eq, NULL)))
218 : {
219 0 : FATAL("Buf-listen socket, cannot open Event Queue, error ", ret);
220 : }
221 :
222 311 : if((ret = fi_passive_ep(socket->listen_socket.fabric, socket->listen_socket.fi, &socket->listen_socket.pep, NULL)))
223 : {
224 0 : FATAL("Buf-listen socket, cannot open passive endpoint, error ", ret);
225 : }
226 :
227 311 : if((ret = fi_pep_bind(socket->listen_socket.pep, &socket->listen_socket.eq->fid, 0)))
228 : {
229 0 : FATAL("Buf-listen socket, cannot bind passive endpoint, error ", ret);
230 : }
231 :
232 311 : if((ret = fi_listen(socket->listen_socket.pep)))
233 : {
234 0 : FATAL("Buf-listen socket, cannot enable, error ", ret);
235 : }
236 :
237 311 : if((ret = fi_control(&socket->listen_socket.eq->fid, FI_GETWAIT, &socket->listen_socket.eqfd)))
238 : {
239 0 : FATAL("Buf-listen socket failed to obtain Event Queue wait object", ret);
240 : }
241 :
242 311 : socket->listen_socket.eq_ev_ctx.fd = socket->listen_socket.eqfd;
243 311 : socket->listen_socket.eq_ev_ctx.data = socket;
244 311 : socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_cm_event;
245 311 : struct netio_eventloop* eloop = &socket->listen_socket.ctx->evloop;
246 311 : netio_register_read_fd(eloop, &socket->listen_socket.eq_ev_ctx);
247 311 : add_open_fd(&eloop->openfds, socket->listen_socket.eqfd, NETIO_EQ, BLISTEN, socket);
248 311 : log_dbg("netio_buffered_listen_socket: registering EQ fd %d", socket->listen_socket.eqfd);
249 311 : fi_freeinfo(hints);
250 311 : }
251 :
252 : /**
253 : * Initializes a buffered send socket.
254 : *
255 : * @param socket: The socket to initialize
256 : * @param ctx: The NetIO context object in which to initialize the socket
257 : * @param attr: Buffer attributes of the socket. Attributes need to match on
258 : * the sending and receiving side of a socket
259 : */
260 : void
261 398 : netio_buffered_send_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
262 : {
263 398 : memset(socket, 0, sizeof(*socket));
264 398 : netio_init_send_socket(&socket->send_socket, ctx);
265 398 : socket->send_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
266 398 : socket->send_socket.usr = socket;
267 398 : socket->send_socket.cb_send_completed = on_send_completed;
268 398 : socket->send_socket.cb_connection_established = on_connect;
269 398 : socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
270 398 : socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
271 398 : socket->current_buffer = NULL;
272 398 : socket->pub_socket = NULL;
273 398 : socket->pos = 0;
274 398 : socket->busy = 0;
275 398 : socket->watermark = attr->watermark;
276 398 : if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
277 0 : log_error("Number of send buffers requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
278 0 : attr->num_pages = NETIO_DOMAIN_MAX_MR;
279 : }
280 398 : socket->num_pages = attr->num_pages;
281 398 : socket->buffersize = attr->pagesize;
282 398 : netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
283 398 : socket->signal_buffer_available.cb = NULL; //deactivated by default
284 398 : socket->timeout_ms = attr->timeout_ms;
285 398 : if(attr->timeout_ms != 0){
286 382 : netio_timer_init(&ctx->evloop, &socket->flush_timer);
287 382 : socket->flush_timer.cb = flush_cb;
288 382 : socket->flush_timer.data = socket;
289 : } else {
290 16 : socket->flush_timer.cb = NULL;
291 : }
292 398 : }
293 :
294 :
295 : /**
296 : * Connect a buffered send socket to a remote.
297 : *
298 : * @param socket: The buffered send socket
299 : * @param hostname: Hostname or IP address of the remote endpoint
300 : * @param port: Port number of the remote endpoint
301 : */
302 : void
303 355 : netio_buffered_connect(struct netio_buffered_send_socket* socket, const char* hostname, unsigned port)
304 : {
305 355 : netio_connect(&socket->send_socket, hostname, port);
306 355 : }
307 :
308 :
309 : void
310 43 : netio_buffered_connect_rawaddr(struct netio_buffered_send_socket* socket, void* addr, size_t addrlen)
311 : {
312 43 : netio_connect_rawaddr(&socket->send_socket, addr, addrlen);
313 43 : }
314 :
315 :
316 : /**
317 : * Send a message on a buffered connection.
318 : *
319 : * @param socket: The buffered send socket
320 : * @param data: Pointer to message
321 : * @param size: Size of the message
322 : *
323 : * @return
324 : * - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
325 : * Increase `pagesize` in the buffer attributes.
326 : * - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
327 : * - `NETIO_STATUS_OK` Message was successfully copied to internal buffers */
328 : int
329 462 : netio_buffered_send(struct netio_buffered_send_socket* socket, void* data, size_t size)
330 : {
331 462 : struct iovec iov;
332 462 : iov.iov_base = data;
333 462 : iov.iov_len = size;
334 462 : return netio_buffered_sendv(socket, &iov, 1);
335 : }
336 :
337 :
338 : /**
339 : * Send a message on a buffered connection.
340 : *
341 : * @param socket: The buffered send socket
342 : * @param iov: Pointer to a scatter/gather buffer
343 : * @param num: Number of elements in the scatter/gather buffer
344 : *
345 : * @return
346 : * - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
347 : * Increase `pagesize` in the buffer attributes.
348 : * - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
349 : * - `NETIO_STATUS_OK` Message was successfully copied to internal buffers
350 : */
351 : int
352 87570933 : netio_buffered_sendv(struct netio_buffered_send_socket* socket, struct iovec* iov, size_t num)
353 : {
354 :
355 87570933 : if(socket->busy){
356 0 : int ret = flush(socket);
357 0 : if (ret == NETIO_STATUS_AGAIN){
358 : return NETIO_STATUS_AGAIN;
359 : }
360 : }
361 :
362 87570933 : size_t total_size = 0;
363 262712337 : for(unsigned int i=0; i<num; i++) {
364 175141404 : total_size += iov[i].iov_len;
365 : }
366 :
367 : //if current message is larger than the whole buffer
368 87570933 : if(total_size+sizeof(msg_size_t) > socket->buffersize) {
369 : return NETIO_STATUS_TOO_BIG;
370 : }
371 :
372 87570933 : if(socket->current_buffer == NULL) {
373 1515764 : if(netio_bufferstack_pop(&socket->buffers, &socket->current_buffer)) {
374 : return NETIO_STATUS_AGAIN;
375 : }
376 1087083 : socket->pos = 0;
377 : //Enable flush timer
378 1087083 : if(socket->timeout_ms != 0 ){
379 286438 : netio_timer_start_ms(&socket->flush_timer, socket->timeout_ms);
380 : }
381 : } else {
382 : //if current message is larger than remaining space
383 : //flush buffer and retry with a new one
384 86055169 : if(socket->pos+total_size+sizeof(msg_size_t) >= socket->buffersize){
385 0 : flush(socket);
386 0 : return NETIO_STATUS_AGAIN;
387 : }
388 : }
389 :
390 87142252 : *(msg_size_t*)((char*)socket->current_buffer->data + socket->pos) = total_size;
391 87142252 : socket->pos += sizeof(msg_size_t);
392 261426294 : for(unsigned int i=0; i<num; i++) {
393 174284042 : memcpy((char *)socket->current_buffer->data + socket->pos, iov[i].iov_base, iov[i].iov_len);
394 174284042 : socket->pos += iov[i].iov_len;
395 : }
396 :
397 87142252 : if(socket->pos > socket->watermark) {
398 285895 : flush(socket);
399 : }
400 : return NETIO_STATUS_OK;
401 : }
402 :
403 : void
404 252 : netio_buffered_recv_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
405 : {
406 252 : memset(socket, 0, sizeof(*socket));
407 252 : socket->lsocket = lsocket;
408 252 : netio_init_recv_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
409 252 : socket->recv_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
410 252 : socket->recv_socket.usr = socket;
411 :
412 252 : socket->num_pages = socket->lsocket->num_pages;
413 252 : socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
414 71082 : for(unsigned int i=0; i<socket->num_pages; i++) {
415 70830 : socket->pages[i].data = malloc(socket->lsocket->pagesize);
416 70830 : socket->pages[i].size = socket->lsocket->pagesize;
417 : }
418 252 : socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
419 252 : }
420 :
421 :
422 : /**
423 : * Flushes the current buffer of the given buffered send socket.
424 : *
425 : * @param socket The buffered send socket
426 : */
427 : void
428 801107 : netio_buffered_flush(struct netio_buffered_send_socket* socket)
429 : {
430 801107 : flush(socket);
431 801107 : }
|