summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c1329
1 files changed, 870 insertions, 459 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 9cfc24ee..ae0401b7 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -29,10 +29,13 @@
#include "config.h"
#include "ssm.h"
+#include <ouroboros/atomics.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/cep.h>
+#include <ouroboros/crc16.h>
#include <ouroboros/crypt.h>
#include <ouroboros/dev.h>
+#include <ouroboros/endian.h>
#include <ouroboros/errno.h>
#include <ouroboros/fccntl.h>
#include <ouroboros/flow.h>
@@ -45,32 +48,33 @@
#include <ouroboros/np1_flow.h>
#include <ouroboros/pthread.h>
#include <ouroboros/random.h>
+#ifdef PROC_FLOW_STATS
+#include <ouroboros/rib.h>
+#endif
#include <ouroboros/serdes-irm.h>
+#include <ouroboros/sockets.h>
#include <ouroboros/ssm_flow_set.h>
#include <ouroboros/ssm_pool.h>
#include <ouroboros/ssm_rbuff.h>
-#include <ouroboros/sockets.h>
+#include <ouroboros/tw.h>
#include <ouroboros/utils.h>
-#ifdef PROC_FLOW_STATS
-#include <ouroboros/rib.h>
-#endif
+#include <assert.h>
#ifdef HAVE_LIBGCRYPT
#include <gcrypt.h>
#endif
-#include <assert.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdio.h>
#include <stdarg.h>
#include <stdbool.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
#include <sys/types.h>
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
-/* Partial read information. */
#define NO_PART -1
#define DONE_PART -2
@@ -78,19 +82,12 @@
#define SECMEMSZ 16384
#define MSGBUFSZ 2048
-/* map flow_ids to flow descriptors; track state of the flow */
struct fmap {
int fd;
- /* TODO: use actual flow state */
enum flow_state state;
};
-#define frcti_to_flow(frcti) \
- ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti)))
-
struct flow {
- struct list_head next;
-
struct flow_info info;
struct ssm_rbuff * rx_rb;
@@ -135,16 +132,10 @@ struct {
struct flow * flows;
struct fmap * id_to_fd;
- struct list_head flow_list;
pthread_mutex_t mtx;
pthread_cond_t cond;
- pthread_t tx;
- pthread_t rx;
- size_t n_frcti;
- fset_t * frct_set;
-
pthread_rwlock_t lock;
} proc;
@@ -243,7 +234,7 @@ static int proc_announce(const struct proc_info * proc)
return irm__irm_result_des(&msg);
}
-/* IRMd will clean up the mess if this fails */
+/* IRMd cleans up on failure. */
static void proc_exit(void)
{
uint8_t buf[SOCK_BUF_SIZE];
@@ -264,7 +255,7 @@ static int spb_encrypt(struct flow * flow,
uint8_t * tail;
if (flow->crypt == NULL)
- return 0; /* No encryption */
+ return 0;
in.data = ssm_pk_buff_head(spb);
in.len = ssm_pk_buff_len(spb);
@@ -272,11 +263,11 @@ static int spb_encrypt(struct flow * flow,
if (crypt_encrypt(flow->crypt, in, &out) < 0)
goto fail_encrypt;
- head = ssm_pk_buff_head_alloc(spb, flow->headsz);
+ head = ssm_pk_buff_push(spb, flow->headsz);
if (head == NULL)
goto fail_alloc;
- tail = ssm_pk_buff_tail_alloc(spb, flow->tailsz);
+ tail = ssm_pk_buff_push_tail(spb, flow->tailsz);
if (tail == NULL)
goto fail_alloc;
@@ -299,7 +290,7 @@ static int spb_decrypt(struct flow * flow,
uint8_t * head;
if (flow->crypt == NULL)
- return 0; /* No decryption */
+ return 0;
in.data = ssm_pk_buff_head(spb);
in.len = ssm_pk_buff_len(spb);
@@ -308,8 +299,8 @@ static int spb_decrypt(struct flow * flow,
return -ENOMEM;
- head = ssm_pk_buff_head_release(spb, flow->headsz) + flow->headsz;
- ssm_pk_buff_tail_release(spb, flow->tailsz);
+ head = ssm_pk_buff_pop(spb, flow->headsz) + flow->headsz;
+ ssm_pk_buff_pop_tail(spb, flow->tailsz);
memcpy(head, out.data, out.len);
@@ -318,130 +309,280 @@ static int spb_decrypt(struct flow * flow,
return 0;
}
-#include "frct.c"
-
-void * flow_tx(void * o)
+/* tw_move under proc.lock rdlock; gates teardown vs in-flight fires. */
+static void tw_move_safe(void)
{
- struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
-
- (void) o;
+ pthread_rwlock_rdlock(&proc.lock);
- while (true) {
- timerwheel_move();
+ pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock);
- nanosleep(&tic, NULL);
- }
+ tw_move();
- return (void *) 0;
+ pthread_cleanup_pop(1);
}
-static void flow_send_keepalive(struct flow * flow,
- struct timespec now)
+static int crc_add(struct ssm_pk_buff * spb,
+ size_t head_skip)
{
- struct ssm_pk_buff * spb;
- ssize_t idx;
- uint8_t * ptr;
-
- idx = ssm_pool_alloc(proc.pool, 0, &ptr, &spb);
- if (idx < 0)
- return;
+ uint8_t * head;
+ uint8_t * tail;
- pthread_rwlock_wrlock(&proc.lock);
+ tail = ssm_pk_buff_push_tail(spb, CRCLEN);
+ if (tail == NULL)
+ return -ENOMEM;
- flow->snd_act = now;
+ head = ssm_pk_buff_head(spb) + head_skip;
- if (ssm_rbuff_write(flow->tx_rb, idx))
- ssm_pool_remove(proc.pool, idx);
- else
- ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+ mem_hash(HASH_CRC32, tail, head, tail - head);
- pthread_rwlock_unlock(&proc.lock);
+ return 0;
}
-/* Needs rdlock on proc. */
-static void _flow_keepalive(struct flow * flow)
+static int crc_check(struct ssm_pk_buff * spb,
+ size_t head_skip)
{
- struct timespec now;
- struct timespec s_act;
- struct timespec r_act;
- int flow_id;
- time_t timeo;
- uint32_t acl;
+ uint32_t crc;
+ uint8_t * head = ssm_pk_buff_head(spb) + head_skip;
+ uint8_t * tail = ssm_pk_buff_pop_tail(spb, CRCLEN);
+
+ mem_hash(HASH_CRC32, &crc, head, tail - head);
- s_act = flow->snd_act;
- r_act = flow->rcv_act;
+ return !(crc == *((uint32_t *) tail));
+}
- flow_id = flow->info.id;
- timeo = flow->info.qs.timeout;
+/* FRCT included here so it can use proc and dev.c statics directly. */
+#include "frct.c"
- acl = ssm_rbuff_get_acl(flow->rx_rb);
- if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
- return;
+/*
+ * SACK / DATA carry trailer CRC32; HCS protects the headers on every
+ * FRCT packet. Decrypt before any check so plaintext is authoritative.
+ */
+static bool invalid_pkt(struct flow * flow,
+ struct ssm_pk_buff * spb)
+{
+ const struct frct_pci * pci;
+ uint16_t flags;
+ size_t pci_total;
- clock_gettime(PTHREAD_COND_CLOCK, &now);
+ if (spb == NULL || ssm_pk_buff_len(spb) == 0)
+ return true;
- if (ts_diff_ns(&now, &r_act) > (int64_t) timeo * MILLION) {
- ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER);
- ssm_flow_set_notify(proc.fqset, flow_id, FLOW_PEER);
- return;
+ if (spb_decrypt(flow, spb) < 0)
+ return true;
+
+ if (flow->frcti == NULL) {
+ if (flow->info.qs.ber == 0 && crc_check(spb, 0) != 0)
+ return true;
+ return false;
}
- if (ts_diff_ns(&now, &s_act) > (int64_t) timeo * (MILLION >> 2)) {
- pthread_rwlock_unlock(&proc.lock);
+ if (ssm_pk_buff_len(spb) < FRCT_PCILEN)
+ return true;
- flow_send_keepalive(flow, now);
+ pci = (const struct frct_pci *) ssm_pk_buff_head(spb);
+ flags = ntoh16(pci->flags);
- pthread_rwlock_rdlock(&proc.lock);
+ /* Untrusted flag read; mismatch on HCS will drop on corrupt. */
+ if (flags & FRCT_DATA)
+ pci_total = frcti_data_hdr_len(flow->frcti);
+ else
+ pci_total = frcti_ctrl_hdr_len(flow->frcti);
+
+ if (ssm_pk_buff_len(spb) < pci_total)
+ return true;
+
+ if (frct_hcs_check(pci, flow->frcti) != 0)
+ return true;
+
+ /* HCS valid: CRC32 on SACK; or on DATA if ber = 0. */
+ if (flags & FRCT_SACK) {
+ if (crc_check(spb, pci_total) != 0)
+ return true;
+
+ } else if ((flags & FRCT_DATA) && flow->info.qs.ber == 0) {
+ if (crc_check(spb, pci_total) != 0)
+ return true;
}
+
+ return false;
}
-static void handle_keepalives(void)
+static bool deadline_passed(const struct timespec * abs)
{
- struct list_head * p;
- struct list_head * h;
+ struct timespec now;
- pthread_rwlock_rdlock(&proc.lock);
+ if (abs == NULL)
+ return false;
- list_for_each_safe(p, h, &proc.flow_list) {
- struct flow * flow;
- flow = list_entry(p, struct flow, next);
- _flow_keepalive(flow);
- }
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- pthread_rwlock_unlock(&proc.lock);
+ return ts_diff_ns(&now, abs) >= 0;
}
-static void __cleanup_fqueue_destroy(void * fq)
+/* Clamp the wait by min(dl, next tw expiry, now + TICTIME). */
+static void compute_wait_deadline(const struct timespec * dl,
+ struct timespec * out)
{
- fqueue_destroy((fqueue_t *) fq);
+ struct timespec now;
+ struct timespec cap;
+ struct timespec expiry;
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, &tic, &cap);
+
+ tw_next_expiry(&expiry);
+
+ *out = (ts_diff_ns(&cap, &expiry) < 0) ? expiry : cap;
+ if (dl != NULL && ts_diff_ns(out, dl) > 0)
+ *out = *dl;
}
-void * flow_rx(void * o)
+/*
+ * proc.lock rdlock held across each iteration so flow_fini's wrlock
+ * waits for us to finish; FLOWDOWN already set means we exit promptly.
+ */
+static void flow_drain_rx_nb(struct flow * flow)
{
- struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
- int ret;
- struct fqueue * fq;
+ ssize_t idx;
+ struct ssm_pk_buff * spb;
+ struct ssm_rbuff * rx_rb;
+ struct frcti * frcti;
+#ifdef PROC_FLOW_STATS
+ struct timespec t_a;
+ struct timespec t_b;
+#endif
+
+ if (flow->frcti != NULL)
+ STAT_BUMP(flow->frcti, drain_calls);
- (void) o;
+ while (true) {
+ pthread_rwlock_rdlock(&proc.lock);
- fq = fqueue_create();
+ rx_rb = flow->rx_rb;
+ if (rx_rb == NULL) {
+ pthread_rwlock_unlock(&proc.lock);
+ return;
+ }
- pthread_cleanup_push(__cleanup_fqueue_destroy, fq);
+ idx = ssm_rbuff_read(rx_rb);
+ if (idx < 0) {
+ pthread_rwlock_unlock(&proc.lock);
+ return;
+ }
- /* fevent will filter all FRCT packets for us */
- while ((ret = fevent(proc.frct_set, fq, &tic)) != 0) {
- if (ret == -ETIMEDOUT) {
- handle_keepalives();
+ spb = ssm_pool_get(proc.pool, idx);
+ if (invalid_pkt(flow, spb)) {
+ ssm_pool_remove(proc.pool, idx);
+ pthread_rwlock_unlock(&proc.lock);
continue;
}
- while (fqueue_next(fq) >= 0)
- ; /* no need to act */
+ frcti = flow->frcti;
+ if (frcti != NULL) {
+#ifdef PROC_FLOW_STATS
+ clock_gettime(CLOCK_MONOTONIC, &t_a);
+ FRCTI_RCV(frcti, spb);
+ clock_gettime(CLOCK_MONOTONIC, &t_b);
+ STAT_ADD(frcti, rcv_proc_ns,
+ (size_t) ts_diff_ns(&t_b, &t_a));
+#else
+ FRCTI_RCV(frcti, spb);
+#endif
+ } else {
+ ssm_pool_remove(proc.pool, idx);
+ }
+
+ pthread_rwlock_unlock(&proc.lock);
+
+ /* Per-packet so the delayed-ACK fires on time in a burst. */
+#ifdef PROC_FLOW_STATS
+ clock_gettime(CLOCK_MONOTONIC, &t_a);
+ tw_move_safe();
+ clock_gettime(CLOCK_MONOTONIC, &t_b);
+ if (frcti != NULL)
+ STAT_ADD(frcti, tw_move_ns,
+ (size_t) ts_diff_ns(&t_b, &t_a));
+#else
+ tw_move_safe();
+#endif
}
+}
- pthread_cleanup_pop(true);
+/*
+ * Wait clamped by caller deadline, next tw expiry, and TICTIME;
+ * a clamp-timeout means tw work is due, not caller-deadline.
+ */
+static int flow_rx_one(struct flow * flow,
+ struct timespec * abs)
+{
+ struct timespec wait_abs;
+ struct ssm_pk_buff * spb;
+ struct ssm_rbuff * rx_rb;
+ ssize_t idx;
+
+ while (true) {
+ compute_wait_deadline(abs, &wait_abs);
+
+ /* rdlock gates flow_fini; FLOWDOWN preempts the block. */
+ pthread_rwlock_rdlock(&proc.lock);
+
+ rx_rb = flow->rx_rb;
+ if (rx_rb == NULL) {
+ pthread_rwlock_unlock(&proc.lock);
+ return -EFLOWDOWN;
+ }
+
+ idx = ssm_rbuff_read_b(rx_rb, &wait_abs);
+ if (idx == -ETIMEDOUT) {
+ pthread_rwlock_unlock(&proc.lock);
+ if (deadline_passed(abs))
+ return -ETIMEDOUT;
+ tw_move_safe();
+ continue;
+ }
+ if (idx < 0) {
+ pthread_rwlock_unlock(&proc.lock);
+ return idx;
+ }
+
+ spb = ssm_pool_get(proc.pool, idx);
+ if (invalid_pkt(flow, spb)) {
+ ssm_pool_remove(proc.pool, idx);
+ pthread_rwlock_unlock(&proc.lock);
+ continue;
+ }
+
+ if (flow->frcti != NULL)
+ FRCTI_RCV(flow->frcti, spb);
+ else
+ ssm_pool_remove(proc.pool, idx);
+
+ pthread_rwlock_unlock(&proc.lock);
- return (void *) 0;
+ tw_move_safe();
+ return 0;
+ }
+}
+
+/* 0 = window open; -EAGAIN = !block and would block; else flow_rx_one rc. */
+static __inline__ int flow_wait_window(struct flow * flow,
+ size_t n,
+ bool block,
+ struct timespec * dl)
+{
+ int rc;
+
+ while (true) {
+ flow_drain_rx_nb(flow);
+ if (FRCTI_IS_WINDOW_OPEN_N(flow->frcti, n))
+ return 0;
+ if (!block)
+ return -EAGAIN;
+ rc = flow_rx_one(flow, dl);
+ if (rc < 0)
+ return rc;
+ }
}
static void flow_clear(int fd)
@@ -451,36 +592,40 @@ static void flow_clear(int fd)
proc.flows[fd].info.id = -1;
}
-static void __flow_fini(int fd)
+/*
+ * Set ACL_FLOWDOWN on rx/tx so any in-flight blocking reads or writes
+ * wake up and drop their proc.lock rdlock. Must run BEFORE flow_fini's
+ * wrlock, else the wrlock blocks on those rdlock holders and the
+ * in-flight calls never see the FLOWDOWN signal.
+ */
+static void flow_quiesce(int fd)
{
- assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ struct ssm_rbuff * rx_rb = proc.flows[fd].rx_rb;
+ struct ssm_rbuff * tx_rb = proc.flows[fd].tx_rb;
- if (proc.flows[fd].frcti != NULL) {
- proc.n_frcti--;
- if (proc.n_frcti == 0) {
- pthread_cancel(proc.tx);
- pthread_join(proc.tx, NULL);
- }
+ if (rx_rb != NULL)
+ ssm_rbuff_set_acl(rx_rb, ACL_FLOWDOWN);
+ if (tx_rb != NULL)
+ ssm_rbuff_set_acl(tx_rb, ACL_FLOWDOWN);
+}
- ssm_flow_set_del(proc.fqset, 0, proc.flows[fd].info.id);
+static void do_flow_fini(int fd)
+{
+ assert(fd >= 0 && fd < PROC_MAX_FLOWS);
+ if (proc.flows[fd].frcti != NULL)
frcti_destroy(proc.flows[fd].frcti);
- }
if (proc.flows[fd].info.id != -1) {
flow_destroy(&proc.id_to_fd[proc.flows[fd].info.id]);
bmp_release(proc.fds, fd);
}
- if (proc.flows[fd].rx_rb != NULL) {
- ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN);
+ if (proc.flows[fd].rx_rb != NULL)
ssm_rbuff_close(proc.flows[fd].rx_rb);
- }
- if (proc.flows[fd].tx_rb != NULL) {
- ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN);
+ if (proc.flows[fd].tx_rb != NULL)
ssm_rbuff_close(proc.flows[fd].tx_rb);
- }
if (proc.flows[fd].set != NULL) {
ssm_flow_set_notify(proc.flows[fd].set,
@@ -491,24 +636,40 @@ static void __flow_fini(int fd)
crypt_destroy_ctx(proc.flows[fd].crypt);
- list_del(&proc.flows[fd].next);
-
flow_clear(fd);
}
static void flow_fini(int fd)
{
+ flow_quiesce(fd);
+
pthread_rwlock_wrlock(&proc.lock);
- __flow_fini(fd);
+ do_flow_fini(fd);
pthread_rwlock_unlock(&proc.lock);
}
#define IS_ENCRYPTED(crypt) ((crypt)->nid != NID_undef)
-#define IS_ORDERED(flow) (flow.qs.in_order != 0)
+#define IS_ORDERED(info) ((info)->qs.service != SVC_RAW)
+#define IS_STREAM(info) ((info)->qs.service == SVC_STREAM)
+
+/* Raw MTU minus the wrapping (IV/Tag + optional CRC) dev.c adds. */
+static __inline__ size_t flow_user_mtu(const struct flow * flow,
+ size_t raw)
+{
+ size_t hdr;
+
+ hdr = flow->headsz + flow->tailsz;
+ if (flow->info.qs.ber == 0 && flow->crypt == NULL)
+ hdr += CRCLEN;
+
+ return raw > hdr ? raw - hdr : 0;
+}
+
static int flow_init(struct flow_info * info,
- struct crypt_sk * sk)
+ struct crypt_sk * sk,
+ time_t rtt_hint)
{
struct timespec now;
struct flow * flow;
@@ -550,7 +711,6 @@ static int flow_init(struct flow_info * info,
flow->tailsz = 0;
if (IS_ENCRYPTED(sk)) {
- /* Set to lower value in tests, should we make configurable? */
sk->rot_bit = KEY_ROTATION_BIT;
flow->crypt = crypt_create_ctx(sk);
if (flow->crypt == NULL)
@@ -561,22 +721,16 @@ static int flow_init(struct flow_info * info,
assert(flow->frcti == NULL);
- if (IS_ORDERED(flow->info)) {
- flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl);
+ if (IS_ORDERED(&flow->info)) {
+ uint32_t frct_mtu = flow_user_mtu(flow, info->mtu);
+
+ flow->frcti = frcti_create(fd, DELT_A, DELT_R,
+ info->mpl, rtt_hint,
+ info->qs, frct_mtu);
if (flow->frcti == NULL)
goto fail_frcti;
-
- if (ssm_flow_set_add(proc.fqset, 0, info->id))
- goto fail_flow_set_add;
-
- ++proc.n_frcti;
- if (proc.n_frcti == 1 &&
- pthread_create(&proc.tx, NULL, flow_tx, NULL) < 0)
- goto fail_tx_thread;
}
- list_add_tail(&flow->next, &proc.flow_list);
-
proc.id_to_fd[info->id].fd = fd;
flow_set_state(&proc.id_to_fd[info->id], FLOW_ALLOCATED);
@@ -585,10 +739,6 @@ static int flow_init(struct flow_info * info,
return fd;
- fail_tx_thread:
- ssm_flow_set_del(proc.fqset, 0, info->id);
- fail_flow_set_add:
- frcti_destroy(flow->frcti);
fail_frcti:
crypt_destroy_ctx(flow->crypt);
fail_crypt:
@@ -655,13 +805,13 @@ static void init(int argc,
gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0);
}
#endif
- proc.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS);
+ proc.fds = bmp_create(PROC_MAX_FLOWS - PROC_RES_FDS, PROC_RES_FDS);
if (proc.fds == NULL) {
fprintf(stderr, "FATAL: Could not create fd bitmap.\n");
goto fail_fds;
}
- proc.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);
+ proc.fqueues = bmp_create(PROC_MAX_FQUEUES, 0);
if (proc.fqueues == NULL) {
fprintf(stderr, "FATAL: Could not create fqueue bitmap.\n");
goto fail_fqueues;
@@ -677,13 +827,13 @@ static void init(int argc,
goto fail_rdrb;
}
- proc.flows = malloc(sizeof(*proc.flows) * PROG_MAX_FLOWS);
+ proc.flows = malloc(sizeof(*proc.flows) * PROC_MAX_FLOWS);
if (proc.flows == NULL) {
fprintf(stderr, "FATAL: Could not malloc flows.\n");
goto fail_flows;
}
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ for (i = 0; i < PROC_MAX_FLOWS; ++i)
flow_clear(i);
proc.id_to_fd = malloc(sizeof(*proc.id_to_fd) * SYS_MAX_FLOWS);
@@ -716,20 +866,14 @@ static void init(int argc,
goto fail_fqset;
}
- proc.frct_set = fset_create();
- if (proc.frct_set == NULL || proc.frct_set->idx != 0) {
- fprintf(stderr, "FATAL: Could not create FRCT set.\n");
- goto fail_frct_set;
- }
-
- if (timerwheel_init() < 0) {
+ if (tw_init() < 0) {
fprintf(stderr, "FATAL: Could not initialize timerwheel.\n");
goto fail_timerwheel;
}
if (crypt_secure_malloc_init(PROC_SECMEM_MAX) < 0) {
fprintf(stderr, "FATAL: Could not init secure malloc.\n");
- goto fail_timerwheel;
+ goto fail_secmem;
}
#if defined PROC_FLOW_STATS
@@ -741,24 +885,15 @@ static void init(int argc,
}
}
#endif
- if (pthread_create(&proc.rx, NULL, flow_rx, NULL) < 0) {
- fprintf(stderr, "FATAL: Could not start monitor thread.\n");
- goto fail_monitor;
- }
-
- list_head_init(&proc.flow_list);
-
return;
- fail_monitor:
#if defined PROC_FLOW_STATS
- rib_fini();
fail_rib_init:
+ crypt_secure_malloc_fini();
#endif
- timerwheel_fini();
+ fail_secmem:
+ tw_fini();
fail_timerwheel:
- fset_destroy(proc.frct_set);
- fail_frct_set:
ssm_flow_set_close(proc.fqset);
fail_fqset:
pthread_rwlock_destroy(&proc.lock);
@@ -789,19 +924,20 @@ static void fini(void)
if (proc.fds == NULL)
return;
- pthread_cancel(proc.rx);
- pthread_join(proc.rx, NULL);
+ /* Wake all in-flight readers/writers BEFORE wrlock acquire. */
+ for (i = 0; i < PROC_MAX_FLOWS; ++i)
+ if (proc.flows[i].info.id != -1)
+ flow_quiesce(i);
pthread_rwlock_wrlock(&proc.lock);
- for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+ for (i = 0; i < PROC_MAX_FLOWS; ++i) {
struct flow * flow = &proc.flows[i];
if (flow->info.id != -1) {
ssize_t idx;
- ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWDOWN);
while ((idx = ssm_rbuff_read(flow->rx_rb)) >= 0)
ssm_pool_remove(proc.pool, idx);
- __flow_fini(i);
+ do_flow_fini(i);
}
}
@@ -813,9 +949,9 @@ static void fini(void)
#ifdef PROC_FLOW_STATS
rib_fini();
#endif
- timerwheel_fini();
+ crypt_secure_malloc_fini();
- fset_destroy(proc.frct_set);
+ tw_fini();
ssm_flow_set_close(proc.fqset);
@@ -860,6 +996,10 @@ int flow_accept(qosspec_t * qs,
if (qs != NULL)
qs->ber = 1;
#endif
+ /* STREAM cannot tolerate loss: drops create silent gaps. */
+ if (qs != NULL && qs->service == SVC_STREAM && qs->loss != 0)
+ return -EINVAL;
+
memset(&flow, 0, sizeof(flow));
flow.n_pid = getpid();
@@ -878,7 +1018,8 @@ int flow_accept(qosspec_t * qs,
if (err < 0)
return err;
- fd = flow_init(&flow, &crypt);
+ /* No RTT in accept; rtt_hint=0 bootstraps from first ACK. */
+ fd = flow_init(&flow, &crypt, 0);
crypt_secure_clear(key, SYMMKEYSZ);
@@ -899,11 +1040,16 @@ int flow_alloc(const char * dst,
uint8_t key[SYMMKEYSZ];
int fd;
int err;
+ struct timespec t0;
+ struct timespec t1;
#ifdef QOS_DISABLE_CRC
if (qs != NULL)
qs->ber = 1;
#endif
+ /* STREAM cannot tolerate loss: drops create silent gaps. */
+ if (qs != NULL && qs->service == SVC_STREAM && qs->loss != 0)
+ return -EINVAL;
memset(&flow, 0, sizeof(flow));
@@ -913,11 +1059,13 @@ int flow_alloc(const char * dst,
if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
return -ENOMEM;
+ clock_gettime(PTHREAD_COND_CLOCK, &t0);
+
err = send_recv_msg(&msg);
- if (err < 0) {
- printf("send_recv_msg error %d\n", err);
+ if (err < 0)
return err;
- }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &t1);
crypt.key = key;
@@ -925,7 +1073,7 @@ int flow_alloc(const char * dst,
if (err < 0)
return err;
- fd = flow_init(&flow, &crypt);
+ fd = flow_init(&flow, &crypt, ts_diff_ns(&t1, &t0));
crypt_secure_clear(key, SYMMKEYSZ);
@@ -964,7 +1112,7 @@ int flow_join(const char * dst,
if (err < 0)
return err;
- fd = flow_init(&flow, &crypt);
+ fd = flow_init(&flow, &crypt, 0);
crypt_secure_clear(key, SYMMKEYSZ);
@@ -983,10 +1131,10 @@ int flow_dealloc(int fd)
struct flow * flow;
int err;
- if (fd < 0 || fd >= SYS_MAX_FLOWS )
+ if (fd < 0 || fd >= PROC_MAX_FLOWS )
return -EINVAL;
- memset(&info, 0, sizeof(flow));
+ memset(&info, 0, sizeof(info));
flow = &proc.flows[fd];
@@ -1008,9 +1156,8 @@ int flow_dealloc(int fd)
pthread_rwlock_rdlock(&proc.lock);
- timeo.tv_sec = frcti_dealloc(flow->frcti);
- while (timeo.tv_sec < 0) { /* keep the flow active for rtx */
- ssize_t ret;
+ while (FRCTI_LINGERING(flow->frcti)) {
+ ssize_t ret;
pthread_rwlock_unlock(&proc.lock);
@@ -1018,12 +1165,12 @@ int flow_dealloc(int fd)
pthread_rwlock_rdlock(&proc.lock);
- timeo.tv_sec = frcti_dealloc(flow->frcti);
-
- if (ret == -EFLOWDOWN && timeo.tv_sec < 0)
- timeo.tv_sec = -timeo.tv_sec;
+ if (ret == -EFLOWDOWN)
+ break;
}
+ timeo.tv_sec = FRCTI_DEALLOC(flow->frcti);
+
pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock);
ssm_rbuff_fini(flow->tx_rb);
@@ -1033,15 +1180,18 @@ int flow_dealloc(int fd)
info.id = flow->info.id;
info.n_pid = getpid();
- if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0)
- return -ENOMEM;
+ if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) {
+ err = -ENOMEM;
+ goto out;
+ }
err = send_recv_msg(&msg);
if (err < 0)
- return err;
+ goto out;
err = irm__irm_result_des(&msg);
+ out:
flow_fini(fd);
return err;
@@ -1055,12 +1205,12 @@ int ipcp_flow_dealloc(int fd)
struct flow * flow;
int err;
- if (fd < 0 || fd >= SYS_MAX_FLOWS )
+ if (fd < 0 || fd >= PROC_MAX_FLOWS )
return -EINVAL;
flow = &proc.flows[fd];
- memset(&info, 0, sizeof(flow));
+ memset(&info, 0, sizeof(info));
pthread_rwlock_rdlock(&proc.lock);
@@ -1074,15 +1224,18 @@ int ipcp_flow_dealloc(int fd)
pthread_rwlock_unlock(&proc.lock);
- if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0)
- return -ENOMEM;
+ if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) {
+ err = -ENOMEM;
+ goto out;
+ }
err = send_recv_msg(&msg);
if (err < 0)
- return err;
+ goto out;
err = irm__irm_result_des(&msg);
+ out:
flow_fini(fd);
return err;
@@ -1102,8 +1255,18 @@ int fccntl(int fd,
uint32_t tx_acl;
size_t * qlen;
struct flow * flow;
-
- if (fd < 0 || fd >= SYS_MAX_FLOWS)
+ uint16_t old_acc;
+ uint16_t new_acc;
+ size_t max;
+ size_t * maxp;
+ size_t rsz;
+ size_t * rszp;
+ time_t rto;
+ time_t * rtop;
+ int rc;
+ bool emit_eos = false;
+
+ if (fd < 0 || fd >= PROC_MAX_FLOWS)
return -EBADF;
flow = &proc.flows[fd];
@@ -1167,14 +1330,27 @@ int fccntl(int fd,
qlen = va_arg(l, size_t *);
*qlen = ssm_rbuff_queued(flow->tx_rb);
break;
+ case FLOWGMTU:
+ maxp = va_arg(l, size_t *);
+ if (maxp == NULL)
+ goto einval;
+ *maxp = flow_user_mtu(flow, flow->info.mtu);
+ break;
case FLOWSFLAGS:
+ old_acc = flow->oflags & FLOWFACCMODE;
flow->oflags = va_arg(l, uint32_t);
+ new_acc = flow->oflags & FLOWFACCMODE;
+
+ /* Defer EOS emit until after proc.lock is dropped: */
+ /* frcti_fin_snd may block on shm-pool/tx-rb. */
+ if (new_acc == FLOWFRDONLY
+ && old_acc != FLOWFRDONLY
+ && flow->frcti != NULL)
+ emit_eos = true;
+
rx_acl = ssm_rbuff_get_acl(flow->rx_rb);
- tx_acl = ssm_rbuff_get_acl(flow->rx_rb);
- /*
- * Making our own flow write only means making the
- * the other side of the flow read only.
- */
+ tx_acl = ssm_rbuff_get_acl(flow->tx_rb);
+ /* Our flow write-only -> peer's read-only. */
if (flow->oflags & FLOWFWRONLY)
rx_acl |= ACL_RDONLY;
if (flow->oflags & FLOWFRDWR)
@@ -1218,6 +1394,59 @@ int fccntl(int fd,
goto eperm;
*cflags = frcti_getflags(flow->frcti);
break;
+ case FRCTSMAXSDU:
+ max = va_arg(l, size_t);
+ if (flow->frcti == NULL)
+ goto eperm;
+ if (frcti_set_max_rcv_sdu(flow->frcti, max) < 0)
+ goto einval;
+ break;
+ case FRCTGMAXSDU:
+ maxp = va_arg(l, size_t *);
+ if (maxp == NULL)
+ goto einval;
+ if (flow->frcti == NULL)
+ goto eperm;
+ *maxp = frcti_get_max_rcv_sdu(flow->frcti);
+ break;
+ case FRCTSRRINGSZ:
+ rsz = va_arg(l, size_t);
+ if (flow->frcti == NULL)
+ goto eperm;
+ rc = frcti_set_rcv_ring_sz(flow->frcti, rsz);
+ if (rc < 0) {
+ pthread_rwlock_unlock(&proc.lock);
+ va_end(l);
+ return rc;
+ }
+ break;
+ case FRCTGRRINGSZ:
+ rszp = va_arg(l, size_t *);
+ if (rszp == NULL)
+ goto einval;
+ if (flow->frcti == NULL)
+ goto eperm;
+ *rszp = frcti_get_rcv_ring_sz(flow->frcti);
+ break;
+ case FRCTSRTOMIN:
+ if (flow->frcti == NULL)
+ goto eperm;
+ rto = va_arg(l, time_t);
+ rc = frcti_set_rto_min(flow->frcti, rto);
+ if (rc < 0) {
+ pthread_rwlock_unlock(&proc.lock);
+ va_end(l);
+ return rc;
+ }
+ break;
+ case FRCTGRTOMIN:
+ if (flow->frcti == NULL)
+ goto eperm;
+ rtop = va_arg(l, time_t *);
+ if (rtop == NULL)
+ goto einval;
+ *rtop = frcti_get_rto_min(flow->frcti);
+ break;
default:
pthread_rwlock_unlock(&proc.lock);
va_end(l);
@@ -1227,6 +1456,9 @@ int fccntl(int fd,
pthread_rwlock_unlock(&proc.lock);
+ if (emit_eos)
+ frcti_fin_snd(flow->frcti);
+
va_end(l);
return 0;
@@ -1241,86 +1473,195 @@ int fccntl(int fd,
return -EPERM;
}
-static int chk_crc(struct ssm_pk_buff * spb)
-{
- uint32_t crc;
- uint8_t * head = ssm_pk_buff_head(spb);
- uint8_t * tail = ssm_pk_buff_tail_release(spb, CRCLEN);
-
- mem_hash(HASH_CRC32, &crc, head, tail - head);
-
- return !(crc == *((uint32_t *) tail));
-}
-
-static int add_crc(struct ssm_pk_buff * spb)
-{
- uint8_t * head;
- uint8_t * tail;
-
- tail = ssm_pk_buff_tail_alloc(spb, CRCLEN);
- if (tail == NULL)
- return -ENOMEM;
-
- head = ssm_pk_buff_head(spb);
- mem_hash(HASH_CRC32, tail, head, tail - head);
-
- return 0;
-}
-
static int flow_tx_spb(struct flow * flow,
struct ssm_pk_buff * spb,
+ uint16_t flags,
bool block,
struct timespec * abstime)
{
struct timespec now;
ssize_t idx;
+ size_t pci_total;
int ret;
clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- pthread_rwlock_wrlock(&proc.lock);
-
flow->snd_act = now;
- pthread_rwlock_unlock(&proc.lock);
-
- idx = ssm_pk_buff_get_idx(spb);
-
- pthread_rwlock_rdlock(&proc.lock);
+ idx = ssm_pk_buff_get_off(spb);
if (ssm_pk_buff_len(spb) > 0) {
- if (frcti_snd(flow->frcti, spb) < 0)
+ if (FRCTI_SND(flow->frcti, spb, flags) < 0)
goto enomem;
- if (spb_encrypt(flow, spb) < 0)
- goto enomem;
+ if (flow->info.qs.ber == 0) {
+ pci_total = flow->frcti != NULL
+ ? frcti_data_hdr_len(flow->frcti) : 0;
+ if (crc_add(spb, pci_total) != 0)
+ goto enomem;
+ }
- if (flow->info.qs.ber == 0 && add_crc(spb) != 0)
+ if (spb_encrypt(flow, spb) < 0)
goto enomem;
}
- pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock);
-
if (!block)
ret = ssm_rbuff_write(flow->tx_rb, idx);
else
ret = ssm_rbuff_write_b(flow->tx_rb, idx, abstime);
- if (ret < 0)
+ if (ret < 0) {
ssm_pool_remove(proc.pool, idx);
- else
- ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
-
- pthread_cleanup_pop(true);
+ return ret;
+ }
+ ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
return 0;
-enomem:
- pthread_rwlock_unlock(&proc.lock);
+ enomem:
ssm_pool_remove(proc.pool, idx);
return -ENOMEM;
}
+/* Per-fragment role for fragment i out of n; n == 1 yields SOLE. */
+static __inline__ uint16_t flow_frag_role(size_t i, size_t n)
+{
+ if (n == 1)
+ return FRCT_FR_SOLE;
+ if (i == 0)
+ return FRCT_FR_FIRST;
+ if (i + 1 == n)
+ return FRCT_FR_LAST;
+
+ return FRCT_FR_MID;
+}
+
+/*
+ * Stream-mode write: split buf into chunks of
+ * (frag_mtu - PCI - PCI_STREAM) bytes; each chunk goes through the
+ * normal tx path. frcti_snd injects the [start,end) extension and
+ * advances snd_byte_next under its wrlock. No FFGM/LFGM role bits.
+ */
+static ssize_t flow_write_stream(struct flow * flow,
+ const void * buf,
+ size_t count,
+ int oflags,
+ struct timespec * dl)
+{
+ const uint8_t * src = buf;
+ size_t payload;
+ size_t off = 0;
+ bool block = !(oflags & FLOWFWNOBLOCK);
+
+ if (!FRCTI_IS_FRTX(flow->frcti))
+ return -EMSGSIZE;
+
+ payload = FRCTI_PAYLOAD_CAP(flow->frcti);
+
+ while (off < count) {
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t idx;
+ size_t clen;
+ int ret;
+
+ ret = flow_wait_window(flow, 1, block, dl);
+ if (ret < 0)
+ return off > 0 ? (ssize_t) off : (ssize_t) ret;
+
+ clen = MIN(count - off, payload);
+
+ if (block)
+ idx = ssm_pool_alloc_b(proc.pool, clen, &ptr,
+ &spb, dl);
+ else
+ idx = ssm_pool_alloc(proc.pool, clen, &ptr, &spb);
+ if (idx < 0)
+ return off > 0 ? (ssize_t) off : idx;
+
+ memcpy(ptr, src + off, clen);
+
+ ret = flow_tx_spb(flow, spb, 0, block, dl);
+ if (ret < 0)
+ return off > 0 ? (ssize_t) off : (ssize_t) ret;
+
+ off += clen;
+ }
+
+ return (ssize_t) count;
+}
+
+/* Per-fragment flow_tx_spb loop. Raw flows refuse; FRCT splits the SDU. */
+static ssize_t flow_write_frag(struct flow * flow,
+ const void * buf,
+ size_t count,
+ int oflags,
+ struct timespec * dl)
+{
+ const uint8_t * src = buf;
+ size_t frag_payload;
+ size_t n;
+ size_t off = 0;
+ size_t i;
+ int ret;
+ bool block = !(oflags & FLOWFWNOBLOCK);
+
+ /* Raw flows carry no PCI; cannot fragment. */
+ if (flow->frcti == NULL)
+ return -EMSGSIZE;
+
+ frag_payload = FRCTI_PAYLOAD_CAP(flow->frcti);
+
+ /* Guard the ceil-divide against size_t overflow. */
+ if (count > SIZE_MAX - frag_payload + 1)
+ return -EMSGSIZE;
+ n = (count + frag_payload - 1) / frag_payload;
+
+ /* SDU larger than the FC window can ever offer would deadlock. */
+ if (n > RQ_SIZE)
+ return -EMSGSIZE;
+
+ /* SDU-atomic FC: wait for n seqnos to avoid overshoot mid-SDU. */
+ ret = flow_wait_window(flow, n, block, dl);
+ if (ret < 0)
+ return (ssize_t) ret;
+
+ STAT_BUMP(flow->frcti, sdu_snd_frag);
+
+ for (i = 0; i < n; ++i) {
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t idx;
+ size_t clen;
+
+ clen = (i + 1 == n) ? (count - off) : frag_payload;
+
+ if (block)
+ idx = ssm_pool_alloc_b(proc.pool, clen, &ptr,
+ &spb, dl);
+ else
+ idx = ssm_pool_alloc(proc.pool, clen, &ptr, &spb);
+ if (idx < 0) {
+ if (off > 0)
+ STAT_BUMP(flow->frcti, sdu_snd_alloc);
+ return off > 0 ? (ssize_t) off : idx;
+ }
+
+ memcpy(ptr, src + off, clen);
+
+ ret = flow_tx_spb(flow, spb, flow_frag_role(i, n),
+ block, dl);
+ if (ret < 0) {
+ if (off > 0)
+ STAT_BUMP(flow->frcti, sdu_snd_tx);
+ return off > 0 ? (ssize_t) off : (ssize_t) ret;
+ }
+
+ off += clen;
+ }
+
+ return (ssize_t) count;
+}
+
ssize_t flow_write(int fd,
const void * buf,
size_t count)
@@ -1330,76 +1671,75 @@ ssize_t flow_write(int fd,
int ret;
int flags;
struct timespec abs;
- struct timespec * abstime = NULL;
+ struct timespec now;
+ struct timespec * dl = NULL;
struct ssm_pk_buff * spb;
uint8_t * ptr;
if (buf == NULL && count != 0)
return -EINVAL;
- if (fd < 0 || fd >= PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROC_MAX_FLOWS)
return -EBADF;
flow = &proc.flows[fd];
- clock_gettime(PTHREAD_COND_CLOCK, &abs);
-
- pthread_rwlock_wrlock(&proc.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0) {
pthread_rwlock_unlock(&proc.lock);
return -ENOTALLOC;
}
+ flags = flow->oflags;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
if (flow->snd_timesout) {
- ts_add(&abs, &flow->snd_timeo, &abs);
- abstime = &abs;
+ ts_add(&now, &flow->snd_timeo, &abs);
+ dl = &abs;
}
- flags = flow->oflags;
-
pthread_rwlock_unlock(&proc.lock);
if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
return -EPERM;
- if (flags & FLOWFWNOBLOCK) {
- if (!frcti_is_window_open(flow->frcti))
- return -EAGAIN;
- idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb);
- } else {
- ret = frcti_window_wait(flow->frcti, abstime);
+ tw_move_safe();
+
+ if (flow->frcti != NULL) {
+ /* Pump rx_rb so a pure-writer processes ACKs. */
+ ret = flow_wait_window(flow, 1, !(flags & FLOWFWNOBLOCK), dl);
if (ret < 0)
return ret;
- idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, abstime);
+
+ if (count > 0 && FRCTI_IS_STREAM(flow->frcti))
+ return flow_write_stream(flow, buf, count, flags, dl);
+
+ if (FRCTI_NEEDS_FRAG(flow->frcti, count))
+ return flow_write_frag(flow, buf, count, flags, dl);
+ } else if (flow->info.mtu > 0
+ && count > flow_user_mtu(flow, flow->info.mtu)) {
+ /* Raw flows carry no PCI; refuse anything > one n-1 frame. */
+ return -EMSGSIZE;
}
+ if (flags & FLOWFWNOBLOCK)
+ idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb);
+ else
+ idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, dl);
if (idx < 0)
return idx;
if (count > 0)
memcpy(ptr, buf, count);
- ret = flow_tx_spb(flow, spb, !(flags & FLOWFWNOBLOCK), abstime);
+ ret = flow_tx_spb(flow, spb, FRCT_FR_SOLE,
+ !(flags & FLOWFWNOBLOCK), dl);
return ret < 0 ? (ssize_t) ret : (ssize_t) count;
}
-static bool invalid_pkt(struct flow * flow,
- struct ssm_pk_buff * spb)
-{
- if (spb == NULL || ssm_pk_buff_len(spb) == 0)
- return true;
-
- if (flow->info.qs.ber == 0 && chk_crc(spb) != 0)
- return true;
-
- if (spb_decrypt(flow, spb) < 0)
- return true;
-
- return false;
-}
-
static ssize_t flow_rx_spb(struct flow * flow,
struct ssm_pk_buff ** spb,
bool block,
@@ -1408,19 +1748,14 @@ static ssize_t flow_rx_spb(struct flow * flow,
ssize_t idx;
struct timespec now;
- idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) :
- ssm_rbuff_read(flow->rx_rb);
+ idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime)
+ : ssm_rbuff_read(flow->rx_rb);
if (idx < 0)
return idx;
clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- pthread_rwlock_wrlock(&proc.lock);
-
flow->rcv_act = now;
- pthread_rwlock_unlock(&proc.lock);
-
*spb = ssm_pool_get(proc.pool, idx);
if (invalid_pkt(flow, *spb)) {
@@ -1431,28 +1766,124 @@ static ssize_t flow_rx_spb(struct flow * flow,
return idx;
}
+static ssize_t raw_flow_read_pkt(struct flow * flow,
+ bool block,
+ struct timespec * dl)
+{
+ struct ssm_pk_buff * spb;
+ struct timespec wait_abs;
+ ssize_t idx;
+
+ while (true) {
+ if (!block) {
+ idx = ssm_rbuff_read(flow->rx_rb);
+ if (idx < 0)
+ return -EAGAIN;
+ } else {
+ compute_wait_deadline(dl, &wait_abs);
+ idx = ssm_rbuff_read_b(flow->rx_rb, &wait_abs);
+ if (idx == -ETIMEDOUT) {
+ if (deadline_passed(dl))
+ return -ETIMEDOUT;
+ continue;
+ }
+ if (idx < 0)
+ return idx;
+ }
+
+ spb = ssm_pool_get(proc.pool, idx);
+ if (!invalid_pkt(flow, spb))
+ return idx;
+
+ ssm_pool_remove(proc.pool, idx);
+ if (!block)
+ return -EAGAIN;
+ }
+}
+
+static ssize_t deliver_pkt(struct flow * flow,
+ struct ssm_pk_buff * spb,
+ ssize_t idx,
+ void * buf,
+ size_t count,
+ bool partrd)
+{
+ uint8_t * packet = ssm_pk_buff_head(spb);
+ ssize_t n = ssm_pk_buff_len(spb);
+
+ assert(n >= 0);
+
+ if (n <= (ssize_t) count) {
+ memcpy(buf, packet, n);
+ ipcp_spb_release(spb);
+ if (partrd && n == (ssize_t) count)
+ flow->part_idx = DONE_PART;
+ else
+ flow->part_idx = NO_PART;
+
+ return n;
+ }
+
+ if (partrd) {
+ memcpy(buf, packet, count);
+ ssm_pk_buff_pop(spb, n);
+ flow->part_idx = idx;
+ return count;
+ }
+
+ ipcp_spb_release(spb);
+ return -EMSGSIZE;
+}
+
+/* Drive frcti_consume until it delivers or errors. */
+static ssize_t flow_read_frcti(struct flow * flow,
+ void * buf,
+ size_t count,
+ bool block,
+ struct timespec * dl)
+{
+ struct timespec now;
+ ssize_t bytes;
+ int rc;
+
+ while (true) {
+ flow_drain_rx_nb(flow);
+ bytes = FRCTI_CONSUME(flow->frcti, buf, count);
+ if (bytes >= 0)
+ break;
+ if (bytes != -EAGAIN)
+ return bytes;
+ if (!block)
+ return -EAGAIN;
+ rc = flow_rx_one(flow, dl);
+ if (rc < 0)
+ return rc;
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ flow->rcv_act = now;
+
+ return bytes;
+}
+
ssize_t flow_read(int fd,
void * buf,
size_t count)
{
- ssize_t idx;
- ssize_t n;
- uint8_t * packet;
+ struct flow * flow;
struct ssm_pk_buff * spb;
struct timespec abs;
struct timespec now;
- struct timespec * abstime = NULL;
- struct flow * flow;
+ struct timespec * dl = NULL;
+ ssize_t idx;
bool block;
bool partrd;
- if (fd < 0 || fd >= PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROC_MAX_FLOWS)
return -EBADF;
flow = &proc.flows[fd];
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0) {
@@ -1461,8 +1892,8 @@ ssize_t flow_read(int fd,
}
if (flow->part_idx == DONE_PART) {
- pthread_rwlock_unlock(&proc.lock);
flow->part_idx = NO_PART;
+ pthread_rwlock_unlock(&proc.lock);
return 0;
}
@@ -1470,75 +1901,33 @@ ssize_t flow_read(int fd,
partrd = !(flow->oflags & FLOWFRNOPART);
if (flow->rcv_timesout) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
ts_add(&now, &flow->rcv_timeo, &abs);
- abstime = &abs;
+ dl = &abs;
}
- idx = flow->part_idx;
- if (idx < 0) {
- while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
- pthread_rwlock_unlock(&proc.lock);
-
- idx = flow_rx_spb(flow, &spb, block, abstime);
- if (idx < 0) {
- if (block && idx != -EAGAIN)
- return idx;
- if (!block)
- return idx;
+ pthread_rwlock_unlock(&proc.lock);
- pthread_rwlock_rdlock(&proc.lock);
- continue;
- }
+ tw_move_safe();
- pthread_rwlock_rdlock(&proc.lock);
+ idx = flow->part_idx;
+ if (idx < 0 && flow->frcti != NULL)
+ return flow_read_frcti(flow, buf, count, block, dl);
- frcti_rcv(flow->frcti, spb);
- }
+ if (idx < 0) {
+ idx = raw_flow_read_pkt(flow, block, dl);
+ if (idx < 0)
+ return idx;
}
spb = ssm_pool_get(proc.pool, idx);
- pthread_rwlock_unlock(&proc.lock);
-
- packet = ssm_pk_buff_head(spb);
-
- n = ssm_pk_buff_len(spb);
-
- assert(n >= 0);
-
- if (n <= (ssize_t) count) {
- memcpy(buf, packet, n);
- ipcp_spb_release(spb);
-
- pthread_rwlock_wrlock(&proc.lock);
-
- flow->part_idx = (partrd && n == (ssize_t) count) ?
- DONE_PART : NO_PART;
-
- flow->rcv_act = now;
-
- pthread_rwlock_unlock(&proc.lock);
- return n;
- } else {
- if (partrd) {
- memcpy(buf, packet, count);
- ssm_pk_buff_head_release(spb, n);
- pthread_rwlock_wrlock(&proc.lock);
- flow->part_idx = idx;
-
- flow->rcv_act = now;
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ flow->rcv_act = now;
- pthread_rwlock_unlock(&proc.lock);
- return count;
- } else {
- ipcp_spb_release(spb);
- return -EMSGSIZE;
- }
- }
+ return deliver_pkt(flow, spb, idx, buf, count, partrd);
}
-/* fqueue functions. */
-
struct flow_set * fset_create(void)
{
struct flow_set * set;
@@ -1614,7 +2003,7 @@ int fset_add(struct flow_set * set,
struct flow * flow;
int ret;
- if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
+ if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS)
return -EINVAL;
flow = &proc.flows[fd];
@@ -1650,7 +2039,7 @@ void fset_del(struct flow_set * set,
{
struct flow * flow;
- if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
+ if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS)
return;
flow = &proc.flows[fd];
@@ -1661,7 +2050,7 @@ void fset_del(struct flow_set * set,
ssm_flow_set_del(proc.fqset, set->idx, flow->info.id);
if (flow->frcti != NULL)
- ssm_flow_set_add(proc.fqset, 0, proc.flows[fd].info.id);
+ ssm_flow_set_add(proc.fqset, 0, flow->info.id);
pthread_rwlock_unlock(&proc.lock);
}
@@ -1672,7 +2061,7 @@ bool fset_has(const struct flow_set * set,
struct flow * flow;
bool ret;
- if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
+ if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS)
return false;
flow = &proc.flows[fd];
@@ -1691,61 +2080,59 @@ bool fset_has(const struct flow_set * set,
return ret;
}
-/* Filter fqueue events for non-data packets */
static int fqueue_filter(struct fqueue * fq)
{
struct ssm_pk_buff * spb;
int fd;
ssize_t idx;
struct frcti * frcti;
+ int ret = 0;
- while (fq->next < fq->fqsize) {
- if (fq->fqueue[fq->next].event != FLOW_PKT)
- return 1;
+ /* proc.lock rdlock gates frcti_destroy via flow_fini wrlock. */
+ pthread_rwlock_rdlock(&proc.lock);
- pthread_rwlock_rdlock(&proc.lock);
+ while (fq->next < fq->fqsize) {
+ if (fq->fqueue[fq->next].event != FLOW_PKT) {
+ ret = 1;
+ goto out;
+ }
fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd;
if (fd < 0) {
++fq->next;
- pthread_rwlock_unlock(&proc.lock);
continue;
}
frcti = proc.flows[fd].frcti;
if (frcti == NULL) {
- pthread_rwlock_unlock(&proc.lock);
- return 1;
+ ret = 1;
+ goto out;
}
- if (__frcti_pdu_ready(frcti) >= 0) {
- pthread_rwlock_unlock(&proc.lock);
- return 1;
+ if (FRCTI_PDU_READY(frcti)) {
+ ret = 1;
+ goto out;
}
- pthread_rwlock_unlock(&proc.lock);
-
idx = flow_rx_spb(&proc.flows[fd], &spb, false, NULL);
if (idx < 0)
- return 0;
-
- pthread_rwlock_rdlock(&proc.lock);
+ goto out;
spb = ssm_pool_get(proc.pool, idx);
- __frcti_rcv(frcti, spb);
+ FRCTI_RCV(frcti, spb);
- if (__frcti_pdu_ready(frcti) >= 0) {
- pthread_rwlock_unlock(&proc.lock);
- return 1;
+ if (FRCTI_PDU_READY(frcti)) {
+ ret = 1;
+ goto out;
}
- pthread_rwlock_unlock(&proc.lock);
-
++fq->next;
}
- return 0;
+ out:
+ pthread_rwlock_unlock(&proc.lock);
+ return ret;
}
int fqueue_next(struct fqueue * fq)
@@ -1792,7 +2179,8 @@ ssize_t fevent(struct flow_set * set,
{
ssize_t ret = 0;
struct timespec abs;
- struct timespec * t = NULL;
+ struct timespec * dl = NULL;
+ struct timespec wait_abs;
if (set == NULL || fq == NULL)
return -EINVAL;
@@ -1800,17 +2188,26 @@ ssize_t fevent(struct flow_set * set,
if (fq->fqsize > 0 && fq->next != fq->fqsize)
return 1;
- clock_gettime(PTHREAD_COND_CLOCK, &abs);
-
if (timeo != NULL) {
- ts_add(&abs, timeo, &abs);
- t = &abs;
+ struct timespec now;
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, timeo, &abs);
+ dl = &abs;
}
while (ret == 0) {
- ret = ssm_flow_set_wait(proc.fqset, set->idx, fq->fqueue, t);
- if (ret == -ETIMEDOUT)
- return -ETIMEDOUT;
+ tw_move_safe();
+
+ compute_wait_deadline(dl, &wait_abs);
+
+ ret = ssm_flow_set_wait(proc.fqset, set->idx,
+ fq->fqueue, &wait_abs);
+ if (ret == -ETIMEDOUT) {
+ if (deadline_passed(dl))
+ return -ETIMEDOUT;
+ ret = 0;
+ continue;
+ }
fq->fqsize = ret;
fq->next = 0;
@@ -1823,8 +2220,6 @@ ssize_t fevent(struct flow_set * set,
return 1;
}
-/* ipcp-dev functions. */
-
int np1_flow_alloc(pid_t n_pid,
int flow_id)
{
@@ -1837,9 +2232,10 @@ int np1_flow_alloc(pid_t n_pid,
flow.n_pid = getpid();
flow.qs = qos_np1;
flow.mpl = 0;
- flow.n_1_pid = n_pid; /* This "flow" is upside-down! */
+ /* np1 flow: n_1_pid is the upper. */
+ flow.n_1_pid = n_pid;
- return flow_init(&flow, &crypt);
+ return flow_init(&flow, &crypt, 0);
}
int np1_flow_dealloc(int flow_id,
@@ -1847,12 +2243,7 @@ int np1_flow_dealloc(int flow_id,
{
int fd;
- /*
- * TODO: Don't pass timeo to the IPCP but wait in IRMd.
- * This will need async ops, waiting until we bootstrap
- * the IRMd over ouroboros.
- */
-
+ /* TODO: wait in IRMd, not here; needs async ops. */
sleep(timeo);
pthread_rwlock_rdlock(&proc.lock);
@@ -1900,6 +2291,7 @@ int ipcp_create_r(const struct ipcp_info * info)
int ipcp_flow_req_arr(const buffer_t * dst,
qosspec_t qs,
time_t mpl,
+ uint32_t mtu,
const buffer_t * data)
{
struct flow_info flow;
@@ -1916,6 +2308,7 @@ int ipcp_flow_req_arr(const buffer_t * dst,
flow.n_1_pid = getpid();
flow.qs = qs;
flow.mpl = mpl;
+ flow.mtu = mtu;
if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0)
return -ENOMEM;
@@ -1930,22 +2323,25 @@ int ipcp_flow_req_arr(const buffer_t * dst,
if (err < 0)
return err;
- assert(crypt.nid == NID_undef); /* np1 flows are not encrypted */
+ /* np1 flows are not encrypted. */
+ assert(crypt.nid == NID_undef);
- /* inverted for np1_flow */
+ /* Inverted for np1_flow. */
flow.n_1_pid = flow.n_pid;
flow.n_pid = getpid();
flow.mpl = 0;
+ flow.mtu = 0;
flow.qs = qos_np1;
crypt.nid = NID_undef;
- return flow_init(&flow, &crypt);
+ return flow_init(&flow, &crypt, 0);
}
int ipcp_flow_alloc_reply(int fd,
int response,
time_t mpl,
+ uint32_t mtu,
const buffer_t * data)
{
struct flow_info flow;
@@ -1953,7 +2349,7 @@ int ipcp_flow_alloc_reply(int fd,
buffer_t msg = {SOCK_BUF_SIZE, buf};
int err;
- assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(fd >= 0 && fd < PROC_MAX_FLOWS);
pthread_rwlock_rdlock(&proc.lock);
@@ -1962,6 +2358,7 @@ int ipcp_flow_alloc_reply(int fd,
pthread_rwlock_unlock(&proc.lock);
flow.mpl = mpl;
+ flow.mtu = mtu;
if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0)
return -ENOMEM;
@@ -1979,7 +2376,7 @@ int ipcp_flow_read(int fd,
struct flow * flow;
ssize_t idx = -1;
- assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(fd >= 0 && fd < PROC_MAX_FLOWS);
assert(spb);
flow = &proc.flows[fd];
@@ -1988,7 +2385,14 @@ int ipcp_flow_read(int fd,
assert(flow->info.id >= 0);
- while (frcti_queued_pdu(flow->frcti) < 0) {
+ /* Raw flow: deliver the popped pkt directly (no FRCT rq). */
+ if (flow->frcti == NULL) {
+ pthread_rwlock_unlock(&proc.lock);
+ idx = flow_rx_spb(flow, spb, false, NULL);
+ return idx < 0 ? (int) idx : 0;
+ }
+
+ while (!FRCTI_PDU_READY(flow->frcti)) {
pthread_rwlock_unlock(&proc.lock);
idx = flow_rx_spb(flow, spb, false, NULL);
@@ -1997,7 +2401,7 @@ int ipcp_flow_read(int fd,
pthread_rwlock_rdlock(&proc.lock);
- frcti_rcv(flow->frcti, *spb);
+ FRCTI_RCV(flow->frcti, *spb);
}
pthread_rwlock_unlock(&proc.lock);
@@ -2011,12 +2415,12 @@ int ipcp_flow_write(int fd,
struct flow * flow;
int ret;
- assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(fd >= 0 && fd < PROC_MAX_FLOWS);
assert(spb);
flow = &proc.flows[fd];
- pthread_rwlock_wrlock(&proc.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0) {
pthread_rwlock_unlock(&proc.lock);
@@ -2030,30 +2434,28 @@ int ipcp_flow_write(int fd,
pthread_rwlock_unlock(&proc.lock);
- ret = flow_tx_spb(flow, spb, true, NULL);
+ ret = flow_tx_spb(flow, spb, FRCT_FR_SOLE, true, NULL);
return ret;
}
-static int pool_copy_spb(struct ssm_pool * src_pool,
- ssize_t src_idx,
- struct ssm_pool * dst_pool,
- struct ssm_pk_buff ** dst_spb)
+/* Copy src into dst_pool without consuming src. Caller owns both halves. */
+static int pool_dup_spb(struct ssm_pool * src_pool,
+ size_t src_off,
+ struct ssm_pool * dst_pool,
+ struct ssm_pk_buff ** dst_spb)
{
struct ssm_pk_buff * src;
uint8_t * ptr;
size_t len;
- src = ssm_pool_get(src_pool, src_idx);
+ src = ssm_pool_get(src_pool, src_off);
len = ssm_pk_buff_len(src);
- if (ssm_pool_alloc(dst_pool, len, &ptr, dst_spb) < 0) {
- ssm_pool_remove(src_pool, src_idx);
+ if (ssm_pool_alloc(dst_pool, len, &ptr, dst_spb) < 0)
return -ENOMEM;
- }
memcpy(ptr, ssm_pk_buff_head(src), len);
- ssm_pool_remove(src_pool, src_idx);
return 0;
}
@@ -2063,9 +2465,9 @@ int np1_flow_read(int fd,
struct ssm_pool * pool)
{
struct flow * flow;
- ssize_t idx = -1;
+ ssize_t off;
- assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(fd >= 0 && fd < PROC_MAX_FLOWS);
assert(spb);
flow = &proc.flows[fd];
@@ -2074,20 +2476,23 @@ int np1_flow_read(int fd,
pthread_rwlock_rdlock(&proc.lock);
- idx = ssm_rbuff_read(flow->rx_rb);
- if (idx < 0) {
+ off = ssm_rbuff_read(flow->rx_rb);
+ if (off < 0) {
pthread_rwlock_unlock(&proc.lock);
- return idx;
+ return off;
}
pthread_rwlock_unlock(&proc.lock);
if (pool == NULL) {
- *spb = ssm_pool_get(proc.pool, idx);
+ *spb = ssm_pool_get(proc.pool, off);
} else {
/* Cross-pool copy: PUP -> GSPP */
- if (pool_copy_spb(pool, idx, proc.pool, spb) < 0)
+ if (pool_dup_spb(pool, off, proc.pool, spb) < 0) {
+ ssm_pool_remove(pool, off);
return -ENOMEM;
+ }
+ ssm_pool_remove(pool, off);
}
return 0;
@@ -2100,9 +2505,10 @@ int np1_flow_write(int fd,
struct flow * flow;
struct ssm_pk_buff * dst;
int ret;
- ssize_t idx;
+ size_t off;
+ size_t dst_off;
- assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(fd >= 0 && fd < PROC_MAX_FLOWS);
assert(spb);
flow = &proc.flows[fd];
@@ -2121,45 +2527,47 @@ int np1_flow_write(int fd,
pthread_rwlock_unlock(&proc.lock);
- idx = ssm_pk_buff_get_idx(spb);
+ off = ssm_pk_buff_get_off(spb);
if (pool == NULL) {
- ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL);
+ ret = ssm_rbuff_write_b(flow->tx_rb, off, NULL);
if (ret < 0)
- ssm_pool_remove(proc.pool, idx);
- else
- ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+ return ret;
+ ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
} else {
- /* Cross-pool copy: GSPP -> PUP */
- if (pool_copy_spb(proc.pool, idx, pool, &dst) < 0)
+ /* Cross-pool copy: GSPP -> PUP. Src kept on error. */
+ if (pool_dup_spb(proc.pool, off, pool, &dst) < 0)
return -ENOMEM;
- idx = ssm_pk_buff_get_idx(dst);
- ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL);
- if (ret < 0)
- ssm_pool_remove(pool, idx);
- else
- ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+ dst_off = ssm_pk_buff_get_off(dst);
+ ret = ssm_rbuff_write_b(flow->tx_rb, dst_off, NULL);
+ if (ret < 0) {
+ ssm_pool_remove(pool, dst_off);
+ return ret;
+ }
+ ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+ ssm_pool_remove(proc.pool, off);
}
- return ret;
+ return 0;
}
int ipcp_spb_reserve(struct ssm_pk_buff ** spb,
size_t len)
{
- return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 ? -1 : 0;
+ return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0
+ ? -1 : 0;
}
void ipcp_spb_release(struct ssm_pk_buff * spb)
{
- ssm_pool_remove(proc.pool, ssm_pk_buff_get_idx(spb));
+ ssm_pool_remove(proc.pool, ssm_pk_buff_get_off(spb));
}
int ipcp_flow_fini(int fd)
{
struct ssm_rbuff * rx_rb;
- assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(fd >= 0 && fd < PROC_MAX_FLOWS);
pthread_rwlock_rdlock(&proc.lock);
@@ -2188,7 +2596,7 @@ int ipcp_flow_fini(int fd)
int ipcp_flow_get_qoscube(int fd,
qoscube_t * cube)
{
- assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(fd >= 0 && fd < PROC_MAX_FLOWS);
assert(cube);
pthread_rwlock_rdlock(&proc.lock);
@@ -2227,7 +2635,7 @@ int local_flow_transfer(int src_fd,
struct ssm_pk_buff * dst_spb;
struct ssm_pool * sp;
struct ssm_pool * dp;
- ssize_t idx;
+ ssize_t off;
int ret;
assert(src_fd >= 0);
@@ -2241,15 +2649,15 @@ int local_flow_transfer(int src_fd,
pthread_rwlock_rdlock(&proc.lock);
- idx = ssm_rbuff_read(src_flow->rx_rb);
- if (idx < 0) {
+ off = ssm_rbuff_read(src_flow->rx_rb);
+ if (off < 0) {
pthread_rwlock_unlock(&proc.lock);
- return idx;
+ return off;
}
if (dst_flow->info.id < 0) {
pthread_rwlock_unlock(&proc.lock);
- ssm_pool_remove(sp, idx);
+ ssm_pool_remove(sp, off);
return -ENOTALLOC;
}
@@ -2257,21 +2665,24 @@ int local_flow_transfer(int src_fd,
if (sp == dp) {
/* Same pool: zero-copy */
- ret = ssm_rbuff_write_b(dst_flow->tx_rb, idx, NULL);
+ ret = ssm_rbuff_write_b(dst_flow->tx_rb, off, NULL);
if (ret < 0)
- ssm_pool_remove(sp, idx);
+ ssm_pool_remove(sp, off);
else
ssm_flow_set_notify(dst_flow->set,
dst_flow->info.id, FLOW_PKT);
} else {
/* Different pools: single copy */
- if (pool_copy_spb(sp, idx, dp, &dst_spb) < 0)
+ if (pool_dup_spb(sp, off, dp, &dst_spb) < 0) {
+ ssm_pool_remove(sp, off);
return -ENOMEM;
+ }
- idx = ssm_pk_buff_get_idx(dst_spb);
- ret = ssm_rbuff_write_b(dst_flow->tx_rb, idx, NULL);
+ ssm_pool_remove(sp, off);
+ off = ssm_pk_buff_get_off(dst_spb);
+ ret = ssm_rbuff_write_b(dst_flow->tx_rb, off, NULL);
if (ret < 0)
- ssm_pool_remove(dp, idx);
+ ssm_pool_remove(dp, off);
else
ssm_flow_set_notify(dst_flow->set,
dst_flow->info.id, FLOW_PKT);