diff options
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
| -rw-r--r-- | src/ipcpd/unicast/fa.c | 770 |
1 files changed, 557 insertions, 213 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index b2eed7e5..ddf78e22 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2020 + * Ouroboros - Copyright (C) 2016 - 2026 * * Flow allocator of the IPC Process * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as @@ -31,64 +31,91 @@ #define FA "flow-allocator" #define OUROBOROS_PREFIX FA +#include <ouroboros/endian.h> #include <ouroboros/logs.h> #include <ouroboros/fqueue.h> #include <ouroboros/errno.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/rib.h> +#include <ouroboros/random.h> +#include <ouroboros/pthread.h> +#include "addr-auth.h" #include "dir.h" #include "fa.h" #include "psched.h" #include "ipcp.h" #include "dt.h" #include "ca.h" +#include "np1.h" -#include <pthread.h> +#include <inttypes.h> #include <stdlib.h> #include <string.h> -#define TIMEOUT 10000 /* nanoseconds */ +#if defined (IPCP_FLOW_STATS) && !defined(CLOCK_REALTIME_COARSE) +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +#define TIMEOUT 10 * MILLION /* nanoseconds */ #define FLOW_REQ 0 #define FLOW_REPLY 1 #define FLOW_UPDATE 2 #define MSGBUFSZ 2048 +#define STAT_FILE_LEN 0 + struct fa_msg { uint64_t s_addr; - uint32_t r_eid; - uint32_t s_eid; - uint8_t code; - int8_t response; - uint16_t ece; - /* QoS parameters from spec, aligned */ - uint8_t availability; - uint8_t in_order; - uint32_t delay; + uint64_t r_eid; + uint64_t s_eid; uint64_t bandwidth; + int32_t response; + uint32_t delay; uint32_t loss; uint32_t ber; uint32_t max_gap; - uint16_t cypher_s; + uint32_t timeout; + uint16_t ece; + uint8_t code; + uint8_t availability; + uint8_t in_order; } __attribute__((packed)); struct cmd { struct list_head next; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; }; struct fa_flow { - int r_eid; /* remote endpoint id */ - uint64_t r_addr; /* remote address */ - void * ctx; /* congestion avoidance context */ +#ifdef IPCP_FLOW_STATS + time_t stamp; /* Flow creation */ + size_t p_snd; /* Packets sent */ + size_t p_snd_f; /* Packets sent fail */ + size_t b_snd; /* Bytes sent */ + size_t b_snd_f; /* Bytes sent fail */ + size_t p_rcv; /* Packets received */ + size_t p_rcv_f; /* Packets received fail */ + size_t b_rcv; /* Bytes received */ + size_t b_rcv_f; /* Bytes received fail */ + size_t u_snd; /* Flow updates sent */ + size_t u_rcv; /* Flow updates received */ +#endif + uint64_t s_eid; /* Local endpoint id */ + uint64_t r_eid; /* Remote endpoint id */ + uint64_t r_addr; /* Remote address */ + void * ctx; /* Congestion avoidance context */ }; struct { pthread_rwlock_t flows_lock; struct fa_flow flows[PROG_MAX_FLOWS]; - - int fd; +#ifdef IPCP_FLOW_STATS + size_t n_flows; +#endif + uint32_t eid; struct list_head cmds; pthread_cond_t cond; @@ -98,13 +125,217 @@ struct { struct psched * psched; } fa; +static int fa_rib_read(const char * path, + char * buf, + size_t len) +{ +#ifdef IPCP_FLOW_STATS + struct fa_flow * flow; + int fd; + char r_addrstr[21]; + char s_eidstr[21]; + char r_eidstr[21]; + char tmstr[RIB_TM_STRLEN]; + char castr[1024]; + char * entry; + struct tm * tm; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); + + if (fd < 0 || fd >= PROG_MAX_FLOWS) + return -1; + + if (len < 1536) + return 0; + + flow = &fa.flows[fd]; + + buf[0] = '\0'; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (flow->stamp ==0) { + pthread_rwlock_unlock(&fa.flows_lock); + return 0; + } + + sprintf(r_addrstr, "%" PRIu64, flow->r_addr); + sprintf(s_eidstr, "%" PRIu64, flow->s_eid); + sprintf(r_eidstr, "%" PRIu64, flow->r_eid); + + tm = gmtime(&flow->stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + + ca_print_stats(flow->ctx, castr, 1024); + + sprintf(buf, + "Flow established at: %20s\n" + "Remote address: %20s\n" + "Local endpoint ID: %20s\n" + "Remote endpoint ID: %20s\n" + "Sent (packets): %20zu\n" + "Sent (bytes): %20zu\n" + "Send failed (packets): %20zu\n" + "Send failed (bytes): %20zu\n" + "Received (packets): %20zu\n" + "Received (bytes): %20zu\n" + "Receive failed (packets): %20zu\n" + "Receive failed (bytes): %20zu\n" + "Sent flow updates (packets): %20zu\n" + "Received flow updates (packets): %20zu\n" + "%s", + tmstr, r_addrstr, + s_eidstr, r_eidstr, + flow->p_snd, flow->b_snd, + flow->p_snd_f, flow->b_snd_f, + flow->p_rcv, flow->b_rcv, + flow->b_rcv_f, flow->b_rcv_f, + flow->u_snd, flow->u_rcv, + castr); + + pthread_rwlock_unlock(&fa.flows_lock); + + return strlen(buf); +#else + (void) path; + (void) buf; + (void) len; + return 0; +#endif +} + +static int fa_rib_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS + char entry[RIB_PATH_LEN + 1]; + size_t i; + int idx = 0; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (fa.n_flows < 1) { + *buf = NULL; + goto no_flows; + } + + *buf = malloc(sizeof(**buf) * fa.n_flows); + if (*buf == NULL) + goto fail_entries; + + for (i = 0; i < PROG_MAX_FLOWS; ++i) { + struct fa_flow * flow; + + flow = &fa.flows[i]; + if (flow->stamp == 0) + continue; + + sprintf(entry, "%zu", i); + + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) + goto fail_entry; + + strcpy((*buf)[idx++], entry); + } + + assert((size_t) idx == fa.n_flows); + no_flows: + pthread_rwlock_unlock(&fa.flows_lock); + + return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + fail_entries: + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; +#else + (void) buf; + return 0; +#endif +} + +static int fa_rib_getattr(const char * path, + struct rib_attr * attr) +{ +#ifdef IPCP_FLOW_STATS + int fd; + char * entry; + struct fa_flow * flow; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); + + flow = &fa.flows[fd]; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (flow->stamp != 0) { + attr->size = 1536; + attr->mtime = flow->stamp; + } else { + attr->size = 0; + attr->mtime = 0; + } + + pthread_rwlock_unlock(&fa.flows_lock); +#else + (void) path; + (void) attr; +#endif + return 0; +} + +static struct rib_ops r_ops = { + .read = fa_rib_read, + .readdir = fa_rib_readdir, + .getattr = fa_rib_getattr +}; + +static int eid_to_fd(uint64_t eid) +{ + struct fa_flow * flow; + int fd; + + fd = eid & 0xFFFFFFFF; + + if (fd < 0 || fd >= PROG_MAX_FLOWS) + return -1; + + flow = &fa.flows[fd]; + + if (flow->s_eid == eid) + return fd; + + return -1; +} + +static uint64_t gen_eid(int fd) +{ + uint32_t rnd; + + if (random_buffer(&rnd, sizeof(rnd)) < 0) + return fa.eid; /* INVALID */ + + fd &= 0xFFFFFFFF; + + return ((uint64_t) rnd << 32) + fd; +} + static void packet_handler(int fd, qoscube_t qc, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct fa_flow * flow; uint64_t r_addr; - uint32_t r_eid; + uint64_t r_eid; ca_wnd_t wnd; size_t len; @@ -112,8 +343,12 @@ static void packet_handler(int fd, pthread_rwlock_wrlock(&fa.flows_lock); - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = ssm_pk_buff_len(spb); +#ifdef IPCP_FLOW_STATS + ++flow->p_snd; + flow->b_snd += len; +#endif wnd = ca_ctx_update_snd(flow->ctx, len); r_addr = flow->r_addr; @@ -123,24 +358,41 @@ static void packet_handler(int fd, ca_wnd_wait(wnd); - if (dt_write_packet(r_addr, qc, r_eid, sdb)) { - ipcp_sdb_release(sdb); - log_warn("Failed to forward packet."); + if (dt_write_packet(r_addr, qc, r_eid, spb)) { + ipcp_spb_release(spb); + log_dbg("Failed to forward packet."); +#ifdef IPCP_FLOW_STATS + pthread_rwlock_wrlock(&fa.flows_lock); + ++flow->p_snd_f; + flow->b_snd_f += len; + pthread_rwlock_unlock(&fa.flows_lock); +#endif return; } } static int fa_flow_init(struct fa_flow * flow) { +#ifdef IPCP_FLOW_STATS + struct timespec now; +#endif memset(flow, 0, sizeof(*flow)); flow->r_eid = -1; + flow->s_eid = -1; flow->r_addr = INVALID_ADDR; flow->ctx = ca_ctx_create(); if (flow->ctx == NULL) return -1; +#ifdef IPCP_FLOW_STATS + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + flow->stamp = now.tv_sec; + + ++fa.n_flows; +#endif return 0; } @@ -151,11 +403,16 @@ static void fa_flow_fini(struct fa_flow * flow) memset(flow, 0, sizeof(*flow)); flow->r_eid = -1; + flow->s_eid = -1; flow->r_addr = INVALID_ADDR; + +#ifdef IPCP_FLOW_STATS + --fa.n_flows; +#endif } static void fa_post_packet(void * comp, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct cmd * cmd; @@ -166,11 +423,11 @@ static void fa_post_packet(void * comp, cmd = malloc(sizeof(*cmd)); if (cmd == NULL) { log_err("Command failed. Out of memory."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return; } - cmd->sdb = sdb; + cmd->spb = spb; pthread_mutex_lock(&fa.mtx); @@ -181,156 +438,199 @@ static void fa_post_packet(void * comp, pthread_mutex_unlock(&fa.mtx); } -static void * fa_handle_packet(void * o) +static size_t fa_wait_for_fa_msg(struct fa_msg * msg) { - struct timespec ts = {0, TIMEOUT * 1000}; + struct cmd * cmd; + size_t len; - (void) o; + pthread_mutex_lock(&fa.mtx); - while (true) { - struct timespec abstime; - int fd; - uint8_t buf[MSGBUFSZ]; - struct fa_msg * msg; - qosspec_t qs; - struct cmd * cmd; - size_t len; - size_t msg_len; - struct fa_flow * flow; + pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); - pthread_mutex_lock(&fa.mtx); + while (list_is_empty(&fa.cmds)) + pthread_cond_wait(&fa.cond, &fa.mtx); - pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, - &fa.mtx); + cmd = list_last_entry(&fa.cmds, struct cmd, next); + list_del(&cmd->next); - while (list_is_empty(&fa.cmds)) - pthread_cond_wait(&fa.cond, &fa.mtx); + pthread_cleanup_pop(true); - cmd = list_last_entry(&fa.cmds, struct cmd, next); - list_del(&cmd->next); + len = ssm_pk_buff_len(cmd->spb); + if (len > MSGBUFSZ || len < sizeof(*msg)) { + log_warn("Invalid flow allocation message (len: %zd).", len); + free(cmd); + return 0; /* No valid message */ + } - pthread_cleanup_pop(true); + memcpy(msg, ssm_pk_buff_head(cmd->spb), len); - len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + ipcp_spb_release(cmd->spb); - if (len > MSGBUFSZ) { - log_err("Message over buffer size."); - free(cmd); - continue; - } + free(cmd); - msg = (struct fa_msg *) buf; + return len; +} - /* Depending on the message call the function in ipcp-dev.h */ +static int fa_handle_flow_req(struct fa_msg * msg, + size_t len) +{ + size_t msg_len; + int fd; + qosspec_t qs; + struct fa_flow * flow; + uint8_t * dst; + buffer_t data; /* Piggbacked data on flow alloc request. */ - memcpy(msg, shm_du_buff_head(cmd->sdb), len); + msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + if (len < msg_len) { + log_err("Invalid flow allocation request"); + return -EPERM; + } - ipcp_sdb_release(cmd->sdb); + dst = (uint8_t *)(msg + 1); + data.data = (uint8_t *) msg + msg_len; + data.len = len - msg_len; - free(cmd); + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + qs.timeout = ntoh32(msg->timeout); - switch (msg->code) { - case FLOW_REQ: - msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data); + if (fd < 0) + return fd; - assert(len >= msg_len); + flow = &fa.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); + pthread_rwlock_wrlock(&fa.flows_lock); - pthread_mutex_lock(&ipcpi.alloc_lock); + fa_flow_init(flow); - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } + flow->s_eid = gen_eid(fd); + flow->r_eid = ntoh64(msg->s_eid); + flow->r_addr = ntoh64(msg->s_addr); - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Won't allocate over non-operational" - "IPCP."); - continue; - } + pthread_rwlock_unlock(&fa.flows_lock); - assert(ipcpi.alloc_id == -1); + return fd; +} - qs.delay = ntoh32(msg->delay); - qs.bandwidth = ntoh64(msg->bandwidth); - qs.availability = msg->availability; - qs.loss = ntoh32(msg->loss); - qs.ber = ntoh32(msg->ber); - qs.in_order = msg->in_order; - qs.max_gap = ntoh32(msg->max_gap); - qs.cypher_s = ntoh16(msg->cypher_s); +static int fa_handle_flow_reply(struct fa_msg * msg, + size_t len) +{ + int fd; + struct fa_flow * flow; + buffer_t data; /* Piggbacked data on flow alloc request. */ + time_t mpl = IPCP_UNICAST_MPL; + int response; - fd = ipcp_flow_req_arr((uint8_t *) (msg + 1), - ipcp_dir_hash_len(), - qs, - buf + msg_len, - len - msg_len); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_err("Failed to get fd for flow."); - continue; - } + assert(len >= sizeof(*msg)); - flow = &fa.flows[fd]; + data.data = (uint8_t *) msg + sizeof(*msg); + data.len = len - sizeof(*msg); - pthread_rwlock_wrlock(&fa.flows_lock); + pthread_rwlock_wrlock(&fa.flows_lock); - fa_flow_init(flow); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow reply for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); + return -ENOTALLOC; + } - flow->r_eid = ntoh32(msg->s_eid); - flow->r_addr = ntoh64(msg->s_addr); + flow = &fa.flows[fd]; - pthread_rwlock_unlock(&fa.flows_lock); + flow->r_eid = ntoh64(msg->s_eid); + response = ntoh32(msg->response); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); + log_dbg("IPCP received msg response %d for flow on fd %d.", + response, fd); - pthread_mutex_unlock(&ipcpi.alloc_lock); + if (response < 0) + fa_flow_fini(flow); + else + psched_add(fa.psched, fd); - break; - case FLOW_REPLY: - assert(len >= sizeof(*msg)); + pthread_rwlock_unlock(&fa.flows_lock); - flow = &fa.flows[ntoh32(msg->r_eid)]; + if (ipcp_flow_alloc_reply(fd, response, mpl, &data) < 0) { + log_err("Failed to reply for flow allocation on fd %d.", fd); + return -EIRMD; + } - pthread_rwlock_wrlock(&fa.flows_lock); + return 0; +} - flow->r_eid = ntoh32(msg->s_eid); +static int fa_handle_flow_update(struct fa_msg * msg, + size_t len) +{ + struct fa_flow * flow; + int fd; - if (msg->response < 0) - fa_flow_fini(flow); - else - psched_add(fa.psched, ntoh32(msg->r_eid)); + (void) len; + assert(len >= sizeof(*msg)); - pthread_rwlock_unlock(&fa.flows_lock); + pthread_rwlock_wrlock(&fa.flows_lock); - ipcp_flow_alloc_reply(ntoh32(msg->r_eid), - msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); - break; - case FLOW_UPDATE: - assert(len >= sizeof(*msg)); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow update for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); + return -EPERM; + } - flow = &fa.flows[ntoh32(msg->r_eid)]; + flow = &fa.flows[fd]; +#ifdef IPCP_FLOW_STATS + flow->u_rcv++; +#endif + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); - pthread_rwlock_wrlock(&fa.flows_lock); + pthread_rwlock_unlock(&fa.flows_lock); - ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); + return 0; +} - pthread_rwlock_unlock(&fa.flows_lock); +static void * fa_handle_packet(void * o) +{ + (void) o; + while (true) { + uint8_t buf[MSGBUFSZ]; + struct fa_msg * msg; + size_t len; + + msg = (struct fa_msg *) buf; + + len = fa_wait_for_fa_msg(msg); + if (len == 0) + continue; + + switch (msg->code) { + case FLOW_REQ: + if (fa_handle_flow_req(msg, len) < 0) + log_err("Error handling flow alloc request."); + break; + case FLOW_REPLY: + if (fa_handle_flow_reply(msg, len) < 0) + log_err("Error handling flow reply."); + break; + case FLOW_UPDATE: + if (fa_handle_flow_update(msg, len) < 0) + log_err("Error handling flow update."); break; default: - log_err("Got an unknown flow allocation message."); + log_warn("Recieved unknown flow allocation message."); break; } } + + return (void *) 0; } int fa_init(void) @@ -352,14 +652,23 @@ int fa_init(void) if (pthread_cond_init(&fa.cond, &cattr)) goto fail_cond; - pthread_condattr_destroy(&cattr); + if (rib_reg(FA, &r_ops)) + goto fail_rib_reg; + + fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); + if ((int) fa.eid < 0) + goto fail_dt_reg; list_head_init(&fa.cmds); - fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); + pthread_condattr_destroy(&cattr); return 0; + fail_dt_reg: + rib_unreg(FA); + fail_rib_reg: + pthread_cond_destroy(&fa.cond); fail_cond: pthread_condattr_destroy(&cattr); fail_cattr: @@ -367,24 +676,33 @@ int fa_init(void) fail_mtx: pthread_rwlock_destroy(&fa.flows_lock); fail_rwlock: - log_err("Failed to initialize flow allocator."); return -1; } void fa_fini(void) { + rib_unreg(FA); + pthread_cond_destroy(&fa.cond);; pthread_mutex_destroy(&fa.mtx); pthread_rwlock_destroy(&fa.flows_lock); } +static int np1_flow_read_fa(int fd, + struct ssm_pk_buff ** spb) +{ + return np1_flow_read(fd, spb, NP1_GET_POOL(fd)); +} + int fa_start(void) { +#ifndef BUILD_CONTAINER struct sched_param par; int pol; int max; +#endif - fa.psched = psched_create(packet_handler); + fa.psched = psched_create(packet_handler, np1_flow_read_fa); if (fa.psched == NULL) { log_err("Failed to start packet scheduler."); goto fail_psched; @@ -395,6 +713,7 @@ int fa_start(void) goto fail_thread; } +#ifndef BUILD_CONTAINER if (pthread_getschedparam(fa.worker, &pol, &par)) { log_err("Failed to get worker thread scheduling parameters."); goto fail_sched; @@ -412,16 +731,18 @@ int fa_start(void) log_err("Failed to set scheduler priority to maximum."); goto fail_sched; } +#endif return 0; +#ifndef BUILD_CONTAINER fail_sched: pthread_cancel(fa.worker); pthread_join(fa.worker, NULL); +#endif fail_thread: psched_destroy(fa.psched); fail_psched: - log_err("Failed to start flow allocator."); return -1; } @@ -433,18 +754,18 @@ void fa_stop(void) psched_destroy(fa.psched); } -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t dlen) +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct fa_msg * msg; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct fa_flow * flow; uint64_t addr; qoscube_t qc = QOS_CUBE_BE; size_t len; + uint64_t eid; addr = dir_query(dst); if (addr == 0) @@ -452,15 +773,17 @@ int fa_alloc(int fd, len = sizeof(*msg) + ipcp_dir_hash_len(); - if (ipcp_sdb_reserve(&sdb, len + dlen)) + if (ipcp_spb_reserve(&spb, len + data->len)) return -1; - msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg = (struct fa_msg *) ssm_pk_buff_head(spb); memset(msg, 0, sizeof(*msg)); + eid = gen_eid(fd); + msg->code = FLOW_REQ; - msg->s_eid = hton32(fd); - msg->s_addr = hton64(ipcpi.dt_addr); + msg->s_eid = hton64(eid); + msg->s_addr = hton64(addr_auth_address()); msg->delay = hton32(qs.delay); msg->bandwidth = hton64(qs.bandwidth); msg->availability = qs.availability; @@ -468,13 +791,15 @@ int fa_alloc(int fd, msg->ber = hton32(qs.ber); msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); - msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - memcpy(shm_du_buff_head(sdb) + len, data, dlen); + if (data->len > 0) + memcpy(ssm_pk_buff_head(spb) + len, data->data, data->len); - if (dt_write_packet(addr, qc, fa.fd, sdb)) { - ipcp_sdb_release(sdb); + if (dt_write_packet(addr, qc, fa.eid, spb)) { + log_err("Failed to send flow allocation request packet."); + ipcp_spb_release(spb); return -1; } @@ -484,81 +809,73 @@ int fa_alloc(int fd, fa_flow_init(flow); flow->r_addr = addr; + flow->s_eid = eid; pthread_rwlock_unlock(&fa.flows_lock); return 0; } -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len) +int fa_alloc_resp(int fd, + int response, + const buffer_t * data) { - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; struct fa_msg * msg; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct fa_flow * flow; qoscube_t qc = QOS_CUBE_BE; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - flow = &fa.flows[fd]; - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); + goto fail_alloc_resp; } - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; + if (ipcp_spb_reserve(&spb, sizeof(*msg) + data->len)) { + log_err("Failed to reserve spb (%zu bytes).", + sizeof(*msg) + data->len); + goto fail_reserve; } - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); + msg = (struct fa_msg *) ssm_pk_buff_head(spb); + memset(msg, 0, sizeof(*msg)); - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { - fa_flow_fini(flow); - return -1; - } + msg->code = FLOW_REPLY; + msg->response = hton32(response); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); - msg = (struct fa_msg *) shm_du_buff_head(sdb); - memset(msg, 0, sizeof(*msg)); + pthread_rwlock_rdlock(&fa.flows_lock); - pthread_rwlock_wrlock(&fa.flows_lock); + msg->r_eid = hton64(flow->r_eid); + msg->s_eid = hton64(flow->s_eid); - msg->code = FLOW_REPLY; - msg->r_eid = hton32(flow->r_eid); - msg->s_eid = hton32(fd); - msg->response = response; + pthread_rwlock_unlock(&fa.flows_lock); - memcpy(msg + 1, data, len); + if (dt_write_packet(flow->r_addr, qc, fa.eid, spb)) { + log_err("Failed to send flow allocation response packet."); + goto fail_packet; + } if (response < 0) { + pthread_rwlock_wrlock(&fa.flows_lock); fa_flow_fini(flow); - ipcp_sdb_release(sdb); + pthread_rwlock_unlock(&fa.flows_lock); } else { psched_add(fa.psched, fd); } - if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { - fa_flow_fini(flow); - pthread_rwlock_unlock(&fa.flows_lock); - ipcp_sdb_release(sdb); - return -1; - } + return 0; + fail_packet: + ipcp_spb_release(spb); + fail_reserve: + pthread_rwlock_wrlock(&fa.flows_lock); + fa_flow_fini(flow); pthread_rwlock_unlock(&fa.flows_lock); - - return 0; + fail_alloc_resp: + return -1; } int fa_dealloc(int fd) @@ -574,7 +891,7 @@ int fa_dealloc(int fd) pthread_rwlock_unlock(&fa.flows_lock); - flow_dealloc(fd); + ipcp_flow_dealloc(fd); return 0; } @@ -583,60 +900,87 @@ static int fa_update_remote(int fd, uint16_t ece) { struct fa_msg * msg; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; qoscube_t qc = QOS_CUBE_BE; struct fa_flow * flow; + uint64_t r_addr; - if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { + if (ipcp_spb_reserve(&spb, sizeof(*msg))) { + log_err("Failed to reserve spb (%zu bytes).", sizeof(*msg)); return -1; } - msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg = (struct fa_msg *) ssm_pk_buff_head(spb); memset(msg, 0, sizeof(*msg)); flow = &fa.flows[fd]; - pthread_rwlock_rdlock(&fa.flows_lock); + pthread_rwlock_wrlock(&fa.flows_lock); msg->code = FLOW_UPDATE; - msg->r_eid = hton32(flow->r_eid); + msg->r_eid = hton64(flow->r_eid); msg->ece = hton16(ece); - if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { - pthread_rwlock_unlock(&fa.flows_lock); - ipcp_sdb_release(sdb); - return -1; - } - + r_addr = flow->r_addr; +#ifdef IPCP_FLOW_STATS + flow->u_snd++; +#endif pthread_rwlock_unlock(&fa.flows_lock); + if (dt_write_packet(r_addr, qc, fa.eid, spb)) { + log_err("Failed to send flow update packet."); + ipcp_spb_release(spb); + return -1; + } + return 0; } -void fa_ecn_update(int eid, - uint8_t ecn, - size_t len) +void fa_np1_rcv(uint64_t eid, + uint8_t ecn, + struct ssm_pk_buff * spb) { struct fa_flow * flow; bool update; uint16_t ece; + int fd; + size_t len; - flow = &fa.flows[eid]; + len = ssm_pk_buff_len(spb); pthread_rwlock_wrlock(&fa.flows_lock); - if (flow->r_eid == -1) { + fd = eid_to_fd(eid); + if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); + log_dbg("Received packet for unknown EID %" PRIu64 ".", eid); + ipcp_spb_release(spb); return; } + flow = &fa.flows[fd]; + +#ifdef IPCP_FLOW_STATS + ++flow->p_rcv; + flow->b_rcv += len; +#endif update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece); pthread_rwlock_unlock(&fa.flows_lock); + if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0) { + log_dbg("Failed to write to flow %d.", fd); + ipcp_spb_release(spb); +#ifdef IPCP_FLOW_STATS + pthread_rwlock_wrlock(&fa.flows_lock); + ++flow->p_rcv_f; + flow->b_rcv_f += len; + pthread_rwlock_unlock(&fa.flows_lock); +#endif + } + if (update) fa_update_remote(eid, ece); - } |
