diff options
Diffstat (limited to 'src/ipcpd/eth/eth.c')
| -rw-r--r-- | src/ipcpd/eth/eth.c | 556 |
1 files changed, 497 insertions, 59 deletions
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 4be7775e..103ba881 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -37,12 +37,14 @@ #include "config.h" +#include <ouroboros/atomics.h> #include <ouroboros/endian.h> #include <ouroboros/hash.h> #include <ouroboros/errno.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/bitmap.h> +#include <ouroboros/crc8.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/fqueue.h> @@ -50,6 +52,14 @@ #include <ouroboros/time.h> #include <ouroboros/fccntl.h> #include <ouroboros/pthread.h> +#include <ouroboros/rib.h> + +#ifndef IPCP_ETH_FLOW_STATS +#undef FETCH_ADD_RELAXED +#define FETCH_ADD_RELAXED(p, v) ((void) 0) +#undef FETCH_SUB_RELAXED +#define FETCH_SUB_RELAXED(p, v) ((void) 0) +#endif #include "ipcp.h" #include "np1.h" @@ -122,7 +132,8 @@ #define MGMT_EID 0 #define DIX_EID_SIZE sizeof(uint16_t) #define DIX_LENGTH_SIZE sizeof(uint16_t) -#define DIX_HEADER_SIZE (DIX_EID_SIZE + DIX_LENGTH_SIZE) +#define DIX_HCS_SIZE CRC8_HASH_LEN +#define DIX_HEADER_SIZE (DIX_EID_SIZE + DIX_LENGTH_SIZE + DIX_HCS_SIZE) #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + DIX_HEADER_SIZE) #define MAX_EIDS (1 << (8 * DIX_EID_SIZE)) #define ETH_MAX_PACKET_SIZE (ETH_MTU - DIX_HEADER_SIZE) @@ -130,16 +141,20 @@ #elif defined(BUILD_ETH_LLC) #define THIS_TYPE IPCP_ETH_LLC #define MGMT_SAP 0x01 -#define LLC_HEADER_SIZE 3 +#define LLC_FIELDS_SIZE 3 +#define LLC_HCS_SIZE CRC8_HASH_LEN +#define LLC_HEADER_SIZE (LLC_FIELDS_SIZE + LLC_HCS_SIZE) #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE) #define MAX_SAPS 64 #define ETH_MAX_PACKET_SIZE (ETH_MTU - LLC_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #endif -#define NAME_QUERY_TIMEO 2000 /* ms */ -#define MGMT_TIMEO 100 /* ms */ +#define NAME_QUERY_TIMEO 1900 /* ms total budget */ +#define NAME_QUERY_RETRIES 3 /* retransmits, 4 attempts total */ +#define MGMT_TIMEO 100 /* ms */ #define MGMT_FRAME_SIZE IPCP_ETH_MGMT_FRAME_SIZE +#define ETH_RIB_PATH "eth" #define FLOW_REQ 0 #define FLOW_REPLY 1 @@ -165,7 +180,7 @@ struct mgmt_msg { uint32_t delay; uint32_t timeout; int32_t response; - uint8_t in_order; + uint8_t service; #if defined (BUILD_ETH_DIX) uint8_t code; uint8_t availability; @@ -185,6 +200,7 @@ struct eth_frame { uint8_t ssap; uint8_t cf; #endif + uint8_t hcs; uint8_t payload; } __attribute__((packed)); @@ -196,6 +212,17 @@ struct ef { int8_t r_sap; #endif uint8_t r_addr[MAC_SIZE]; +#ifdef IPCP_ETH_FLOW_STATS + struct { + time_t stamp; + size_t p_rcv; + size_t b_rcv; + size_t p_dlv_f; + size_t p_snd; + size_t b_snd; + size_t p_snd_f; + } stat; +#endif }; struct mgmt_frame { @@ -233,6 +260,22 @@ struct { struct ef * fd_to_ef; fset_t * np1_flows; pthread_rwlock_t flows_lock; +#ifdef IPCP_ETH_FLOW_STATS + struct { + size_t n_flows; + size_t n_rcv; + size_t n_snd; + size_t n_mgmt_rcv; + size_t n_mgmt_snd; + size_t n_bad_id; + size_t n_dlv_f; + size_t n_buf_f; + size_t n_rcv_f; + size_t n_snd_f; + size_t kern_rcv; + size_t kern_drp; + } stat; +#endif pthread_t packet_writer[IPCP_ETH_WR_THR]; pthread_t packet_reader[IPCP_ETH_RD_THR]; @@ -284,7 +327,14 @@ static int eth_data_init(void) eth_data.fd_to_ef[i].r_sap = -1; #endif memset(ð_data.fd_to_ef[i].r_addr, 0, MAC_SIZE); +#ifdef IPCP_ETH_FLOW_STATS + memset(ð_data.fd_to_ef[i].stat, 0, + sizeof(eth_data.fd_to_ef[i].stat)); +#endif } +#ifdef IPCP_ETH_FLOW_STATS + memset(ð_data.stat, 0, sizeof(eth_data.stat)); +#endif eth_data.shim_data = shim_data_create(); if (eth_data.shim_data == NULL) @@ -357,6 +407,227 @@ static void eth_data_fini(void) free(eth_data.fd_to_ef); } +#ifdef IPCP_ETH_FLOW_STATS +static int eth_rib_read(const char * path, + char * buf, + size_t len) +{ + struct ef * flow; + int fd; + char tmstr[RIB_TM_STRLEN]; + struct tm * tm; + time_t stamp; + char * entry; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + if (len < 2048) + return 0; + + buf[0] = '\0'; + + if (strcmp(entry, "summary") == 0) { + int n; +#if defined(HAVE_RAW_SOCKETS) + int rcvbuf = 0; + int sndbuf = 0; + int queued = 0; + socklen_t optlen = sizeof(rcvbuf); +# if defined(__linux__) + struct tpacket_stats tp_stats; + socklen_t tp_len = sizeof(tp_stats); +# endif + + getsockopt(eth_data.s_fd, SOL_SOCKET, + SO_RCVBUF, &rcvbuf, &optlen); + optlen = sizeof(sndbuf); + getsockopt(eth_data.s_fd, SOL_SOCKET, + SO_SNDBUF, &sndbuf, &optlen); + ioctl(eth_data.s_fd, FIONREAD, &queued); +# if defined(__linux__) + if (getsockopt(eth_data.s_fd, SOL_PACKET, + PACKET_STATISTICS, + &tp_stats, &tp_len) == 0) { + FETCH_ADD_RELAXED(ð_data.stat.kern_rcv, + tp_stats.tp_packets); + FETCH_ADD_RELAXED(ð_data.stat.kern_drp, + tp_stats.tp_drops); + } +# endif +#endif + n = sprintf(buf, + "Active flows: %20zu\n" + "Total frames received: %20zu\n" + "Total frames sent: %20zu\n" + "Management frames received: %20zu\n" + "Management frames sent: %20zu\n" + "Bad EID/SAP frames: %20zu\n" + "Delivery (N+1) failures: %20zu\n" + "Buffer alloc failures: %20zu\n" + "Frame read failures: %20zu\n" + "Frame send failures: %20zu\n", + LOAD_RELAXED(ð_data.stat.n_flows), + LOAD_RELAXED(ð_data.stat.n_rcv), + LOAD_RELAXED(ð_data.stat.n_snd), + LOAD_RELAXED(ð_data.stat.n_mgmt_rcv), + LOAD_RELAXED(ð_data.stat.n_mgmt_snd), + LOAD_RELAXED(ð_data.stat.n_bad_id), + LOAD_RELAXED(ð_data.stat.n_dlv_f), + LOAD_RELAXED(ð_data.stat.n_buf_f), + LOAD_RELAXED(ð_data.stat.n_rcv_f), + LOAD_RELAXED(ð_data.stat.n_snd_f)); +#if defined(HAVE_RAW_SOCKETS) + n += sprintf(buf + n, + "Socket rcvbuf (bytes): %20d\n" + "Socket sndbuf (bytes): %20d\n" + "Socket queued (bytes): %20d\n", + rcvbuf, sndbuf, queued); +# if defined(__linux__) + n += sprintf(buf + n, + "Kernel frames received: %20zu\n" + "Kernel frames dropped: %20zu\n", + LOAD_RELAXED(ð_data.stat.kern_rcv), + LOAD_RELAXED(ð_data.stat.kern_drp)); +# endif +#endif + return n; + } + + fd = atoi(entry); + + if (fd < 0 || fd >= SYS_MAX_FLOWS) + return -1; + + flow = ð_data.fd_to_ef[fd]; + + pthread_rwlock_rdlock(ð_data.flows_lock); + + stamp = flow->stat.stamp; + if (stamp == 0) { + pthread_rwlock_unlock(ð_data.flows_lock); + return 0; + } + + pthread_rwlock_unlock(ð_data.flows_lock); + + tm = gmtime(&stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + + sprintf(buf, + "Flow established at: %20s\n" + "Sent (packets): %20zu\n" + "Sent (bytes): %20zu\n" + "Send failed (packets): %20zu\n" + "Received (packets): %20zu\n" + "Received (bytes): %20zu\n" + "Delivery (N+1) failures: %20zu\n", + tmstr, + LOAD_RELAXED(&flow->stat.p_snd), + LOAD_RELAXED(&flow->stat.b_snd), + LOAD_RELAXED(&flow->stat.p_snd_f), + LOAD_RELAXED(&flow->stat.p_rcv), + LOAD_RELAXED(&flow->stat.b_rcv), + LOAD_RELAXED(&flow->stat.p_dlv_f)); + + return strlen(buf); +} + +static int eth_rib_readdir(char *** buf) +{ + char entry[RIB_PATH_LEN + 1]; + size_t i; + int idx = 0; + int n_entries; + + pthread_rwlock_rdlock(ð_data.flows_lock); + + n_entries = (int) LOAD_RELAXED(ð_data.stat.n_flows) + 1; + + *buf = malloc(sizeof(**buf) * n_entries); + if (*buf == NULL) + goto fail_entries; + + (*buf)[idx] = malloc(strlen("summary") + 1); + if ((*buf)[idx] == NULL) + goto fail_entry; + + strcpy((*buf)[idx++], "summary"); + + for (i = 0; i < SYS_MAX_FLOWS && idx < n_entries; ++i) { + if (eth_data.fd_to_ef[i].stat.stamp == 0) + continue; + + sprintf(entry, "%zu", i); + + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) + goto fail_entry; + + strcpy((*buf)[idx++], entry); + } + + pthread_rwlock_unlock(ð_data.flows_lock); + + return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + fail_entries: + pthread_rwlock_unlock(ð_data.flows_lock); + return -ENOMEM; +} + +static int eth_rib_getattr(const char * path, + struct rib_attr * attr) +{ + int fd; + char * entry; + struct ef * flow; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + if (strcmp(entry, "summary") == 0) { + attr->size = 2048; + attr->mtime = 0; + return 0; + } + + fd = atoi(entry); + + if (fd < 0 || fd >= SYS_MAX_FLOWS) { + attr->size = 0; + attr->mtime = 0; + return 0; + } + + flow = ð_data.fd_to_ef[fd]; + + pthread_rwlock_rdlock(ð_data.flows_lock); + + if (flow->stat.stamp != 0) { + attr->size = 2048; + attr->mtime = flow->stat.stamp; + } else { + attr->size = 0; + attr->mtime = 0; + } + + pthread_rwlock_unlock(ð_data.flows_lock); + + return 0; +} + +static struct rib_ops eth_r_ops = { + .read = eth_rib_read, + .readdir = eth_rib_readdir, + .getattr = eth_rib_getattr +}; +#endif /* IPCP_ETH_FLOW_STATS */ + #ifdef BUILD_ETH_LLC static uint8_t reverse_bits(uint8_t b) { @@ -409,12 +680,18 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, e_frame->ethertype = eth_data.ethertype; e_frame->eid = htons(deid); e_frame->length = htons(len); + mem_hash(HASH_CRC8, &e_frame->hcs, + (uint8_t *) &e_frame->eid, + DIX_EID_SIZE + DIX_LENGTH_SIZE); frame_len = ETH_HEADER_TOT_SIZE + len; #elif defined(BUILD_ETH_LLC) e_frame->length = htons(LLC_HEADER_SIZE + len); e_frame->dsap = dsap; e_frame->ssap = ssap; e_frame->cf = cf; + mem_hash(HASH_CRC8, &e_frame->hcs, + (uint8_t *) &e_frame->dsap, + LLC_FIELDS_SIZE); frame_len = ETH_HEADER_TOT_SIZE + len; #endif @@ -440,10 +717,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, } assert(FD_ISSET(eth_data.s_fd, &fds)); - if (sendto(eth_data.s_fd, - frame, - frame_len, - 0, + if (sendto(eth_data.s_fd, frame, frame_len, 0, (struct sockaddr *) ð_data.device, sizeof(eth_data.device)) <= 0) { log_dbg("Failed to send message: %s.", strerror(errno)); @@ -451,6 +725,8 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, } #endif /* HAVE_NETMAP */ + FETCH_ADD_RELAXED(ð_data.stat.n_snd, 1); + return 0; } @@ -490,7 +766,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, msg->availability = qs.availability; msg->loss = hton32(qs.loss); msg->ber = hton32(qs.ber); - msg->in_order = qs.in_order; + msg->service = qs.service; msg->max_gap = hton32(qs.max_gap); msg->timeout = hton32(qs.timeout); @@ -508,6 +784,9 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, buf, len + data->len); free(buf); + if (ret == 0) + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_snd, 1); + return ret; } @@ -558,6 +837,8 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, return -1; } + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_snd, 1); + free(buf); return 0; @@ -575,7 +856,8 @@ static int eth_ipcp_req(uint8_t * r_addr, { int fd; - fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, + ETH_MAX_PACKET_SIZE, data); if (fd < 0) { log_err("Could not get new flow from IRMd."); return -1; @@ -622,7 +904,7 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, fd = eth_data.ef_to_fd[dsap]; #endif if (fd < 0) { - pthread_rwlock_unlock(& eth_data.flows_lock); + pthread_rwlock_unlock(ð_data.flows_lock); log_err("No flow found with that SAP."); return -1; /* -EFLOWNOTFOUND */ } @@ -647,7 +929,8 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, #elif defined(BUILD_ETH_LLC) log_dbg("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap); #endif - if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data)) < 0) { + if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, + ETH_MAX_PACKET_SIZE, data)) < 0) { log_err("Failed to reply to flow allocation."); return -1; } @@ -689,6 +972,8 @@ static int eth_ipcp_name_query_req(const uint8_t * hash, return -1; } + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_snd, 1); + free(buf); } @@ -718,20 +1003,24 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, qosspec_t qs; buffer_t data; + if (len < sizeof(*msg)) + return -1; + msg = (struct mgmt_msg *) buf; switch (msg->code) { case FLOW_REQ: msg_len = sizeof(*msg) + ipcp_dir_hash_len(); - assert(len >= msg_len); + if (len < msg_len) + return -1; qs.delay = ntoh32(msg->delay); qs.bandwidth = ntoh64(msg->bandwidth); qs.availability = msg->availability; qs.loss = ntoh32(msg->loss); qs.ber = ntoh32(msg->ber); - qs.in_order = msg->in_order; + qs.service = msg->service; qs.max_gap = ntoh32(msg->max_gap); qs.timeout = ntoh32(msg->timeout); @@ -752,8 +1041,6 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, } break; case FLOW_REPLY: - assert(len >= sizeof(*msg)); - data.data = (uint8_t *) buf + sizeof(*msg); data.len = len - sizeof(*msg); @@ -769,9 +1056,13 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, &data); break; case NAME_QUERY_REQ: + if (len < sizeof(*msg) + ipcp_dir_hash_len()) + return -1; eth_ipcp_name_query_req(buf + sizeof(*msg), r_addr); break; case NAME_QUERY_REPLY: + if (len < sizeof(*msg) + ipcp_dir_hash_len()) + return -1; eth_ipcp_name_query_reply(buf + sizeof(*msg), r_addr); break; default: @@ -844,6 +1135,12 @@ static void * eth_ipcp_packet_reader(void * o) fd_set fds; int frame_len; #endif +#if defined(HAVE_RAW_SOCKETS) + struct sockaddr_ll src; + socklen_t slen; +#endif + size_t eth_len; + uint8_t hcs; struct eth_frame * e_frame; struct mgmt_frame * frame; @@ -881,24 +1178,58 @@ static void * eth_ipcp_packet_reader(void * o) if (select(eth_data.s_fd + 1, &fds, NULL, NULL, NULL) < 0) continue; assert(FD_ISSET(eth_data.s_fd, &fds)); - if (ipcp_spb_reserve(&spb, ETH_MTU)) + if (ipcp_spb_reserve(&spb, ETH_MTU)) { + FETCH_ADD_RELAXED(ð_data.stat.n_buf_f, 1); continue; - buf = ssm_pk_buff_head_alloc(spb, ETH_HEADER_TOT_SIZE); + } + buf = ssm_pk_buff_push(spb, ETH_HEADER_TOT_SIZE); if (buf == NULL) { log_dbg("Failed to allocate header."); ipcp_spb_release(spb); + FETCH_ADD_RELAXED(ð_data.stat.n_buf_f, 1); continue; } - frame_len = recv(eth_data.s_fd, buf, - ETH_MTU + ETH_HEADER_TOT_SIZE, 0); + slen = sizeof(src); + /* MSG_DONTWAIT: RD_THR>1 race-loser bails with EAGAIN. */ + frame_len = recvfrom(eth_data.s_fd, buf, + ETH_MTU + ETH_HEADER_TOT_SIZE, + MSG_DONTWAIT, + (struct sockaddr *) &src, &slen); #endif - if (frame_len <= 0) { - log_dbg("Failed to receive frame."); + if (frame_len == 0) { + ipcp_spb_release(spb); + continue; /* Spurious */ + } + + if (frame_len < 0) { ipcp_spb_release(spb); + + if (errno == EAGAIN || errno == EWOULDBLOCK) + continue; + + log_dbg("Failed to rcv frame: %s.", strerror(errno)); + FETCH_ADD_RELAXED(ð_data.stat.n_rcv_f, 1); continue; } #endif +#if defined(HAVE_NETMAP) + eth_len = hdr.len; +#elif defined(HAVE_BPF) + eth_len = ((struct bpf_hdr *) buf)->bh_caplen; +#else + eth_len = (size_t) frame_len; +#endif + /* Defense in depth: reject before parsing dereferences. */ + if (eth_len < ETH_HEADER_TOT_SIZE) + goto fail_frame; + +#if defined(HAVE_RAW_SOCKETS) + /* Drop our own egress. */ + if (src.sll_pkttype == PACKET_OUTGOING) + goto fail_frame; +#endif + #if defined(HAVE_BPF) && !defined(HAVE_NETMAP) e_frame = (struct eth_frame *) (buf + ((struct bpf_hdr *) buf)->bh_hdrlen); @@ -916,6 +1247,8 @@ static void * eth_ipcp_packet_reader(void * o) e_frame->dst_hwaddr, MAC_SIZE) && memcmp(br_addr, e_frame->dst_hwaddr, MAC_SIZE)) { + FETCH_ADD_RELAXED(ð_data.stat.n_bad_id, 1); + goto fail_frame; } #endif length = ntohs(e_frame->length); @@ -923,17 +1256,41 @@ static void * eth_ipcp_packet_reader(void * o) if (e_frame->ethertype != eth_data.ethertype) goto fail_frame; + if (length > ETH_MTU) + goto fail_frame; + deid = ntohs(e_frame->eid); - if (deid == MGMT_EID) { #elif defined (BUILD_ETH_LLC) if (length > 0x05FF) /* DIX */ goto fail_frame; + if (length < LLC_HEADER_SIZE || length > ETH_MTU) + goto fail_frame; + length -= LLC_HEADER_SIZE; dsap = reverse_bits(e_frame->dsap); ssap = reverse_bits(e_frame->ssap); +#endif + + if (eth_len < ETH_HEADER_TOT_SIZE + (size_t) length) + goto fail_frame; +#if defined(BUILD_ETH_DIX) + mem_hash(HASH_CRC8, &hcs, + (uint8_t *) &e_frame->eid, + DIX_EID_SIZE + DIX_LENGTH_SIZE); +#elif defined(BUILD_ETH_LLC) + mem_hash(HASH_CRC8, &hcs, + (uint8_t *) &e_frame->dsap, + LLC_FIELDS_SIZE); +#endif + if (hcs != e_frame->hcs) + goto fail_frame; + +#if defined(BUILD_ETH_DIX) + if (deid == MGMT_EID) { +#elif defined (BUILD_ETH_LLC) if (ssap == MGMT_SAP && dsap == MGMT_SAP) { #endif ipcp_spb_release(spb); /* No need for the N+1 buffer. */ @@ -958,6 +1315,8 @@ static void * eth_ipcp_packet_reader(void * o) list_add(&frame->next, ð_data.mgmt_frames); pthread_cond_signal(ð_data.mgmt_cond); pthread_mutex_unlock(ð_data.mgmt_lock); + FETCH_ADD_RELAXED(ð_data.stat.n_rcv, 1); + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_rcv, 1); } else { pthread_rwlock_rdlock(ð_data.flows_lock); @@ -968,6 +1327,7 @@ static void * eth_ipcp_packet_reader(void * o) #endif if (fd < 0) { pthread_rwlock_unlock(ð_data.flows_lock); + FETCH_ADD_RELAXED(ð_data.stat.n_bad_id, 1); goto fail_frame; } @@ -976,13 +1336,18 @@ static void * eth_ipcp_packet_reader(void * o) || memcmp(eth_data.fd_to_ef[fd].r_addr, e_frame->src_hwaddr, MAC_SIZE)) { pthread_rwlock_unlock(ð_data.flows_lock); + FETCH_ADD_RELAXED(ð_data.stat.n_bad_id, 1); goto fail_frame; } #endif + FETCH_ADD_RELAXED(ð_data.fd_to_ef[fd].stat.p_rcv, 1); + FETCH_ADD_RELAXED(ð_data.fd_to_ef[fd].stat.b_rcv, + length); + FETCH_ADD_RELAXED(ð_data.stat.n_rcv, 1); pthread_rwlock_unlock(ð_data.flows_lock); #ifndef HAVE_NETMAP - ssm_pk_buff_head_release(spb, ETH_HEADER_TOT_SIZE); + ssm_pk_buff_pop(spb, ETH_HEADER_TOT_SIZE); ssm_pk_buff_truncate(spb, length); #else if (ipcp_spb_reserve(&spb, length)) @@ -991,8 +1356,13 @@ static void * eth_ipcp_packet_reader(void * o) buf = ssm_pk_buff_head(spb); memcpy(buf, &e_frame->payload, length); #endif - if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0) + if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0) { ipcp_spb_release(spb); + FETCH_ADD_RELAXED( + ð_data.fd_to_ef[fd].stat.p_dlv_f, + 1); + FETCH_ADD_RELAXED(ð_data.stat.n_dlv_f, 1); + } continue; fail_frame: @@ -1048,10 +1418,11 @@ static void * eth_ipcp_packet_writer(void * o) len = ssm_pk_buff_len(spb); - if (ssm_pk_buff_head_alloc(spb, ETH_HEADER_TOT_SIZE) + if (ssm_pk_buff_push(spb, ETH_HEADER_TOT_SIZE) == NULL) { log_dbg("Failed to allocate header."); ipcp_spb_release(spb); + FETCH_ADD_RELAXED(ð_data.stat.n_buf_f, 1); continue; } @@ -1075,8 +1446,20 @@ static void * eth_ipcp_packet_writer(void * o) dsap, ssap, #endif ssm_pk_buff_head(spb), - len)) + len)) { log_dbg("Failed to send frame."); + FETCH_ADD_RELAXED( + ð_data.fd_to_ef[fd].stat.p_snd_f, + 1); + FETCH_ADD_RELAXED(ð_data.stat.n_snd_f, 1); + } else { + FETCH_ADD_RELAXED( + ð_data.fd_to_ef[fd].stat.p_snd, + 1); + FETCH_ADD_RELAXED( + ð_data.fd_to_ef[fd].stat.b_snd, + len); + } ipcp_spb_release(spb); } } @@ -1424,12 +1807,14 @@ static int eth_init_bpf(struct ifreq * ifr) return -1; } #elif defined(HAVE_RAW_SOCKETS) +#define SOCKOPT() static int eth_init_raw_socket(struct ifreq * ifr) { int idx; - int flags; + int sndbuf; + int rcvbuf; #if defined(IPCP_ETH_QDISC_BYPASS) - int qdisc_bypass = 1; + int qdisc_bypass = 1; #endif /* ENABLE_QDISC_BYPASS */ idx = if_nametoindex(ifr->ifr_name); @@ -1437,6 +1822,7 @@ static int eth_init_raw_socket(struct ifreq * ifr) log_err("Failed to retrieve interface index."); return -1; } + memset(&(eth_data.device), 0, sizeof(eth_data.device)); eth_data.device.sll_ifindex = idx; eth_data.device.sll_family = AF_PACKET; @@ -1453,17 +1839,6 @@ static int eth_init_raw_socket(struct ifreq * ifr) goto fail_socket; } - flags = fcntl(eth_data.s_fd, F_GETFL, 0); - if (flags < 0) { - log_err("Failed to get flags."); - goto fail_device; - } - - if (fcntl(eth_data.s_fd, F_SETFL, flags | O_NONBLOCK)) { - log_err("Failed to set socket non-blocking."); - goto fail_device; - } - #if defined(IPCP_ETH_QDISC_BYPASS) if (setsockopt(eth_data.s_fd, SOL_PACKET, PACKET_QDISC_BYPASS, &qdisc_bypass, sizeof(qdisc_bypass))) { @@ -1471,6 +1846,18 @@ static int eth_init_raw_socket(struct ifreq * ifr) } #endif + sndbuf = IPCP_ETH_SNDBUF; + if (sndbuf > 0 && setsockopt(eth_data.s_fd, SOL_SOCKET, SO_SNDBUF, + &sndbuf, sizeof(sndbuf))) { + log_info("Failed to set SO_SNDBUF to %d.", sndbuf); + } + + rcvbuf = IPCP_ETH_RCVBUF; + if (rcvbuf > 0 && setsockopt(eth_data.s_fd, SOL_SOCKET, SO_RCVBUF, + &rcvbuf, sizeof(rcvbuf))) { + log_info("Failed to set SO_RCVBUF to %d.", rcvbuf); + } + if (bind(eth_data.s_fd, (struct sockaddr *) ð_data.device, sizeof(eth_data.device)) < 0) { log_err("Failed to bind socket to interface."); @@ -1543,6 +1930,12 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf) return -1; } #endif /* HAVE_NETMAP */ +#ifdef IPCP_ETH_FLOW_STATS + if (rib_reg(ETH_RIB_PATH, ð_r_ops)) { + log_err("Failed to register RIB."); + goto fail_rib_reg; + } +#endif #if defined(__linux__) if (pthread_create(ð_data.if_monitor, NULL, eth_ipcp_if_monitor, NULL)) { @@ -1606,6 +1999,10 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf) #if defined(__linux__) fail_monitor: #endif +#ifdef IPCP_ETH_FLOW_STATS + rib_unreg(ETH_RIB_PATH); + fail_rib_reg: +#endif #if defined(HAVE_NETMAP) nm_close(eth_data.nmd); #elif defined(HAVE_BPF) @@ -1637,12 +2034,14 @@ static int eth_ipcp_unreg(const uint8_t * hash) static int eth_ipcp_query(const uint8_t * hash) { uint8_t r_addr[MAC_SIZE]; - struct timespec timeout = TIMESPEC_INIT_MS(NAME_QUERY_TIMEO); + struct timespec timeout; struct dir_query * query; int ret; + int attempt; uint8_t * buf; struct mgmt_msg * msg; size_t len; + long per_ms; if (shim_data_dir_has(eth_data.shim_data, hash)) return 0; @@ -1662,32 +2061,46 @@ static int eth_ipcp_query(const uint8_t * hash) memset(r_addr, 0xff, MAC_SIZE); - query = shim_data_dir_query_create(eth_data.shim_data, hash); - if (query == NULL) { - free(buf); - return -1; - } + per_ms = NAME_QUERY_TIMEO / (NAME_QUERY_RETRIES + 1); + + ret = -1; + for (attempt = 0; attempt <= NAME_QUERY_RETRIES; ++attempt) { + query = shim_data_dir_query_create(eth_data.shim_data, hash); + if (query == NULL) { + ret = -1; + break; + } - if (eth_ipcp_send_frame(r_addr, + if (eth_ipcp_send_frame(r_addr, #if defined(BUILD_ETH_DIX) - MGMT_EID, + MGMT_EID, #elif defined(BUILD_ETH_LLC) - reverse_bits(MGMT_SAP), - reverse_bits(MGMT_SAP), + reverse_bits(MGMT_SAP), + reverse_bits(MGMT_SAP), #endif - buf, len)) { - log_err("Failed to send management frame."); + buf, len)) { + log_err("Failed to send management frame."); + shim_data_dir_query_destroy(eth_data.shim_data, + query); + ret = -1; + break; + } + + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_snd, 1); + + timeout.tv_sec = per_ms / 1000; + timeout.tv_nsec = (per_ms % 1000) * 1000000L; + + ret = shim_data_dir_query_wait(query, &timeout); + shim_data_dir_query_destroy(eth_data.shim_data, query); - free(buf); - return -1; + + if (ret != -ETIMEDOUT) + break; } free(buf); - ret = shim_data_dir_query_wait(query, &timeout); - - shim_data_dir_query_destroy(eth_data.shim_data, query); - return ret; } @@ -1748,6 +2161,14 @@ static int eth_ipcp_flow_alloc(int fd, } fset_add(eth_data.np1_flows, fd); +#ifdef IPCP_ETH_FLOW_STATS + pthread_rwlock_wrlock(ð_data.flows_lock); + memset(ð_data.fd_to_ef[fd].stat, 0, + sizeof(eth_data.fd_to_ef[fd].stat)); + eth_data.fd_to_ef[fd].stat.stamp = time(NULL); + FETCH_ADD_RELAXED(ð_data.stat.n_flows, 1); + pthread_rwlock_unlock(ð_data.flows_lock); +#endif #if defined(BUILD_ETH_LLC) log_dbg("Assigned SAP %d for fd %d.", ssap, fd); #endif @@ -1808,6 +2229,14 @@ static int eth_ipcp_flow_alloc_resp(int fd, } fset_add(eth_data.np1_flows, fd); +#ifdef IPCP_ETH_FLOW_STATS + pthread_rwlock_wrlock(ð_data.flows_lock); + memset(ð_data.fd_to_ef[fd].stat, 0, + sizeof(eth_data.fd_to_ef[fd].stat)); + eth_data.fd_to_ef[fd].stat.stamp = time(NULL); + FETCH_ADD_RELAXED(ð_data.stat.n_flows, 1); + pthread_rwlock_unlock(ð_data.flows_lock); +#endif #if defined(BUILD_ETH_LLC) log_dbg("Assigned SAP %d for fd %d.", ssap, fd); #endif @@ -1836,6 +2265,12 @@ static int eth_ipcp_flow_dealloc(int fd) #endif memset(ð_data.fd_to_ef[fd].r_addr, 0, MAC_SIZE); +#ifdef IPCP_ETH_FLOW_STATS + memset(ð_data.fd_to_ef[fd].stat, 0, + sizeof(eth_data.fd_to_ef[fd].stat)); + FETCH_SUB_RELAXED(ð_data.stat.n_flows, 1); +#endif + pthread_rwlock_unlock(ð_data.flows_lock); ipcp_flow_dealloc(fd); @@ -1902,6 +2337,9 @@ int main(int argc, #ifdef __linux__ pthread_join(eth_data.if_monitor, NULL); #endif +#ifdef IPCP_ETH_FLOW_STATS + rib_unreg(ETH_RIB_PATH); +#endif } ipcp_stop(); |
