diff options
Diffstat (limited to 'src/lib/dev.c')
| -rw-r--r-- | src/lib/dev.c | 1329 |
1 files changed, 870 insertions, 459 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 9cfc24ee..ae0401b7 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -29,10 +29,13 @@ #include "config.h" #include "ssm.h" +#include <ouroboros/atomics.h> #include <ouroboros/bitmap.h> #include <ouroboros/cep.h> +#include <ouroboros/crc16.h> #include <ouroboros/crypt.h> #include <ouroboros/dev.h> +#include <ouroboros/endian.h> #include <ouroboros/errno.h> #include <ouroboros/fccntl.h> #include <ouroboros/flow.h> @@ -45,32 +48,33 @@ #include <ouroboros/np1_flow.h> #include <ouroboros/pthread.h> #include <ouroboros/random.h> +#ifdef PROC_FLOW_STATS +#include <ouroboros/rib.h> +#endif #include <ouroboros/serdes-irm.h> +#include <ouroboros/sockets.h> #include <ouroboros/ssm_flow_set.h> #include <ouroboros/ssm_pool.h> #include <ouroboros/ssm_rbuff.h> -#include <ouroboros/sockets.h> +#include <ouroboros/tw.h> #include <ouroboros/utils.h> -#ifdef PROC_FLOW_STATS -#include <ouroboros/rib.h> -#endif +#include <assert.h> #ifdef HAVE_LIBGCRYPT #include <gcrypt.h> #endif -#include <assert.h> -#include <stdlib.h> -#include <string.h> -#include <stdio.h> #include <stdarg.h> #include <stdbool.h> +#include <inttypes.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> #include <sys/types.h> #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -/* Partial read information. */ #define NO_PART -1 #define DONE_PART -2 @@ -78,19 +82,12 @@ #define SECMEMSZ 16384 #define MSGBUFSZ 2048 -/* map flow_ids to flow descriptors; track state of the flow */ struct fmap { int fd; - /* TODO: use actual flow state */ enum flow_state state; }; -#define frcti_to_flow(frcti) \ - ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti))) - struct flow { - struct list_head next; - struct flow_info info; struct ssm_rbuff * rx_rb; @@ -135,16 +132,10 @@ struct { struct flow * flows; struct fmap * id_to_fd; - struct list_head flow_list; pthread_mutex_t mtx; pthread_cond_t cond; - pthread_t tx; - pthread_t rx; - size_t n_frcti; - fset_t * frct_set; - pthread_rwlock_t lock; } proc; @@ -243,7 +234,7 @@ static int proc_announce(const struct proc_info * proc) return irm__irm_result_des(&msg); } -/* IRMd will clean up the mess if this fails */ +/* IRMd cleans up on failure. */ static void proc_exit(void) { uint8_t buf[SOCK_BUF_SIZE]; @@ -264,7 +255,7 @@ static int spb_encrypt(struct flow * flow, uint8_t * tail; if (flow->crypt == NULL) - return 0; /* No encryption */ + return 0; in.data = ssm_pk_buff_head(spb); in.len = ssm_pk_buff_len(spb); @@ -272,11 +263,11 @@ static int spb_encrypt(struct flow * flow, if (crypt_encrypt(flow->crypt, in, &out) < 0) goto fail_encrypt; - head = ssm_pk_buff_head_alloc(spb, flow->headsz); + head = ssm_pk_buff_push(spb, flow->headsz); if (head == NULL) goto fail_alloc; - tail = ssm_pk_buff_tail_alloc(spb, flow->tailsz); + tail = ssm_pk_buff_push_tail(spb, flow->tailsz); if (tail == NULL) goto fail_alloc; @@ -299,7 +290,7 @@ static int spb_decrypt(struct flow * flow, uint8_t * head; if (flow->crypt == NULL) - return 0; /* No decryption */ + return 0; in.data = ssm_pk_buff_head(spb); in.len = ssm_pk_buff_len(spb); @@ -308,8 +299,8 @@ static int spb_decrypt(struct flow * flow, return -ENOMEM; - head = ssm_pk_buff_head_release(spb, flow->headsz) + flow->headsz; - ssm_pk_buff_tail_release(spb, flow->tailsz); + head = ssm_pk_buff_pop(spb, flow->headsz) + flow->headsz; + ssm_pk_buff_pop_tail(spb, flow->tailsz); memcpy(head, out.data, out.len); @@ -318,130 +309,280 @@ static int spb_decrypt(struct flow * flow, return 0; } -#include "frct.c" - -void * flow_tx(void * o) +/* tw_move under proc.lock rdlock; gates teardown vs in-flight fires. */ +static void tw_move_safe(void) { - struct timespec tic = TIMESPEC_INIT_NS(TICTIME); - - (void) o; + pthread_rwlock_rdlock(&proc.lock); - while (true) { - timerwheel_move(); + pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); - nanosleep(&tic, NULL); - } + tw_move(); - return (void *) 0; + pthread_cleanup_pop(1); } -static void flow_send_keepalive(struct flow * flow, - struct timespec now) +static int crc_add(struct ssm_pk_buff * spb, + size_t head_skip) { - struct ssm_pk_buff * spb; - ssize_t idx; - uint8_t * ptr; - - idx = ssm_pool_alloc(proc.pool, 0, &ptr, &spb); - if (idx < 0) - return; + uint8_t * head; + uint8_t * tail; - pthread_rwlock_wrlock(&proc.lock); + tail = ssm_pk_buff_push_tail(spb, CRCLEN); + if (tail == NULL) + return -ENOMEM; - flow->snd_act = now; + head = ssm_pk_buff_head(spb) + head_skip; - if (ssm_rbuff_write(flow->tx_rb, idx)) - ssm_pool_remove(proc.pool, idx); - else - ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + mem_hash(HASH_CRC32, tail, head, tail - head); - pthread_rwlock_unlock(&proc.lock); + return 0; } -/* Needs rdlock on proc. */ -static void _flow_keepalive(struct flow * flow) +static int crc_check(struct ssm_pk_buff * spb, + size_t head_skip) { - struct timespec now; - struct timespec s_act; - struct timespec r_act; - int flow_id; - time_t timeo; - uint32_t acl; + uint32_t crc; + uint8_t * head = ssm_pk_buff_head(spb) + head_skip; + uint8_t * tail = ssm_pk_buff_pop_tail(spb, CRCLEN); + + mem_hash(HASH_CRC32, &crc, head, tail - head); - s_act = flow->snd_act; - r_act = flow->rcv_act; + return !(crc == *((uint32_t *) tail)); +} - flow_id = flow->info.id; - timeo = flow->info.qs.timeout; +/* FRCT included here so it can use proc and dev.c statics directly. */ +#include "frct.c" - acl = ssm_rbuff_get_acl(flow->rx_rb); - if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) - return; +/* + * SACK / DATA carry trailer CRC32; HCS protects the headers on every + * FRCT packet. Decrypt before any check so plaintext is authoritative. + */ +static bool invalid_pkt(struct flow * flow, + struct ssm_pk_buff * spb) +{ + const struct frct_pci * pci; + uint16_t flags; + size_t pci_total; - clock_gettime(PTHREAD_COND_CLOCK, &now); + if (spb == NULL || ssm_pk_buff_len(spb) == 0) + return true; - if (ts_diff_ns(&now, &r_act) > (int64_t) timeo * MILLION) { - ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); - ssm_flow_set_notify(proc.fqset, flow_id, FLOW_PEER); - return; + if (spb_decrypt(flow, spb) < 0) + return true; + + if (flow->frcti == NULL) { + if (flow->info.qs.ber == 0 && crc_check(spb, 0) != 0) + return true; + return false; } - if (ts_diff_ns(&now, &s_act) > (int64_t) timeo * (MILLION >> 2)) { - pthread_rwlock_unlock(&proc.lock); + if (ssm_pk_buff_len(spb) < FRCT_PCILEN) + return true; - flow_send_keepalive(flow, now); + pci = (const struct frct_pci *) ssm_pk_buff_head(spb); + flags = ntoh16(pci->flags); - pthread_rwlock_rdlock(&proc.lock); + /* Untrusted flag read; mismatch on HCS will drop on corrupt. */ + if (flags & FRCT_DATA) + pci_total = frcti_data_hdr_len(flow->frcti); + else + pci_total = frcti_ctrl_hdr_len(flow->frcti); + + if (ssm_pk_buff_len(spb) < pci_total) + return true; + + if (frct_hcs_check(pci, flow->frcti) != 0) + return true; + + /* HCS valid: CRC32 on SACK; or on DATA if ber = 0. */ + if (flags & FRCT_SACK) { + if (crc_check(spb, pci_total) != 0) + return true; + + } else if ((flags & FRCT_DATA) && flow->info.qs.ber == 0) { + if (crc_check(spb, pci_total) != 0) + return true; } + + return false; } -static void handle_keepalives(void) +static bool deadline_passed(const struct timespec * abs) { - struct list_head * p; - struct list_head * h; + struct timespec now; - pthread_rwlock_rdlock(&proc.lock); + if (abs == NULL) + return false; - list_for_each_safe(p, h, &proc.flow_list) { - struct flow * flow; - flow = list_entry(p, struct flow, next); - _flow_keepalive(flow); - } + clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_unlock(&proc.lock); + return ts_diff_ns(&now, abs) >= 0; } -static void __cleanup_fqueue_destroy(void * fq) +/* Clamp the wait by min(dl, next tw expiry, now + TICTIME). */ +static void compute_wait_deadline(const struct timespec * dl, + struct timespec * out) { - fqueue_destroy((fqueue_t *) fq); + struct timespec now; + struct timespec cap; + struct timespec expiry; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &tic, &cap); + + tw_next_expiry(&expiry); + + *out = (ts_diff_ns(&cap, &expiry) < 0) ? expiry : cap; + if (dl != NULL && ts_diff_ns(out, dl) > 0) + *out = *dl; } -void * flow_rx(void * o) +/* + * proc.lock rdlock held across each iteration so flow_fini's wrlock + * waits for us to finish; FLOWDOWN already set means we exit promptly. + */ +static void flow_drain_rx_nb(struct flow * flow) { - struct timespec tic = TIMESPEC_INIT_NS(TICTIME); - int ret; - struct fqueue * fq; + ssize_t idx; + struct ssm_pk_buff * spb; + struct ssm_rbuff * rx_rb; + struct frcti * frcti; +#ifdef PROC_FLOW_STATS + struct timespec t_a; + struct timespec t_b; +#endif + + if (flow->frcti != NULL) + STAT_BUMP(flow->frcti, drain_calls); - (void) o; + while (true) { + pthread_rwlock_rdlock(&proc.lock); - fq = fqueue_create(); + rx_rb = flow->rx_rb; + if (rx_rb == NULL) { + pthread_rwlock_unlock(&proc.lock); + return; + } - pthread_cleanup_push(__cleanup_fqueue_destroy, fq); + idx = ssm_rbuff_read(rx_rb); + if (idx < 0) { + pthread_rwlock_unlock(&proc.lock); + return; + } - /* fevent will filter all FRCT packets for us */ - while ((ret = fevent(proc.frct_set, fq, &tic)) != 0) { - if (ret == -ETIMEDOUT) { - handle_keepalives(); + spb = ssm_pool_get(proc.pool, idx); + if (invalid_pkt(flow, spb)) { + ssm_pool_remove(proc.pool, idx); + pthread_rwlock_unlock(&proc.lock); continue; } - while (fqueue_next(fq) >= 0) - ; /* no need to act */ + frcti = flow->frcti; + if (frcti != NULL) { +#ifdef PROC_FLOW_STATS + clock_gettime(CLOCK_MONOTONIC, &t_a); + FRCTI_RCV(frcti, spb); + clock_gettime(CLOCK_MONOTONIC, &t_b); + STAT_ADD(frcti, rcv_proc_ns, + (size_t) ts_diff_ns(&t_b, &t_a)); +#else + FRCTI_RCV(frcti, spb); +#endif + } else { + ssm_pool_remove(proc.pool, idx); + } + + pthread_rwlock_unlock(&proc.lock); + + /* Per-packet so the delayed-ACK fires on time in a burst. */ +#ifdef PROC_FLOW_STATS + clock_gettime(CLOCK_MONOTONIC, &t_a); + tw_move_safe(); + clock_gettime(CLOCK_MONOTONIC, &t_b); + if (frcti != NULL) + STAT_ADD(frcti, tw_move_ns, + (size_t) ts_diff_ns(&t_b, &t_a)); +#else + tw_move_safe(); +#endif } +} - pthread_cleanup_pop(true); +/* + * Wait clamped by caller deadline, next tw expiry, and TICTIME; + * a clamp-timeout means tw work is due, not caller-deadline. + */ +static int flow_rx_one(struct flow * flow, + struct timespec * abs) +{ + struct timespec wait_abs; + struct ssm_pk_buff * spb; + struct ssm_rbuff * rx_rb; + ssize_t idx; + + while (true) { + compute_wait_deadline(abs, &wait_abs); + + /* rdlock gates flow_fini; FLOWDOWN preempts the block. */ + pthread_rwlock_rdlock(&proc.lock); + + rx_rb = flow->rx_rb; + if (rx_rb == NULL) { + pthread_rwlock_unlock(&proc.lock); + return -EFLOWDOWN; + } + + idx = ssm_rbuff_read_b(rx_rb, &wait_abs); + if (idx == -ETIMEDOUT) { + pthread_rwlock_unlock(&proc.lock); + if (deadline_passed(abs)) + return -ETIMEDOUT; + tw_move_safe(); + continue; + } + if (idx < 0) { + pthread_rwlock_unlock(&proc.lock); + return idx; + } + + spb = ssm_pool_get(proc.pool, idx); + if (invalid_pkt(flow, spb)) { + ssm_pool_remove(proc.pool, idx); + pthread_rwlock_unlock(&proc.lock); + continue; + } + + if (flow->frcti != NULL) + FRCTI_RCV(flow->frcti, spb); + else + ssm_pool_remove(proc.pool, idx); + + pthread_rwlock_unlock(&proc.lock); - return (void *) 0; + tw_move_safe(); + return 0; + } +} + +/* 0 = window open; -EAGAIN = !block and would block; else flow_rx_one rc. */ +static __inline__ int flow_wait_window(struct flow * flow, + size_t n, + bool block, + struct timespec * dl) +{ + int rc; + + while (true) { + flow_drain_rx_nb(flow); + if (FRCTI_IS_WINDOW_OPEN_N(flow->frcti, n)) + return 0; + if (!block) + return -EAGAIN; + rc = flow_rx_one(flow, dl); + if (rc < 0) + return rc; + } } static void flow_clear(int fd) @@ -451,36 +592,40 @@ static void flow_clear(int fd) proc.flows[fd].info.id = -1; } -static void __flow_fini(int fd) +/* + * Set ACL_FLOWDOWN on rx/tx so any in-flight blocking reads or writes + * wake up and drop their proc.lock rdlock. Must run BEFORE flow_fini's + * wrlock, else the wrlock blocks on those rdlock holders and the + * in-flight calls never see the FLOWDOWN signal. + */ +static void flow_quiesce(int fd) { - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + struct ssm_rbuff * rx_rb = proc.flows[fd].rx_rb; + struct ssm_rbuff * tx_rb = proc.flows[fd].tx_rb; - if (proc.flows[fd].frcti != NULL) { - proc.n_frcti--; - if (proc.n_frcti == 0) { - pthread_cancel(proc.tx); - pthread_join(proc.tx, NULL); - } + if (rx_rb != NULL) + ssm_rbuff_set_acl(rx_rb, ACL_FLOWDOWN); + if (tx_rb != NULL) + ssm_rbuff_set_acl(tx_rb, ACL_FLOWDOWN); +} - ssm_flow_set_del(proc.fqset, 0, proc.flows[fd].info.id); +static void do_flow_fini(int fd) +{ + assert(fd >= 0 && fd < PROC_MAX_FLOWS); + if (proc.flows[fd].frcti != NULL) frcti_destroy(proc.flows[fd].frcti); - } if (proc.flows[fd].info.id != -1) { flow_destroy(&proc.id_to_fd[proc.flows[fd].info.id]); bmp_release(proc.fds, fd); } - if (proc.flows[fd].rx_rb != NULL) { - ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN); + if (proc.flows[fd].rx_rb != NULL) ssm_rbuff_close(proc.flows[fd].rx_rb); - } - if (proc.flows[fd].tx_rb != NULL) { - ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN); + if (proc.flows[fd].tx_rb != NULL) ssm_rbuff_close(proc.flows[fd].tx_rb); - } if (proc.flows[fd].set != NULL) { ssm_flow_set_notify(proc.flows[fd].set, @@ -491,24 +636,40 @@ static void __flow_fini(int fd) crypt_destroy_ctx(proc.flows[fd].crypt); - list_del(&proc.flows[fd].next); - flow_clear(fd); } static void flow_fini(int fd) { + flow_quiesce(fd); + pthread_rwlock_wrlock(&proc.lock); - __flow_fini(fd); + do_flow_fini(fd); pthread_rwlock_unlock(&proc.lock); } #define IS_ENCRYPTED(crypt) ((crypt)->nid != NID_undef) -#define IS_ORDERED(flow) (flow.qs.in_order != 0) +#define IS_ORDERED(info) ((info)->qs.service != SVC_RAW) +#define IS_STREAM(info) ((info)->qs.service == SVC_STREAM) + +/* Raw MTU minus the wrapping (IV/Tag + optional CRC) dev.c adds. */ +static __inline__ size_t flow_user_mtu(const struct flow * flow, + size_t raw) +{ + size_t hdr; + + hdr = flow->headsz + flow->tailsz; + if (flow->info.qs.ber == 0 && flow->crypt == NULL) + hdr += CRCLEN; + + return raw > hdr ? raw - hdr : 0; +} + static int flow_init(struct flow_info * info, - struct crypt_sk * sk) + struct crypt_sk * sk, + time_t rtt_hint) { struct timespec now; struct flow * flow; @@ -550,7 +711,6 @@ static int flow_init(struct flow_info * info, flow->tailsz = 0; if (IS_ENCRYPTED(sk)) { - /* Set to lower value in tests, should we make configurable? */ sk->rot_bit = KEY_ROTATION_BIT; flow->crypt = crypt_create_ctx(sk); if (flow->crypt == NULL) @@ -561,22 +721,16 @@ static int flow_init(struct flow_info * info, assert(flow->frcti == NULL); - if (IS_ORDERED(flow->info)) { - flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl); + if (IS_ORDERED(&flow->info)) { + uint32_t frct_mtu = flow_user_mtu(flow, info->mtu); + + flow->frcti = frcti_create(fd, DELT_A, DELT_R, + info->mpl, rtt_hint, + info->qs, frct_mtu); if (flow->frcti == NULL) goto fail_frcti; - - if (ssm_flow_set_add(proc.fqset, 0, info->id)) - goto fail_flow_set_add; - - ++proc.n_frcti; - if (proc.n_frcti == 1 && - pthread_create(&proc.tx, NULL, flow_tx, NULL) < 0) - goto fail_tx_thread; } - list_add_tail(&flow->next, &proc.flow_list); - proc.id_to_fd[info->id].fd = fd; flow_set_state(&proc.id_to_fd[info->id], FLOW_ALLOCATED); @@ -585,10 +739,6 @@ static int flow_init(struct flow_info * info, return fd; - fail_tx_thread: - ssm_flow_set_del(proc.fqset, 0, info->id); - fail_flow_set_add: - frcti_destroy(flow->frcti); fail_frcti: crypt_destroy_ctx(flow->crypt); fail_crypt: @@ -655,13 +805,13 @@ static void init(int argc, gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0); } #endif - proc.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS); + proc.fds = bmp_create(PROC_MAX_FLOWS - PROC_RES_FDS, PROC_RES_FDS); if (proc.fds == NULL) { fprintf(stderr, "FATAL: Could not create fd bitmap.\n"); goto fail_fds; } - proc.fqueues = bmp_create(PROG_MAX_FQUEUES, 0); + proc.fqueues = bmp_create(PROC_MAX_FQUEUES, 0); if (proc.fqueues == NULL) { fprintf(stderr, "FATAL: Could not create fqueue bitmap.\n"); goto fail_fqueues; @@ -677,13 +827,13 @@ static void init(int argc, goto fail_rdrb; } - proc.flows = malloc(sizeof(*proc.flows) * PROG_MAX_FLOWS); + proc.flows = malloc(sizeof(*proc.flows) * PROC_MAX_FLOWS); if (proc.flows == NULL) { fprintf(stderr, "FATAL: Could not malloc flows.\n"); goto fail_flows; } - for (i = 0; i < PROG_MAX_FLOWS; ++i) + for (i = 0; i < PROC_MAX_FLOWS; ++i) flow_clear(i); proc.id_to_fd = malloc(sizeof(*proc.id_to_fd) * SYS_MAX_FLOWS); @@ -716,20 +866,14 @@ static void init(int argc, goto fail_fqset; } - proc.frct_set = fset_create(); - if (proc.frct_set == NULL || proc.frct_set->idx != 0) { - fprintf(stderr, "FATAL: Could not create FRCT set.\n"); - goto fail_frct_set; - } - - if (timerwheel_init() < 0) { + if (tw_init() < 0) { fprintf(stderr, "FATAL: Could not initialize timerwheel.\n"); goto fail_timerwheel; } if (crypt_secure_malloc_init(PROC_SECMEM_MAX) < 0) { fprintf(stderr, "FATAL: Could not init secure malloc.\n"); - goto fail_timerwheel; + goto fail_secmem; } #if defined PROC_FLOW_STATS @@ -741,24 +885,15 @@ static void init(int argc, } } #endif - if (pthread_create(&proc.rx, NULL, flow_rx, NULL) < 0) { - fprintf(stderr, "FATAL: Could not start monitor thread.\n"); - goto fail_monitor; - } - - list_head_init(&proc.flow_list); - return; - fail_monitor: #if defined PROC_FLOW_STATS - rib_fini(); fail_rib_init: + crypt_secure_malloc_fini(); #endif - timerwheel_fini(); + fail_secmem: + tw_fini(); fail_timerwheel: - fset_destroy(proc.frct_set); - fail_frct_set: ssm_flow_set_close(proc.fqset); fail_fqset: pthread_rwlock_destroy(&proc.lock); @@ -789,19 +924,20 @@ static void fini(void) if (proc.fds == NULL) return; - pthread_cancel(proc.rx); - pthread_join(proc.rx, NULL); + /* Wake all in-flight readers/writers BEFORE wrlock acquire. */ + for (i = 0; i < PROC_MAX_FLOWS; ++i) + if (proc.flows[i].info.id != -1) + flow_quiesce(i); pthread_rwlock_wrlock(&proc.lock); - for (i = 0; i < PROG_MAX_FLOWS; ++i) { + for (i = 0; i < PROC_MAX_FLOWS; ++i) { struct flow * flow = &proc.flows[i]; if (flow->info.id != -1) { ssize_t idx; - ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWDOWN); while ((idx = ssm_rbuff_read(flow->rx_rb)) >= 0) ssm_pool_remove(proc.pool, idx); - __flow_fini(i); + do_flow_fini(i); } } @@ -813,9 +949,9 @@ static void fini(void) #ifdef PROC_FLOW_STATS rib_fini(); #endif - timerwheel_fini(); + crypt_secure_malloc_fini(); - fset_destroy(proc.frct_set); + tw_fini(); ssm_flow_set_close(proc.fqset); @@ -860,6 +996,10 @@ int flow_accept(qosspec_t * qs, if (qs != NULL) qs->ber = 1; #endif + /* STREAM cannot tolerate loss: drops create silent gaps. */ + if (qs != NULL && qs->service == SVC_STREAM && qs->loss != 0) + return -EINVAL; + memset(&flow, 0, sizeof(flow)); flow.n_pid = getpid(); @@ -878,7 +1018,8 @@ int flow_accept(qosspec_t * qs, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + /* No RTT in accept; rtt_hint=0 bootstraps from first ACK. */ + fd = flow_init(&flow, &crypt, 0); crypt_secure_clear(key, SYMMKEYSZ); @@ -899,11 +1040,16 @@ int flow_alloc(const char * dst, uint8_t key[SYMMKEYSZ]; int fd; int err; + struct timespec t0; + struct timespec t1; #ifdef QOS_DISABLE_CRC if (qs != NULL) qs->ber = 1; #endif + /* STREAM cannot tolerate loss: drops create silent gaps. */ + if (qs != NULL && qs->service == SVC_STREAM && qs->loss != 0) + return -EINVAL; memset(&flow, 0, sizeof(flow)); @@ -913,11 +1059,13 @@ int flow_alloc(const char * dst, if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) return -ENOMEM; + clock_gettime(PTHREAD_COND_CLOCK, &t0); + err = send_recv_msg(&msg); - if (err < 0) { - printf("send_recv_msg error %d\n", err); + if (err < 0) return err; - } + + clock_gettime(PTHREAD_COND_CLOCK, &t1); crypt.key = key; @@ -925,7 +1073,7 @@ int flow_alloc(const char * dst, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + fd = flow_init(&flow, &crypt, ts_diff_ns(&t1, &t0)); crypt_secure_clear(key, SYMMKEYSZ); @@ -964,7 +1112,7 @@ int flow_join(const char * dst, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + fd = flow_init(&flow, &crypt, 0); crypt_secure_clear(key, SYMMKEYSZ); @@ -983,10 +1131,10 @@ int flow_dealloc(int fd) struct flow * flow; int err; - if (fd < 0 || fd >= SYS_MAX_FLOWS ) + if (fd < 0 || fd >= PROC_MAX_FLOWS ) return -EINVAL; - memset(&info, 0, sizeof(flow)); + memset(&info, 0, sizeof(info)); flow = &proc.flows[fd]; @@ -1008,9 +1156,8 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&proc.lock); - timeo.tv_sec = frcti_dealloc(flow->frcti); - while (timeo.tv_sec < 0) { /* keep the flow active for rtx */ - ssize_t ret; + while (FRCTI_LINGERING(flow->frcti)) { + ssize_t ret; pthread_rwlock_unlock(&proc.lock); @@ -1018,12 +1165,12 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&proc.lock); - timeo.tv_sec = frcti_dealloc(flow->frcti); - - if (ret == -EFLOWDOWN && timeo.tv_sec < 0) - timeo.tv_sec = -timeo.tv_sec; + if (ret == -EFLOWDOWN) + break; } + timeo.tv_sec = FRCTI_DEALLOC(flow->frcti); + pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); ssm_rbuff_fini(flow->tx_rb); @@ -1033,15 +1180,18 @@ int flow_dealloc(int fd) info.id = flow->info.id; info.n_pid = getpid(); - if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) - return -ENOMEM; + if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) { + err = -ENOMEM; + goto out; + } err = send_recv_msg(&msg); if (err < 0) - return err; + goto out; err = irm__irm_result_des(&msg); + out: flow_fini(fd); return err; @@ -1055,12 +1205,12 @@ int ipcp_flow_dealloc(int fd) struct flow * flow; int err; - if (fd < 0 || fd >= SYS_MAX_FLOWS ) + if (fd < 0 || fd >= PROC_MAX_FLOWS ) return -EINVAL; flow = &proc.flows[fd]; - memset(&info, 0, sizeof(flow)); + memset(&info, 0, sizeof(info)); pthread_rwlock_rdlock(&proc.lock); @@ -1074,15 +1224,18 @@ int ipcp_flow_dealloc(int fd) pthread_rwlock_unlock(&proc.lock); - if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) - return -ENOMEM; + if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) { + err = -ENOMEM; + goto out; + } err = send_recv_msg(&msg); if (err < 0) - return err; + goto out; err = irm__irm_result_des(&msg); + out: flow_fini(fd); return err; @@ -1102,8 +1255,18 @@ int fccntl(int fd, uint32_t tx_acl; size_t * qlen; struct flow * flow; - - if (fd < 0 || fd >= SYS_MAX_FLOWS) + uint16_t old_acc; + uint16_t new_acc; + size_t max; + size_t * maxp; + size_t rsz; + size_t * rszp; + time_t rto; + time_t * rtop; + int rc; + bool emit_eos = false; + + if (fd < 0 || fd >= PROC_MAX_FLOWS) return -EBADF; flow = &proc.flows[fd]; @@ -1167,14 +1330,27 @@ int fccntl(int fd, qlen = va_arg(l, size_t *); *qlen = ssm_rbuff_queued(flow->tx_rb); break; + case FLOWGMTU: + maxp = va_arg(l, size_t *); + if (maxp == NULL) + goto einval; + *maxp = flow_user_mtu(flow, flow->info.mtu); + break; case FLOWSFLAGS: + old_acc = flow->oflags & FLOWFACCMODE; flow->oflags = va_arg(l, uint32_t); + new_acc = flow->oflags & FLOWFACCMODE; + + /* Defer EOS emit until after proc.lock is dropped: */ + /* frcti_fin_snd may block on shm-pool/tx-rb. */ + if (new_acc == FLOWFRDONLY + && old_acc != FLOWFRDONLY + && flow->frcti != NULL) + emit_eos = true; + rx_acl = ssm_rbuff_get_acl(flow->rx_rb); - tx_acl = ssm_rbuff_get_acl(flow->rx_rb); - /* - * Making our own flow write only means making the - * the other side of the flow read only. - */ + tx_acl = ssm_rbuff_get_acl(flow->tx_rb); + /* Our flow write-only -> peer's read-only. */ if (flow->oflags & FLOWFWRONLY) rx_acl |= ACL_RDONLY; if (flow->oflags & FLOWFRDWR) @@ -1218,6 +1394,59 @@ int fccntl(int fd, goto eperm; *cflags = frcti_getflags(flow->frcti); break; + case FRCTSMAXSDU: + max = va_arg(l, size_t); + if (flow->frcti == NULL) + goto eperm; + if (frcti_set_max_rcv_sdu(flow->frcti, max) < 0) + goto einval; + break; + case FRCTGMAXSDU: + maxp = va_arg(l, size_t *); + if (maxp == NULL) + goto einval; + if (flow->frcti == NULL) + goto eperm; + *maxp = frcti_get_max_rcv_sdu(flow->frcti); + break; + case FRCTSRRINGSZ: + rsz = va_arg(l, size_t); + if (flow->frcti == NULL) + goto eperm; + rc = frcti_set_rcv_ring_sz(flow->frcti, rsz); + if (rc < 0) { + pthread_rwlock_unlock(&proc.lock); + va_end(l); + return rc; + } + break; + case FRCTGRRINGSZ: + rszp = va_arg(l, size_t *); + if (rszp == NULL) + goto einval; + if (flow->frcti == NULL) + goto eperm; + *rszp = frcti_get_rcv_ring_sz(flow->frcti); + break; + case FRCTSRTOMIN: + if (flow->frcti == NULL) + goto eperm; + rto = va_arg(l, time_t); + rc = frcti_set_rto_min(flow->frcti, rto); + if (rc < 0) { + pthread_rwlock_unlock(&proc.lock); + va_end(l); + return rc; + } + break; + case FRCTGRTOMIN: + if (flow->frcti == NULL) + goto eperm; + rtop = va_arg(l, time_t *); + if (rtop == NULL) + goto einval; + *rtop = frcti_get_rto_min(flow->frcti); + break; default: pthread_rwlock_unlock(&proc.lock); va_end(l); @@ -1227,6 +1456,9 @@ int fccntl(int fd, pthread_rwlock_unlock(&proc.lock); + if (emit_eos) + frcti_fin_snd(flow->frcti); + va_end(l); return 0; @@ -1241,86 +1473,195 @@ int fccntl(int fd, return -EPERM; } -static int chk_crc(struct ssm_pk_buff * spb) -{ - uint32_t crc; - uint8_t * head = ssm_pk_buff_head(spb); - uint8_t * tail = ssm_pk_buff_tail_release(spb, CRCLEN); - - mem_hash(HASH_CRC32, &crc, head, tail - head); - - return !(crc == *((uint32_t *) tail)); -} - -static int add_crc(struct ssm_pk_buff * spb) -{ - uint8_t * head; - uint8_t * tail; - - tail = ssm_pk_buff_tail_alloc(spb, CRCLEN); - if (tail == NULL) - return -ENOMEM; - - head = ssm_pk_buff_head(spb); - mem_hash(HASH_CRC32, tail, head, tail - head); - - return 0; -} - static int flow_tx_spb(struct flow * flow, struct ssm_pk_buff * spb, + uint16_t flags, bool block, struct timespec * abstime) { struct timespec now; ssize_t idx; + size_t pci_total; int ret; clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_wrlock(&proc.lock); - flow->snd_act = now; - pthread_rwlock_unlock(&proc.lock); - - idx = ssm_pk_buff_get_idx(spb); - - pthread_rwlock_rdlock(&proc.lock); + idx = ssm_pk_buff_get_off(spb); if (ssm_pk_buff_len(spb) > 0) { - if (frcti_snd(flow->frcti, spb) < 0) + if (FRCTI_SND(flow->frcti, spb, flags) < 0) goto enomem; - if (spb_encrypt(flow, spb) < 0) - goto enomem; + if (flow->info.qs.ber == 0) { + pci_total = flow->frcti != NULL + ? frcti_data_hdr_len(flow->frcti) : 0; + if (crc_add(spb, pci_total) != 0) + goto enomem; + } - if (flow->info.qs.ber == 0 && add_crc(spb) != 0) + if (spb_encrypt(flow, spb) < 0) goto enomem; } - pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); - if (!block) ret = ssm_rbuff_write(flow->tx_rb, idx); else ret = ssm_rbuff_write_b(flow->tx_rb, idx, abstime); - if (ret < 0) + if (ret < 0) { ssm_pool_remove(proc.pool, idx); - else - ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); - - pthread_cleanup_pop(true); + return ret; + } + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); return 0; -enomem: - pthread_rwlock_unlock(&proc.lock); + enomem: ssm_pool_remove(proc.pool, idx); return -ENOMEM; } +/* Per-fragment role for fragment i out of n; n == 1 yields SOLE. */ +static __inline__ uint16_t flow_frag_role(size_t i, size_t n) +{ + if (n == 1) + return FRCT_FR_SOLE; + if (i == 0) + return FRCT_FR_FIRST; + if (i + 1 == n) + return FRCT_FR_LAST; + + return FRCT_FR_MID; +} + +/* + * Stream-mode write: split buf into chunks of + * (frag_mtu - PCI - PCI_STREAM) bytes; each chunk goes through the + * normal tx path. frcti_snd injects the [start,end) extension and + * advances snd_byte_next under its wrlock. No FFGM/LFGM role bits. + */ +static ssize_t flow_write_stream(struct flow * flow, + const void * buf, + size_t count, + int oflags, + struct timespec * dl) +{ + const uint8_t * src = buf; + size_t payload; + size_t off = 0; + bool block = !(oflags & FLOWFWNOBLOCK); + + if (!FRCTI_IS_FRTX(flow->frcti)) + return -EMSGSIZE; + + payload = FRCTI_PAYLOAD_CAP(flow->frcti); + + while (off < count) { + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t idx; + size_t clen; + int ret; + + ret = flow_wait_window(flow, 1, block, dl); + if (ret < 0) + return off > 0 ? (ssize_t) off : (ssize_t) ret; + + clen = MIN(count - off, payload); + + if (block) + idx = ssm_pool_alloc_b(proc.pool, clen, &ptr, + &spb, dl); + else + idx = ssm_pool_alloc(proc.pool, clen, &ptr, &spb); + if (idx < 0) + return off > 0 ? (ssize_t) off : idx; + + memcpy(ptr, src + off, clen); + + ret = flow_tx_spb(flow, spb, 0, block, dl); + if (ret < 0) + return off > 0 ? (ssize_t) off : (ssize_t) ret; + + off += clen; + } + + return (ssize_t) count; +} + +/* Per-fragment flow_tx_spb loop. Raw flows refuse; FRCT splits the SDU. */ +static ssize_t flow_write_frag(struct flow * flow, + const void * buf, + size_t count, + int oflags, + struct timespec * dl) +{ + const uint8_t * src = buf; + size_t frag_payload; + size_t n; + size_t off = 0; + size_t i; + int ret; + bool block = !(oflags & FLOWFWNOBLOCK); + + /* Raw flows carry no PCI; cannot fragment. */ + if (flow->frcti == NULL) + return -EMSGSIZE; + + frag_payload = FRCTI_PAYLOAD_CAP(flow->frcti); + + /* Guard the ceil-divide against size_t overflow. */ + if (count > SIZE_MAX - frag_payload + 1) + return -EMSGSIZE; + n = (count + frag_payload - 1) / frag_payload; + + /* SDU larger than the FC window can ever offer would deadlock. */ + if (n > RQ_SIZE) + return -EMSGSIZE; + + /* SDU-atomic FC: wait for n seqnos to avoid overshoot mid-SDU. */ + ret = flow_wait_window(flow, n, block, dl); + if (ret < 0) + return (ssize_t) ret; + + STAT_BUMP(flow->frcti, sdu_snd_frag); + + for (i = 0; i < n; ++i) { + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t idx; + size_t clen; + + clen = (i + 1 == n) ? (count - off) : frag_payload; + + if (block) + idx = ssm_pool_alloc_b(proc.pool, clen, &ptr, + &spb, dl); + else + idx = ssm_pool_alloc(proc.pool, clen, &ptr, &spb); + if (idx < 0) { + if (off > 0) + STAT_BUMP(flow->frcti, sdu_snd_alloc); + return off > 0 ? (ssize_t) off : idx; + } + + memcpy(ptr, src + off, clen); + + ret = flow_tx_spb(flow, spb, flow_frag_role(i, n), + block, dl); + if (ret < 0) { + if (off > 0) + STAT_BUMP(flow->frcti, sdu_snd_tx); + return off > 0 ? (ssize_t) off : (ssize_t) ret; + } + + off += clen; + } + + return (ssize_t) count; +} + ssize_t flow_write(int fd, const void * buf, size_t count) @@ -1330,76 +1671,75 @@ ssize_t flow_write(int fd, int ret; int flags; struct timespec abs; - struct timespec * abstime = NULL; + struct timespec now; + struct timespec * dl = NULL; struct ssm_pk_buff * spb; uint8_t * ptr; if (buf == NULL && count != 0) return -EINVAL; - if (fd < 0 || fd >= PROG_MAX_FLOWS) + if (fd < 0 || fd >= PROC_MAX_FLOWS) return -EBADF; flow = &proc.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - pthread_rwlock_wrlock(&proc.lock); + pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); return -ENOTALLOC; } + flags = flow->oflags; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + if (flow->snd_timesout) { - ts_add(&abs, &flow->snd_timeo, &abs); - abstime = &abs; + ts_add(&now, &flow->snd_timeo, &abs); + dl = &abs; } - flags = flow->oflags; - pthread_rwlock_unlock(&proc.lock); if ((flags & FLOWFACCMODE) == FLOWFRDONLY) return -EPERM; - if (flags & FLOWFWNOBLOCK) { - if (!frcti_is_window_open(flow->frcti)) - return -EAGAIN; - idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb); - } else { - ret = frcti_window_wait(flow->frcti, abstime); + tw_move_safe(); + + if (flow->frcti != NULL) { + /* Pump rx_rb so a pure-writer processes ACKs. */ + ret = flow_wait_window(flow, 1, !(flags & FLOWFWNOBLOCK), dl); if (ret < 0) return ret; - idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, abstime); + + if (count > 0 && FRCTI_IS_STREAM(flow->frcti)) + return flow_write_stream(flow, buf, count, flags, dl); + + if (FRCTI_NEEDS_FRAG(flow->frcti, count)) + return flow_write_frag(flow, buf, count, flags, dl); + } else if (flow->info.mtu > 0 + && count > flow_user_mtu(flow, flow->info.mtu)) { + /* Raw flows carry no PCI; refuse anything > one n-1 frame. */ + return -EMSGSIZE; } + if (flags & FLOWFWNOBLOCK) + idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb); + else + idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, dl); if (idx < 0) return idx; if (count > 0) memcpy(ptr, buf, count); - ret = flow_tx_spb(flow, spb, !(flags & FLOWFWNOBLOCK), abstime); + ret = flow_tx_spb(flow, spb, FRCT_FR_SOLE, + !(flags & FLOWFWNOBLOCK), dl); return ret < 0 ? (ssize_t) ret : (ssize_t) count; } -static bool invalid_pkt(struct flow * flow, - struct ssm_pk_buff * spb) -{ - if (spb == NULL || ssm_pk_buff_len(spb) == 0) - return true; - - if (flow->info.qs.ber == 0 && chk_crc(spb) != 0) - return true; - - if (spb_decrypt(flow, spb) < 0) - return true; - - return false; -} - static ssize_t flow_rx_spb(struct flow * flow, struct ssm_pk_buff ** spb, bool block, @@ -1408,19 +1748,14 @@ static ssize_t flow_rx_spb(struct flow * flow, ssize_t idx; struct timespec now; - idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) : - ssm_rbuff_read(flow->rx_rb); + idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) + : ssm_rbuff_read(flow->rx_rb); if (idx < 0) return idx; clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_wrlock(&proc.lock); - flow->rcv_act = now; - pthread_rwlock_unlock(&proc.lock); - *spb = ssm_pool_get(proc.pool, idx); if (invalid_pkt(flow, *spb)) { @@ -1431,28 +1766,124 @@ static ssize_t flow_rx_spb(struct flow * flow, return idx; } +static ssize_t raw_flow_read_pkt(struct flow * flow, + bool block, + struct timespec * dl) +{ + struct ssm_pk_buff * spb; + struct timespec wait_abs; + ssize_t idx; + + while (true) { + if (!block) { + idx = ssm_rbuff_read(flow->rx_rb); + if (idx < 0) + return -EAGAIN; + } else { + compute_wait_deadline(dl, &wait_abs); + idx = ssm_rbuff_read_b(flow->rx_rb, &wait_abs); + if (idx == -ETIMEDOUT) { + if (deadline_passed(dl)) + return -ETIMEDOUT; + continue; + } + if (idx < 0) + return idx; + } + + spb = ssm_pool_get(proc.pool, idx); + if (!invalid_pkt(flow, spb)) + return idx; + + ssm_pool_remove(proc.pool, idx); + if (!block) + return -EAGAIN; + } +} + +static ssize_t deliver_pkt(struct flow * flow, + struct ssm_pk_buff * spb, + ssize_t idx, + void * buf, + size_t count, + bool partrd) +{ + uint8_t * packet = ssm_pk_buff_head(spb); + ssize_t n = ssm_pk_buff_len(spb); + + assert(n >= 0); + + if (n <= (ssize_t) count) { + memcpy(buf, packet, n); + ipcp_spb_release(spb); + if (partrd && n == (ssize_t) count) + flow->part_idx = DONE_PART; + else + flow->part_idx = NO_PART; + + return n; + } + + if (partrd) { + memcpy(buf, packet, count); + ssm_pk_buff_pop(spb, n); + flow->part_idx = idx; + return count; + } + + ipcp_spb_release(spb); + return -EMSGSIZE; +} + +/* Drive frcti_consume until it delivers or errors. */ +static ssize_t flow_read_frcti(struct flow * flow, + void * buf, + size_t count, + bool block, + struct timespec * dl) +{ + struct timespec now; + ssize_t bytes; + int rc; + + while (true) { + flow_drain_rx_nb(flow); + bytes = FRCTI_CONSUME(flow->frcti, buf, count); + if (bytes >= 0) + break; + if (bytes != -EAGAIN) + return bytes; + if (!block) + return -EAGAIN; + rc = flow_rx_one(flow, dl); + if (rc < 0) + return rc; + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow->rcv_act = now; + + return bytes; +} + ssize_t flow_read(int fd, void * buf, size_t count) { - ssize_t idx; - ssize_t n; - uint8_t * packet; + struct flow * flow; struct ssm_pk_buff * spb; struct timespec abs; struct timespec now; - struct timespec * abstime = NULL; - struct flow * flow; + struct timespec * dl = NULL; + ssize_t idx; bool block; bool partrd; - if (fd < 0 || fd >= PROG_MAX_FLOWS) + if (fd < 0 || fd >= PROC_MAX_FLOWS) return -EBADF; flow = &proc.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { @@ -1461,8 +1892,8 @@ ssize_t flow_read(int fd, } if (flow->part_idx == DONE_PART) { - pthread_rwlock_unlock(&proc.lock); flow->part_idx = NO_PART; + pthread_rwlock_unlock(&proc.lock); return 0; } @@ -1470,75 +1901,33 @@ ssize_t flow_read(int fd, partrd = !(flow->oflags & FLOWFRNOPART); if (flow->rcv_timesout) { + clock_gettime(PTHREAD_COND_CLOCK, &now); ts_add(&now, &flow->rcv_timeo, &abs); - abstime = &abs; + dl = &abs; } - idx = flow->part_idx; - if (idx < 0) { - while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { - pthread_rwlock_unlock(&proc.lock); - - idx = flow_rx_spb(flow, &spb, block, abstime); - if (idx < 0) { - if (block && idx != -EAGAIN) - return idx; - if (!block) - return idx; + pthread_rwlock_unlock(&proc.lock); - pthread_rwlock_rdlock(&proc.lock); - continue; - } + tw_move_safe(); - pthread_rwlock_rdlock(&proc.lock); + idx = flow->part_idx; + if (idx < 0 && flow->frcti != NULL) + return flow_read_frcti(flow, buf, count, block, dl); - frcti_rcv(flow->frcti, spb); - } + if (idx < 0) { + idx = raw_flow_read_pkt(flow, block, dl); + if (idx < 0) + return idx; } spb = ssm_pool_get(proc.pool, idx); - pthread_rwlock_unlock(&proc.lock); - - packet = ssm_pk_buff_head(spb); - - n = ssm_pk_buff_len(spb); - - assert(n >= 0); - - if (n <= (ssize_t) count) { - memcpy(buf, packet, n); - ipcp_spb_release(spb); - - pthread_rwlock_wrlock(&proc.lock); - - flow->part_idx = (partrd && n == (ssize_t) count) ? - DONE_PART : NO_PART; - - flow->rcv_act = now; - - pthread_rwlock_unlock(&proc.lock); - return n; - } else { - if (partrd) { - memcpy(buf, packet, count); - ssm_pk_buff_head_release(spb, n); - pthread_rwlock_wrlock(&proc.lock); - flow->part_idx = idx; - - flow->rcv_act = now; + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow->rcv_act = now; - pthread_rwlock_unlock(&proc.lock); - return count; - } else { - ipcp_spb_release(spb); - return -EMSGSIZE; - } - } + return deliver_pkt(flow, spb, idx, buf, count, partrd); } -/* fqueue functions. */ - struct flow_set * fset_create(void) { struct flow_set * set; @@ -1614,7 +2003,7 @@ int fset_add(struct flow_set * set, struct flow * flow; int ret; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return -EINVAL; flow = &proc.flows[fd]; @@ -1650,7 +2039,7 @@ void fset_del(struct flow_set * set, { struct flow * flow; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return; flow = &proc.flows[fd]; @@ -1661,7 +2050,7 @@ void fset_del(struct flow_set * set, ssm_flow_set_del(proc.fqset, set->idx, flow->info.id); if (flow->frcti != NULL) - ssm_flow_set_add(proc.fqset, 0, proc.flows[fd].info.id); + ssm_flow_set_add(proc.fqset, 0, flow->info.id); pthread_rwlock_unlock(&proc.lock); } @@ -1672,7 +2061,7 @@ bool fset_has(const struct flow_set * set, struct flow * flow; bool ret; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return false; flow = &proc.flows[fd]; @@ -1691,61 +2080,59 @@ bool fset_has(const struct flow_set * set, return ret; } -/* Filter fqueue events for non-data packets */ static int fqueue_filter(struct fqueue * fq) { struct ssm_pk_buff * spb; int fd; ssize_t idx; struct frcti * frcti; + int ret = 0; - while (fq->next < fq->fqsize) { - if (fq->fqueue[fq->next].event != FLOW_PKT) - return 1; + /* proc.lock rdlock gates frcti_destroy via flow_fini wrlock. */ + pthread_rwlock_rdlock(&proc.lock); - pthread_rwlock_rdlock(&proc.lock); + while (fq->next < fq->fqsize) { + if (fq->fqueue[fq->next].event != FLOW_PKT) { + ret = 1; + goto out; + } fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd; if (fd < 0) { ++fq->next; - pthread_rwlock_unlock(&proc.lock); continue; } frcti = proc.flows[fd].frcti; if (frcti == NULL) { - pthread_rwlock_unlock(&proc.lock); - return 1; + ret = 1; + goto out; } - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&proc.lock); - return 1; + if (FRCTI_PDU_READY(frcti)) { + ret = 1; + goto out; } - pthread_rwlock_unlock(&proc.lock); - idx = flow_rx_spb(&proc.flows[fd], &spb, false, NULL); if (idx < 0) - return 0; - - pthread_rwlock_rdlock(&proc.lock); + goto out; spb = ssm_pool_get(proc.pool, idx); - __frcti_rcv(frcti, spb); + FRCTI_RCV(frcti, spb); - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&proc.lock); - return 1; + if (FRCTI_PDU_READY(frcti)) { + ret = 1; + goto out; } - pthread_rwlock_unlock(&proc.lock); - ++fq->next; } - return 0; + out: + pthread_rwlock_unlock(&proc.lock); + return ret; } int fqueue_next(struct fqueue * fq) @@ -1792,7 +2179,8 @@ ssize_t fevent(struct flow_set * set, { ssize_t ret = 0; struct timespec abs; - struct timespec * t = NULL; + struct timespec * dl = NULL; + struct timespec wait_abs; if (set == NULL || fq == NULL) return -EINVAL; @@ -1800,17 +2188,26 @@ ssize_t fevent(struct flow_set * set, if (fq->fqsize > 0 && fq->next != fq->fqsize) return 1; - clock_gettime(PTHREAD_COND_CLOCK, &abs); - if (timeo != NULL) { - ts_add(&abs, timeo, &abs); - t = &abs; + struct timespec now; + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, timeo, &abs); + dl = &abs; } while (ret == 0) { - ret = ssm_flow_set_wait(proc.fqset, set->idx, fq->fqueue, t); - if (ret == -ETIMEDOUT) - return -ETIMEDOUT; + tw_move_safe(); + + compute_wait_deadline(dl, &wait_abs); + + ret = ssm_flow_set_wait(proc.fqset, set->idx, + fq->fqueue, &wait_abs); + if (ret == -ETIMEDOUT) { + if (deadline_passed(dl)) + return -ETIMEDOUT; + ret = 0; + continue; + } fq->fqsize = ret; fq->next = 0; @@ -1823,8 +2220,6 @@ ssize_t fevent(struct flow_set * set, return 1; } -/* ipcp-dev functions. */ - int np1_flow_alloc(pid_t n_pid, int flow_id) { @@ -1837,9 +2232,10 @@ int np1_flow_alloc(pid_t n_pid, flow.n_pid = getpid(); flow.qs = qos_np1; flow.mpl = 0; - flow.n_1_pid = n_pid; /* This "flow" is upside-down! */ + /* np1 flow: n_1_pid is the upper. */ + flow.n_1_pid = n_pid; - return flow_init(&flow, &crypt); + return flow_init(&flow, &crypt, 0); } int np1_flow_dealloc(int flow_id, @@ -1847,12 +2243,7 @@ int np1_flow_dealloc(int flow_id, { int fd; - /* - * TODO: Don't pass timeo to the IPCP but wait in IRMd. - * This will need async ops, waiting until we bootstrap - * the IRMd over ouroboros. - */ - + /* TODO: wait in IRMd, not here; needs async ops. */ sleep(timeo); pthread_rwlock_rdlock(&proc.lock); @@ -1900,6 +2291,7 @@ int ipcp_create_r(const struct ipcp_info * info) int ipcp_flow_req_arr(const buffer_t * dst, qosspec_t qs, time_t mpl, + uint32_t mtu, const buffer_t * data) { struct flow_info flow; @@ -1916,6 +2308,7 @@ int ipcp_flow_req_arr(const buffer_t * dst, flow.n_1_pid = getpid(); flow.qs = qs; flow.mpl = mpl; + flow.mtu = mtu; if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0) return -ENOMEM; @@ -1930,22 +2323,25 @@ int ipcp_flow_req_arr(const buffer_t * dst, if (err < 0) return err; - assert(crypt.nid == NID_undef); /* np1 flows are not encrypted */ + /* np1 flows are not encrypted. */ + assert(crypt.nid == NID_undef); - /* inverted for np1_flow */ + /* Inverted for np1_flow. */ flow.n_1_pid = flow.n_pid; flow.n_pid = getpid(); flow.mpl = 0; + flow.mtu = 0; flow.qs = qos_np1; crypt.nid = NID_undef; - return flow_init(&flow, &crypt); + return flow_init(&flow, &crypt, 0); } int ipcp_flow_alloc_reply(int fd, int response, time_t mpl, + uint32_t mtu, const buffer_t * data) { struct flow_info flow; @@ -1953,7 +2349,7 @@ int ipcp_flow_alloc_reply(int fd, buffer_t msg = {SOCK_BUF_SIZE, buf}; int err; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); pthread_rwlock_rdlock(&proc.lock); @@ -1962,6 +2358,7 @@ int ipcp_flow_alloc_reply(int fd, pthread_rwlock_unlock(&proc.lock); flow.mpl = mpl; + flow.mtu = mtu; if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0) return -ENOMEM; @@ -1979,7 +2376,7 @@ int ipcp_flow_read(int fd, struct flow * flow; ssize_t idx = -1; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -1988,7 +2385,14 @@ int ipcp_flow_read(int fd, assert(flow->info.id >= 0); - while (frcti_queued_pdu(flow->frcti) < 0) { + /* Raw flow: deliver the popped pkt directly (no FRCT rq). */ + if (flow->frcti == NULL) { + pthread_rwlock_unlock(&proc.lock); + idx = flow_rx_spb(flow, spb, false, NULL); + return idx < 0 ? (int) idx : 0; + } + + while (!FRCTI_PDU_READY(flow->frcti)) { pthread_rwlock_unlock(&proc.lock); idx = flow_rx_spb(flow, spb, false, NULL); @@ -1997,7 +2401,7 @@ int ipcp_flow_read(int fd, pthread_rwlock_rdlock(&proc.lock); - frcti_rcv(flow->frcti, *spb); + FRCTI_RCV(flow->frcti, *spb); } pthread_rwlock_unlock(&proc.lock); @@ -2011,12 +2415,12 @@ int ipcp_flow_write(int fd, struct flow * flow; int ret; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; - pthread_rwlock_wrlock(&proc.lock); + pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); @@ -2030,30 +2434,28 @@ int ipcp_flow_write(int fd, pthread_rwlock_unlock(&proc.lock); - ret = flow_tx_spb(flow, spb, true, NULL); + ret = flow_tx_spb(flow, spb, FRCT_FR_SOLE, true, NULL); return ret; } -static int pool_copy_spb(struct ssm_pool * src_pool, - ssize_t src_idx, - struct ssm_pool * dst_pool, - struct ssm_pk_buff ** dst_spb) +/* Copy src into dst_pool without consuming src. Caller owns both halves. */ +static int pool_dup_spb(struct ssm_pool * src_pool, + size_t src_off, + struct ssm_pool * dst_pool, + struct ssm_pk_buff ** dst_spb) { struct ssm_pk_buff * src; uint8_t * ptr; size_t len; - src = ssm_pool_get(src_pool, src_idx); + src = ssm_pool_get(src_pool, src_off); len = ssm_pk_buff_len(src); - if (ssm_pool_alloc(dst_pool, len, &ptr, dst_spb) < 0) { - ssm_pool_remove(src_pool, src_idx); + if (ssm_pool_alloc(dst_pool, len, &ptr, dst_spb) < 0) return -ENOMEM; - } memcpy(ptr, ssm_pk_buff_head(src), len); - ssm_pool_remove(src_pool, src_idx); return 0; } @@ -2063,9 +2465,9 @@ int np1_flow_read(int fd, struct ssm_pool * pool) { struct flow * flow; - ssize_t idx = -1; + ssize_t off; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -2074,20 +2476,23 @@ int np1_flow_read(int fd, pthread_rwlock_rdlock(&proc.lock); - idx = ssm_rbuff_read(flow->rx_rb); - if (idx < 0) { + off = ssm_rbuff_read(flow->rx_rb); + if (off < 0) { pthread_rwlock_unlock(&proc.lock); - return idx; + return off; } pthread_rwlock_unlock(&proc.lock); if (pool == NULL) { - *spb = ssm_pool_get(proc.pool, idx); + *spb = ssm_pool_get(proc.pool, off); } else { /* Cross-pool copy: PUP -> GSPP */ - if (pool_copy_spb(pool, idx, proc.pool, spb) < 0) + if (pool_dup_spb(pool, off, proc.pool, spb) < 0) { + ssm_pool_remove(pool, off); return -ENOMEM; + } + ssm_pool_remove(pool, off); } return 0; @@ -2100,9 +2505,10 @@ int np1_flow_write(int fd, struct flow * flow; struct ssm_pk_buff * dst; int ret; - ssize_t idx; + size_t off; + size_t dst_off; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -2121,45 +2527,47 @@ int np1_flow_write(int fd, pthread_rwlock_unlock(&proc.lock); - idx = ssm_pk_buff_get_idx(spb); + off = ssm_pk_buff_get_off(spb); if (pool == NULL) { - ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL); + ret = ssm_rbuff_write_b(flow->tx_rb, off, NULL); if (ret < 0) - ssm_pool_remove(proc.pool, idx); - else - ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + return ret; + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); } else { - /* Cross-pool copy: GSPP -> PUP */ - if (pool_copy_spb(proc.pool, idx, pool, &dst) < 0) + /* Cross-pool copy: GSPP -> PUP. Src kept on error. */ + if (pool_dup_spb(proc.pool, off, pool, &dst) < 0) return -ENOMEM; - idx = ssm_pk_buff_get_idx(dst); - ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL); - if (ret < 0) - ssm_pool_remove(pool, idx); - else - ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + dst_off = ssm_pk_buff_get_off(dst); + ret = ssm_rbuff_write_b(flow->tx_rb, dst_off, NULL); + if (ret < 0) { + ssm_pool_remove(pool, dst_off); + return ret; + } + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + ssm_pool_remove(proc.pool, off); } - return ret; + return 0; } int ipcp_spb_reserve(struct ssm_pk_buff ** spb, size_t len) { - return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 ? -1 : 0; + return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 + ? -1 : 0; } void ipcp_spb_release(struct ssm_pk_buff * spb) { - ssm_pool_remove(proc.pool, ssm_pk_buff_get_idx(spb)); + ssm_pool_remove(proc.pool, ssm_pk_buff_get_off(spb)); } int ipcp_flow_fini(int fd) { struct ssm_rbuff * rx_rb; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); pthread_rwlock_rdlock(&proc.lock); @@ -2188,7 +2596,7 @@ int ipcp_flow_fini(int fd) int ipcp_flow_get_qoscube(int fd, qoscube_t * cube) { - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(cube); pthread_rwlock_rdlock(&proc.lock); @@ -2227,7 +2635,7 @@ int local_flow_transfer(int src_fd, struct ssm_pk_buff * dst_spb; struct ssm_pool * sp; struct ssm_pool * dp; - ssize_t idx; + ssize_t off; int ret; assert(src_fd >= 0); @@ -2241,15 +2649,15 @@ int local_flow_transfer(int src_fd, pthread_rwlock_rdlock(&proc.lock); - idx = ssm_rbuff_read(src_flow->rx_rb); - if (idx < 0) { + off = ssm_rbuff_read(src_flow->rx_rb); + if (off < 0) { pthread_rwlock_unlock(&proc.lock); - return idx; + return off; } if (dst_flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); - ssm_pool_remove(sp, idx); + ssm_pool_remove(sp, off); return -ENOTALLOC; } @@ -2257,21 +2665,24 @@ int local_flow_transfer(int src_fd, if (sp == dp) { /* Same pool: zero-copy */ - ret = ssm_rbuff_write_b(dst_flow->tx_rb, idx, NULL); + ret = ssm_rbuff_write_b(dst_flow->tx_rb, off, NULL); if (ret < 0) - ssm_pool_remove(sp, idx); + ssm_pool_remove(sp, off); else ssm_flow_set_notify(dst_flow->set, dst_flow->info.id, FLOW_PKT); } else { /* Different pools: single copy */ - if (pool_copy_spb(sp, idx, dp, &dst_spb) < 0) + if (pool_dup_spb(sp, off, dp, &dst_spb) < 0) { + ssm_pool_remove(sp, off); return -ENOMEM; + } - idx = ssm_pk_buff_get_idx(dst_spb); - ret = ssm_rbuff_write_b(dst_flow->tx_rb, idx, NULL); + ssm_pool_remove(sp, off); + off = ssm_pk_buff_get_off(dst_spb); + ret = ssm_rbuff_write_b(dst_flow->tx_rb, off, NULL); if (ret < 0) - ssm_pool_remove(dp, idx); + ssm_pool_remove(dp, off); else ssm_flow_set_notify(dst_flow->set, dst_flow->info.id, FLOW_PKT); |
