diff options
Diffstat (limited to 'src/lib/dev.c')
| -rw-r--r-- | src/lib/dev.c | 1258 |
1 files changed, 827 insertions, 431 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index b202a85c..7e9b7329 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -29,10 +29,13 @@ #include "config.h" #include "ssm.h" +#include <ouroboros/atomics.h> #include <ouroboros/bitmap.h> #include <ouroboros/cep.h> +#include <ouroboros/crc16.h> #include <ouroboros/crypt.h> #include <ouroboros/dev.h> +#include <ouroboros/endian.h> #include <ouroboros/errno.h> #include <ouroboros/fccntl.h> #include <ouroboros/flow.h> @@ -45,32 +48,33 @@ #include <ouroboros/np1_flow.h> #include <ouroboros/pthread.h> #include <ouroboros/random.h> +#ifdef PROC_FLOW_STATS +#include <ouroboros/rib.h> +#endif #include <ouroboros/serdes-irm.h> +#include <ouroboros/sockets.h> #include <ouroboros/ssm_flow_set.h> #include <ouroboros/ssm_pool.h> #include <ouroboros/ssm_rbuff.h> -#include <ouroboros/sockets.h> +#include <ouroboros/tw.h> #include <ouroboros/utils.h> -#ifdef PROC_FLOW_STATS -#include <ouroboros/rib.h> -#endif +#include <assert.h> #ifdef HAVE_LIBGCRYPT #include <gcrypt.h> #endif -#include <assert.h> -#include <stdlib.h> -#include <string.h> -#include <stdio.h> #include <stdarg.h> #include <stdbool.h> +#include <inttypes.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> #include <sys/types.h> #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -/* Partial read information. */ #define NO_PART -1 #define DONE_PART -2 @@ -78,19 +82,12 @@ #define SECMEMSZ 16384 #define MSGBUFSZ 2048 -/* map flow_ids to flow descriptors; track state of the flow */ struct fmap { int fd; - /* TODO: use actual flow state */ enum flow_state state; }; -#define frcti_to_flow(frcti) \ - ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti))) - struct flow { - struct list_head next; - struct flow_info info; struct ssm_rbuff * rx_rb; @@ -135,16 +132,10 @@ struct { struct flow * flows; struct fmap * id_to_fd; - struct list_head flow_list; pthread_mutex_t mtx; pthread_cond_t cond; - pthread_t tx; - pthread_t rx; - size_t n_frcti; - fset_t * frct_set; - pthread_rwlock_t lock; } proc; @@ -243,7 +234,7 @@ static int proc_announce(const struct proc_info * proc) return irm__irm_result_des(&msg); } -/* IRMd will clean up the mess if this fails */ +/* IRMd cleans up on failure. */ static void proc_exit(void) { uint8_t buf[SOCK_BUF_SIZE]; @@ -264,7 +255,7 @@ static int spb_encrypt(struct flow * flow, uint8_t * tail; if (flow->crypt == NULL) - return 0; /* No encryption */ + return 0; in.data = ssm_pk_buff_head(spb); in.len = ssm_pk_buff_len(spb); @@ -299,7 +290,7 @@ static int spb_decrypt(struct flow * flow, uint8_t * head; if (flow->crypt == NULL) - return 0; /* No decryption */ + return 0; in.data = ssm_pk_buff_head(spb); in.len = ssm_pk_buff_len(spb); @@ -318,130 +309,280 @@ static int spb_decrypt(struct flow * flow, return 0; } -#include "frct.c" - -void * flow_tx(void * o) +/* tw_move under proc.lock rdlock; gates teardown vs in-flight fires. */ +static void tw_move_safe(void) { - struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + pthread_rwlock_rdlock(&proc.lock); - (void) o; + pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); - while (true) { - timerwheel_move(); + tw_move(); - nanosleep(&tic, NULL); - } - - return (void *) 0; + pthread_cleanup_pop(1); } -static void flow_send_keepalive(struct flow * flow, - struct timespec now) +static int crc_add(struct ssm_pk_buff * spb, + size_t head_skip) { - struct ssm_pk_buff * spb; - ssize_t idx; - uint8_t * ptr; - - idx = ssm_pool_alloc(proc.pool, 0, &ptr, &spb); - if (idx < 0) - return; + uint8_t * head; + uint8_t * tail; - pthread_rwlock_wrlock(&proc.lock); + tail = ssm_pk_buff_push_tail(spb, CRCLEN); + if (tail == NULL) + return -ENOMEM; - flow->snd_act = now; + head = ssm_pk_buff_head(spb) + head_skip; - if (ssm_rbuff_write(flow->tx_rb, idx)) - ssm_pool_remove(proc.pool, idx); - else - ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + mem_hash(HASH_CRC32, tail, head, tail - head); - pthread_rwlock_unlock(&proc.lock); + return 0; } -/* Needs rdlock on proc. */ -static void _flow_keepalive(struct flow * flow) +static int crc_check(struct ssm_pk_buff * spb, + size_t head_skip) { - struct timespec now; - struct timespec s_act; - struct timespec r_act; - int flow_id; - time_t timeo; - uint32_t acl; + uint32_t crc; + uint8_t * head = ssm_pk_buff_head(spb) + head_skip; + uint8_t * tail = ssm_pk_buff_pop_tail(spb, CRCLEN); - s_act = flow->snd_act; - r_act = flow->rcv_act; + mem_hash(HASH_CRC32, &crc, head, tail - head); - flow_id = flow->info.id; - timeo = flow->info.qs.timeout; + return !(crc == *((uint32_t *) tail)); +} - acl = ssm_rbuff_get_acl(flow->rx_rb); - if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) - return; +/* FRCT included here so it can use proc and dev.c statics directly. */ +#include "frct.c" - clock_gettime(PTHREAD_COND_CLOCK, &now); +/* + * SACK / DATA carry trailer CRC32; HCS protects the headers on every + * FRCT packet. Decrypt before any check so plaintext is authoritative. + */ +static bool invalid_pkt(struct flow * flow, + struct ssm_pk_buff * spb) +{ + const struct frct_pci * pci; + uint16_t flags; + size_t pci_total; - if (ts_diff_ns(&now, &r_act) > (int64_t) timeo * MILLION) { - ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); - ssm_flow_set_notify(proc.fqset, flow_id, FLOW_PEER); - return; + if (spb == NULL || ssm_pk_buff_len(spb) == 0) + return true; + + if (spb_decrypt(flow, spb) < 0) + return true; + + if (flow->frcti == NULL) { + if (flow->info.qs.ber == 0 && crc_check(spb, 0) != 0) + return true; + return false; } - if (ts_diff_ns(&now, &s_act) > (int64_t) timeo * (MILLION >> 2)) { - pthread_rwlock_unlock(&proc.lock); + if (ssm_pk_buff_len(spb) < FRCT_PCILEN) + return true; - flow_send_keepalive(flow, now); + pci = (const struct frct_pci *) ssm_pk_buff_head(spb); + flags = ntoh16(pci->flags); - pthread_rwlock_rdlock(&proc.lock); + /* Untrusted flag read; mismatch on HCS will drop on corrupt. */ + if (flags & FRCT_DATA) + pci_total = frcti_data_hdr_len(flow->frcti); + else + pci_total = frcti_ctrl_hdr_len(flow->frcti); + + if (ssm_pk_buff_len(spb) < pci_total) + return true; + + if (frct_hcs_check(pci, flow->frcti) != 0) + return true; + + /* HCS valid: CRC32 on SACK; or on DATA if ber = 0. */ + if (flags & FRCT_SACK) { + if (crc_check(spb, pci_total) != 0) + return true; + + } else if ((flags & FRCT_DATA) && flow->info.qs.ber == 0) { + if (crc_check(spb, pci_total) != 0) + return true; } + + return false; } -static void handle_keepalives(void) +static bool deadline_passed(const struct timespec * abs) { - struct list_head * p; - struct list_head * h; + struct timespec now; - pthread_rwlock_rdlock(&proc.lock); + if (abs == NULL) + return false; - list_for_each_safe(p, h, &proc.flow_list) { - struct flow * flow; - flow = list_entry(p, struct flow, next); - _flow_keepalive(flow); - } + clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_unlock(&proc.lock); + return ts_diff_ns(&now, abs) >= 0; } -static void __cleanup_fqueue_destroy(void * fq) +/* Clamp the wait by min(dl, next tw expiry, now + TICTIME). */ +static void compute_wait_deadline(const struct timespec * dl, + struct timespec * out) { - fqueue_destroy((fqueue_t *) fq); + struct timespec now; + struct timespec cap; + struct timespec expiry; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &tic, &cap); + + tw_next_expiry(&expiry); + + *out = (ts_diff_ns(&cap, &expiry) < 0) ? expiry : cap; + if (dl != NULL && ts_diff_ns(out, dl) > 0) + *out = *dl; } -void * flow_rx(void * o) +/* + * proc.lock rdlock held across each iteration so flow_fini's wrlock + * waits for us to finish; FLOWDOWN already set means we exit promptly. + */ +static void flow_drain_rx_nb(struct flow * flow) { - struct timespec tic = TIMESPEC_INIT_NS(TICTIME); - int ret; - struct fqueue * fq; + ssize_t idx; + struct ssm_pk_buff * spb; + struct ssm_rbuff * rx_rb; + struct frcti * frcti; +#ifdef PROC_FLOW_STATS + struct timespec t_a; + struct timespec t_b; +#endif - (void) o; + if (flow->frcti != NULL) + STAT_BUMP(flow->frcti, drain_calls); - fq = fqueue_create(); + while (true) { + pthread_rwlock_rdlock(&proc.lock); - pthread_cleanup_push(__cleanup_fqueue_destroy, fq); + rx_rb = flow->rx_rb; + if (rx_rb == NULL) { + pthread_rwlock_unlock(&proc.lock); + return; + } - /* fevent will filter all FRCT packets for us */ - while ((ret = fevent(proc.frct_set, fq, &tic)) != 0) { - if (ret == -ETIMEDOUT) { - handle_keepalives(); + idx = ssm_rbuff_read(rx_rb); + if (idx < 0) { + pthread_rwlock_unlock(&proc.lock); + return; + } + + spb = ssm_pool_get(proc.pool, idx); + if (invalid_pkt(flow, spb)) { + ssm_pool_remove(proc.pool, idx); + pthread_rwlock_unlock(&proc.lock); + continue; + } + + frcti = flow->frcti; + if (frcti != NULL) { +#ifdef PROC_FLOW_STATS + clock_gettime(CLOCK_MONOTONIC, &t_a); + FRCTI_RCV(frcti, spb); + clock_gettime(CLOCK_MONOTONIC, &t_b); + STAT_ADD(frcti, rcv_proc_ns, + (size_t) ts_diff_ns(&t_b, &t_a)); +#else + FRCTI_RCV(frcti, spb); +#endif + } else { + ssm_pool_remove(proc.pool, idx); + } + + pthread_rwlock_unlock(&proc.lock); + + /* Per-packet so the delayed-ACK fires on time in a burst. */ +#ifdef PROC_FLOW_STATS + clock_gettime(CLOCK_MONOTONIC, &t_a); + tw_move_safe(); + clock_gettime(CLOCK_MONOTONIC, &t_b); + if (frcti != NULL) + STAT_ADD(frcti, tw_move_ns, + (size_t) ts_diff_ns(&t_b, &t_a)); +#else + tw_move_safe(); +#endif + } +} + +/* + * Wait clamped by caller deadline, next tw expiry, and TICTIME; + * a clamp-timeout means tw work is due, not caller-deadline. + */ +static int flow_rx_one(struct flow * flow, + struct timespec * abs) +{ + struct timespec wait_abs; + struct ssm_pk_buff * spb; + struct ssm_rbuff * rx_rb; + ssize_t idx; + + while (true) { + compute_wait_deadline(abs, &wait_abs); + + /* rdlock gates flow_fini; FLOWDOWN preempts the block. */ + pthread_rwlock_rdlock(&proc.lock); + + rx_rb = flow->rx_rb; + if (rx_rb == NULL) { + pthread_rwlock_unlock(&proc.lock); + return -EFLOWDOWN; + } + + idx = ssm_rbuff_read_b(rx_rb, &wait_abs); + if (idx == -ETIMEDOUT) { + pthread_rwlock_unlock(&proc.lock); + if (deadline_passed(abs)) + return -ETIMEDOUT; + tw_move_safe(); continue; } + if (idx < 0) { + pthread_rwlock_unlock(&proc.lock); + return idx; + } + + spb = ssm_pool_get(proc.pool, idx); + if (invalid_pkt(flow, spb)) { + ssm_pool_remove(proc.pool, idx); + pthread_rwlock_unlock(&proc.lock); + continue; + } + + if (flow->frcti != NULL) + FRCTI_RCV(flow->frcti, spb); + else + ssm_pool_remove(proc.pool, idx); - while (fqueue_next(fq) >= 0) - ; /* no need to act */ + pthread_rwlock_unlock(&proc.lock); + + tw_move_safe(); + return 0; } +} - pthread_cleanup_pop(true); +/* 0 = window open; -EAGAIN = !block and would block; else flow_rx_one rc. */ +static __inline__ int flow_wait_window(struct flow * flow, + size_t n, + bool block, + struct timespec * dl) +{ + int rc; - return (void *) 0; + while (true) { + flow_drain_rx_nb(flow); + if (FRCTI_IS_WINDOW_OPEN_N(flow->frcti, n)) + return 0; + if (!block) + return -EAGAIN; + rc = flow_rx_one(flow, dl); + if (rc < 0) + return rc; + } } static void flow_clear(int fd) @@ -451,36 +592,40 @@ static void flow_clear(int fd) proc.flows[fd].info.id = -1; } -static void __flow_fini(int fd) +/* + * Set ACL_FLOWDOWN on rx/tx so any in-flight blocking reads or writes + * wake up and drop their proc.lock rdlock. Must run BEFORE flow_fini's + * wrlock, else the wrlock blocks on those rdlock holders and the + * in-flight calls never see the FLOWDOWN signal. + */ +static void flow_quiesce(int fd) { - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + struct ssm_rbuff * rx_rb = proc.flows[fd].rx_rb; + struct ssm_rbuff * tx_rb = proc.flows[fd].tx_rb; - if (proc.flows[fd].frcti != NULL) { - proc.n_frcti--; - if (proc.n_frcti == 0) { - pthread_cancel(proc.tx); - pthread_join(proc.tx, NULL); - } + if (rx_rb != NULL) + ssm_rbuff_set_acl(rx_rb, ACL_FLOWDOWN); + if (tx_rb != NULL) + ssm_rbuff_set_acl(tx_rb, ACL_FLOWDOWN); +} - ssm_flow_set_del(proc.fqset, 0, proc.flows[fd].info.id); +static void do_flow_fini(int fd) +{ + assert(fd >= 0 && fd < PROC_MAX_FLOWS); + if (proc.flows[fd].frcti != NULL) frcti_destroy(proc.flows[fd].frcti); - } if (proc.flows[fd].info.id != -1) { flow_destroy(&proc.id_to_fd[proc.flows[fd].info.id]); bmp_release(proc.fds, fd); } - if (proc.flows[fd].rx_rb != NULL) { - ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN); + if (proc.flows[fd].rx_rb != NULL) ssm_rbuff_close(proc.flows[fd].rx_rb); - } - if (proc.flows[fd].tx_rb != NULL) { - ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN); + if (proc.flows[fd].tx_rb != NULL) ssm_rbuff_close(proc.flows[fd].tx_rb); - } if (proc.flows[fd].set != NULL) { ssm_flow_set_notify(proc.flows[fd].set, @@ -491,24 +636,40 @@ static void __flow_fini(int fd) crypt_destroy_ctx(proc.flows[fd].crypt); - list_del(&proc.flows[fd].next); - flow_clear(fd); } static void flow_fini(int fd) { + flow_quiesce(fd); + pthread_rwlock_wrlock(&proc.lock); - __flow_fini(fd); + do_flow_fini(fd); pthread_rwlock_unlock(&proc.lock); } #define IS_ENCRYPTED(crypt) ((crypt)->nid != NID_undef) -#define IS_ORDERED(flow) (flow.qs.in_order != 0) +#define IS_ORDERED(info) ((info)->qs.service != SVC_RAW) +#define IS_STREAM(info) ((info)->qs.service == SVC_STREAM) + +/* Raw MTU minus the wrapping (IV/Tag + optional CRC) dev.c adds. */ +static __inline__ size_t flow_user_mtu(const struct flow * flow, + size_t raw) +{ + size_t hdr; + + hdr = flow->headsz + flow->tailsz; + if (flow->info.qs.ber == 0 && flow->crypt == NULL) + hdr += CRCLEN; + + return raw > hdr ? raw - hdr : 0; +} + static int flow_init(struct flow_info * info, - struct crypt_sk * sk) + struct crypt_sk * sk, + time_t rtt_hint) { struct timespec now; struct flow * flow; @@ -550,7 +711,6 @@ static int flow_init(struct flow_info * info, flow->tailsz = 0; if (IS_ENCRYPTED(sk)) { - /* Set to lower value in tests, should we make configurable? */ sk->rot_bit = KEY_ROTATION_BIT; flow->crypt = crypt_create_ctx(sk); if (flow->crypt == NULL) @@ -561,22 +721,16 @@ static int flow_init(struct flow_info * info, assert(flow->frcti == NULL); - if (IS_ORDERED(flow->info)) { - flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl); + if (IS_ORDERED(&flow->info)) { + uint32_t frct_mtu = flow_user_mtu(flow, info->mtu); + + flow->frcti = frcti_create(fd, DELT_A, DELT_R, + info->mpl, rtt_hint, + info->qs, frct_mtu); if (flow->frcti == NULL) goto fail_frcti; - - if (ssm_flow_set_add(proc.fqset, 0, info->id)) - goto fail_flow_set_add; - - ++proc.n_frcti; - if (proc.n_frcti == 1 && - pthread_create(&proc.tx, NULL, flow_tx, NULL) < 0) - goto fail_tx_thread; } - list_add_tail(&flow->next, &proc.flow_list); - proc.id_to_fd[info->id].fd = fd; flow_set_state(&proc.id_to_fd[info->id], FLOW_ALLOCATED); @@ -585,10 +739,6 @@ static int flow_init(struct flow_info * info, return fd; - fail_tx_thread: - ssm_flow_set_del(proc.fqset, 0, info->id); - fail_flow_set_add: - frcti_destroy(flow->frcti); fail_frcti: crypt_destroy_ctx(flow->crypt); fail_crypt: @@ -716,13 +866,7 @@ static void init(int argc, goto fail_fqset; } - proc.frct_set = fset_create(); - if (proc.frct_set == NULL || proc.frct_set->idx != 0) { - fprintf(stderr, "FATAL: Could not create FRCT set.\n"); - goto fail_frct_set; - } - - if (timerwheel_init() < 0) { + if (tw_init() < 0) { fprintf(stderr, "FATAL: Could not initialize timerwheel.\n"); goto fail_timerwheel; } @@ -741,24 +885,13 @@ static void init(int argc, } } #endif - if (pthread_create(&proc.rx, NULL, flow_rx, NULL) < 0) { - fprintf(stderr, "FATAL: Could not start monitor thread.\n"); - goto fail_monitor; - } - - list_head_init(&proc.flow_list); - return; - fail_monitor: #if defined PROC_FLOW_STATS - rib_fini(); fail_rib_init: #endif - timerwheel_fini(); + tw_fini(); fail_timerwheel: - fset_destroy(proc.frct_set); - fail_frct_set: ssm_flow_set_close(proc.fqset); fail_fqset: pthread_rwlock_destroy(&proc.lock); @@ -789,8 +922,10 @@ static void fini(void) if (proc.fds == NULL) return; - pthread_cancel(proc.rx); - pthread_join(proc.rx, NULL); + /* Wake all in-flight readers/writers BEFORE wrlock acquire. */ + for (i = 0; i < PROC_MAX_FLOWS; ++i) + if (proc.flows[i].info.id != -1) + flow_quiesce(i); pthread_rwlock_wrlock(&proc.lock); @@ -798,10 +933,9 @@ static void fini(void) struct flow * flow = &proc.flows[i]; if (flow->info.id != -1) { ssize_t idx; - ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWDOWN); while ((idx = ssm_rbuff_read(flow->rx_rb)) >= 0) ssm_pool_remove(proc.pool, idx); - __flow_fini(i); + do_flow_fini(i); } } @@ -813,9 +947,7 @@ static void fini(void) #ifdef PROC_FLOW_STATS rib_fini(); #endif - timerwheel_fini(); - - fset_destroy(proc.frct_set); + tw_fini(); ssm_flow_set_close(proc.fqset); @@ -860,6 +992,10 @@ int flow_accept(qosspec_t * qs, if (qs != NULL) qs->ber = 1; #endif + /* STREAM cannot tolerate loss: drops create silent gaps. */ + if (qs != NULL && qs->service == SVC_STREAM && qs->loss != 0) + return -EINVAL; + memset(&flow, 0, sizeof(flow)); flow.n_pid = getpid(); @@ -878,7 +1014,8 @@ int flow_accept(qosspec_t * qs, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + /* No RTT in accept; rtt_hint=0 bootstraps from first ACK. */ + fd = flow_init(&flow, &crypt, 0); crypt_secure_clear(key, SYMMKEYSZ); @@ -899,11 +1036,16 @@ int flow_alloc(const char * dst, uint8_t key[SYMMKEYSZ]; int fd; int err; + struct timespec t0; + struct timespec t1; #ifdef QOS_DISABLE_CRC if (qs != NULL) qs->ber = 1; #endif + /* STREAM cannot tolerate loss: drops create silent gaps. */ + if (qs != NULL && qs->service == SVC_STREAM && qs->loss != 0) + return -EINVAL; memset(&flow, 0, sizeof(flow)); @@ -913,11 +1055,13 @@ int flow_alloc(const char * dst, if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) return -ENOMEM; + clock_gettime(PTHREAD_COND_CLOCK, &t0); + err = send_recv_msg(&msg); - if (err < 0) { - printf("send_recv_msg error %d\n", err); + if (err < 0) return err; - } + + clock_gettime(PTHREAD_COND_CLOCK, &t1); crypt.key = key; @@ -925,7 +1069,7 @@ int flow_alloc(const char * dst, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + fd = flow_init(&flow, &crypt, ts_diff_ns(&t1, &t0)); crypt_secure_clear(key, SYMMKEYSZ); @@ -964,7 +1108,7 @@ int flow_join(const char * dst, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + fd = flow_init(&flow, &crypt, 0); crypt_secure_clear(key, SYMMKEYSZ); @@ -983,10 +1127,10 @@ int flow_dealloc(int fd) struct flow * flow; int err; - if (fd < 0 || fd >= SYS_MAX_FLOWS ) + if (fd < 0 || fd >= PROC_MAX_FLOWS ) return -EINVAL; - memset(&info, 0, sizeof(flow)); + memset(&info, 0, sizeof(info)); flow = &proc.flows[fd]; @@ -1008,9 +1152,8 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&proc.lock); - timeo.tv_sec = frcti_dealloc(flow->frcti); - while (timeo.tv_sec < 0) { /* keep the flow active for rtx */ - ssize_t ret; + while (FRCTI_LINGERING(flow->frcti)) { + ssize_t ret; pthread_rwlock_unlock(&proc.lock); @@ -1018,12 +1161,12 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&proc.lock); - timeo.tv_sec = frcti_dealloc(flow->frcti); - - if (ret == -EFLOWDOWN && timeo.tv_sec < 0) - timeo.tv_sec = -timeo.tv_sec; + if (ret == -EFLOWDOWN) + break; } + timeo.tv_sec = FRCTI_DEALLOC(flow->frcti); + pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); ssm_rbuff_fini(flow->tx_rb); @@ -1033,15 +1176,18 @@ int flow_dealloc(int fd) info.id = flow->info.id; info.n_pid = getpid(); - if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) - return -ENOMEM; + if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) { + err = -ENOMEM; + goto out; + } err = send_recv_msg(&msg); if (err < 0) - return err; + goto out; err = irm__irm_result_des(&msg); + out: flow_fini(fd); return err; @@ -1055,12 +1201,12 @@ int ipcp_flow_dealloc(int fd) struct flow * flow; int err; - if (fd < 0 || fd >= SYS_MAX_FLOWS ) + if (fd < 0 || fd >= PROC_MAX_FLOWS ) return -EINVAL; flow = &proc.flows[fd]; - memset(&info, 0, sizeof(flow)); + memset(&info, 0, sizeof(info)); pthread_rwlock_rdlock(&proc.lock); @@ -1074,15 +1220,18 @@ int ipcp_flow_dealloc(int fd) pthread_rwlock_unlock(&proc.lock); - if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) - return -ENOMEM; + if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) { + err = -ENOMEM; + goto out; + } err = send_recv_msg(&msg); if (err < 0) - return err; + goto out; err = irm__irm_result_des(&msg); + out: flow_fini(fd); return err; @@ -1102,8 +1251,18 @@ int fccntl(int fd, uint32_t tx_acl; size_t * qlen; struct flow * flow; + uint16_t old_acc; + uint16_t new_acc; + size_t max; + size_t * maxp; + size_t rsz; + size_t * rszp; + time_t rto; + time_t * rtop; + int rc; + bool emit_eos = false; - if (fd < 0 || fd >= SYS_MAX_FLOWS) + if (fd < 0 || fd >= PROC_MAX_FLOWS) return -EBADF; flow = &proc.flows[fd]; @@ -1167,14 +1326,27 @@ int fccntl(int fd, qlen = va_arg(l, size_t *); *qlen = ssm_rbuff_queued(flow->tx_rb); break; + case FLOWGMTU: + maxp = va_arg(l, size_t *); + if (maxp == NULL) + goto einval; + *maxp = flow_user_mtu(flow, flow->info.mtu); + break; case FLOWSFLAGS: + old_acc = flow->oflags & FLOWFACCMODE; flow->oflags = va_arg(l, uint32_t); + new_acc = flow->oflags & FLOWFACCMODE; + + /* Defer EOS emit until after proc.lock is dropped: */ + /* frcti_stream_fin_snd may block on shm-pool/tx-rb. */ + if (new_acc == FLOWFRDONLY + && old_acc != FLOWFRDONLY + && FRCTI_IS_STREAM(flow->frcti)) + emit_eos = true; + rx_acl = ssm_rbuff_get_acl(flow->rx_rb); - tx_acl = ssm_rbuff_get_acl(flow->rx_rb); - /* - * Making our own flow write only means making the - * the other side of the flow read only. - */ + tx_acl = ssm_rbuff_get_acl(flow->tx_rb); + /* Our flow write-only -> peer's read-only. */ if (flow->oflags & FLOWFWRONLY) rx_acl |= ACL_RDONLY; if (flow->oflags & FLOWFRDWR) @@ -1218,6 +1390,59 @@ int fccntl(int fd, goto eperm; *cflags = frcti_getflags(flow->frcti); break; + case FRCTSMAXSDU: + max = va_arg(l, size_t); + if (flow->frcti == NULL) + goto eperm; + if (frcti_set_max_rcv_sdu(flow->frcti, max) < 0) + goto einval; + break; + case FRCTGMAXSDU: + maxp = va_arg(l, size_t *); + if (maxp == NULL) + goto einval; + if (flow->frcti == NULL) + goto eperm; + *maxp = frcti_get_max_rcv_sdu(flow->frcti); + break; + case FRCTSRRINGSZ: + rsz = va_arg(l, size_t); + if (flow->frcti == NULL) + goto eperm; + rc = frcti_set_rcv_ring_sz(flow->frcti, rsz); + if (rc < 0) { + pthread_rwlock_unlock(&proc.lock); + va_end(l); + return rc; + } + break; + case FRCTGRRINGSZ: + rszp = va_arg(l, size_t *); + if (rszp == NULL) + goto einval; + if (flow->frcti == NULL) + goto eperm; + *rszp = frcti_get_rcv_ring_sz(flow->frcti); + break; + case FRCTSRTOMIN: + if (flow->frcti == NULL) + goto eperm; + rto = va_arg(l, time_t); + rc = frcti_set_rto_min(flow->frcti, rto); + if (rc < 0) { + pthread_rwlock_unlock(&proc.lock); + va_end(l); + return rc; + } + break; + case FRCTGRTOMIN: + if (flow->frcti == NULL) + goto eperm; + rtop = va_arg(l, time_t *); + if (rtop == NULL) + goto einval; + *rtop = frcti_get_rto_min(flow->frcti); + break; default: pthread_rwlock_unlock(&proc.lock); va_end(l); @@ -1227,6 +1452,9 @@ int fccntl(int fd, pthread_rwlock_unlock(&proc.lock); + if (emit_eos) + frcti_stream_fin_snd(flow->frcti); + va_end(l); return 0; @@ -1241,86 +1469,189 @@ int fccntl(int fd, return -EPERM; } -static int chk_crc(struct ssm_pk_buff * spb) -{ - uint32_t crc; - uint8_t * head = ssm_pk_buff_head(spb); - uint8_t * tail = ssm_pk_buff_pop_tail(spb, CRCLEN); - - mem_hash(HASH_CRC32, &crc, head, tail - head); - - return !(crc == *((uint32_t *) tail)); -} - -static int add_crc(struct ssm_pk_buff * spb) -{ - uint8_t * head; - uint8_t * tail; - - tail = ssm_pk_buff_push_tail(spb, CRCLEN); - if (tail == NULL) - return -ENOMEM; - - head = ssm_pk_buff_head(spb); - mem_hash(HASH_CRC32, tail, head, tail - head); - - return 0; -} - static int flow_tx_spb(struct flow * flow, struct ssm_pk_buff * spb, + uint16_t flags, bool block, struct timespec * abstime) { struct timespec now; ssize_t idx; + size_t pci_total; int ret; clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_wrlock(&proc.lock); - flow->snd_act = now; - pthread_rwlock_unlock(&proc.lock); - idx = ssm_pk_buff_get_off(spb); - pthread_rwlock_rdlock(&proc.lock); - if (ssm_pk_buff_len(spb) > 0) { - if (frcti_snd(flow->frcti, spb) < 0) + if (FRCTI_SND(flow->frcti, spb, flags) < 0) goto enomem; - if (spb_encrypt(flow, spb) < 0) - goto enomem; + if (flow->info.qs.ber == 0) { + pci_total = flow->frcti != NULL + ? frcti_data_hdr_len(flow->frcti) : 0; + if (crc_add(spb, pci_total) != 0) + goto enomem; + } - if (flow->info.qs.ber == 0 && add_crc(spb) != 0) + if (spb_encrypt(flow, spb) < 0) goto enomem; } - pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); - if (!block) ret = ssm_rbuff_write(flow->tx_rb, idx); else ret = ssm_rbuff_write_b(flow->tx_rb, idx, abstime); - if (ret < 0) + if (ret < 0) { ssm_pool_remove(proc.pool, idx); - else - ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); - - pthread_cleanup_pop(true); + return ret; + } + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); return 0; -enomem: - pthread_rwlock_unlock(&proc.lock); + enomem: ssm_pool_remove(proc.pool, idx); return -ENOMEM; } +/* Per-fragment role for fragment i out of n; n == 1 yields SOLE. */ +static __inline__ uint16_t flow_frag_role(size_t i, size_t n) +{ + if (n == 1) + return FRCT_FR_SOLE; + if (i == 0) + return FRCT_FR_FIRST; + if (i + 1 == n) + return FRCT_FR_LAST; + + return FRCT_FR_MID; +} + +/* + * Stream-mode write: split buf into chunks of + * (frag_mtu - PCI - PCI_STREAM) bytes; each chunk goes through the + * normal tx path. frcti_snd injects the [start,end) extension and + * advances snd_byte_next under its wrlock. No FFGM/LFGM role bits. + */ +static ssize_t flow_write_stream(struct flow * flow, + const void * buf, + size_t count, + int oflags, + struct timespec * dl) +{ + const uint8_t * src = buf; + size_t payload; + size_t off = 0; + bool block = !(oflags & FLOWFWNOBLOCK); + + if (!FRCTI_IS_FRTX(flow->frcti)) + return -EMSGSIZE; + + payload = FRCTI_PAYLOAD_CAP(flow->frcti); + + while (off < count) { + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t idx; + size_t clen; + int ret; + + ret = flow_wait_window(flow, 1, block, dl); + if (ret < 0) + return off > 0 ? (ssize_t) off : (ssize_t) ret; + + clen = MIN(count - off, payload); + + if (block) + idx = ssm_pool_alloc_b(proc.pool, clen, &ptr, + &spb, dl); + else + idx = ssm_pool_alloc(proc.pool, clen, &ptr, &spb); + if (idx < 0) + return off > 0 ? (ssize_t) off : idx; + + memcpy(ptr, src + off, clen); + + ret = flow_tx_spb(flow, spb, 0, block, dl); + if (ret < 0) + return off > 0 ? (ssize_t) off : (ssize_t) ret; + + off += clen; + } + + return (ssize_t) count; +} + +/* Per-fragment flow_tx_spb loop. Raw flows refuse; FRCT splits the SDU. */ +static ssize_t flow_write_frag(struct flow * flow, + const void * buf, + size_t count, + int oflags, + struct timespec * dl) +{ + const uint8_t * src = buf; + size_t frag_payload; + size_t n; + size_t off = 0; + size_t i; + int ret; + bool block = !(oflags & FLOWFWNOBLOCK); + + /* Raw flows carry no PCI; cannot fragment. */ + if (flow->frcti == NULL) + return -EMSGSIZE; + + frag_payload = FRCTI_PAYLOAD_CAP(flow->frcti); + + /* Guard the ceil-divide against size_t overflow. */ + if (count > SIZE_MAX - frag_payload + 1) + return -EMSGSIZE; + n = (count + frag_payload - 1) / frag_payload; + + /* SDU larger than the FC window can ever offer would deadlock. */ + if (n > RQ_SIZE) + return -EMSGSIZE; + + /* SDU-atomic FC: wait for n seqnos to avoid overshoot mid-SDU. */ + ret = flow_wait_window(flow, n, block, dl); + if (ret < 0) + return (ssize_t) ret; + + STAT_BUMP(flow->frcti, sdu_snd_frag); + + for (i = 0; i < n; ++i) { + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t idx; + size_t clen; + + clen = (i + 1 == n) ? (count - off) : frag_payload; + + if (block) + idx = ssm_pool_alloc_b(proc.pool, clen, &ptr, + &spb, dl); + else + idx = ssm_pool_alloc(proc.pool, clen, &ptr, &spb); + if (idx < 0) + return off > 0 ? (ssize_t) off : idx; + + memcpy(ptr, src + off, clen); + + ret = flow_tx_spb(flow, spb, flow_frag_role(i, n), + block, dl); + if (ret < 0) + return off > 0 ? (ssize_t) off : (ssize_t) ret; + + off += clen; + } + + return (ssize_t) count; +} + ssize_t flow_write(int fd, const void * buf, size_t count) @@ -1330,7 +1661,8 @@ ssize_t flow_write(int fd, int ret; int flags; struct timespec abs; - struct timespec * abstime = NULL; + struct timespec now; + struct timespec * dl = NULL; struct ssm_pk_buff * spb; uint8_t * ptr; @@ -1342,64 +1674,62 @@ ssize_t flow_write(int fd, flow = &proc.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - pthread_rwlock_wrlock(&proc.lock); + pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); return -ENOTALLOC; } + flags = flow->oflags; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + if (flow->snd_timesout) { - ts_add(&abs, &flow->snd_timeo, &abs); - abstime = &abs; + ts_add(&now, &flow->snd_timeo, &abs); + dl = &abs; } - flags = flow->oflags; - pthread_rwlock_unlock(&proc.lock); if ((flags & FLOWFACCMODE) == FLOWFRDONLY) return -EPERM; - if (flags & FLOWFWNOBLOCK) { - if (!frcti_is_window_open(flow->frcti)) - return -EAGAIN; - idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb); - } else { - ret = frcti_window_wait(flow->frcti, abstime); + tw_move_safe(); + + if (flow->frcti != NULL) { + /* Pump rx_rb so a pure-writer processes ACKs. */ + ret = flow_wait_window(flow, 1, !(flags & FLOWFWNOBLOCK), dl); if (ret < 0) return ret; - idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, abstime); + + if (count > 0 && FRCTI_IS_STREAM(flow->frcti)) + return flow_write_stream(flow, buf, count, flags, dl); + + if (FRCTI_NEEDS_FRAG(flow->frcti, count)) + return flow_write_frag(flow, buf, count, flags, dl); + } else if (flow->info.mtu > 0 + && count > flow_user_mtu(flow, flow->info.mtu)) { + /* Raw flows carry no PCI; refuse anything > one n-1 frame. */ + return -EMSGSIZE; } + if (flags & FLOWFWNOBLOCK) + idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb); + else + idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, dl); if (idx < 0) return idx; if (count > 0) memcpy(ptr, buf, count); - ret = flow_tx_spb(flow, spb, !(flags & FLOWFWNOBLOCK), abstime); + ret = flow_tx_spb(flow, spb, FRCT_FR_SOLE, + !(flags & FLOWFWNOBLOCK), dl); return ret < 0 ? (ssize_t) ret : (ssize_t) count; } -static bool invalid_pkt(struct flow * flow, - struct ssm_pk_buff * spb) -{ - if (spb == NULL || ssm_pk_buff_len(spb) == 0) - return true; - - if (flow->info.qs.ber == 0 && chk_crc(spb) != 0) - return true; - - if (spb_decrypt(flow, spb) < 0) - return true; - - return false; -} - static ssize_t flow_rx_spb(struct flow * flow, struct ssm_pk_buff ** spb, bool block, @@ -1408,19 +1738,14 @@ static ssize_t flow_rx_spb(struct flow * flow, ssize_t idx; struct timespec now; - idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) : - ssm_rbuff_read(flow->rx_rb); + idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) + : ssm_rbuff_read(flow->rx_rb); if (idx < 0) return idx; clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_wrlock(&proc.lock); - flow->rcv_act = now; - pthread_rwlock_unlock(&proc.lock); - *spb = ssm_pool_get(proc.pool, idx); if (invalid_pkt(flow, *spb)) { @@ -1431,18 +1756,116 @@ static ssize_t flow_rx_spb(struct flow * flow, return idx; } +static ssize_t raw_flow_read_pkt(struct flow * flow, + bool block, + struct timespec * dl) +{ + struct ssm_pk_buff * spb; + struct timespec wait_abs; + ssize_t idx; + + while (true) { + if (!block) { + idx = ssm_rbuff_read(flow->rx_rb); + if (idx < 0) + return -EAGAIN; + } else { + compute_wait_deadline(dl, &wait_abs); + idx = ssm_rbuff_read_b(flow->rx_rb, &wait_abs); + if (idx == -ETIMEDOUT) { + if (deadline_passed(dl)) + return -ETIMEDOUT; + continue; + } + if (idx < 0) + return idx; + } + + spb = ssm_pool_get(proc.pool, idx); + if (!invalid_pkt(flow, spb)) + return idx; + + ssm_pool_remove(proc.pool, idx); + if (!block) + return -EAGAIN; + } +} + +static ssize_t deliver_pkt(struct flow * flow, + struct ssm_pk_buff * spb, + ssize_t idx, + void * buf, + size_t count, + bool partrd) +{ + uint8_t * packet = ssm_pk_buff_head(spb); + ssize_t n = ssm_pk_buff_len(spb); + + assert(n >= 0); + + if (n <= (ssize_t) count) { + memcpy(buf, packet, n); + ipcp_spb_release(spb); + if (partrd && n == (ssize_t) count) + flow->part_idx = DONE_PART; + else + flow->part_idx = NO_PART; + + return n; + } + + if (partrd) { + memcpy(buf, packet, count); + ssm_pk_buff_pop(spb, n); + flow->part_idx = idx; + return count; + } + + ipcp_spb_release(spb); + return -EMSGSIZE; +} + +/* Drive frcti_consume until it delivers or errors. */ +static ssize_t flow_read_frcti(struct flow * flow, + void * buf, + size_t count, + bool block, + struct timespec * dl) +{ + struct timespec now; + ssize_t bytes; + int rc; + + while (true) { + flow_drain_rx_nb(flow); + bytes = FRCTI_CONSUME(flow->frcti, buf, count); + if (bytes >= 0) + break; + if (bytes != -EAGAIN) + return bytes; + if (!block) + return -EAGAIN; + rc = flow_rx_one(flow, dl); + if (rc < 0) + return rc; + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow->rcv_act = now; + + return bytes; +} + ssize_t flow_read(int fd, void * buf, size_t count) { - ssize_t idx; - ssize_t n; - uint8_t * packet; + struct flow * flow; struct ssm_pk_buff * spb; struct timespec abs; struct timespec now; - struct timespec * abstime = NULL; - struct flow * flow; + struct timespec * dl = NULL; + ssize_t idx; bool block; bool partrd; @@ -1451,8 +1874,6 @@ ssize_t flow_read(int fd, flow = &proc.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { @@ -1461,8 +1882,8 @@ ssize_t flow_read(int fd, } if (flow->part_idx == DONE_PART) { - pthread_rwlock_unlock(&proc.lock); flow->part_idx = NO_PART; + pthread_rwlock_unlock(&proc.lock); return 0; } @@ -1470,75 +1891,33 @@ ssize_t flow_read(int fd, partrd = !(flow->oflags & FLOWFRNOPART); if (flow->rcv_timesout) { + clock_gettime(PTHREAD_COND_CLOCK, &now); ts_add(&now, &flow->rcv_timeo, &abs); - abstime = &abs; + dl = &abs; } - idx = flow->part_idx; - if (idx < 0) { - while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { - pthread_rwlock_unlock(&proc.lock); - - idx = flow_rx_spb(flow, &spb, block, abstime); - if (idx < 0) { - if (block && idx != -EAGAIN) - return idx; - if (!block) - return idx; + pthread_rwlock_unlock(&proc.lock); - pthread_rwlock_rdlock(&proc.lock); - continue; - } + tw_move_safe(); - pthread_rwlock_rdlock(&proc.lock); + idx = flow->part_idx; + if (idx < 0 && flow->frcti != NULL) + return flow_read_frcti(flow, buf, count, block, dl); - frcti_rcv(flow->frcti, spb); - } + if (idx < 0) { + idx = raw_flow_read_pkt(flow, block, dl); + if (idx < 0) + return idx; } spb = ssm_pool_get(proc.pool, idx); - pthread_rwlock_unlock(&proc.lock); - - packet = ssm_pk_buff_head(spb); - - n = ssm_pk_buff_len(spb); - - assert(n >= 0); - - if (n <= (ssize_t) count) { - memcpy(buf, packet, n); - ipcp_spb_release(spb); - - pthread_rwlock_wrlock(&proc.lock); - - flow->part_idx = (partrd && n == (ssize_t) count) ? - DONE_PART : NO_PART; - - flow->rcv_act = now; - - pthread_rwlock_unlock(&proc.lock); - return n; - } else { - if (partrd) { - memcpy(buf, packet, count); - ssm_pk_buff_pop(spb, n); - pthread_rwlock_wrlock(&proc.lock); - flow->part_idx = idx; - - flow->rcv_act = now; + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow->rcv_act = now; - pthread_rwlock_unlock(&proc.lock); - return count; - } else { - ipcp_spb_release(spb); - return -EMSGSIZE; - } - } + return deliver_pkt(flow, spb, idx, buf, count, partrd); } -/* fqueue functions. */ - struct flow_set * fset_create(void) { struct flow_set * set; @@ -1614,7 +1993,7 @@ int fset_add(struct flow_set * set, struct flow * flow; int ret; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return -EINVAL; flow = &proc.flows[fd]; @@ -1650,7 +2029,7 @@ void fset_del(struct flow_set * set, { struct flow * flow; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return; flow = &proc.flows[fd]; @@ -1661,7 +2040,7 @@ void fset_del(struct flow_set * set, ssm_flow_set_del(proc.fqset, set->idx, flow->info.id); if (flow->frcti != NULL) - ssm_flow_set_add(proc.fqset, 0, proc.flows[fd].info.id); + ssm_flow_set_add(proc.fqset, 0, flow->info.id); pthread_rwlock_unlock(&proc.lock); } @@ -1672,7 +2051,7 @@ bool fset_has(const struct flow_set * set, struct flow * flow; bool ret; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return false; flow = &proc.flows[fd]; @@ -1691,61 +2070,59 @@ bool fset_has(const struct flow_set * set, return ret; } -/* Filter fqueue events for non-data packets */ static int fqueue_filter(struct fqueue * fq) { struct ssm_pk_buff * spb; int fd; ssize_t idx; struct frcti * frcti; + int ret = 0; - while (fq->next < fq->fqsize) { - if (fq->fqueue[fq->next].event != FLOW_PKT) - return 1; + /* proc.lock rdlock gates frcti_destroy via flow_fini wrlock. */ + pthread_rwlock_rdlock(&proc.lock); - pthread_rwlock_rdlock(&proc.lock); + while (fq->next < fq->fqsize) { + if (fq->fqueue[fq->next].event != FLOW_PKT) { + ret = 1; + goto out; + } fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd; if (fd < 0) { ++fq->next; - pthread_rwlock_unlock(&proc.lock); continue; } frcti = proc.flows[fd].frcti; if (frcti == NULL) { - pthread_rwlock_unlock(&proc.lock); - return 1; + ret = 1; + goto out; } - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&proc.lock); - return 1; + if (FRCTI_PDU_READY(frcti)) { + ret = 1; + goto out; } - pthread_rwlock_unlock(&proc.lock); - idx = flow_rx_spb(&proc.flows[fd], &spb, false, NULL); if (idx < 0) - return 0; - - pthread_rwlock_rdlock(&proc.lock); + goto out; spb = ssm_pool_get(proc.pool, idx); - __frcti_rcv(frcti, spb); + FRCTI_RCV(frcti, spb); - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&proc.lock); - return 1; + if (FRCTI_PDU_READY(frcti)) { + ret = 1; + goto out; } - pthread_rwlock_unlock(&proc.lock); - ++fq->next; } - return 0; + out: + pthread_rwlock_unlock(&proc.lock); + return ret; } int fqueue_next(struct fqueue * fq) @@ -1792,7 +2169,8 @@ ssize_t fevent(struct flow_set * set, { ssize_t ret = 0; struct timespec abs; - struct timespec * t = NULL; + struct timespec * dl = NULL; + struct timespec wait_abs; if (set == NULL || fq == NULL) return -EINVAL; @@ -1800,17 +2178,26 @@ ssize_t fevent(struct flow_set * set, if (fq->fqsize > 0 && fq->next != fq->fqsize) return 1; - clock_gettime(PTHREAD_COND_CLOCK, &abs); - if (timeo != NULL) { - ts_add(&abs, timeo, &abs); - t = &abs; + struct timespec now; + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, timeo, &abs); + dl = &abs; } while (ret == 0) { - ret = ssm_flow_set_wait(proc.fqset, set->idx, fq->fqueue, t); - if (ret == -ETIMEDOUT) - return -ETIMEDOUT; + tw_move_safe(); + + compute_wait_deadline(dl, &wait_abs); + + ret = ssm_flow_set_wait(proc.fqset, set->idx, + fq->fqueue, &wait_abs); + if (ret == -ETIMEDOUT) { + if (deadline_passed(dl)) + return -ETIMEDOUT; + ret = 0; + continue; + } fq->fqsize = ret; fq->next = 0; @@ -1823,8 +2210,6 @@ ssize_t fevent(struct flow_set * set, return 1; } -/* ipcp-dev functions. */ - int np1_flow_alloc(pid_t n_pid, int flow_id) { @@ -1837,9 +2222,10 @@ int np1_flow_alloc(pid_t n_pid, flow.n_pid = getpid(); flow.qs = qos_np1; flow.mpl = 0; - flow.n_1_pid = n_pid; /* This "flow" is upside-down! */ + /* np1 flow: n_1_pid is the upper. */ + flow.n_1_pid = n_pid; - return flow_init(&flow, &crypt); + return flow_init(&flow, &crypt, 0); } int np1_flow_dealloc(int flow_id, @@ -1847,12 +2233,7 @@ int np1_flow_dealloc(int flow_id, { int fd; - /* - * TODO: Don't pass timeo to the IPCP but wait in IRMd. - * This will need async ops, waiting until we bootstrap - * the IRMd over ouroboros. - */ - + /* TODO: wait in IRMd, not here; needs async ops. */ sleep(timeo); pthread_rwlock_rdlock(&proc.lock); @@ -1900,6 +2281,7 @@ int ipcp_create_r(const struct ipcp_info * info) int ipcp_flow_req_arr(const buffer_t * dst, qosspec_t qs, time_t mpl, + uint32_t mtu, const buffer_t * data) { struct flow_info flow; @@ -1916,6 +2298,7 @@ int ipcp_flow_req_arr(const buffer_t * dst, flow.n_1_pid = getpid(); flow.qs = qs; flow.mpl = mpl; + flow.mtu = mtu; if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0) return -ENOMEM; @@ -1930,22 +2313,25 @@ int ipcp_flow_req_arr(const buffer_t * dst, if (err < 0) return err; - assert(crypt.nid == NID_undef); /* np1 flows are not encrypted */ + /* np1 flows are not encrypted. */ + assert(crypt.nid == NID_undef); - /* inverted for np1_flow */ + /* Inverted for np1_flow. */ flow.n_1_pid = flow.n_pid; flow.n_pid = getpid(); flow.mpl = 0; + flow.mtu = 0; flow.qs = qos_np1; crypt.nid = NID_undef; - return flow_init(&flow, &crypt); + return flow_init(&flow, &crypt, 0); } int ipcp_flow_alloc_reply(int fd, int response, time_t mpl, + uint32_t mtu, const buffer_t * data) { struct flow_info flow; @@ -1953,7 +2339,7 @@ int ipcp_flow_alloc_reply(int fd, buffer_t msg = {SOCK_BUF_SIZE, buf}; int err; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); pthread_rwlock_rdlock(&proc.lock); @@ -1962,6 +2348,7 @@ int ipcp_flow_alloc_reply(int fd, pthread_rwlock_unlock(&proc.lock); flow.mpl = mpl; + flow.mtu = mtu; if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0) return -ENOMEM; @@ -1979,7 +2366,7 @@ int ipcp_flow_read(int fd, struct flow * flow; ssize_t idx = -1; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -1988,7 +2375,14 @@ int ipcp_flow_read(int fd, assert(flow->info.id >= 0); - while (frcti_queued_pdu(flow->frcti) < 0) { + /* Raw flow: deliver the popped pkt directly (no FRCT rq). */ + if (flow->frcti == NULL) { + pthread_rwlock_unlock(&proc.lock); + idx = flow_rx_spb(flow, spb, false, NULL); + return idx < 0 ? (int) idx : 0; + } + + while (!FRCTI_PDU_READY(flow->frcti)) { pthread_rwlock_unlock(&proc.lock); idx = flow_rx_spb(flow, spb, false, NULL); @@ -1997,7 +2391,7 @@ int ipcp_flow_read(int fd, pthread_rwlock_rdlock(&proc.lock); - frcti_rcv(flow->frcti, *spb); + FRCTI_RCV(flow->frcti, *spb); } pthread_rwlock_unlock(&proc.lock); @@ -2011,12 +2405,12 @@ int ipcp_flow_write(int fd, struct flow * flow; int ret; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; - pthread_rwlock_wrlock(&proc.lock); + pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); @@ -2030,15 +2424,16 @@ int ipcp_flow_write(int fd, pthread_rwlock_unlock(&proc.lock); - ret = flow_tx_spb(flow, spb, true, NULL); + ret = flow_tx_spb(flow, spb, FRCT_FR_SOLE, true, NULL); return ret; } -static int pool_copy_spb(struct ssm_pool * src_pool, - ssize_t src_off, - struct ssm_pool * dst_pool, - struct ssm_pk_buff ** dst_spb) +/* Copy src into dst_pool without consuming src. Caller owns both halves. */ +static int pool_dup_spb(struct ssm_pool * src_pool, + size_t src_off, + struct ssm_pool * dst_pool, + struct ssm_pk_buff ** dst_spb) { struct ssm_pk_buff * src; uint8_t * ptr; @@ -2051,7 +2446,6 @@ static int pool_copy_spb(struct ssm_pool * src_pool, return -ENOMEM; memcpy(ptr, ssm_pk_buff_head(src), len); - ssm_pool_remove(src_pool, src_off); return 0; } @@ -2061,9 +2455,9 @@ int np1_flow_read(int fd, struct ssm_pool * pool) { struct flow * flow; - ssize_t off = -1; + ssize_t off; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -2084,10 +2478,11 @@ int np1_flow_read(int fd, *spb = ssm_pool_get(proc.pool, off); } else { /* Cross-pool copy: PUP -> GSPP */ - if (pool_copy_spb(pool, off, proc.pool, spb) < 0) { + if (pool_dup_spb(pool, off, proc.pool, spb) < 0) { ssm_pool_remove(pool, off); return -ENOMEM; } + ssm_pool_remove(pool, off); } return 0; @@ -2100,10 +2495,10 @@ int np1_flow_write(int fd, struct flow * flow; struct ssm_pk_buff * dst; int ret; - ssize_t src_off; - ssize_t dst_off; + size_t off; + size_t dst_off; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -2122,16 +2517,16 @@ int np1_flow_write(int fd, pthread_rwlock_unlock(&proc.lock); - src_off = ssm_pk_buff_get_off(spb); + off = ssm_pk_buff_get_off(spb); if (pool == NULL) { - ret = ssm_rbuff_write_b(flow->tx_rb, src_off, NULL); + ret = ssm_rbuff_write_b(flow->tx_rb, off, NULL); if (ret < 0) return ret; ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); } else { - /* Cross-pool copy: GSPP -> PUP */ - if (pool_copy_spb(proc.pool, src_off, pool, &dst) < 0) + /* Cross-pool copy: GSPP -> PUP. Src kept on error. */ + if (pool_dup_spb(proc.pool, off, pool, &dst) < 0) return -ENOMEM; dst_off = ssm_pk_buff_get_off(dst); ret = ssm_rbuff_write_b(flow->tx_rb, dst_off, NULL); @@ -2140,7 +2535,7 @@ int np1_flow_write(int fd, return ret; } ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); - ssm_pool_remove(proc.pool, src_off); + ssm_pool_remove(proc.pool, off); } return 0; @@ -2149,7 +2544,8 @@ int np1_flow_write(int fd, int ipcp_spb_reserve(struct ssm_pk_buff ** spb, size_t len) { - return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 ? -1 : 0; + return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 + ? -1 : 0; } void ipcp_spb_release(struct ssm_pk_buff * spb) @@ -2161,7 +2557,7 @@ int ipcp_flow_fini(int fd) { struct ssm_rbuff * rx_rb; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); pthread_rwlock_rdlock(&proc.lock); @@ -2190,7 +2586,7 @@ int ipcp_flow_fini(int fd) int ipcp_flow_get_qoscube(int fd, qoscube_t * cube) { - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(cube); pthread_rwlock_rdlock(&proc.lock); @@ -2229,8 +2625,7 @@ int local_flow_transfer(int src_fd, struct ssm_pk_buff * dst_spb; struct ssm_pool * sp; struct ssm_pool * dp; - ssize_t src_off; - ssize_t dst_off; + ssize_t off; int ret; assert(src_fd >= 0); @@ -2244,15 +2639,15 @@ int local_flow_transfer(int src_fd, pthread_rwlock_rdlock(&proc.lock); - src_off = ssm_rbuff_read(src_flow->rx_rb); - if (src_off < 0) { + off = ssm_rbuff_read(src_flow->rx_rb); + if (off < 0) { pthread_rwlock_unlock(&proc.lock); - return src_off; + return off; } if (dst_flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); - ssm_pool_remove(sp, src_off); + ssm_pool_remove(sp, off); return -ENOTALLOC; } @@ -2260,23 +2655,24 @@ int local_flow_transfer(int src_fd, if (sp == dp) { /* Same pool: zero-copy */ - ret = ssm_rbuff_write_b(dst_flow->tx_rb, src_off, NULL); + ret = ssm_rbuff_write_b(dst_flow->tx_rb, off, NULL); if (ret < 0) - ssm_pool_remove(sp, src_off); + ssm_pool_remove(sp, off); else ssm_flow_set_notify(dst_flow->set, dst_flow->info.id, FLOW_PKT); } else { /* Different pools: single copy */ - if (pool_copy_spb(sp, src_off, dp, &dst_spb) < 0) { - ssm_pool_remove(sp, src_off); + if (pool_dup_spb(sp, off, dp, &dst_spb) < 0) { + ssm_pool_remove(sp, off); return -ENOMEM; } - dst_off = ssm_pk_buff_get_off(dst_spb); - ret = ssm_rbuff_write_b(dst_flow->tx_rb, dst_off, NULL); + ssm_pool_remove(sp, off); + off = ssm_pk_buff_get_off(dst_spb); + ret = ssm_rbuff_write_b(dst_flow->tx_rb, off, NULL); if (ret < 0) - ssm_pool_remove(dp, dst_off); + ssm_pool_remove(dp, off); else ssm_flow_set_notify(dst_flow->set, dst_flow->info.id, FLOW_PKT); |
