diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-05-10 19:06:21 +0200 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-05-20 08:17:07 +0200 |
| commit | 63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9 (patch) | |
| tree | 88f0827466b40d0e83da7954123d00cbb5f6c676 /src/lib | |
| parent | f33769c818cb1f01079405f543b36aa294764112 (diff) | |
| download | ouroboros-63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9.tar.gz ouroboros-63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9.zip | |
lib: Update FRCP implementation
The Flow and Retransmission Control Protocol (FRCP) runs end-to-end
between two peers over a flow. It provides reliability, in-order
delivery, flow control, and liveness. Note that congestion avoidance
is orthogonal to FRCP and handled in the IPCP.
A fixed 16-octet header, network byte order, is prefixed to every FRCP
packet:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| flags | hcs |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| window |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| seqno |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ackno |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| payload (variable) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
hcs is a CRC-16-CCITT-FALSE checksum over the PCI (and the stream
extension when present), verified before any flag-driven dispatch. A
single packet can simultaneously carry DATA + ACK + FC + RXM by OR-ing
flag bits. An optional CRC trailer covers the body on DATA when qs.ber
== 0, and on every SACK packet; an optional AEAD wrap (per-flow keys)
sits outermost.
Flag bits (MSB-first; bits 13..15 reserved, MUST be zero):
+------+--------+--------+----------------------------------------+
| Bit | Mask | Name | Meaning |
+------+--------+--------+----------------------------------------+
| 0 | 0x8000 | DATA | Carries caller payload |
| 1 | 0x4000 | DRF | Start of a fresh data run |
| 2 | 0x2000 | ACK | ackno field valid |
| 3 | 0x1000 | NACK | Pre-DRF nudge (seqno informational) |
| 4 | 0x0800 | FC | window field valid (rwe advertisement) |
| 5 | 0x0400 | RDVS | Rendezvous probe (window-closed) |
| 6 | 0x0200 | FFGM | First Fragment of a multi-fragment SDU |
| 7 | 0x0100 | LFGM | Last Fragment of a multi-fragment SDU |
| 8 | 0x0080 | RXM | Retransmission |
| 9 | 0x0040 | SACK | Block list follows in payload |
| 10 | 0x0020 | RTTP | RTT probe / echo (payload follows) |
| 11 | 0x0010 | KA | Keepalive |
| 12 | 0x0008 | FIN | End of stream marker |
| 13-15| -- | -- | Reserved (MUST be zero) |
+------+--------+--------+----------------------------------------+
(FFGM, LFGM) encodes the fragment role of a DATA packet (SCTP-style
B/E): 11=SOLE, 10=FIRST, 00=MID, 01=LAST. Each fragment carries its
own seqno; Retransmission recovers fragments individually, reassembly
runs at consume time. In stream mode FFGM/LFGM are unused; per-byte
position is carried by the stream extension below and end-of-stream is
signalled by FIN on a 0-byte DATA packet.
SACK payload (FRCT_ACK | FRCT_FC | FRCT_SACK):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| n_blocks | padding (2 octets) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| start[0] |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| end[0] |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
... n_blocks pairs total ...
Each block describes a *present* (received) range strictly above the
cumulative ACK in the PCI ackno. D-SACK (RFC 2883) is signalled
in-band as block[0] - no flag bit, no extra framing - and consumed by
the RACK reo_wnd_mult scaler (RFC 8985 sec. 7.2).
RTTP payload (FRCT_RTTP only; 24 octets):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| probe_id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| echo_id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ nonce (16 octets, echoed verbatim) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Stream PCI extension (in_order == STREAM only; 8 octets after the base
PCI on every DATA packet):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| start |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| end |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
start, end are monotonic 32-bit byte offsets; end - start equals the
on-wire payload length. Stream mode is negotiated at flow allocation;
the extension is present iff stream mode is in use, never on a
per-packet basis.
Service modes are an orthogonal (in_order, loss, ber) vector selected
at flow_alloc; the cubes above map to the axes:
+----------------+---------+------+-----+-----------------------+
| Cube | in_order| loss | ber | Engaged |
+----------------+---------+------+-----+-----------------------+
| qos_raw | 0 | 1 | 1 | Raw passthrough |
| qos_raw_safe | 0 | 1 | 0 | Raw + CRC trailer |
| qos_rt | 1 | 1 | 1 | FRCP, no FRTX, no CRC |
| qos_rt_safe | 1 | 1 | 0 | FRCP, no FRTX, CRC |
| qos_msg | 1 | 0 | 0 | FRCP + FRTX |
| qos_stream | 2 | 0 | 0 | FRCP + FRTX, stream |
+----------------+---------+------+-----+-----------------------+
in_order=0 sends raw datagrams with no PCI (UDP-equivalent);
in_order=1 engages FRCP with SDU framing; in_order=2 (stream) requires
loss=0 and is rejected otherwise. loss=0 engages the FRTX retransmit
machinery. ber=0 appends the CRC-32 trailer; QOS_DISABLE_CRC at build
time forces ber=1 for development. Encryption is a separate per-flow
attribute layered as an AEAD wrap outside the FRCP packet.
Heritage: delta-t (Watson 1981) supplies timer-based connection
management - no SYN/FIN handshake, the DRF marker, the t_mpl / t_a /
t_r timers. RINA (Day 2008) supplies the unified flow_alloc(name, qos,
...) primitive and the orthogonal QoS-cube axes. Loss detection
follows TCP/QUIC practice (RFCs 2018, 2883, 6582, 6298, 8985); RTT
probing is nonce-authenticated like QUIC PATH_CHALLENGE.
Adds oftp, a minimal file-transfer tool over an FRCP stream flow. The
client reads from stdin or --in FILE and writes through a
flow_alloc(qos_stream); the server (--listen) calls flow_accept and
writes to stdout or --out FILE. Both sides compute a CRC-64/NVMe over
the bytes they handle and print the result. The server rejects flows
whose negotiated qs.in_order != STREAM.
Two FRCP knobs are exposed via env vars on either side:
OFTP_FRCT_RTO_MIN fccntl FRCTSRTOMIN (ns)
OFTP_FRCT_STREAM_RING_SZ fccntl FRCTSRRINGSZ (octets)
The ocbr_client gains an OCBR_QOS env var to pick the cube the client
uses for flow_alloc; recognised values are raw, safe, rt, rt_safe,
msg, stream. Unknown values fall back to raw with a warning on
stderr. Without the env set behaviour is unchanged.
Removes the deprecated lib/timerwheel.c
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/config.h.in | 16 | ||||
| -rw-r--r-- | src/lib/dev.c | 1258 | ||||
| -rw-r--r-- | src/lib/frct.c | 3839 | ||||
| -rw-r--r-- | src/lib/pb/ipcp.proto | 2 | ||||
| -rw-r--r-- | src/lib/pb/irm.proto | 2 | ||||
| -rw-r--r-- | src/lib/pb/model.proto | 4 | ||||
| -rw-r--r-- | src/lib/protobuf.c | 4 | ||||
| -rw-r--r-- | src/lib/qoscube.c | 12 | ||||
| -rw-r--r-- | src/lib/timerwheel.c | 414 |
9 files changed, 4188 insertions, 1363 deletions
diff --git a/src/lib/config.h.in b/src/lib/config.h.in index 9142f9df..7124a974 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -20,6 +20,14 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#ifndef MILLION +#define MILLION 1000000LL +#endif + +#ifndef BILLION +#define BILLION 1000000000LL +#endif + #cmakedefine HAVE_SYS_RANDOM #cmakedefine HAVE_EXPLICIT_BZERO #cmakedefine HAVE_LIBGCRYPT @@ -62,6 +70,8 @@ #cmakedefine PROC_FLOW_STATS #endif +#cmakedefine FRCT_DEBUG_STDOUT + #define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@ #define PROC_MAX_FLOWS @PROC_MAX_FLOWS@ @@ -70,8 +80,8 @@ /* Default Delta-t parameters */ #cmakedefine FRCT_LINUX_RTT_ESTIMATOR -#define DELT_A (@DELTA_T_ACK@) /* ns */ -#define DELT_R (@DELTA_T_RTX@) /* ns */ +#define DELT_A (@DELTA_T_ACK@) /* ms */ +#define DELT_R (@DELTA_T_RTX@) /* ms */ #define RQ_SIZE (@FRCT_REORDER_QUEUE_SIZE@) #define START_WINDOW (@FRCT_START_WINDOW@) @@ -82,8 +92,6 @@ #define TICTIME (@FRCT_TICK_TIME@ * 1000) /* ns */ /* Retransmission tuning */ -#cmakedefine RXM_BLOCKING - #define RXMQ_RES (@RXM_MIN_RESOLUTION@) /* 2^N ns */ #define RXMQ_BUMP (@RXM_WHEEL_MULTIPLIER@) #define RXMQ_LVLS (@RXM_WHEEL_LEVELS@) diff --git a/src/lib/dev.c b/src/lib/dev.c index b202a85c..7e9b7329 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -29,10 +29,13 @@ #include "config.h" #include "ssm.h" +#include <ouroboros/atomics.h> #include <ouroboros/bitmap.h> #include <ouroboros/cep.h> +#include <ouroboros/crc16.h> #include <ouroboros/crypt.h> #include <ouroboros/dev.h> +#include <ouroboros/endian.h> #include <ouroboros/errno.h> #include <ouroboros/fccntl.h> #include <ouroboros/flow.h> @@ -45,32 +48,33 @@ #include <ouroboros/np1_flow.h> #include <ouroboros/pthread.h> #include <ouroboros/random.h> +#ifdef PROC_FLOW_STATS +#include <ouroboros/rib.h> +#endif #include <ouroboros/serdes-irm.h> +#include <ouroboros/sockets.h> #include <ouroboros/ssm_flow_set.h> #include <ouroboros/ssm_pool.h> #include <ouroboros/ssm_rbuff.h> -#include <ouroboros/sockets.h> +#include <ouroboros/tw.h> #include <ouroboros/utils.h> -#ifdef PROC_FLOW_STATS -#include <ouroboros/rib.h> -#endif +#include <assert.h> #ifdef HAVE_LIBGCRYPT #include <gcrypt.h> #endif -#include <assert.h> -#include <stdlib.h> -#include <string.h> -#include <stdio.h> #include <stdarg.h> #include <stdbool.h> +#include <inttypes.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> #include <sys/types.h> #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -/* Partial read information. */ #define NO_PART -1 #define DONE_PART -2 @@ -78,19 +82,12 @@ #define SECMEMSZ 16384 #define MSGBUFSZ 2048 -/* map flow_ids to flow descriptors; track state of the flow */ struct fmap { int fd; - /* TODO: use actual flow state */ enum flow_state state; }; -#define frcti_to_flow(frcti) \ - ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti))) - struct flow { - struct list_head next; - struct flow_info info; struct ssm_rbuff * rx_rb; @@ -135,16 +132,10 @@ struct { struct flow * flows; struct fmap * id_to_fd; - struct list_head flow_list; pthread_mutex_t mtx; pthread_cond_t cond; - pthread_t tx; - pthread_t rx; - size_t n_frcti; - fset_t * frct_set; - pthread_rwlock_t lock; } proc; @@ -243,7 +234,7 @@ static int proc_announce(const struct proc_info * proc) return irm__irm_result_des(&msg); } -/* IRMd will clean up the mess if this fails */ +/* IRMd cleans up on failure. */ static void proc_exit(void) { uint8_t buf[SOCK_BUF_SIZE]; @@ -264,7 +255,7 @@ static int spb_encrypt(struct flow * flow, uint8_t * tail; if (flow->crypt == NULL) - return 0; /* No encryption */ + return 0; in.data = ssm_pk_buff_head(spb); in.len = ssm_pk_buff_len(spb); @@ -299,7 +290,7 @@ static int spb_decrypt(struct flow * flow, uint8_t * head; if (flow->crypt == NULL) - return 0; /* No decryption */ + return 0; in.data = ssm_pk_buff_head(spb); in.len = ssm_pk_buff_len(spb); @@ -318,130 +309,280 @@ static int spb_decrypt(struct flow * flow, return 0; } -#include "frct.c" - -void * flow_tx(void * o) +/* tw_move under proc.lock rdlock; gates teardown vs in-flight fires. */ +static void tw_move_safe(void) { - struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + pthread_rwlock_rdlock(&proc.lock); - (void) o; + pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); - while (true) { - timerwheel_move(); + tw_move(); - nanosleep(&tic, NULL); - } - - return (void *) 0; + pthread_cleanup_pop(1); } -static void flow_send_keepalive(struct flow * flow, - struct timespec now) +static int crc_add(struct ssm_pk_buff * spb, + size_t head_skip) { - struct ssm_pk_buff * spb; - ssize_t idx; - uint8_t * ptr; - - idx = ssm_pool_alloc(proc.pool, 0, &ptr, &spb); - if (idx < 0) - return; + uint8_t * head; + uint8_t * tail; - pthread_rwlock_wrlock(&proc.lock); + tail = ssm_pk_buff_push_tail(spb, CRCLEN); + if (tail == NULL) + return -ENOMEM; - flow->snd_act = now; + head = ssm_pk_buff_head(spb) + head_skip; - if (ssm_rbuff_write(flow->tx_rb, idx)) - ssm_pool_remove(proc.pool, idx); - else - ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + mem_hash(HASH_CRC32, tail, head, tail - head); - pthread_rwlock_unlock(&proc.lock); + return 0; } -/* Needs rdlock on proc. */ -static void _flow_keepalive(struct flow * flow) +static int crc_check(struct ssm_pk_buff * spb, + size_t head_skip) { - struct timespec now; - struct timespec s_act; - struct timespec r_act; - int flow_id; - time_t timeo; - uint32_t acl; + uint32_t crc; + uint8_t * head = ssm_pk_buff_head(spb) + head_skip; + uint8_t * tail = ssm_pk_buff_pop_tail(spb, CRCLEN); - s_act = flow->snd_act; - r_act = flow->rcv_act; + mem_hash(HASH_CRC32, &crc, head, tail - head); - flow_id = flow->info.id; - timeo = flow->info.qs.timeout; + return !(crc == *((uint32_t *) tail)); +} - acl = ssm_rbuff_get_acl(flow->rx_rb); - if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) - return; +/* FRCT included here so it can use proc and dev.c statics directly. */ +#include "frct.c" - clock_gettime(PTHREAD_COND_CLOCK, &now); +/* + * SACK / DATA carry trailer CRC32; HCS protects the headers on every + * FRCT packet. Decrypt before any check so plaintext is authoritative. + */ +static bool invalid_pkt(struct flow * flow, + struct ssm_pk_buff * spb) +{ + const struct frct_pci * pci; + uint16_t flags; + size_t pci_total; - if (ts_diff_ns(&now, &r_act) > (int64_t) timeo * MILLION) { - ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); - ssm_flow_set_notify(proc.fqset, flow_id, FLOW_PEER); - return; + if (spb == NULL || ssm_pk_buff_len(spb) == 0) + return true; + + if (spb_decrypt(flow, spb) < 0) + return true; + + if (flow->frcti == NULL) { + if (flow->info.qs.ber == 0 && crc_check(spb, 0) != 0) + return true; + return false; } - if (ts_diff_ns(&now, &s_act) > (int64_t) timeo * (MILLION >> 2)) { - pthread_rwlock_unlock(&proc.lock); + if (ssm_pk_buff_len(spb) < FRCT_PCILEN) + return true; - flow_send_keepalive(flow, now); + pci = (const struct frct_pci *) ssm_pk_buff_head(spb); + flags = ntoh16(pci->flags); - pthread_rwlock_rdlock(&proc.lock); + /* Untrusted flag read; mismatch on HCS will drop on corrupt. */ + if (flags & FRCT_DATA) + pci_total = frcti_data_hdr_len(flow->frcti); + else + pci_total = frcti_ctrl_hdr_len(flow->frcti); + + if (ssm_pk_buff_len(spb) < pci_total) + return true; + + if (frct_hcs_check(pci, flow->frcti) != 0) + return true; + + /* HCS valid: CRC32 on SACK; or on DATA if ber = 0. */ + if (flags & FRCT_SACK) { + if (crc_check(spb, pci_total) != 0) + return true; + + } else if ((flags & FRCT_DATA) && flow->info.qs.ber == 0) { + if (crc_check(spb, pci_total) != 0) + return true; } + + return false; } -static void handle_keepalives(void) +static bool deadline_passed(const struct timespec * abs) { - struct list_head * p; - struct list_head * h; + struct timespec now; - pthread_rwlock_rdlock(&proc.lock); + if (abs == NULL) + return false; - list_for_each_safe(p, h, &proc.flow_list) { - struct flow * flow; - flow = list_entry(p, struct flow, next); - _flow_keepalive(flow); - } + clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_unlock(&proc.lock); + return ts_diff_ns(&now, abs) >= 0; } -static void __cleanup_fqueue_destroy(void * fq) +/* Clamp the wait by min(dl, next tw expiry, now + TICTIME). */ +static void compute_wait_deadline(const struct timespec * dl, + struct timespec * out) { - fqueue_destroy((fqueue_t *) fq); + struct timespec now; + struct timespec cap; + struct timespec expiry; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &tic, &cap); + + tw_next_expiry(&expiry); + + *out = (ts_diff_ns(&cap, &expiry) < 0) ? expiry : cap; + if (dl != NULL && ts_diff_ns(out, dl) > 0) + *out = *dl; } -void * flow_rx(void * o) +/* + * proc.lock rdlock held across each iteration so flow_fini's wrlock + * waits for us to finish; FLOWDOWN already set means we exit promptly. + */ +static void flow_drain_rx_nb(struct flow * flow) { - struct timespec tic = TIMESPEC_INIT_NS(TICTIME); - int ret; - struct fqueue * fq; + ssize_t idx; + struct ssm_pk_buff * spb; + struct ssm_rbuff * rx_rb; + struct frcti * frcti; +#ifdef PROC_FLOW_STATS + struct timespec t_a; + struct timespec t_b; +#endif - (void) o; + if (flow->frcti != NULL) + STAT_BUMP(flow->frcti, drain_calls); - fq = fqueue_create(); + while (true) { + pthread_rwlock_rdlock(&proc.lock); - pthread_cleanup_push(__cleanup_fqueue_destroy, fq); + rx_rb = flow->rx_rb; + if (rx_rb == NULL) { + pthread_rwlock_unlock(&proc.lock); + return; + } - /* fevent will filter all FRCT packets for us */ - while ((ret = fevent(proc.frct_set, fq, &tic)) != 0) { - if (ret == -ETIMEDOUT) { - handle_keepalives(); + idx = ssm_rbuff_read(rx_rb); + if (idx < 0) { + pthread_rwlock_unlock(&proc.lock); + return; + } + + spb = ssm_pool_get(proc.pool, idx); + if (invalid_pkt(flow, spb)) { + ssm_pool_remove(proc.pool, idx); + pthread_rwlock_unlock(&proc.lock); + continue; + } + + frcti = flow->frcti; + if (frcti != NULL) { +#ifdef PROC_FLOW_STATS + clock_gettime(CLOCK_MONOTONIC, &t_a); + FRCTI_RCV(frcti, spb); + clock_gettime(CLOCK_MONOTONIC, &t_b); + STAT_ADD(frcti, rcv_proc_ns, + (size_t) ts_diff_ns(&t_b, &t_a)); +#else + FRCTI_RCV(frcti, spb); +#endif + } else { + ssm_pool_remove(proc.pool, idx); + } + + pthread_rwlock_unlock(&proc.lock); + + /* Per-packet so the delayed-ACK fires on time in a burst. */ +#ifdef PROC_FLOW_STATS + clock_gettime(CLOCK_MONOTONIC, &t_a); + tw_move_safe(); + clock_gettime(CLOCK_MONOTONIC, &t_b); + if (frcti != NULL) + STAT_ADD(frcti, tw_move_ns, + (size_t) ts_diff_ns(&t_b, &t_a)); +#else + tw_move_safe(); +#endif + } +} + +/* + * Wait clamped by caller deadline, next tw expiry, and TICTIME; + * a clamp-timeout means tw work is due, not caller-deadline. + */ +static int flow_rx_one(struct flow * flow, + struct timespec * abs) +{ + struct timespec wait_abs; + struct ssm_pk_buff * spb; + struct ssm_rbuff * rx_rb; + ssize_t idx; + + while (true) { + compute_wait_deadline(abs, &wait_abs); + + /* rdlock gates flow_fini; FLOWDOWN preempts the block. */ + pthread_rwlock_rdlock(&proc.lock); + + rx_rb = flow->rx_rb; + if (rx_rb == NULL) { + pthread_rwlock_unlock(&proc.lock); + return -EFLOWDOWN; + } + + idx = ssm_rbuff_read_b(rx_rb, &wait_abs); + if (idx == -ETIMEDOUT) { + pthread_rwlock_unlock(&proc.lock); + if (deadline_passed(abs)) + return -ETIMEDOUT; + tw_move_safe(); continue; } + if (idx < 0) { + pthread_rwlock_unlock(&proc.lock); + return idx; + } + + spb = ssm_pool_get(proc.pool, idx); + if (invalid_pkt(flow, spb)) { + ssm_pool_remove(proc.pool, idx); + pthread_rwlock_unlock(&proc.lock); + continue; + } + + if (flow->frcti != NULL) + FRCTI_RCV(flow->frcti, spb); + else + ssm_pool_remove(proc.pool, idx); - while (fqueue_next(fq) >= 0) - ; /* no need to act */ + pthread_rwlock_unlock(&proc.lock); + + tw_move_safe(); + return 0; } +} - pthread_cleanup_pop(true); +/* 0 = window open; -EAGAIN = !block and would block; else flow_rx_one rc. */ +static __inline__ int flow_wait_window(struct flow * flow, + size_t n, + bool block, + struct timespec * dl) +{ + int rc; - return (void *) 0; + while (true) { + flow_drain_rx_nb(flow); + if (FRCTI_IS_WINDOW_OPEN_N(flow->frcti, n)) + return 0; + if (!block) + return -EAGAIN; + rc = flow_rx_one(flow, dl); + if (rc < 0) + return rc; + } } static void flow_clear(int fd) @@ -451,36 +592,40 @@ static void flow_clear(int fd) proc.flows[fd].info.id = -1; } -static void __flow_fini(int fd) +/* + * Set ACL_FLOWDOWN on rx/tx so any in-flight blocking reads or writes + * wake up and drop their proc.lock rdlock. Must run BEFORE flow_fini's + * wrlock, else the wrlock blocks on those rdlock holders and the + * in-flight calls never see the FLOWDOWN signal. + */ +static void flow_quiesce(int fd) { - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + struct ssm_rbuff * rx_rb = proc.flows[fd].rx_rb; + struct ssm_rbuff * tx_rb = proc.flows[fd].tx_rb; - if (proc.flows[fd].frcti != NULL) { - proc.n_frcti--; - if (proc.n_frcti == 0) { - pthread_cancel(proc.tx); - pthread_join(proc.tx, NULL); - } + if (rx_rb != NULL) + ssm_rbuff_set_acl(rx_rb, ACL_FLOWDOWN); + if (tx_rb != NULL) + ssm_rbuff_set_acl(tx_rb, ACL_FLOWDOWN); +} - ssm_flow_set_del(proc.fqset, 0, proc.flows[fd].info.id); +static void do_flow_fini(int fd) +{ + assert(fd >= 0 && fd < PROC_MAX_FLOWS); + if (proc.flows[fd].frcti != NULL) frcti_destroy(proc.flows[fd].frcti); - } if (proc.flows[fd].info.id != -1) { flow_destroy(&proc.id_to_fd[proc.flows[fd].info.id]); bmp_release(proc.fds, fd); } - if (proc.flows[fd].rx_rb != NULL) { - ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN); + if (proc.flows[fd].rx_rb != NULL) ssm_rbuff_close(proc.flows[fd].rx_rb); - } - if (proc.flows[fd].tx_rb != NULL) { - ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN); + if (proc.flows[fd].tx_rb != NULL) ssm_rbuff_close(proc.flows[fd].tx_rb); - } if (proc.flows[fd].set != NULL) { ssm_flow_set_notify(proc.flows[fd].set, @@ -491,24 +636,40 @@ static void __flow_fini(int fd) crypt_destroy_ctx(proc.flows[fd].crypt); - list_del(&proc.flows[fd].next); - flow_clear(fd); } static void flow_fini(int fd) { + flow_quiesce(fd); + pthread_rwlock_wrlock(&proc.lock); - __flow_fini(fd); + do_flow_fini(fd); pthread_rwlock_unlock(&proc.lock); } #define IS_ENCRYPTED(crypt) ((crypt)->nid != NID_undef) -#define IS_ORDERED(flow) (flow.qs.in_order != 0) +#define IS_ORDERED(info) ((info)->qs.service != SVC_RAW) +#define IS_STREAM(info) ((info)->qs.service == SVC_STREAM) + +/* Raw MTU minus the wrapping (IV/Tag + optional CRC) dev.c adds. */ +static __inline__ size_t flow_user_mtu(const struct flow * flow, + size_t raw) +{ + size_t hdr; + + hdr = flow->headsz + flow->tailsz; + if (flow->info.qs.ber == 0 && flow->crypt == NULL) + hdr += CRCLEN; + + return raw > hdr ? raw - hdr : 0; +} + static int flow_init(struct flow_info * info, - struct crypt_sk * sk) + struct crypt_sk * sk, + time_t rtt_hint) { struct timespec now; struct flow * flow; @@ -550,7 +711,6 @@ static int flow_init(struct flow_info * info, flow->tailsz = 0; if (IS_ENCRYPTED(sk)) { - /* Set to lower value in tests, should we make configurable? */ sk->rot_bit = KEY_ROTATION_BIT; flow->crypt = crypt_create_ctx(sk); if (flow->crypt == NULL) @@ -561,22 +721,16 @@ static int flow_init(struct flow_info * info, assert(flow->frcti == NULL); - if (IS_ORDERED(flow->info)) { - flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl); + if (IS_ORDERED(&flow->info)) { + uint32_t frct_mtu = flow_user_mtu(flow, info->mtu); + + flow->frcti = frcti_create(fd, DELT_A, DELT_R, + info->mpl, rtt_hint, + info->qs, frct_mtu); if (flow->frcti == NULL) goto fail_frcti; - - if (ssm_flow_set_add(proc.fqset, 0, info->id)) - goto fail_flow_set_add; - - ++proc.n_frcti; - if (proc.n_frcti == 1 && - pthread_create(&proc.tx, NULL, flow_tx, NULL) < 0) - goto fail_tx_thread; } - list_add_tail(&flow->next, &proc.flow_list); - proc.id_to_fd[info->id].fd = fd; flow_set_state(&proc.id_to_fd[info->id], FLOW_ALLOCATED); @@ -585,10 +739,6 @@ static int flow_init(struct flow_info * info, return fd; - fail_tx_thread: - ssm_flow_set_del(proc.fqset, 0, info->id); - fail_flow_set_add: - frcti_destroy(flow->frcti); fail_frcti: crypt_destroy_ctx(flow->crypt); fail_crypt: @@ -716,13 +866,7 @@ static void init(int argc, goto fail_fqset; } - proc.frct_set = fset_create(); - if (proc.frct_set == NULL || proc.frct_set->idx != 0) { - fprintf(stderr, "FATAL: Could not create FRCT set.\n"); - goto fail_frct_set; - } - - if (timerwheel_init() < 0) { + if (tw_init() < 0) { fprintf(stderr, "FATAL: Could not initialize timerwheel.\n"); goto fail_timerwheel; } @@ -741,24 +885,13 @@ static void init(int argc, } } #endif - if (pthread_create(&proc.rx, NULL, flow_rx, NULL) < 0) { - fprintf(stderr, "FATAL: Could not start monitor thread.\n"); - goto fail_monitor; - } - - list_head_init(&proc.flow_list); - return; - fail_monitor: #if defined PROC_FLOW_STATS - rib_fini(); fail_rib_init: #endif - timerwheel_fini(); + tw_fini(); fail_timerwheel: - fset_destroy(proc.frct_set); - fail_frct_set: ssm_flow_set_close(proc.fqset); fail_fqset: pthread_rwlock_destroy(&proc.lock); @@ -789,8 +922,10 @@ static void fini(void) if (proc.fds == NULL) return; - pthread_cancel(proc.rx); - pthread_join(proc.rx, NULL); + /* Wake all in-flight readers/writers BEFORE wrlock acquire. */ + for (i = 0; i < PROC_MAX_FLOWS; ++i) + if (proc.flows[i].info.id != -1) + flow_quiesce(i); pthread_rwlock_wrlock(&proc.lock); @@ -798,10 +933,9 @@ static void fini(void) struct flow * flow = &proc.flows[i]; if (flow->info.id != -1) { ssize_t idx; - ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWDOWN); while ((idx = ssm_rbuff_read(flow->rx_rb)) >= 0) ssm_pool_remove(proc.pool, idx); - __flow_fini(i); + do_flow_fini(i); } } @@ -813,9 +947,7 @@ static void fini(void) #ifdef PROC_FLOW_STATS rib_fini(); #endif - timerwheel_fini(); - - fset_destroy(proc.frct_set); + tw_fini(); ssm_flow_set_close(proc.fqset); @@ -860,6 +992,10 @@ int flow_accept(qosspec_t * qs, if (qs != NULL) qs->ber = 1; #endif + /* STREAM cannot tolerate loss: drops create silent gaps. */ + if (qs != NULL && qs->service == SVC_STREAM && qs->loss != 0) + return -EINVAL; + memset(&flow, 0, sizeof(flow)); flow.n_pid = getpid(); @@ -878,7 +1014,8 @@ int flow_accept(qosspec_t * qs, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + /* No RTT in accept; rtt_hint=0 bootstraps from first ACK. */ + fd = flow_init(&flow, &crypt, 0); crypt_secure_clear(key, SYMMKEYSZ); @@ -899,11 +1036,16 @@ int flow_alloc(const char * dst, uint8_t key[SYMMKEYSZ]; int fd; int err; + struct timespec t0; + struct timespec t1; #ifdef QOS_DISABLE_CRC if (qs != NULL) qs->ber = 1; #endif + /* STREAM cannot tolerate loss: drops create silent gaps. */ + if (qs != NULL && qs->service == SVC_STREAM && qs->loss != 0) + return -EINVAL; memset(&flow, 0, sizeof(flow)); @@ -913,11 +1055,13 @@ int flow_alloc(const char * dst, if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) return -ENOMEM; + clock_gettime(PTHREAD_COND_CLOCK, &t0); + err = send_recv_msg(&msg); - if (err < 0) { - printf("send_recv_msg error %d\n", err); + if (err < 0) return err; - } + + clock_gettime(PTHREAD_COND_CLOCK, &t1); crypt.key = key; @@ -925,7 +1069,7 @@ int flow_alloc(const char * dst, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + fd = flow_init(&flow, &crypt, ts_diff_ns(&t1, &t0)); crypt_secure_clear(key, SYMMKEYSZ); @@ -964,7 +1108,7 @@ int flow_join(const char * dst, if (err < 0) return err; - fd = flow_init(&flow, &crypt); + fd = flow_init(&flow, &crypt, 0); crypt_secure_clear(key, SYMMKEYSZ); @@ -983,10 +1127,10 @@ int flow_dealloc(int fd) struct flow * flow; int err; - if (fd < 0 || fd >= SYS_MAX_FLOWS ) + if (fd < 0 || fd >= PROC_MAX_FLOWS ) return -EINVAL; - memset(&info, 0, sizeof(flow)); + memset(&info, 0, sizeof(info)); flow = &proc.flows[fd]; @@ -1008,9 +1152,8 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&proc.lock); - timeo.tv_sec = frcti_dealloc(flow->frcti); - while (timeo.tv_sec < 0) { /* keep the flow active for rtx */ - ssize_t ret; + while (FRCTI_LINGERING(flow->frcti)) { + ssize_t ret; pthread_rwlock_unlock(&proc.lock); @@ -1018,12 +1161,12 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&proc.lock); - timeo.tv_sec = frcti_dealloc(flow->frcti); - - if (ret == -EFLOWDOWN && timeo.tv_sec < 0) - timeo.tv_sec = -timeo.tv_sec; + if (ret == -EFLOWDOWN) + break; } + timeo.tv_sec = FRCTI_DEALLOC(flow->frcti); + pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); ssm_rbuff_fini(flow->tx_rb); @@ -1033,15 +1176,18 @@ int flow_dealloc(int fd) info.id = flow->info.id; info.n_pid = getpid(); - if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) - return -ENOMEM; + if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) { + err = -ENOMEM; + goto out; + } err = send_recv_msg(&msg); if (err < 0) - return err; + goto out; err = irm__irm_result_des(&msg); + out: flow_fini(fd); return err; @@ -1055,12 +1201,12 @@ int ipcp_flow_dealloc(int fd) struct flow * flow; int err; - if (fd < 0 || fd >= SYS_MAX_FLOWS ) + if (fd < 0 || fd >= PROC_MAX_FLOWS ) return -EINVAL; flow = &proc.flows[fd]; - memset(&info, 0, sizeof(flow)); + memset(&info, 0, sizeof(info)); pthread_rwlock_rdlock(&proc.lock); @@ -1074,15 +1220,18 @@ int ipcp_flow_dealloc(int fd) pthread_rwlock_unlock(&proc.lock); - if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) - return -ENOMEM; + if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) { + err = -ENOMEM; + goto out; + } err = send_recv_msg(&msg); if (err < 0) - return err; + goto out; err = irm__irm_result_des(&msg); + out: flow_fini(fd); return err; @@ -1102,8 +1251,18 @@ int fccntl(int fd, uint32_t tx_acl; size_t * qlen; struct flow * flow; + uint16_t old_acc; + uint16_t new_acc; + size_t max; + size_t * maxp; + size_t rsz; + size_t * rszp; + time_t rto; + time_t * rtop; + int rc; + bool emit_eos = false; - if (fd < 0 || fd >= SYS_MAX_FLOWS) + if (fd < 0 || fd >= PROC_MAX_FLOWS) return -EBADF; flow = &proc.flows[fd]; @@ -1167,14 +1326,27 @@ int fccntl(int fd, qlen = va_arg(l, size_t *); *qlen = ssm_rbuff_queued(flow->tx_rb); break; + case FLOWGMTU: + maxp = va_arg(l, size_t *); + if (maxp == NULL) + goto einval; + *maxp = flow_user_mtu(flow, flow->info.mtu); + break; case FLOWSFLAGS: + old_acc = flow->oflags & FLOWFACCMODE; flow->oflags = va_arg(l, uint32_t); + new_acc = flow->oflags & FLOWFACCMODE; + + /* Defer EOS emit until after proc.lock is dropped: */ + /* frcti_stream_fin_snd may block on shm-pool/tx-rb. */ + if (new_acc == FLOWFRDONLY + && old_acc != FLOWFRDONLY + && FRCTI_IS_STREAM(flow->frcti)) + emit_eos = true; + rx_acl = ssm_rbuff_get_acl(flow->rx_rb); - tx_acl = ssm_rbuff_get_acl(flow->rx_rb); - /* - * Making our own flow write only means making the - * the other side of the flow read only. - */ + tx_acl = ssm_rbuff_get_acl(flow->tx_rb); + /* Our flow write-only -> peer's read-only. */ if (flow->oflags & FLOWFWRONLY) rx_acl |= ACL_RDONLY; if (flow->oflags & FLOWFRDWR) @@ -1218,6 +1390,59 @@ int fccntl(int fd, goto eperm; *cflags = frcti_getflags(flow->frcti); break; + case FRCTSMAXSDU: + max = va_arg(l, size_t); + if (flow->frcti == NULL) + goto eperm; + if (frcti_set_max_rcv_sdu(flow->frcti, max) < 0) + goto einval; + break; + case FRCTGMAXSDU: + maxp = va_arg(l, size_t *); + if (maxp == NULL) + goto einval; + if (flow->frcti == NULL) + goto eperm; + *maxp = frcti_get_max_rcv_sdu(flow->frcti); + break; + case FRCTSRRINGSZ: + rsz = va_arg(l, size_t); + if (flow->frcti == NULL) + goto eperm; + rc = frcti_set_rcv_ring_sz(flow->frcti, rsz); + if (rc < 0) { + pthread_rwlock_unlock(&proc.lock); + va_end(l); + return rc; + } + break; + case FRCTGRRINGSZ: + rszp = va_arg(l, size_t *); + if (rszp == NULL) + goto einval; + if (flow->frcti == NULL) + goto eperm; + *rszp = frcti_get_rcv_ring_sz(flow->frcti); + break; + case FRCTSRTOMIN: + if (flow->frcti == NULL) + goto eperm; + rto = va_arg(l, time_t); + rc = frcti_set_rto_min(flow->frcti, rto); + if (rc < 0) { + pthread_rwlock_unlock(&proc.lock); + va_end(l); + return rc; + } + break; + case FRCTGRTOMIN: + if (flow->frcti == NULL) + goto eperm; + rtop = va_arg(l, time_t *); + if (rtop == NULL) + goto einval; + *rtop = frcti_get_rto_min(flow->frcti); + break; default: pthread_rwlock_unlock(&proc.lock); va_end(l); @@ -1227,6 +1452,9 @@ int fccntl(int fd, pthread_rwlock_unlock(&proc.lock); + if (emit_eos) + frcti_stream_fin_snd(flow->frcti); + va_end(l); return 0; @@ -1241,86 +1469,189 @@ int fccntl(int fd, return -EPERM; } -static int chk_crc(struct ssm_pk_buff * spb) -{ - uint32_t crc; - uint8_t * head = ssm_pk_buff_head(spb); - uint8_t * tail = ssm_pk_buff_pop_tail(spb, CRCLEN); - - mem_hash(HASH_CRC32, &crc, head, tail - head); - - return !(crc == *((uint32_t *) tail)); -} - -static int add_crc(struct ssm_pk_buff * spb) -{ - uint8_t * head; - uint8_t * tail; - - tail = ssm_pk_buff_push_tail(spb, CRCLEN); - if (tail == NULL) - return -ENOMEM; - - head = ssm_pk_buff_head(spb); - mem_hash(HASH_CRC32, tail, head, tail - head); - - return 0; -} - static int flow_tx_spb(struct flow * flow, struct ssm_pk_buff * spb, + uint16_t flags, bool block, struct timespec * abstime) { struct timespec now; ssize_t idx; + size_t pci_total; int ret; clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_wrlock(&proc.lock); - flow->snd_act = now; - pthread_rwlock_unlock(&proc.lock); - idx = ssm_pk_buff_get_off(spb); - pthread_rwlock_rdlock(&proc.lock); - if (ssm_pk_buff_len(spb) > 0) { - if (frcti_snd(flow->frcti, spb) < 0) + if (FRCTI_SND(flow->frcti, spb, flags) < 0) goto enomem; - if (spb_encrypt(flow, spb) < 0) - goto enomem; + if (flow->info.qs.ber == 0) { + pci_total = flow->frcti != NULL + ? frcti_data_hdr_len(flow->frcti) : 0; + if (crc_add(spb, pci_total) != 0) + goto enomem; + } - if (flow->info.qs.ber == 0 && add_crc(spb) != 0) + if (spb_encrypt(flow, spb) < 0) goto enomem; } - pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); - if (!block) ret = ssm_rbuff_write(flow->tx_rb, idx); else ret = ssm_rbuff_write_b(flow->tx_rb, idx, abstime); - if (ret < 0) + if (ret < 0) { ssm_pool_remove(proc.pool, idx); - else - ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); - - pthread_cleanup_pop(true); + return ret; + } + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); return 0; -enomem: - pthread_rwlock_unlock(&proc.lock); + enomem: ssm_pool_remove(proc.pool, idx); return -ENOMEM; } +/* Per-fragment role for fragment i out of n; n == 1 yields SOLE. */ +static __inline__ uint16_t flow_frag_role(size_t i, size_t n) +{ + if (n == 1) + return FRCT_FR_SOLE; + if (i == 0) + return FRCT_FR_FIRST; + if (i + 1 == n) + return FRCT_FR_LAST; + + return FRCT_FR_MID; +} + +/* + * Stream-mode write: split buf into chunks of + * (frag_mtu - PCI - PCI_STREAM) bytes; each chunk goes through the + * normal tx path. frcti_snd injects the [start,end) extension and + * advances snd_byte_next under its wrlock. No FFGM/LFGM role bits. + */ +static ssize_t flow_write_stream(struct flow * flow, + const void * buf, + size_t count, + int oflags, + struct timespec * dl) +{ + const uint8_t * src = buf; + size_t payload; + size_t off = 0; + bool block = !(oflags & FLOWFWNOBLOCK); + + if (!FRCTI_IS_FRTX(flow->frcti)) + return -EMSGSIZE; + + payload = FRCTI_PAYLOAD_CAP(flow->frcti); + + while (off < count) { + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t idx; + size_t clen; + int ret; + + ret = flow_wait_window(flow, 1, block, dl); + if (ret < 0) + return off > 0 ? (ssize_t) off : (ssize_t) ret; + + clen = MIN(count - off, payload); + + if (block) + idx = ssm_pool_alloc_b(proc.pool, clen, &ptr, + &spb, dl); + else + idx = ssm_pool_alloc(proc.pool, clen, &ptr, &spb); + if (idx < 0) + return off > 0 ? (ssize_t) off : idx; + + memcpy(ptr, src + off, clen); + + ret = flow_tx_spb(flow, spb, 0, block, dl); + if (ret < 0) + return off > 0 ? (ssize_t) off : (ssize_t) ret; + + off += clen; + } + + return (ssize_t) count; +} + +/* Per-fragment flow_tx_spb loop. Raw flows refuse; FRCT splits the SDU. */ +static ssize_t flow_write_frag(struct flow * flow, + const void * buf, + size_t count, + int oflags, + struct timespec * dl) +{ + const uint8_t * src = buf; + size_t frag_payload; + size_t n; + size_t off = 0; + size_t i; + int ret; + bool block = !(oflags & FLOWFWNOBLOCK); + + /* Raw flows carry no PCI; cannot fragment. */ + if (flow->frcti == NULL) + return -EMSGSIZE; + + frag_payload = FRCTI_PAYLOAD_CAP(flow->frcti); + + /* Guard the ceil-divide against size_t overflow. */ + if (count > SIZE_MAX - frag_payload + 1) + return -EMSGSIZE; + n = (count + frag_payload - 1) / frag_payload; + + /* SDU larger than the FC window can ever offer would deadlock. */ + if (n > RQ_SIZE) + return -EMSGSIZE; + + /* SDU-atomic FC: wait for n seqnos to avoid overshoot mid-SDU. */ + ret = flow_wait_window(flow, n, block, dl); + if (ret < 0) + return (ssize_t) ret; + + STAT_BUMP(flow->frcti, sdu_snd_frag); + + for (i = 0; i < n; ++i) { + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t idx; + size_t clen; + + clen = (i + 1 == n) ? (count - off) : frag_payload; + + if (block) + idx = ssm_pool_alloc_b(proc.pool, clen, &ptr, + &spb, dl); + else + idx = ssm_pool_alloc(proc.pool, clen, &ptr, &spb); + if (idx < 0) + return off > 0 ? (ssize_t) off : idx; + + memcpy(ptr, src + off, clen); + + ret = flow_tx_spb(flow, spb, flow_frag_role(i, n), + block, dl); + if (ret < 0) + return off > 0 ? (ssize_t) off : (ssize_t) ret; + + off += clen; + } + + return (ssize_t) count; +} + ssize_t flow_write(int fd, const void * buf, size_t count) @@ -1330,7 +1661,8 @@ ssize_t flow_write(int fd, int ret; int flags; struct timespec abs; - struct timespec * abstime = NULL; + struct timespec now; + struct timespec * dl = NULL; struct ssm_pk_buff * spb; uint8_t * ptr; @@ -1342,64 +1674,62 @@ ssize_t flow_write(int fd, flow = &proc.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - pthread_rwlock_wrlock(&proc.lock); + pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); return -ENOTALLOC; } + flags = flow->oflags; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + if (flow->snd_timesout) { - ts_add(&abs, &flow->snd_timeo, &abs); - abstime = &abs; + ts_add(&now, &flow->snd_timeo, &abs); + dl = &abs; } - flags = flow->oflags; - pthread_rwlock_unlock(&proc.lock); if ((flags & FLOWFACCMODE) == FLOWFRDONLY) return -EPERM; - if (flags & FLOWFWNOBLOCK) { - if (!frcti_is_window_open(flow->frcti)) - return -EAGAIN; - idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb); - } else { - ret = frcti_window_wait(flow->frcti, abstime); + tw_move_safe(); + + if (flow->frcti != NULL) { + /* Pump rx_rb so a pure-writer processes ACKs. */ + ret = flow_wait_window(flow, 1, !(flags & FLOWFWNOBLOCK), dl); if (ret < 0) return ret; - idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, abstime); + + if (count > 0 && FRCTI_IS_STREAM(flow->frcti)) + return flow_write_stream(flow, buf, count, flags, dl); + + if (FRCTI_NEEDS_FRAG(flow->frcti, count)) + return flow_write_frag(flow, buf, count, flags, dl); + } else if (flow->info.mtu > 0 + && count > flow_user_mtu(flow, flow->info.mtu)) { + /* Raw flows carry no PCI; refuse anything > one n-1 frame. */ + return -EMSGSIZE; } + if (flags & FLOWFWNOBLOCK) + idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb); + else + idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, dl); if (idx < 0) return idx; if (count > 0) memcpy(ptr, buf, count); - ret = flow_tx_spb(flow, spb, !(flags & FLOWFWNOBLOCK), abstime); + ret = flow_tx_spb(flow, spb, FRCT_FR_SOLE, + !(flags & FLOWFWNOBLOCK), dl); return ret < 0 ? (ssize_t) ret : (ssize_t) count; } -static bool invalid_pkt(struct flow * flow, - struct ssm_pk_buff * spb) -{ - if (spb == NULL || ssm_pk_buff_len(spb) == 0) - return true; - - if (flow->info.qs.ber == 0 && chk_crc(spb) != 0) - return true; - - if (spb_decrypt(flow, spb) < 0) - return true; - - return false; -} - static ssize_t flow_rx_spb(struct flow * flow, struct ssm_pk_buff ** spb, bool block, @@ -1408,19 +1738,14 @@ static ssize_t flow_rx_spb(struct flow * flow, ssize_t idx; struct timespec now; - idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) : - ssm_rbuff_read(flow->rx_rb); + idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) + : ssm_rbuff_read(flow->rx_rb); if (idx < 0) return idx; clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_wrlock(&proc.lock); - flow->rcv_act = now; - pthread_rwlock_unlock(&proc.lock); - *spb = ssm_pool_get(proc.pool, idx); if (invalid_pkt(flow, *spb)) { @@ -1431,18 +1756,116 @@ static ssize_t flow_rx_spb(struct flow * flow, return idx; } +static ssize_t raw_flow_read_pkt(struct flow * flow, + bool block, + struct timespec * dl) +{ + struct ssm_pk_buff * spb; + struct timespec wait_abs; + ssize_t idx; + + while (true) { + if (!block) { + idx = ssm_rbuff_read(flow->rx_rb); + if (idx < 0) + return -EAGAIN; + } else { + compute_wait_deadline(dl, &wait_abs); + idx = ssm_rbuff_read_b(flow->rx_rb, &wait_abs); + if (idx == -ETIMEDOUT) { + if (deadline_passed(dl)) + return -ETIMEDOUT; + continue; + } + if (idx < 0) + return idx; + } + + spb = ssm_pool_get(proc.pool, idx); + if (!invalid_pkt(flow, spb)) + return idx; + + ssm_pool_remove(proc.pool, idx); + if (!block) + return -EAGAIN; + } +} + +static ssize_t deliver_pkt(struct flow * flow, + struct ssm_pk_buff * spb, + ssize_t idx, + void * buf, + size_t count, + bool partrd) +{ + uint8_t * packet = ssm_pk_buff_head(spb); + ssize_t n = ssm_pk_buff_len(spb); + + assert(n >= 0); + + if (n <= (ssize_t) count) { + memcpy(buf, packet, n); + ipcp_spb_release(spb); + if (partrd && n == (ssize_t) count) + flow->part_idx = DONE_PART; + else + flow->part_idx = NO_PART; + + return n; + } + + if (partrd) { + memcpy(buf, packet, count); + ssm_pk_buff_pop(spb, n); + flow->part_idx = idx; + return count; + } + + ipcp_spb_release(spb); + return -EMSGSIZE; +} + +/* Drive frcti_consume until it delivers or errors. */ +static ssize_t flow_read_frcti(struct flow * flow, + void * buf, + size_t count, + bool block, + struct timespec * dl) +{ + struct timespec now; + ssize_t bytes; + int rc; + + while (true) { + flow_drain_rx_nb(flow); + bytes = FRCTI_CONSUME(flow->frcti, buf, count); + if (bytes >= 0) + break; + if (bytes != -EAGAIN) + return bytes; + if (!block) + return -EAGAIN; + rc = flow_rx_one(flow, dl); + if (rc < 0) + return rc; + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow->rcv_act = now; + + return bytes; +} + ssize_t flow_read(int fd, void * buf, size_t count) { - ssize_t idx; - ssize_t n; - uint8_t * packet; + struct flow * flow; struct ssm_pk_buff * spb; struct timespec abs; struct timespec now; - struct timespec * abstime = NULL; - struct flow * flow; + struct timespec * dl = NULL; + ssize_t idx; bool block; bool partrd; @@ -1451,8 +1874,6 @@ ssize_t flow_read(int fd, flow = &proc.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { @@ -1461,8 +1882,8 @@ ssize_t flow_read(int fd, } if (flow->part_idx == DONE_PART) { - pthread_rwlock_unlock(&proc.lock); flow->part_idx = NO_PART; + pthread_rwlock_unlock(&proc.lock); return 0; } @@ -1470,75 +1891,33 @@ ssize_t flow_read(int fd, partrd = !(flow->oflags & FLOWFRNOPART); if (flow->rcv_timesout) { + clock_gettime(PTHREAD_COND_CLOCK, &now); ts_add(&now, &flow->rcv_timeo, &abs); - abstime = &abs; + dl = &abs; } - idx = flow->part_idx; - if (idx < 0) { - while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { - pthread_rwlock_unlock(&proc.lock); - - idx = flow_rx_spb(flow, &spb, block, abstime); - if (idx < 0) { - if (block && idx != -EAGAIN) - return idx; - if (!block) - return idx; + pthread_rwlock_unlock(&proc.lock); - pthread_rwlock_rdlock(&proc.lock); - continue; - } + tw_move_safe(); - pthread_rwlock_rdlock(&proc.lock); + idx = flow->part_idx; + if (idx < 0 && flow->frcti != NULL) + return flow_read_frcti(flow, buf, count, block, dl); - frcti_rcv(flow->frcti, spb); - } + if (idx < 0) { + idx = raw_flow_read_pkt(flow, block, dl); + if (idx < 0) + return idx; } spb = ssm_pool_get(proc.pool, idx); - pthread_rwlock_unlock(&proc.lock); - - packet = ssm_pk_buff_head(spb); - - n = ssm_pk_buff_len(spb); - - assert(n >= 0); - - if (n <= (ssize_t) count) { - memcpy(buf, packet, n); - ipcp_spb_release(spb); - - pthread_rwlock_wrlock(&proc.lock); - - flow->part_idx = (partrd && n == (ssize_t) count) ? - DONE_PART : NO_PART; - - flow->rcv_act = now; - - pthread_rwlock_unlock(&proc.lock); - return n; - } else { - if (partrd) { - memcpy(buf, packet, count); - ssm_pk_buff_pop(spb, n); - pthread_rwlock_wrlock(&proc.lock); - flow->part_idx = idx; - - flow->rcv_act = now; + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow->rcv_act = now; - pthread_rwlock_unlock(&proc.lock); - return count; - } else { - ipcp_spb_release(spb); - return -EMSGSIZE; - } - } + return deliver_pkt(flow, spb, idx, buf, count, partrd); } -/* fqueue functions. */ - struct flow_set * fset_create(void) { struct flow_set * set; @@ -1614,7 +1993,7 @@ int fset_add(struct flow_set * set, struct flow * flow; int ret; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return -EINVAL; flow = &proc.flows[fd]; @@ -1650,7 +2029,7 @@ void fset_del(struct flow_set * set, { struct flow * flow; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return; flow = &proc.flows[fd]; @@ -1661,7 +2040,7 @@ void fset_del(struct flow_set * set, ssm_flow_set_del(proc.fqset, set->idx, flow->info.id); if (flow->frcti != NULL) - ssm_flow_set_add(proc.fqset, 0, proc.flows[fd].info.id); + ssm_flow_set_add(proc.fqset, 0, flow->info.id); pthread_rwlock_unlock(&proc.lock); } @@ -1672,7 +2051,7 @@ bool fset_has(const struct flow_set * set, struct flow * flow; bool ret; - if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= PROC_MAX_FLOWS) return false; flow = &proc.flows[fd]; @@ -1691,61 +2070,59 @@ bool fset_has(const struct flow_set * set, return ret; } -/* Filter fqueue events for non-data packets */ static int fqueue_filter(struct fqueue * fq) { struct ssm_pk_buff * spb; int fd; ssize_t idx; struct frcti * frcti; + int ret = 0; - while (fq->next < fq->fqsize) { - if (fq->fqueue[fq->next].event != FLOW_PKT) - return 1; + /* proc.lock rdlock gates frcti_destroy via flow_fini wrlock. */ + pthread_rwlock_rdlock(&proc.lock); - pthread_rwlock_rdlock(&proc.lock); + while (fq->next < fq->fqsize) { + if (fq->fqueue[fq->next].event != FLOW_PKT) { + ret = 1; + goto out; + } fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd; if (fd < 0) { ++fq->next; - pthread_rwlock_unlock(&proc.lock); continue; } frcti = proc.flows[fd].frcti; if (frcti == NULL) { - pthread_rwlock_unlock(&proc.lock); - return 1; + ret = 1; + goto out; } - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&proc.lock); - return 1; + if (FRCTI_PDU_READY(frcti)) { + ret = 1; + goto out; } - pthread_rwlock_unlock(&proc.lock); - idx = flow_rx_spb(&proc.flows[fd], &spb, false, NULL); if (idx < 0) - return 0; - - pthread_rwlock_rdlock(&proc.lock); + goto out; spb = ssm_pool_get(proc.pool, idx); - __frcti_rcv(frcti, spb); + FRCTI_RCV(frcti, spb); - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&proc.lock); - return 1; + if (FRCTI_PDU_READY(frcti)) { + ret = 1; + goto out; } - pthread_rwlock_unlock(&proc.lock); - ++fq->next; } - return 0; + out: + pthread_rwlock_unlock(&proc.lock); + return ret; } int fqueue_next(struct fqueue * fq) @@ -1792,7 +2169,8 @@ ssize_t fevent(struct flow_set * set, { ssize_t ret = 0; struct timespec abs; - struct timespec * t = NULL; + struct timespec * dl = NULL; + struct timespec wait_abs; if (set == NULL || fq == NULL) return -EINVAL; @@ -1800,17 +2178,26 @@ ssize_t fevent(struct flow_set * set, if (fq->fqsize > 0 && fq->next != fq->fqsize) return 1; - clock_gettime(PTHREAD_COND_CLOCK, &abs); - if (timeo != NULL) { - ts_add(&abs, timeo, &abs); - t = &abs; + struct timespec now; + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, timeo, &abs); + dl = &abs; } while (ret == 0) { - ret = ssm_flow_set_wait(proc.fqset, set->idx, fq->fqueue, t); - if (ret == -ETIMEDOUT) - return -ETIMEDOUT; + tw_move_safe(); + + compute_wait_deadline(dl, &wait_abs); + + ret = ssm_flow_set_wait(proc.fqset, set->idx, + fq->fqueue, &wait_abs); + if (ret == -ETIMEDOUT) { + if (deadline_passed(dl)) + return -ETIMEDOUT; + ret = 0; + continue; + } fq->fqsize = ret; fq->next = 0; @@ -1823,8 +2210,6 @@ ssize_t fevent(struct flow_set * set, return 1; } -/* ipcp-dev functions. */ - int np1_flow_alloc(pid_t n_pid, int flow_id) { @@ -1837,9 +2222,10 @@ int np1_flow_alloc(pid_t n_pid, flow.n_pid = getpid(); flow.qs = qos_np1; flow.mpl = 0; - flow.n_1_pid = n_pid; /* This "flow" is upside-down! */ + /* np1 flow: n_1_pid is the upper. */ + flow.n_1_pid = n_pid; - return flow_init(&flow, &crypt); + return flow_init(&flow, &crypt, 0); } int np1_flow_dealloc(int flow_id, @@ -1847,12 +2233,7 @@ int np1_flow_dealloc(int flow_id, { int fd; - /* - * TODO: Don't pass timeo to the IPCP but wait in IRMd. - * This will need async ops, waiting until we bootstrap - * the IRMd over ouroboros. - */ - + /* TODO: wait in IRMd, not here; needs async ops. */ sleep(timeo); pthread_rwlock_rdlock(&proc.lock); @@ -1900,6 +2281,7 @@ int ipcp_create_r(const struct ipcp_info * info) int ipcp_flow_req_arr(const buffer_t * dst, qosspec_t qs, time_t mpl, + uint32_t mtu, const buffer_t * data) { struct flow_info flow; @@ -1916,6 +2298,7 @@ int ipcp_flow_req_arr(const buffer_t * dst, flow.n_1_pid = getpid(); flow.qs = qs; flow.mpl = mpl; + flow.mtu = mtu; if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0) return -ENOMEM; @@ -1930,22 +2313,25 @@ int ipcp_flow_req_arr(const buffer_t * dst, if (err < 0) return err; - assert(crypt.nid == NID_undef); /* np1 flows are not encrypted */ + /* np1 flows are not encrypted. */ + assert(crypt.nid == NID_undef); - /* inverted for np1_flow */ + /* Inverted for np1_flow. */ flow.n_1_pid = flow.n_pid; flow.n_pid = getpid(); flow.mpl = 0; + flow.mtu = 0; flow.qs = qos_np1; crypt.nid = NID_undef; - return flow_init(&flow, &crypt); + return flow_init(&flow, &crypt, 0); } int ipcp_flow_alloc_reply(int fd, int response, time_t mpl, + uint32_t mtu, const buffer_t * data) { struct flow_info flow; @@ -1953,7 +2339,7 @@ int ipcp_flow_alloc_reply(int fd, buffer_t msg = {SOCK_BUF_SIZE, buf}; int err; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); pthread_rwlock_rdlock(&proc.lock); @@ -1962,6 +2348,7 @@ int ipcp_flow_alloc_reply(int fd, pthread_rwlock_unlock(&proc.lock); flow.mpl = mpl; + flow.mtu = mtu; if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0) return -ENOMEM; @@ -1979,7 +2366,7 @@ int ipcp_flow_read(int fd, struct flow * flow; ssize_t idx = -1; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -1988,7 +2375,14 @@ int ipcp_flow_read(int fd, assert(flow->info.id >= 0); - while (frcti_queued_pdu(flow->frcti) < 0) { + /* Raw flow: deliver the popped pkt directly (no FRCT rq). */ + if (flow->frcti == NULL) { + pthread_rwlock_unlock(&proc.lock); + idx = flow_rx_spb(flow, spb, false, NULL); + return idx < 0 ? (int) idx : 0; + } + + while (!FRCTI_PDU_READY(flow->frcti)) { pthread_rwlock_unlock(&proc.lock); idx = flow_rx_spb(flow, spb, false, NULL); @@ -1997,7 +2391,7 @@ int ipcp_flow_read(int fd, pthread_rwlock_rdlock(&proc.lock); - frcti_rcv(flow->frcti, *spb); + FRCTI_RCV(flow->frcti, *spb); } pthread_rwlock_unlock(&proc.lock); @@ -2011,12 +2405,12 @@ int ipcp_flow_write(int fd, struct flow * flow; int ret; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; - pthread_rwlock_wrlock(&proc.lock); + pthread_rwlock_rdlock(&proc.lock); if (flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); @@ -2030,15 +2424,16 @@ int ipcp_flow_write(int fd, pthread_rwlock_unlock(&proc.lock); - ret = flow_tx_spb(flow, spb, true, NULL); + ret = flow_tx_spb(flow, spb, FRCT_FR_SOLE, true, NULL); return ret; } -static int pool_copy_spb(struct ssm_pool * src_pool, - ssize_t src_off, - struct ssm_pool * dst_pool, - struct ssm_pk_buff ** dst_spb) +/* Copy src into dst_pool without consuming src. Caller owns both halves. */ +static int pool_dup_spb(struct ssm_pool * src_pool, + size_t src_off, + struct ssm_pool * dst_pool, + struct ssm_pk_buff ** dst_spb) { struct ssm_pk_buff * src; uint8_t * ptr; @@ -2051,7 +2446,6 @@ static int pool_copy_spb(struct ssm_pool * src_pool, return -ENOMEM; memcpy(ptr, ssm_pk_buff_head(src), len); - ssm_pool_remove(src_pool, src_off); return 0; } @@ -2061,9 +2455,9 @@ int np1_flow_read(int fd, struct ssm_pool * pool) { struct flow * flow; - ssize_t off = -1; + ssize_t off; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -2084,10 +2478,11 @@ int np1_flow_read(int fd, *spb = ssm_pool_get(proc.pool, off); } else { /* Cross-pool copy: PUP -> GSPP */ - if (pool_copy_spb(pool, off, proc.pool, spb) < 0) { + if (pool_dup_spb(pool, off, proc.pool, spb) < 0) { ssm_pool_remove(pool, off); return -ENOMEM; } + ssm_pool_remove(pool, off); } return 0; @@ -2100,10 +2495,10 @@ int np1_flow_write(int fd, struct flow * flow; struct ssm_pk_buff * dst; int ret; - ssize_t src_off; - ssize_t dst_off; + size_t off; + size_t dst_off; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(spb); flow = &proc.flows[fd]; @@ -2122,16 +2517,16 @@ int np1_flow_write(int fd, pthread_rwlock_unlock(&proc.lock); - src_off = ssm_pk_buff_get_off(spb); + off = ssm_pk_buff_get_off(spb); if (pool == NULL) { - ret = ssm_rbuff_write_b(flow->tx_rb, src_off, NULL); + ret = ssm_rbuff_write_b(flow->tx_rb, off, NULL); if (ret < 0) return ret; ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); } else { - /* Cross-pool copy: GSPP -> PUP */ - if (pool_copy_spb(proc.pool, src_off, pool, &dst) < 0) + /* Cross-pool copy: GSPP -> PUP. Src kept on error. */ + if (pool_dup_spb(proc.pool, off, pool, &dst) < 0) return -ENOMEM; dst_off = ssm_pk_buff_get_off(dst); ret = ssm_rbuff_write_b(flow->tx_rb, dst_off, NULL); @@ -2140,7 +2535,7 @@ int np1_flow_write(int fd, return ret; } ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); - ssm_pool_remove(proc.pool, src_off); + ssm_pool_remove(proc.pool, off); } return 0; @@ -2149,7 +2544,8 @@ int np1_flow_write(int fd, int ipcp_spb_reserve(struct ssm_pk_buff ** spb, size_t len) { - return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 ? -1 : 0; + return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 + ? -1 : 0; } void ipcp_spb_release(struct ssm_pk_buff * spb) @@ -2161,7 +2557,7 @@ int ipcp_flow_fini(int fd) { struct ssm_rbuff * rx_rb; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); pthread_rwlock_rdlock(&proc.lock); @@ -2190,7 +2586,7 @@ int ipcp_flow_fini(int fd) int ipcp_flow_get_qoscube(int fd, qoscube_t * cube) { - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(fd >= 0 && fd < PROC_MAX_FLOWS); assert(cube); pthread_rwlock_rdlock(&proc.lock); @@ -2229,8 +2625,7 @@ int local_flow_transfer(int src_fd, struct ssm_pk_buff * dst_spb; struct ssm_pool * sp; struct ssm_pool * dp; - ssize_t src_off; - ssize_t dst_off; + ssize_t off; int ret; assert(src_fd >= 0); @@ -2244,15 +2639,15 @@ int local_flow_transfer(int src_fd, pthread_rwlock_rdlock(&proc.lock); - src_off = ssm_rbuff_read(src_flow->rx_rb); - if (src_off < 0) { + off = ssm_rbuff_read(src_flow->rx_rb); + if (off < 0) { pthread_rwlock_unlock(&proc.lock); - return src_off; + return off; } if (dst_flow->info.id < 0) { pthread_rwlock_unlock(&proc.lock); - ssm_pool_remove(sp, src_off); + ssm_pool_remove(sp, off); return -ENOTALLOC; } @@ -2260,23 +2655,24 @@ int local_flow_transfer(int src_fd, if (sp == dp) { /* Same pool: zero-copy */ - ret = ssm_rbuff_write_b(dst_flow->tx_rb, src_off, NULL); + ret = ssm_rbuff_write_b(dst_flow->tx_rb, off, NULL); if (ret < 0) - ssm_pool_remove(sp, src_off); + ssm_pool_remove(sp, off); else ssm_flow_set_notify(dst_flow->set, dst_flow->info.id, FLOW_PKT); } else { /* Different pools: single copy */ - if (pool_copy_spb(sp, src_off, dp, &dst_spb) < 0) { - ssm_pool_remove(sp, src_off); + if (pool_dup_spb(sp, off, dp, &dst_spb) < 0) { + ssm_pool_remove(sp, off); return -ENOMEM; } - dst_off = ssm_pk_buff_get_off(dst_spb); - ret = ssm_rbuff_write_b(dst_flow->tx_rb, dst_off, NULL); + ssm_pool_remove(sp, off); + off = ssm_pk_buff_get_off(dst_spb); + ret = ssm_rbuff_write_b(dst_flow->tx_rb, off, NULL); if (ret < 0) - ssm_pool_remove(dp, dst_off); + ssm_pool_remove(dp, off); else ssm_flow_set_notify(dst_flow->set, dst_flow->info.id, FLOW_PKT); diff --git a/src/lib/frct.c b/src/lib/frct.c index c0fdd703..1d583162 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2026 * - * Flow and Retransmission Control + * Flow and Retransmission Control Task (FRCT) * * Dimitri Staessens <dimitri@ouroboros.rocks> * Sander Vrijders <sander@ouroboros.rocks> @@ -20,97 +20,370 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include <ouroboros/endian.h> +/* Included by dev.c; uses dev.c statics (proc, spb_encrypt, ...). */ #define DELT_RDV (100 * MILLION) /* ns */ -#define MAX_RDV (1 * BILLION) /* ns */ +#define MAX_RDV (1 * BILLION) /* ns */ + +#define MAX_RTO_MUL 20 /* caps the RTO backoff shift */ +#define INITIAL_RTO (1 * BILLION) /* RFC 6298 §2.1: 1 s default */ +#define RTT_BOOT_NS (10 * MILLION) /* rtt_hint floor + initial mdev */ +#define SRTT_FLOOR_NS 1000L /* 1 us; smoothed RTT floor */ +#define MDEV_FLOOR_NS 100L /* 100 ns; mdev sanity floor */ +#define RTT_CLAMP_MUL 16 /* probe sample cap = N * srtt */ +#define MIN_RTT_WIN_NS (300ULL * BILLION) /* 5 min, Linux tcp default */ +#define NACK_COOLDOWN_NS (100 * MILLION) /* pre-DRF NACK cooldown fallback */ #define FRCT "frct" #define FRCT_PCILEN (sizeof(struct frct_pci)) #define FRCT_NAME_STRLEN 32 -struct frct_cr { - uint32_t lwe; /* Left window edge */ - uint32_t rwe; /* Right window edge */ +/* Wire-protocol cap on SACK blocks per packet; binds both peers. */ +#define SACK_MAX_BLOCKS 2048 +#define SACK_BLOCK_SIZE (2 * sizeof(uint32_t)) +/* 2B count + 2B pad to 4-byte align the block list. */ +#define SACK_HDR_SIZE (sizeof(uint32_t)) +#define SACK_MIN_GAP_NS (250u * 1000u) /* 250 us SACK gap */ +#define MIN_REORDER_NS (250u * 1000u) /* 250 us RACK floor */ +#define SACK_RXM_MAX 32 /* Cap on retransmits staged from single SACK.*/ +#define DUP_THRESH 3 /* RFC 8985 §6.2 step 2.2 SACK count gate. */ + +/* RFC 8985 §7.2 RACK reorder-window scaling cap. */ +#define REO_WND_MULT_MAX 20 +/* RFC 8985 §7.2 step 5: round trips of no DSACK before halving. */ +#define REO_DECAY_PKTS 16 +/* DSACK seqno sanity: reject reports older/farther than one rcv window. */ +#define MAX_DSACK_LAG RQ_SIZE + +/* FRCT r-timer: do not retransmit packet older than t_r (from first send). */ +#define RXM_AGED_OUT(t0, now_ns, t_r) (((now_ns) - (t0)) > (uint64_t) (t_r)) + +/* FRCT a-timer: do not (re)transmit ACK after t_a from last data receive. */ +#define ACK_AGED_OUT(act, now_ns, t_a) (((now_ns) - (act)) > (uint64_t) (t_a)) + +struct sack_args { + uint16_t n; + bool dsack; /* RFC 2883: block[0] is a DSACK report */ + uint32_t ack; + uint32_t rwe; + uint32_t blocks[][2]; /* flexible — sized at alloc time */ +}; - uint8_t cflags; - uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */ +/* NewReno-careful (RFC 6582) exit pad; gates RTT samples post-signal. */ +#define RTT_QUARANTINE 32 +#define RTTP_NONCE_LEN 16 + +/* RTT-probe wire payload (after the FRCT PCI). */ +struct frct_rttp { + uint32_t probe_id; /* sender counter; 0 on reply */ + uint32_t echo_id; /* peer's probe_id; 0 outbound */ + uint8_t nonce[RTTP_NONCE_LEN]; /* random; echoed verbatim */ +} __attribute__((packed)); - struct timespec act; /* Last seen activity */ - time_t inact; /* Inactivity (s) */ +#define RTTP_PAYLOAD sizeof(struct frct_rttp) +#define RTTP_POS(id) ((id) & (RTTP_RING - 1)) + +/* + * Flag values are assigned MSB-first on the wire (RFC convention): + * bit 0 = 0x8000 occupies wire-position 0 of the 16-bit flags + * field, bit 11 = 0x0010 is the last assigned bit, and the four + * LSBs (0x000F) are reserved. + */ +enum frct_flags { + FRCT_DATA = 0x8000, /* PDU carries data */ + FRCT_DRF = 0x4000, /* Data run flag */ + FRCT_ACK = 0x2000, /* ACK field valid */ + FRCT_NACK = 0x1000, /* Neg-ACK: pci->seqno is arrival_seqno - 1 */ + FRCT_FC = 0x0800, /* FC window valid */ + FRCT_RDVS = 0x0400, /* Rendez-vous */ + FRCT_FFGM = 0x0200, /* First fragment (begin) */ + FRCT_LFGM = 0x0100, /* Last fragment (end) */ + FRCT_RXM = 0x0080, /* Retransmission */ + FRCT_SACK = 0x0040, /* SACK block list follows */ + FRCT_RTTP = 0x0020, /* RTT probe / echo */ + FRCT_KA = 0x0010, /* Keepalive */ + FRCT_FIN = 0x0008, /* End of stream (stream) */ }; -struct frcti { - int fd; +/* + * DATA-packet fragment role (FFGM = begin, LFGM = end), SCTP-style: + * 1 1 = sole / un-fragmented SDU (begin AND end) + * 1 0 = first fragment of a multi-fragment SDU + * 0 0 = middle fragment + * 0 1 = last fragment + */ +#define FRCT_FR_MASK (FRCT_FFGM | FRCT_LFGM) +#define FRCT_FR_SOLE (FRCT_FFGM | FRCT_LFGM) +#define FRCT_FR_FIRST (FRCT_FFGM) +#define FRCT_FR_MID (0) +#define FRCT_FR_LAST (FRCT_LFGM) + +/* Default cap on a single reassembled SDU. App can raise via FRCTSMAXSDU */ +#define FRCT_MAX_SDU (1U << 20) + +/* Stream-mode PCI extension: [start, end) byte range on every DATA pkt. */ +struct frct_pci_stream { + uint32_t start; + uint32_t end; +} __attribute__((packed)); + +#define FRCT_PCI_STREAM_LEN (sizeof(struct frct_pci_stream)) + +/* Bytes following PCI: SACK list / RTTP nonce / control payload. */ +#define FRCT_BODY(pci) ((uint8_t *) (pci) + FRCT_PCILEN) +/* Typed access to the stream PCI extension on stream DATA packets. */ +#define FRCT_SPCI(pci) \ + ((struct frct_pci_stream *) ((uint8_t *) (pci) + FRCT_PCILEN)) + +/* Push the FRCT header onto spb's head. */ +#define FRCT_HDR_PUSH(spb, frcti) \ + ((struct frct_pci *) ssm_pk_buff_push((spb), \ + frcti_data_hdr_len(frcti))) + +/* Pop a fixed-size header off spb's head; cast to type *. */ +#define FRCT_HDR_POP(spb, type) \ + ((struct type *) ssm_pk_buff_pop((spb), sizeof(struct type))) - time_t mpl; - time_t a; - time_t r; - time_t rdv; - - time_t srtt; /* Smoothed rtt */ - time_t mdev; /* Deviation */ - time_t rto; /* Retransmission timeout */ - uint32_t rttseq; - struct timespec t_probe; /* Probe time */ - bool probe; /* Probe active */ +/* Default / max per-flow stream rx ring (pow2); min N * per_pkt. */ +#define FRCT_STREAM_RING_MIN_PKTS 4 +#define FRCT_STREAM_RING_SZ (1U << 20) /* 1 MiB default */ +#define FRCT_STREAM_RING_SZ_MAX (1U << 27) /* 128 MiB */ + +struct frct_pci { + uint16_t flags; + uint16_t hcs; + + uint32_t window; + uint32_t seqno; + uint32_t ackno; +} __attribute__((packed)); + +/* Stat counters; fold to no-ops without PROC_FLOW_STATS. */ #ifdef PROC_FLOW_STATS - size_t n_rtx; /* Number of rxm packets */ - size_t n_prb; /* Number of rtt probes */ - size_t n_rtt; /* Number of estimates */ - size_t n_dup; /* Duplicates received */ - size_t n_dak; /* Delayed ACKs received */ - size_t n_rdv; /* Number of rdv packets */ - size_t n_out; /* Packets out of window */ - size_t n_rqo; /* Packets out of rqueue */ +struct frcti_stat { + size_t rxm_snd; /* RXM packets sent */ + size_t rxm_rcv; /* RXM packets received */ + size_t rxm_fire; /* tw RXM fires */ + size_t rxm_sack; /* SACK-driven retransmits */ + size_t rxm_rack; /* RACK fast retransmits */ + size_t rxm_dupthresh; /* DupThresh-driven retransmits */ + size_t rxm_due_count; /* rxm_due entries (pre-bail) */ + size_t rxm_due_acked; /* bail: seqno < snd_lwe */ + size_t rxm_due_unowned; /* bail: slot.rxm replaced */ + size_t rxm_due_aged; /* bail: r->t0 + t_r < now */ + size_t rxm_arm_fail; /* rxm_arm: malloc failed */ + size_t rxm_cancel; /* entries cancelled at teardown */ + size_t ack_snd; /* ACK packets sent (bare + SACK) */ + size_t ack_fire; /* delayed-ACK timer fires */ + size_t ack_supp_seqno; /* fire suppressed: seqno */ + size_t ack_supp_inact; /* fire suppressed: inact */ + size_t ack_supp_rate; /* fire suppressed: rate */ + size_t ack_rcv; /* ACK packets received */ + size_t ack_rtt; /* ACKs that fed RTT estimator */ + size_t ack_dup_rcv; /* ACK packet wire dups dropped */ + size_t dup_rcv; /* duplicates received */ + size_t out_rcv; /* pkts out of window */ + size_t rqo_rcv; /* pkts out of rqueue */ + size_t ooo_rcv; /* OOO arrivals */ + size_t sack_snd; /* SACK packets sent */ + size_t sack_rcv; /* SACK packets received */ + size_t dsack_snd; /* SACK pkts carrying a DSACK */ + size_t dsack_rcv; /* DSACK blocks parsed */ + size_t dsack_drop; /* DSACK blocks past MAX_DSACK_LAG */ + size_t nack_snd; /* pre-DRF NACKs sent */ + size_t nack_rcv; /* pre-DRF NACKs received */ + size_t rttp_snd; /* RTT probes sent */ + size_t rttp_rcv; /* RTT probe replies rcvd */ + size_t rtt_smpl; /* RTT estimator samples */ + size_t rdv_snd; /* rendez-vous packets sent */ + size_t rdv_rcv; /* rendez-vous packets rcvd */ + size_t ka_snd; /* keepalives sent */ + size_t ka_rcv; /* keepalives received */ + size_t sdu_snd_frag; /* writes that fragmented */ + size_t frag_snd; /* fragments sent: FIRST/MID/LAST */ + size_t frag_rcv; /* fragments stashed in rq[] */ + size_t sdu_reasm; /* SDUs delivered reassembled */ + size_t frag_drop; /* dropped at malformed run */ + size_t strm_snd_byte; /* bytes sent on stream */ + size_t strm_rcv_byte; /* bytes copied to ring */ + size_t strm_dlv_byte; /* bytes delivered to reader */ + size_t strm_drop; /* stream rcvs dropped */ + size_t strm_fin_drop; /* stream FIN packets rejected */ + /* Profiling instrumentation. */ + size_t rcv_proc_ns; /* time inside FRCTI_RCV (ns) */ + size_t tw_move_ns; /* time inside tw_move (ns) */ + size_t drain_calls; /* flow_drain_rx_nb invocations */ +}; + +#define STAT_BUMP(frcti, field) FETCH_ADD_RELAXED(&(frcti)->stat.field, 1) +#define STAT_ADD(frcti, field, v) FETCH_ADD_RELAXED(&(frcti)->stat.field, (v)) +#define STAT_LOAD(frcti, field) LOAD_RELAXED(&(frcti)->stat.field) +#else +#define STAT_BUMP(frcti, field) ((void) (frcti)) +#define STAT_ADD(frcti, field, v) ((void) (frcti)) +#define STAT_LOAD(frcti, field) ((void) (frcti), (size_t) 0) #endif - struct frct_cr snd_cr; - struct frct_cr rcv_cr; +#define frcti_to_flow(f) (&proc.flows[(f)->fd]) + +#define RTTP_RING 8 +#define RTTP_COLD_NS (100 * MILLION) /* cold-probe cadence */ +#define RQ_SLOT(seqno) ((seqno) & (RQ_SIZE - 1)) - ssize_t rq[RQ_SIZE]; - pthread_rwlock_t lock; +struct rxm_entry; - bool open; /* Window open/closed */ - struct timespec t_wnd; /* Window closed time */ - struct timespec t_rdvs; /* Last rendez-vous sent */ - pthread_cond_t cond; - pthread_mutex_t mtx; +enum snd_slot_flags { + SND_RTX = 0x01, /* Any retransmit; Karn skips next RTT sample. */ + SND_FAST_RXM = 0x02, /* Fast-retx one-shot gate per loss event. */ }; -enum frct_flags { - FRCT_DATA = 0x01, /* PDU carries data */ - FRCT_DRF = 0x02, /* Data run flag */ - FRCT_ACK = 0x04, /* ACK field valid */ - FRCT_FC = 0x08, /* FC window valid */ - FRCT_RDVS = 0x10, /* Rendez-vous */ - FRCT_FFGM = 0x20, /* First Fragment */ - FRCT_MFGM = 0x40, /* More fragments */ +struct snd_slot { + struct rxm_entry * rxm; /* RXM entry, NULL if none. */ + uint64_t time; /* ts_to_ns of last send (any kind). */ + uint8_t flags; /* SND_* bits above. */ }; -struct frct_pci { - uint8_t flags; +/* Per-seqno reorder slot (FRTX) and stream-mode byte/FIN metadata. */ +struct rcv_slot { + ssize_t idx; /* spb idx; -1 = empty */ + uint32_t start; /* stream byte start */ + uint32_t end; /* stream byte end */ + uint8_t fin; /* stream FIN bit */ +}; - uint8_t pad; /* 24 bit window! */ - uint16_t window; +struct frct_cr { + uint32_t lwe; /* Left window edge */ + uint32_t rwe; /* Right window edge */ - uint32_t seqno; - uint32_t ackno; -} __attribute__((packed)); + uint8_t cflags; + uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */ + uint32_t ackno; /* snd: ACK-pkt seqno; rcv: dedup */ + + uint64_t act; /* ts_to_ns of last activity */ + uint64_t inact; /* Inactivity threshold (ns) */ +}; + +struct frcti { + /* IMM: set once in frcti_create; read-only thereafter. */ + int fd; + uint64_t t_mpl; /* MPL (ns) */ + uint64_t t_a; /* a-timer (ns) */ + uint64_t t_r; /* r-timer (ns) */ + uint64_t t_rdv; /* RDV cooldown (ns) */ + time_t ber; /* cached qs.ber */ + bool lossy; /* qs.loss != 0 */ + time_t qs_timeout; /* cached qs.timeout (ms) */ + size_t frag_mtu; /* max FRCT pkt: PCI + payload */ + uint16_t sack_n_max; /* SACK blocks that fit MTU */ + bool stream; + + /* All fields below are protected by lock (rwlock/LOAD_ACQUIRE). */ + struct { + struct frct_cr snd_cr; + struct frct_cr rcv_cr; + + /* RTT/RACK estimator */ + time_t srtt; /* smoothed RTT */ + time_t mdev; /* mean deviation */ + time_t min_rtt; /* RACK base, ns */ + uint64_t t_min_rtt; /* min_rtt last set */ + time_t rto; /* retransmit TO */ + time_t rto_min; /* RTO floor (ns) */ + uint8_t rto_mul; /* RTO backoff bits */ + uint32_t rtt_lwe; /* RTT-sample fence */ + uint64_t t_rcv_rtt; /* last RTT feed */ + uint64_t t_snd_probe; /* last probe sent */ + uint64_t t_latest_ack; /* RACK.fack snd-ts */ + uint32_t probe_id_next; + struct { + uint32_t id; + uint64_t ts; /* ts_to_ns send */ + uint8_t nonce[RTTP_NONCE_LEN]; /* echoed back */ + } probes[RTTP_RING]; + + /* rcv reassembly */ + size_t max_rcv_sdu; /* max reasm bytes */ + uint8_t * rcv_ring; /* lazy alloc */ + size_t rcv_ring_sz; /* power of 2 */ + uint32_t ring_seq_cap; /* ring/per_pkt */ + + uint32_t snd_byte_next; + bool snd_fin_sent; + uint32_t snd_fin_seqno; + uint32_t rcv_byte_next; + uint32_t rcv_byte_high; /* contiguous high */ + uint32_t rcv_byte_fin; /* set when FIN */ + bool rcv_fin_seen; + + struct rcv_slot rcv_slots[RQ_SIZE]; + struct snd_slot snd_slots[RQ_SIZE]; /* .rxm is ATOM */ + + /* rcv SACK dedup */ + uint64_t t_snd_sack; + uint32_t sack_lwe; /* rcv lwe at SACK */ + uint16_t sack_n; /* SACK block count */ + + /* RFC 2883 D-SACK: pending report (single-slot, latest). */ + uint32_t dsack_seqno; + bool dsack_valid; + + /* RFC 8985 §7.2 RACK reorder-window scaling. */ + uint8_t reo_wnd_mult; /* 1..REO_WND_MULT_MAX */ + uint32_t dsack_lwe_snap; /* lwe @ last DSACK */ + + uint32_t dup_thresh; /* RFC 8985 */ + uint64_t t_nack; + bool open; /* FC window state */ + bool in_recovery; + uint32_t recovery_high; /* seqno @ entry */ + uint32_t rack_fired_lwe; /* lwe @ last RACK */ + struct timespec t_wnd; /* window-closed ts */ + struct timespec t_last_rdv; /* last RDV sent */ + struct list_head rxm_list; /* live rxm entries */ + + pthread_rwlock_t lock; + }; + + /* Read/written via __atomic without holding lock. */ + uint64_t t_ka_rcv; /* ts_to_ns of last KA rx */ + uint8_t ack_pending; /* delayed-ACK dedup */ + + /* Timer entries; ownership belongs to the tw module. */ + struct tw_entry ack_tw; /* delayed-ACK timer */ + struct tw_entry ka_tw; /* keepalive timer */ + +#ifdef PROC_FLOW_STATS + /* STAT: lock-free relaxed atomic counters. */ + struct frcti_stat stat; +#endif +}; #ifdef PROC_FLOW_STATS +__attribute__((cold)) static int frct_rib_read(const char * path, char * buf, size_t len) { + struct frcti * frcti; struct timespec now; + uint64_t now_ns; char * entry; - struct flow * flow; - struct frcti * frcti; int fd; - - (void) len; + int written; + /* Snapshot under the locks; format outside (pure userspace). */ + struct { + uint64_t t_mpl; + uint64_t t_a; + uint64_t t_r; + time_t srtt; + time_t mdev; + time_t rto; + time_t min_rtt; + struct frct_cr snd_cr; + struct frct_cr rcv_cr; + struct frcti_stat stat; + } s; entry = strstr(path, RIB_SEPARATOR); assert(entry); @@ -118,23 +391,45 @@ static int frct_rib_read(const char * path, fd = atoi(path); - flow = &proc.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + if (fd < 0 || fd >= PROC_MAX_FLOWS) + return 0; pthread_rwlock_rdlock(&proc.lock); - frcti = flow->frcti; + frcti = proc.flows[fd].frcti; + if (frcti == NULL) { + pthread_rwlock_unlock(&proc.lock); + return 0; + } + + s.t_mpl = frcti->t_mpl; + s.t_a = frcti->t_a; + s.t_r = frcti->t_r; pthread_rwlock_rdlock(&frcti->lock); - sprintf(buf, - "Maximum packet lifetime (ns): %20ld\n" - "Max time to Ack (ns): %20ld\n" - "Max time to Retransmit (ns): %20ld\n" + s.srtt = frcti->srtt; + s.mdev = frcti->mdev; + s.rto = frcti->rto; + s.min_rtt = frcti->min_rtt; + s.snd_cr = frcti->snd_cr; + s.rcv_cr = frcti->rcv_cr; + s.stat = frcti->stat; + + pthread_rwlock_unlock(&frcti->lock); + pthread_rwlock_unlock(&proc.lock); + + written = snprintf(buf, len, + "Maximum packet lifetime (ns): %20" PRIu64 "\n" + "Max time to Ack (ns): %20" PRIu64 "\n" + "Max time to Retransmit (ns): %20" PRIu64 "\n" "Smoothed rtt (ns): %20ld\n" "RTT standard deviation (ns): %20ld\n" "Retransmit timeout RTO (ns): %20ld\n" + "Minimum RTT (RACK base, ns): %20ld\n" "Sender left window edge: %20u\n" "Sender right window edge: %20u\n" "Sender inactive (ns): %20lld\n" @@ -143,44 +438,101 @@ static int frct_rib_read(const char * path, "Receiver right window edge: %20u\n" "Receiver inactive (ns): %20lld\n" "Receiver last ack: %20u\n" - "Number of pkt retransmissions: %20zu\n" - "Number of rtt probes: %20zu\n" - "Number of rtt estimates: %20zu\n" - "Number of duplicates received: %20zu\n" - "Number of delayed acks received: %20zu\n" - "Number of rendez-vous sent: %20zu\n" - "Number of packets out of window: %20zu\n" - "Number of packets out of rqueue: %20zu\n", - frcti->mpl, - frcti->a, - frcti->r, - frcti->srtt, - frcti->mdev, - frcti->rto, - frcti->snd_cr.lwe, - frcti->snd_cr.rwe, - ts_diff_ns(&now, &frcti->snd_cr.act), - frcti->snd_cr.seqno, - frcti->rcv_cr.lwe, - frcti->rcv_cr.rwe, - ts_diff_ns(&now, &frcti->rcv_cr.act), - frcti->rcv_cr.seqno, - frcti->n_rtx, - frcti->n_prb, - frcti->n_rtt, - frcti->n_dup, - frcti->n_dak, - frcti->n_rdv, - frcti->n_out, - frcti->n_rqo); - - pthread_rwlock_unlock(&flow->frcti->lock); + "RXM packets sent: %20zu\n" + "RXM packets received: %20zu\n" + "RXM timer fires: %20zu\n" + "RXM (SACK-driven) sent: %20zu\n" + "RXM (RACK-driven) sent: %20zu\n" + "RXM (DupThresh-driven) sent: %20zu\n" + "ACK packets sent: %20zu\n" + "Delayed-ACK timer fires: %20zu\n" + " suppressed (seqno): %20zu\n" + " suppressed (inact): %20zu\n" + " suppressed (rate): %20zu\n" + "ACK packets received: %20zu\n" + " fed RTT estimator: %20zu\n" + " wire dups dropped: %20zu\n" + "Duplicates received: %20zu\n" + "Out-of-window pkts received: %20zu\n" + "Out-of-rqueue pkts received: %20zu\n" + "OOO arrivals: %20zu\n" + "SACKs sent: %20zu\n" + "SACKs received: %20zu\n" + "D-SACKs sent: %20zu\n" + "D-SACKs received: %20zu\n" + "D-SACK out-of-range dropped: %20zu\n" + "Pre-DRF NACKs sent: %20zu\n" + "Pre-DRF NACKs received: %20zu\n" + "RTT probes sent: %20zu\n" + "RTT probe replies received: %20zu\n" + "RTT estimator samples: %20zu\n" + "Rendez-vous packets sent: %20zu\n" + "Rendez-vous packets received: %20zu\n" + "Keepalives sent: %20zu\n" + "Keepalives received: %20zu\n" + "SDU writes fragmented: %20zu\n" + "Fragments sent: %20zu\n" + "Fragments received: %20zu\n" + "SDUs delivered reassembled: %20zu\n" + "Fragments dropped (malformed): %20zu\n" + "Stream bytes sent: %20zu\n" + "Stream bytes received: %20zu\n" + "Stream bytes delivered: %20zu\n" + "Stream packets dropped: %20zu\n" + "Stream FINs dropped: %20zu\n" + "FRCTI_RCV time (ns): %20zu\n" + "tw_move time (ns): %20zu\n" + "drain_rx_nb calls: %20zu\n" + "RXM-due entries: %20zu\n" + " bail (acked): %20zu\n" + " bail (unowned): %20zu\n" + " bail (aged): %20zu\n" + "RXM-arm malloc failures: %20zu\n" + "RXM cancels (teardown): %20zu\n", + s.t_mpl, s.t_a, s.t_r, + s.srtt, s.mdev, s.rto, s.min_rtt, + s.snd_cr.lwe, s.snd_cr.rwe, + (long long)(now_ns - s.snd_cr.act), + s.snd_cr.seqno, + s.rcv_cr.lwe, s.rcv_cr.rwe, + (long long)(now_ns - s.rcv_cr.act), + s.rcv_cr.seqno, + s.stat.rxm_snd, s.stat.rxm_rcv, s.stat.rxm_fire, + s.stat.rxm_sack, s.stat.rxm_rack, s.stat.rxm_dupthresh, + s.stat.ack_snd, s.stat.ack_fire, + s.stat.ack_supp_seqno, s.stat.ack_supp_inact, + s.stat.ack_supp_rate, + s.stat.ack_rcv, s.stat.ack_rtt, s.stat.ack_dup_rcv, + s.stat.dup_rcv, s.stat.out_rcv, s.stat.rqo_rcv, + s.stat.ooo_rcv, + s.stat.sack_snd, s.stat.sack_rcv, + s.stat.dsack_snd, s.stat.dsack_rcv, s.stat.dsack_drop, + s.stat.nack_snd, s.stat.nack_rcv, + s.stat.rttp_snd, s.stat.rttp_rcv, s.stat.rtt_smpl, + s.stat.rdv_snd, s.stat.rdv_rcv, + s.stat.ka_snd, s.stat.ka_rcv, + s.stat.sdu_snd_frag, s.stat.frag_snd, s.stat.frag_rcv, + s.stat.sdu_reasm, s.stat.frag_drop, + s.stat.strm_snd_byte, s.stat.strm_rcv_byte, + s.stat.strm_dlv_byte, + s.stat.strm_drop, s.stat.strm_fin_drop, + s.stat.rcv_proc_ns, s.stat.tw_move_ns, + s.stat.drain_calls, + s.stat.rxm_due_count, + s.stat.rxm_due_acked, s.stat.rxm_due_unowned, + s.stat.rxm_due_aged, s.stat.rxm_arm_fail, + s.stat.rxm_cancel); + + if (written < 0) + return 0; - pthread_rwlock_unlock(&proc.lock); + if ((size_t) written >= len) + return (int) (len - 1); - return strlen(buf); + return written; } +__attribute__((cold)) static int frct_rib_readdir(char *** buf) { *buf = malloc(sizeof(**buf)); @@ -199,13 +551,14 @@ static int frct_rib_readdir(char *** buf) return -ENOMEM; } +__attribute__((cold)) static int frct_rib_getattr(const char * path, struct rib_attr * attr) { (void) path; - (void) attr; - attr->size = 1189; + /* Must be >= the sprintf output in frct_rib_read. */ + attr->size = 4096; attr->mtime = 0; return 0; @@ -220,128 +573,1089 @@ static struct rib_ops r_ops = { #endif /* PROC_FLOW_STATS */ -static bool before(uint32_t seq1, - uint32_t seq2) +static __inline__ bool before(uint32_t s1, uint32_t s2) { - return (int32_t)(seq1 - seq2) < 0; + return (int32_t)(s1 - s2) < 0; } -static bool after(uint32_t seq1, - uint32_t seq2) +static __inline__ bool after(uint32_t s1, uint32_t s2) { - return (int32_t)(seq2 - seq1) < 0; + return (int32_t)(s2 - s1) < 0; } -static void __send_frct_pkt(int fd, - uint8_t flags, - uint32_t ackno, - uint32_t rwe) +static __inline__ bool within(uint32_t seq, uint32_t lo, uint32_t hi) { - struct ssm_pk_buff * spb; - struct frct_pci * pci; - ssize_t idx; - struct flow * f; + return after(seq, lo) && !after(seq, hi); +} - /* Raw calls needed to bypass frcti. */ -#ifdef RXM_BLOCKING - idx = ssm_pool_alloc_b(proc.pool, sizeof(*pci), NULL, &spb, NULL); -#else - idx = ssm_pool_alloc(proc.pool, sizeof(*pci), NULL, &spb); -#endif - if (idx < 0) - return; +/* + * RACK reorder window R (RFC 8985 §6.2): + * R = MIN(reo_wnd_mult * RACK.min_RTT / 4, SRTT) + * reo_wnd_mult scales on D-SACK evidence of under-tolerance (§7.2). + * Fall back to srtt when no min_rtt sample exists yet; MIN_REORDER_NS + * floor guards collapse below the timer-tick resolution. + */ +static __inline__ uint64_t rack_reorder_window(struct frcti * frcti) +{ + uint64_t mult = frcti->reo_wnd_mult > 0 ? frcti->reo_wnd_mult : 1; + uint64_t base = frcti->min_rtt > 0 ? (uint64_t) frcti->min_rtt + : (uint64_t) frcti->srtt; + uint64_t R = mult * (base / 4); - pci = (struct frct_pci *) ssm_pk_buff_head(spb); - memset(pci, 0, sizeof(*pci)); + R = MAX(R, (uint64_t) MIN_REORDER_NS); + R = MIN(R, (uint64_t) frcti->srtt); - *((uint32_t *) pci) = hton32(rwe); + return R; +} - pci->flags = flags; - pci->ackno = hton32(ackno); +static __inline__ int frct_spb_reserve(size_t len, + struct ssm_pk_buff ** spb) +{ + ssize_t idx = ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL); + + return idx < 0 ? (int) idx : 0; +} + +static __inline__ void frct_spb_release(struct ssm_pk_buff * spb) +{ + ssm_pool_remove(proc.pool, ssm_pk_buff_get_off(spb)); +} + +static __inline__ void frct_spb_release_idx(size_t idx) +{ + ssm_pool_remove(proc.pool, idx); +} - f = &proc.flows[fd]; +/* Fetch the spb stashed at the rq slot for seqno. */ +static __inline__ struct ssm_pk_buff * rq_frag(const struct frcti * frcti, + uint32_t seqno) +{ + return ssm_pool_get(proc.pool, frcti->rcv_slots[RQ_SLOT(seqno)].idx); +} + +static __inline__ size_t frcti_data_hdr_len(const struct frcti * frcti) +{ + return FRCT_PCILEN + (frcti->stream ? FRCT_PCI_STREAM_LEN : 0); +} + +static __inline__ size_t frcti_ctrl_hdr_len(const struct frcti * frcti) +{ + (void) frcti; + + return FRCT_PCILEN; +} + +/* + * HCS at offset 2 inside PCI. Covers flags (bytes 0..1) and + * window/seqno/ackno (bytes 4..15), plus SPCI for stream DATA. + */ +static void frct_hcs_set(struct frct_pci * pci, + bool stream) +{ + uint16_t hcs = 0; + size_t tail; + + tail = sizeof(*pci) - sizeof(pci->flags) - sizeof(pci->hcs); + if (stream) + tail += FRCT_PCI_STREAM_LEN; + + crc16_ccitt_false(&hcs, pci, sizeof(pci->flags)); + crc16_ccitt_false(&hcs, &pci->window, tail); + + pci->hcs = hton16(hcs); +} + +static int frct_hcs_check(const struct frct_pci * pci, + const struct frcti * frcti) +{ + uint16_t hcs = 0; + uint16_t flags; + size_t tail; + + /* Untrusted flag read; mismatch on HCS will drop on corrupt. */ + flags = ntoh16(pci->flags); + + tail = sizeof(*pci) - sizeof(pci->flags) - sizeof(pci->hcs); + if (frcti->stream && (flags & FRCT_DATA)) + tail += FRCT_PCI_STREAM_LEN; + + crc16_ccitt_false(&hcs, pci, sizeof(pci->flags)); + crc16_ccitt_false(&hcs, &pci->window, tail); + + return hcs != ntoh16(pci->hcs); +} + +static int frct_tx(struct frcti * frcti, struct ssm_pk_buff * spb) +{ + struct flow * f = frcti_to_flow(frcti); + const struct frct_pci * pci; + uint16_t flags; + ssize_t idx; + int ret; + + pci = (const struct frct_pci *) ssm_pk_buff_head(spb); + flags = ntoh16(pci->flags); + + /* CRC32 covers plaintext body; PCI is in HCS. Pre-encrypt. */ + if (flags & FRCT_SACK) { + if (crc_add(spb, frcti_ctrl_hdr_len(frcti)) != 0) + goto fail; + } else if ((flags & FRCT_DATA) && f->info.qs.ber == 0) { + if (crc_add(spb, frcti_data_hdr_len(frcti)) != 0) + goto fail; + } if (spb_encrypt(f, spb) < 0) goto fail; -#ifdef RXM_BLOCKING - if (ssm_rbuff_write_b(f->tx_rb, idx, NULL)) -#else - if (ssm_rbuff_write(f->tx_rb, idx)) -#endif + idx = ssm_pk_buff_get_off(spb); + + ret = ssm_rbuff_write_b(f->tx_rb, idx, NULL); + if (ret < 0) goto fail; ssm_flow_set_notify(f->set, f->info.id, FLOW_PKT); - return; + return 0; fail: - ipcp_spb_release(spb); - return; + ssm_pool_remove(proc.pool, ssm_pk_buff_get_off(spb)); + return -ENOMEM; +} + +__attribute__((cold)) +static void frct_mark_flow_down(struct frcti * frcti) +{ + struct flow * f = frcti_to_flow(frcti); + + if (f->rx_rb != NULL) + ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + + if (f->tx_rb != NULL) + ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); +} + +__attribute__((cold)) +static void frct_mark_peer_dead(struct frcti * frcti) +{ + struct flow * f = frcti_to_flow(frcti); + + if (f->rx_rb != NULL) + ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWPEER); + + if (proc.fqset != NULL) + ssm_flow_set_notify(proc.fqset, f->info.id, FLOW_PEER); +} + +static __inline__ int frct_ctrl_alloc(struct ssm_pk_buff ** spb, + struct frct_pci ** pci, + size_t payload_len) +{ + if (frct_spb_reserve(FRCT_PCILEN + payload_len, spb) < 0) + return -1; + + *pci = (struct frct_pci *) ssm_pk_buff_head(*spb); + memset(*pci, 0, FRCT_PCILEN); + + return 0; +} + +/* + * Advertised rwe. Stream mode clamps to lwe + ring_seq_cap so the + * byte-equivalent fits the rx ring. Caller holds at least the rdlock. + */ +static __inline__ uint32_t frcti_advert_rwe(struct frcti * frcti) +{ + uint32_t rwe; + uint32_t cap; + + rwe = frcti->rcv_cr.rwe; + + if (!frcti->stream) + return rwe; + + cap = frcti->rcv_cr.lwe + frcti->ring_seq_cap; + + return before(cap, rwe) ? cap : rwe; +} + +static void frcti_pkt_snd(struct frcti * frcti, + uint16_t flags, + uint32_t ackno, + uint32_t rwe) +{ + struct ssm_pk_buff * spb; + struct frct_pci * pci; + + if (frct_ctrl_alloc(&spb, &pci, 0) < 0) + return; + + pci->flags = hton16(flags); + pci->window = hton32(rwe); + pci->ackno = hton32(ackno); + if (flags & FRCT_ACK) { + /* reuse ackno for the sequence number of delayed ACK */ + ackno = FETCH_ADD_RELAXED(&frcti->snd_cr.ackno, 1); + pci->seqno = hton32(ackno + 1); + } + + frct_hcs_set(pci, false); + + frct_tx(frcti, spb); +} + +/* RTO floor scales with srtt; hard floor rto_min guards sub-ms RTT. */ +static void rtt_init(struct frcti * frcti, + time_t rtt_hint) +{ + time_t floor; + + if (rtt_hint > 0) { + rtt_hint = MAX(rtt_hint, (time_t) RTT_BOOT_NS); + frcti->srtt = rtt_hint; + frcti->mdev = rtt_hint >> 3; + floor = MAX(frcti->rto_min, 2 * frcti->srtt); + frcti->rto = MAX(floor, rtt_hint + (frcti->mdev << MDEV_MUL)); + frcti->min_rtt = rtt_hint; + } else { + /* Boot from first ACK. */ + frcti->srtt = 0; + frcti->mdev = RTT_BOOT_NS; + frcti->rto = MAX((time_t) INITIAL_RTO, frcti->rto_min); + frcti->min_rtt = 0; + } + + frcti->rto_mul = 0; +} + +/* RFC 8985 §6.2: replace min_RTT on unset, smaller sample, or expiry. */ +static __inline__ bool min_rtt_stale(struct frcti * frcti, + time_t mrtt, + uint64_t now_ns) +{ + if (frcti->min_rtt == 0) + return true; + + if (mrtt < frcti->min_rtt) + return true; + + return (now_ns - frcti->t_min_rtt) > MIN_RTT_WIN_NS; +} + +/* Linux-style windowed-min refresh of RACK.min_RTT. */ +static __inline__ void min_rtt_update(struct frcti * frcti, + time_t mrtt, + uint64_t now_ns) +{ + if (!min_rtt_stale(frcti, mrtt, now_ns)) + return; + + frcti->min_rtt = mrtt; + frcti->t_min_rtt = now_ns; +} + +static void rtt_update(struct frcti * frcti, + time_t mrtt, + uint64_t now_ns) +{ + time_t srtt = frcti->srtt; + time_t rttvar = frcti->mdev; + time_t floor; + time_t rto; + + if (srtt == 0) { + srtt = mrtt; + rttvar = mrtt >> 1; + } else { + /* RFC 6298 symmetric EWMA. */ + time_t delta = mrtt - srtt; + srtt += (delta >> 3); + delta = (ABS(delta) - rttvar) >> 2; +#ifdef FRCT_LINUX_RTT_ESTIMATOR + if (delta < 0) + delta >>= 3; +#endif + rttvar += delta; + } + STAT_BUMP(frcti, rtt_smpl); + frcti->srtt = MAX(SRTT_FLOOR_NS, srtt); + frcti->mdev = MAX(MDEV_FLOOR_NS, rttvar); + + min_rtt_update(frcti, mrtt, now_ns); + + floor = MAX(frcti->rto_min, 2 * frcti->srtt); + rto = MAX(floor, frcti->srtt + (frcti->mdev << MDEV_MUL)); + + STORE_RELEASE(&frcti->rto, rto); + STORE_RELEASE(&frcti->rto_mul, 0); } -static void send_frct_pkt(struct frcti * frcti) +/* Fill probes[pos], return new probe_id; 0 on entropy failure. Wrlock. */ +static uint32_t rttp_alloc_probe(struct frcti * frcti, + uint64_t now_ns, + uint8_t nonce[RTTP_NONCE_LEN]) { + uint32_t probe_id; + size_t pos; + + if (random_buffer(nonce, RTTP_NONCE_LEN) < 0) + return 0; + + probe_id = frcti->probe_id_next++; + if (probe_id == 0) + probe_id = frcti->probe_id_next++; + + pos = RTTP_POS(probe_id); + frcti->probes[pos].id = probe_id; + frcti->probes[pos].ts = now_ns; + memcpy(frcti->probes[pos].nonce, nonce, RTTP_NONCE_LEN); + frcti->t_snd_probe = now_ns; + + STAT_BUMP(frcti, rttp_snd); + + return probe_id; +} + +/* Caller wrlock; out args valid on true (caller emits post-unlock). */ +static bool rtt_probe_arm(struct frcti * frcti, + uint64_t now_ns, + uint32_t * probe_id, + uint8_t nonce[RTTP_NONCE_LEN]) +{ + if (frcti->srtt == 0) + return false; + + if (!after(frcti->snd_cr.seqno, frcti->snd_cr.lwe)) + return false; + + if (now_ns - frcti->t_rcv_rtt <= 2u * (uint64_t) frcti->srtt) + return false; + + if (now_ns - frcti->t_snd_probe <= (uint64_t) frcti->srtt) + return false; + + *probe_id = rttp_alloc_probe(frcti, now_ns, nonce); + + return *probe_id != 0; +} + +static void frcti_rttp_snd(struct frcti * frcti, + uint32_t probe_id, + uint32_t echo_id, + const uint8_t * nonce) +{ + struct ssm_pk_buff * spb; + struct frct_pci * pci; + struct frct_rttp * rttp; + + if (frct_ctrl_alloc(&spb, &pci, RTTP_PAYLOAD) < 0) + return; + + pci->flags = hton16(FRCT_RTTP); + + frct_hcs_set(pci, false); + + rttp = (struct frct_rttp *) FRCT_BODY(pci); + rttp->probe_id = hton32(probe_id); + rttp->echo_id = hton32(echo_id); + memcpy(rttp->nonce, nonce, sizeof(rttp->nonce)); + + frct_tx(frcti, spb); +} + +struct rxm_entry { + struct tw_entry tw; + struct list_head next; /* in frcti->rxm_list */ + struct frcti * frcti; + uint32_t seqno; + uint64_t t0; + size_t len; + uint8_t pkt[]; /* flexible — sized at alloc time */ +}; + +static struct rxm_entry * rxm_entry_create(struct frcti * frcti, + uint32_t seqno, + const struct ssm_pk_buff * spb) +{ + struct rxm_entry * r; + struct timespec now; + size_t len = ssm_pk_buff_len(spb); + + r = malloc(sizeof(*r) + len); + if (r == NULL) { + STAT_BUMP(frcti, rxm_arm_fail); + return NULL; + } + + memcpy(r->pkt, ssm_pk_buff_head(spb), len); + r->len = len; + r->frcti = frcti; + r->seqno = seqno; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + r->t0 = TS_TO_UINT64(now); + + tw_init_entry(&r->tw); + + return r; +} + +static void rxm_entry_destroy(struct rxm_entry * r) +{ + free(r); +} + +static bool rxm_still_owned(struct frcti * frcti, + size_t pos, + struct rxm_entry * r) +{ + return LOAD_ACQUIRE(&frcti->snd_slots[pos].rxm) == r; +} + +/* + * All in-flight slots share the HoL backoff; otherwise non-HoL timers + * cycle at base RTO and storm the wire while HoL is still backing off. + */ +static uint64_t rxm_next_deadline(struct frcti * frcti, + uint64_t now_ns) +{ + time_t rto = LOAD_RELAXED(&frcti->rto); + uint8_t rto_mul = LOAD_RELAXED(&frcti->rto_mul); + + return now_ns + ((uint64_t) rto << rto_mul); +} + +/* Copy pkt, set FRCT_RXM, refresh ackno, re-seal HCS. */ +static struct ssm_pk_buff * rxm_pkt_prepare(const void * pkt, + size_t len, + uint32_t rcv_lwe, + bool stream) +{ + struct ssm_pk_buff * spb; + struct frct_pci * pci; + uint16_t flags; + + if (frct_spb_reserve(len, &spb) < 0) + return NULL; + + pci = (struct frct_pci *) ssm_pk_buff_head(spb); + memcpy(pci, pkt, len); + + flags = ntoh16(pci->flags) | FRCT_RXM; + pci->flags = hton16(flags); + pci->ackno = hton32(rcv_lwe); + + frct_hcs_set(pci, stream); + + return spb; +} + +/* Caller must NOT hold frcti->lock. */ +static void rxm_snd(struct frcti * frcti, + uint32_t seqno, + const void * pkt, + size_t len) +{ + struct ssm_pk_buff * spb; struct timespec now; + struct snd_slot * slot; + uint32_t snd_lwe; + uint32_t rcv_lwe; + size_t pos; + + snd_lwe = LOAD_RELAXED(&frcti->snd_cr.lwe); + rcv_lwe = LOAD_RELAXED(&frcti->rcv_cr.lwe); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&frcti->lock); + + pos = RQ_SLOT(seqno); + slot = &frcti->snd_slots[pos]; + + slot->time = TS_TO_UINT64(now); + /* RTO clears fast-rtx gate: a fresh loss event for SACK/RACK. */ + slot->flags = (slot->flags & ~SND_FAST_RXM) | SND_RTX; + + frcti->rtt_lwe = seqno + 1; + + /* Only the HoL retransmit bumps the global RTO backoff. */ + if (seqno == snd_lwe && frcti->rto_mul < MAX_RTO_MUL) + STORE_RELEASE(&frcti->rto_mul, frcti->rto_mul + 1); + + /* RFC 8985 §7.2 step 4: RTO on HoL resets RACK reo scaling. */ + if (seqno == snd_lwe) + frcti->reo_wnd_mult = 1; + + pthread_rwlock_unlock(&frcti->lock); + + STAT_BUMP(frcti, rxm_snd); + STAT_BUMP(frcti, rxm_fire); + + spb = rxm_pkt_prepare(pkt, len, rcv_lwe, frcti->stream); + if (spb == NULL) + return; + + if (frct_tx(frcti, spb) < 0) + frct_mark_flow_down(frcti); +} + +static void rxm_due(void * arg) +{ + struct rxm_entry * r = arg; + struct frcti * frcti = r->frcti; + struct timespec now; + uint64_t now_ns; + uint32_t snd_lwe; + size_t pos = RQ_SLOT(r->seqno); + + STAT_BUMP(frcti, rxm_due_count); + + snd_lwe = LOAD_RELAXED(&frcti->snd_cr.lwe); + + /* Already ACK'd: expected for the steady-state majority. */ + if (before(r->seqno, snd_lwe)) { + STAT_BUMP(frcti, rxm_due_acked); + goto cleanup; + } + + /* SACK/RACK-cleared the slot (caller NULL'd snd_slots[pos].rxm). */ + if (!rxm_still_owned(frcti, pos, r)) { + STAT_BUMP(frcti, rxm_due_unowned); + goto cleanup; + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + /* R-timer expired: peer unreachable. */ + if (RXM_AGED_OUT(r->t0, now_ns, frcti->t_r)) { + STAT_BUMP(frcti, rxm_due_aged); + frct_mark_flow_down(frcti); + goto cleanup; + } + + rxm_snd(frcti, r->seqno, r->pkt, r->len); + + /* Re-check ownership: fire path may have replaced our entry. */ + if (rxm_still_owned(frcti, pos, r)) { + uint64_t anchor; + + /* Per-slot anchor breaks co-fire re-bin. */ + anchor = frcti->snd_slots[pos].time; + tw_post(&r->tw, rxm_next_deadline(frcti, anchor), rxm_due, r); + return; + } + + cleanup: + pthread_rwlock_wrlock(&frcti->lock); + + if (rxm_still_owned(frcti, pos, r)) + STORE_RELEASE(&frcti->snd_slots[pos].rxm, NULL); + + list_del(&r->next); + + pthread_rwlock_unlock(&frcti->lock); + + rxm_entry_destroy(r); +} + +static int rxm_arm(struct frcti * frcti, + uint32_t seqno, + const struct ssm_pk_buff * spb) +{ + struct rxm_entry * r; + time_t rto; + uint8_t rto_mul; + uint64_t deadline; + + r = rxm_entry_create(frcti, seqno, spb); + if (r == NULL) + return -ENOMEM; + + rto = LOAD_RELAXED(&frcti->rto); + rto_mul = LOAD_RELAXED(&frcti->rto_mul); + deadline = r->t0 + ((uint64_t) rto << rto_mul); + + pthread_rwlock_wrlock(&frcti->lock); + + list_add_tail(&r->next, &frcti->rxm_list); + STORE_RELEASE(&frcti->snd_slots[RQ_SLOT(seqno)].rxm, r); + + pthread_rwlock_unlock(&frcti->lock); + + tw_post(&r->tw, deadline, rxm_due, r); + + return 0; +} + +static void rxm_cancel_all(struct frcti * frcti) +{ + struct list_head * p; + struct list_head * t; + + list_for_each_safe(p, t, &frcti->rxm_list) { + struct rxm_entry * r = list_entry(p, struct rxm_entry, next); + list_del(&r->next); + tw_cancel(&r->tw); + rxm_entry_destroy(r); + STAT_BUMP(frcti, rxm_cancel); + } +} + +static __inline__ void sack_block_put(uint8_t * payload, + uint16_t i, + uint32_t s, + uint32_t e) +{ + uint32_t * blk = (uint32_t *) + (payload + SACK_HDR_SIZE + i * SACK_BLOCK_SIZE); + + blk[0] = hton32(s); + blk[1] = hton32(e); +} + +static __inline__ void sack_block_get(const uint8_t * payload, + uint16_t i, + uint32_t * s, + uint32_t * e) +{ + const uint32_t * blk = (const uint32_t *) + (payload + SACK_HDR_SIZE + i * SACK_BLOCK_SIZE); + + *s = ntoh32(blk[0]); + *e = ntoh32(blk[1]); +} + +/* + * Build SACK blocks for ranges *above* rcv_cr.lwe. Wire invariant + * (see doc/frct.txt §1.3): every block produced here satisfies + * blocks[i].start > rcv_cr.lwe = ackno, which makes the "first block + * below ackno" convention used to mark a D-SACK (RFC 2883 §4 case 1) + * unambiguous. Caller holds frcti->lock. + */ +static uint16_t sack_blocks_build(struct frcti * frcti, + uint32_t blocks[][2], + uint16_t max_n) +{ + const struct rcv_slot * slots = frcti->rcv_slots; + uint32_t s; + uint32_t end; + uint16_t n = 0; + + s = frcti->rcv_cr.lwe + 1; + end = frcti->rcv_cr.lwe + RQ_SIZE; + if (after(end, frcti->rcv_cr.rwe)) + end = frcti->rcv_cr.rwe; + + while (before(s, end) && n < max_n) { + while (before(s, end) && slots[RQ_SLOT(s)].idx == -1) + ++s; + + if (!before(s, end)) + break; + + blocks[n][0] = s; + while (before(s, end) && slots[RQ_SLOT(s)].idx != -1) + ++s; + blocks[n][1] = s; + ++n; + } + + return n; +} + +/* + * Prepend the pending D-SACK report (if any) as block[0]; clear flag. + * Returns the number of slots consumed at the head (0 or 1). Caller + * holds wrlock. + */ +static __inline__ uint16_t dsack_consume(struct frcti * frcti, + uint32_t blocks[][2]) +{ + if (!frcti->dsack_valid || frcti->sack_n_max == 0) + return 0; + + blocks[0][0] = frcti->dsack_seqno; + blocks[0][1] = frcti->dsack_seqno + 1; + frcti->dsack_valid = false; + return 1; +} + +/* Caller must NOT hold frcti->lock. */ +static void frcti_sack_snd(struct frcti * frcti, + const struct sack_args * sa) +{ + struct ssm_pk_buff * spb; + struct frct_pci * pci; + buffer_t buf; + uint16_t i; + + assert(sa->n <= SACK_MAX_BLOCKS); + + buf.len = SACK_HDR_SIZE + sa->n * SACK_BLOCK_SIZE; + + if (frct_ctrl_alloc(&spb, &pci, buf.len) < 0) + return; + + pci->flags = hton16(FRCT_ACK | FRCT_FC | FRCT_SACK); + pci->window = hton32(sa->rwe); + pci->ackno = hton32(sa->ack); + pci->seqno = hton32(FETCH_ADD_RELAXED(&frcti->snd_cr.ackno, 1) + 1); + + frct_hcs_set(pci, false); + + buf.data = FRCT_BODY(pci); + memset(buf.data, 0, SACK_HDR_SIZE); + *(uint16_t *) buf.data = hton16(sa->n); + for (i = 0; i < sa->n; ++i) + sack_block_put(buf.data, i, sa->blocks[i][0], sa->blocks[i][1]); + + frct_tx(frcti, spb); +} + +static void ack_snd(struct frcti * frcti, + bool with_sack) +{ + struct timespec now; + uint64_t now_ns; time_t diff; uint32_t ackno; uint32_t rwe; - int fd; + struct sack_args * sa = NULL; + size_t sa_sz; + bool sacking = false; assert(frcti); + STAT_BUMP(frcti, ack_fire); + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + if (with_sack && frcti->sack_n_max > 0) { + sa_sz = sizeof(*sa) + frcti->sack_n_max * sizeof(sa->blocks[0]); + sa = malloc(sa_sz); + /* If alloc fails, fall through and send a bare cum-ACK. */ + } pthread_rwlock_wrlock(&frcti->lock); - if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) { + /* D-SACK rides through cum-ACK freshness; signal is the duplicate. */ + if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno) + && !frcti->dsack_valid) { pthread_rwlock_unlock(&frcti->lock); - return; + STAT_BUMP(frcti, ack_supp_seqno); + goto out; } - fd = frcti->fd; ackno = frcti->rcv_cr.lwe; - rwe = frcti->rcv_cr.rwe; + rwe = frcti_advert_rwe(frcti); - diff = ts_diff_ns(&now, &frcti->rcv_cr.act); - if (diff > frcti->a) { + if (ACK_AGED_OUT(frcti->rcv_cr.act, now_ns, frcti->t_a)) { pthread_rwlock_unlock(&frcti->lock); - return; + STAT_BUMP(frcti, ack_supp_inact); + goto out; } - diff = ts_diff_ns(&now, &frcti->snd_cr.act); - if (diff < TICTIME) { + diff = (time_t)(now_ns - frcti->snd_cr.act); + if (diff < TICTIME && !frcti->dsack_valid) { pthread_rwlock_unlock(&frcti->lock); - return; + STAT_BUMP(frcti, ack_supp_rate); + goto out; } + /* RFC 2018: piggyback SACK on timer ACK; dedup unchanged board. */ + if (sa == NULL || (frcti->sack_n == 0 && !frcti->dsack_valid)) + goto no_sack; + + sa->dsack = false; + sa->n = dsack_consume(frcti, sa->blocks); + if (sa->n == 1) + sa->dsack = true; + + sa->n += sack_blocks_build(frcti, sa->blocks + sa->n, + frcti->sack_n_max - sa->n); + if (sa->n == 0) + goto no_sack; + + if (!sa->dsack && ackno == frcti->sack_lwe && sa->n == frcti->sack_n) + goto no_sack; + + sa->ack = ackno; + sa->rwe = rwe; + frcti->sack_lwe = ackno; + frcti->sack_n = sa->n; + frcti->t_snd_sack = now_ns; + sacking = true; + + no_sack: frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; pthread_rwlock_unlock(&frcti->lock); - __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); + STAT_BUMP(frcti, ack_snd); + + if (sacking) { + STAT_BUMP(frcti, sack_snd); + if (sa->dsack) + STAT_BUMP(frcti, dsack_snd); + frcti_sack_snd(frcti, sa); + } else { + frcti_pkt_snd(frcti, FRCT_ACK | FRCT_FC, ackno, rwe); + } + + out: + free(sa); } -static void __send_rdv(int fd) +/* Delayed-ACK timer: per-flow, dedup'd via atomic test-and-set. */ +static void ack_due(void * arg) { - __send_frct_pkt(fd, FRCT_RDVS, 0, 0); + struct frcti * frcti = arg; + + __atomic_clear(&frcti->ack_pending, __ATOMIC_RELAXED); + + ack_snd(frcti, true); } -static struct frcti * frcti_create(int fd, - time_t a, - time_t r, - time_t mpl) +static int ack_arm(struct frcti * frcti) { - struct frcti * frcti; - ssize_t idx; - struct timespec now; - pthread_condattr_t cattr; + struct timespec now; + uint64_t deadline; + + if (__atomic_test_and_set(&frcti->ack_pending, __ATOMIC_RELAXED)) + return 0; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + deadline = TS_TO_UINT64(now) + 2ULL * (uint64_t) TICTIME; + + tw_post(&frcti->ack_tw, deadline, ack_due, frcti); + + return 0; +} + +/* Forward decl breaks the keepalive cycle: ka_arm <-> ka_due. */ +static void ka_due(void * arg); + +static int ka_arm(struct frcti * frcti) +{ + struct timespec now; + uint64_t now_ns; + uint64_t timeo_ns; + uint64_t snd_ns; + uint64_t rcv_ns; + uint64_t deadline; + + timeo_ns = (uint64_t) frcti->qs_timeout * MILLION; /* IMM */ + snd_ns = LOAD_RELAXED(&frcti->snd_cr.act) + timeo_ns / 4; + rcv_ns = LOAD_RELAXED(&frcti->rcv_cr.act) + timeo_ns; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + deadline = MIN(snd_ns, rcv_ns); + if (deadline <= now_ns) + deadline = now_ns + timeo_ns / 4; + + tw_post(&frcti->ka_tw, deadline, ka_due, frcti); + + return 0; +} + +__attribute__((cold)) +static void ka_snd(struct frcti * frcti) +{ + struct ssm_pk_buff * spb; + struct frct_pci * pci; + struct timespec now; + uint64_t now_ns; + time_t timeo_ns; + uint64_t rcv_act; + uint64_t ka_rcv; + int64_t rcv_idle; + int64_t snd_idle; + uint32_t ackno; + + assert(frcti); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + timeo_ns = (time_t)(frcti->qs_timeout) * MILLION; /* IMM */ + rcv_act = LOAD_RELAXED(&frcti->rcv_cr.act); + ka_rcv = LOAD_RELAXED(&frcti->t_ka_rcv); + rcv_idle = (int64_t)(now_ns - (rcv_act > ka_rcv ? rcv_act : ka_rcv)); + snd_idle = (int64_t)(now_ns - LOAD_RELAXED(&frcti->snd_cr.act)); + + if (rcv_idle > timeo_ns) { + frct_mark_peer_dead(frcti); + return; + } + + if (snd_idle <= timeo_ns / 4) { + ka_arm(frcti); + return; + } + + if (frct_ctrl_alloc(&spb, &pci, 0) < 0) { + ka_arm(frcti); + return; + } + + ackno = LOAD_RELAXED(&frcti->rcv_cr.lwe); + + pci->flags = hton16(FRCT_KA | FRCT_ACK); + pci->ackno = hton32(ackno); + + frct_hcs_set(pci, false); + + STAT_BUMP(frcti, ka_snd); + frct_tx(frcti, spb); + + ka_arm(frcti); +} + +/* Keepalive timer: re-posted by the fire callback itself. */ +static void ka_due(void * arg) +{ + ka_snd((struct frcti *) arg); +} + +static void frcti_rdv_snd(struct frcti * frcti) +{ + frcti_pkt_snd(frcti, FRCT_RDVS, 0, 0); +} + +#define HAS_RESCNTL(cr) ((cr)->cflags & FRCTFRESCNTL) +static bool frcti_is_window_open(struct frcti * frcti) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + struct timespec now; + time_t diff; + bool ret = false; + + if (!HAS_RESCNTL(snd_cr)) + return true; + + if (before(snd_cr->seqno, LOAD_RELAXED(&snd_cr->rwe))) + return true; + + /* Window may be closed; wrlock for RDV state mutations. */ + pthread_rwlock_wrlock(&frcti->lock); + + if (before(snd_cr->seqno, snd_cr->rwe)) { + ret = true; + goto unlock; + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + if (frcti->open) { + frcti->open = false; + frcti->t_wnd = now; + frcti->t_last_rdv = now; + goto unlock; + } + + diff = ts_diff_ns(&now, &frcti->t_wnd); + if (diff > MAX_RDV) + goto unlock; + + diff = ts_diff_ns(&now, &frcti->t_last_rdv); + if (diff > (time_t) frcti->t_rdv) { + frcti->t_last_rdv = now; + frcti_rdv_snd(frcti); + STAT_BUMP(frcti, rdv_snd); + } + unlock: + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} + +/* n contiguous seqnos free? No RDV: the n=1 path drives it. */ +static bool frcti_is_window_open_n(struct frcti * frcti, + size_t n) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + + if (!HAS_RESCNTL(snd_cr)) + return true; + + if (n <= 1) + return frcti_is_window_open(frcti); + + return before(snd_cr->seqno + (uint32_t)(n - 1), + LOAD_RELAXED(&snd_cr->rwe)); +} + +static void release_rq(struct frcti * frcti) +{ + size_t i; + + for (i = 0; i < RQ_SIZE; ++i) { + if (frcti->rcv_slots[i].idx == -1) + continue; + + /* Stream rq entries are sentinels (no spb owned). */ + if (!frcti->stream) + frct_spb_release_idx(frcti->rcv_slots[i].idx); + + frcti->rcv_slots[i].idx = -1; + } +} + +static __inline__ bool stream_ring_sz_ok(struct frcti * frcti, + size_t n) +{ + size_t per_pkt; + + if (n > FRCT_STREAM_RING_SZ_MAX) + return false; + + if ((n & (n - 1)) != 0) + return false; + + per_pkt = frcti->frag_mtu - frcti_data_hdr_len(frcti); + + return n >= FRCT_STREAM_RING_MIN_PKTS * per_pkt; +} + +/* Default ring sized for full RQ_SIZE seqno window; pow2, capped. */ +static size_t default_stream_ring_sz(size_t per_pkt) +{ + size_t need; + size_t sz; + + need = (size_t) RQ_SIZE * per_pkt; + sz = FRCT_STREAM_RING_SZ; + + while (sz < need && sz < FRCT_STREAM_RING_SZ_MAX) + sz <<= 1; + + return sz; +} + +struct frcti * frcti_create(int fd, + uint64_t a, + uint64_t r, + uint64_t mpl, + time_t rtt_hint, + qosspec_t qs, + uint32_t mtu) +{ + struct frcti * frcti; + ssize_t idx; + struct timespec now; + uint64_t now_ns; + size_t bb; + size_t per_pkt; #ifdef PROC_FLOW_STATS - char frctstr[FRCT_NAME_STRLEN + 1]; + char frctstr[FRCT_NAME_STRLEN + 1]; #endif - mpl *= MILLION; - a *= BILLION; - r *= BILLION; + mpl *= MILLION; /* ms -> ns */ + a *= MILLION; /* ms -> ns */ + r *= MILLION; /* ms -> ns */ frcti = malloc(sizeof(*frcti)); if (frcti == NULL) @@ -349,56 +1663,74 @@ static struct frcti * frcti_create(int fd, memset(frcti, 0, sizeof(*frcti)); + list_head_init(&frcti->rxm_list); + if (pthread_rwlock_init(&frcti->lock, NULL)) goto fail_lock; - if (pthread_mutex_init(&frcti->mtx, NULL)) - goto fail_mutex; - - if (pthread_condattr_init(&cattr)) - goto fail_cattr; -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - if (pthread_cond_init(&frcti->cond, &cattr)) - goto fail_cond; - #ifdef PROC_FLOW_STATS sprintf(frctstr, "%d", fd); if (rib_reg(frctstr, &r_ops)) goto fail_rib_reg; #endif - pthread_condattr_destroy(&cattr); for (idx = 0; idx < RQ_SIZE; ++idx) - frcti->rq[idx] = -1; + frcti->rcv_slots[idx].idx = -1; clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + frcti->t_mpl = mpl; + frcti->t_a = a; + frcti->t_r = r; + frcti->t_rdv = DELT_RDV; + frcti->fd = fd; + frcti->ber = (time_t) qs.ber; + frcti->lossy = (qs.loss != 0); + frcti->qs_timeout = (time_t) qs.timeout; + + frcti->frag_mtu = (size_t) mtu; + + /* Cap blocks per SACK at what fits in the per-flow frag_mtu. */ + bb = (frcti->frag_mtu - FRCT_PCILEN - SACK_HDR_SIZE) + / SACK_BLOCK_SIZE; + if (bb > SACK_MAX_BLOCKS) + bb = SACK_MAX_BLOCKS; + frcti->sack_n_max = (uint16_t) bb; + + frcti->max_rcv_sdu = FRCT_MAX_SDU; + + frcti->stream = (qs.service == SVC_STREAM); + if (frcti->stream) { + per_pkt = frcti->frag_mtu - frcti_data_hdr_len(frcti); + frcti->rcv_ring_sz = default_stream_ring_sz(per_pkt); + frcti->ring_seq_cap = + (uint32_t) (frcti->rcv_ring_sz / per_pkt); + } - frcti->mpl = mpl; - frcti->a = a; - frcti->r = r; - frcti->rdv = DELT_RDV; - frcti->fd = fd; - - - frcti->rttseq = 0; - frcti->probe = false; - - frcti->srtt = 0; /* Updated on first ACK */ - frcti->mdev = 10 * MILLION; /* Updated on first ACK */ - frcti->rto = BILLION; /* Initial rxm will be after 1 s */ -#ifdef PROC_FLOW_STATS - frcti->n_rtx = 0; - frcti->n_prb = 0; - frcti->n_rtt = 0; - frcti->n_dup = 0; - frcti->n_dak = 0; - frcti->n_rdv = 0; - frcti->n_out = 0; - frcti->n_rqo = 0; -#endif - if (proc.flows[fd].info.qs.loss == 0) { + frcti->rto_min = (time_t) MAX(RTO_MIN, 1ULL << RXMQ_RES); + rtt_init(frcti, rtt_hint); + frcti->t_min_rtt = now_ns; + frcti->probe_id_next = 1; + frcti->t_rcv_rtt = now_ns; + frcti->t_snd_probe = now_ns; + frcti->t_snd_sack = 0; + frcti->sack_lwe = 0; + frcti->sack_n = 0; + frcti->dsack_seqno = 0; + frcti->dsack_valid = false; + frcti->reo_wnd_mult = 1; + frcti->dsack_lwe_snap = 0; + /* So the first pre-DRF NACK fires without waiting cooldown. */ + frcti->t_nack = now_ns - BILLION; + frcti->in_recovery = false; + frcti->recovery_high = 0; + frcti->rack_fired_lwe = 0; + + tw_init_entry(&frcti->ack_tw); + tw_init_entry(&frcti->ka_tw); + + if (!frcti->lossy) { frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; frcti->rcv_cr.cflags |= FRCTFRTX; } @@ -406,24 +1738,31 @@ static struct frcti * frcti_create(int fd, frcti->snd_cr.cflags |= FRCTFRESCNTL; frcti->snd_cr.rwe = START_WINDOW; + if (frcti->lossy) + frcti->snd_cr.rwe = RQ_SIZE; + + frcti->snd_cr.inact = 3 * mpl + a + r + BILLION; /* ns */ + frcti->snd_cr.act = now_ns - frcti->snd_cr.inact - BILLION; - frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ - frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1); + frcti->rcv_cr.inact = 2 * mpl + a + r + BILLION; /* ns */ + frcti->rcv_cr.act = now_ns - frcti->rcv_cr.inact - BILLION; - frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */ - frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1); + frcti->t_ka_rcv = now_ns; + + /* qs_timeout == 0: no KA, silent peer crash goes undetected. */ + if (frcti->qs_timeout > 0) { + if (ka_arm(frcti) < 0) + goto fail_ka_arm; + } return frcti; + fail_ka_arm: #ifdef PROC_FLOW_STATS + sprintf(frctstr, "%d", fd); + rib_unreg(frctstr); fail_rib_reg: - pthread_cond_destroy(&frcti->cond); #endif - fail_cond: - pthread_condattr_destroy(&cattr); - fail_cattr: - pthread_mutex_destroy(&frcti->mtx); - fail_mutex: pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); @@ -431,21 +1770,43 @@ static struct frcti * frcti_create(int fd, return NULL; } -static void frcti_destroy(struct frcti * frcti) +void frcti_destroy(struct frcti * frcti) { #ifdef PROC_FLOW_STATS char frctstr[FRCT_NAME_STRLEN + 1]; +#endif + /* Drop every wheel entry referencing frcti before freeing it. */ + rxm_cancel_all(frcti); + tw_cancel(&frcti->ack_tw); + tw_cancel(&frcti->ka_tw); + +#if defined(PROC_FLOW_STATS) && defined(FRCT_DEBUG_STDOUT) + printf("[FRCT teardown] pid=%d fd=%d " + "frag_snd=%zu rxm_sack=%zu rxm_dup=%zu rxm_snd=%zu " + "rxm_due=%zu acked=%zu unowned=%zu aged=%zu " + "cancel=%zu arm_fail=%zu inflight=%u\n", + (int) getpid(), frcti->fd, + frcti->stat.frag_snd, frcti->stat.rxm_sack, + frcti->stat.rxm_dupthresh, + frcti->stat.rxm_snd, + frcti->stat.rxm_due_count, frcti->stat.rxm_due_acked, + frcti->stat.rxm_due_unowned, frcti->stat.rxm_due_aged, + frcti->stat.rxm_cancel, frcti->stat.rxm_arm_fail, + frcti->snd_cr.seqno - frcti->snd_cr.lwe); +#endif + + release_rq(frcti); + free(frcti->rcv_ring); +#ifdef PROC_FLOW_STATS sprintf(frctstr, "%d", frcti->fd); rib_unreg(frctstr); #endif - pthread_cond_destroy(&frcti->cond); - pthread_mutex_destroy(&frcti->mtx); pthread_rwlock_destroy(&frcti->lock); free(frcti); } -static uint16_t frcti_getflags(struct frcti * frcti) +uint16_t frcti_getflags(struct frcti * frcti) { uint16_t ret; @@ -453,89 +1814,91 @@ static uint16_t frcti_getflags(struct frcti * frcti) pthread_rwlock_rdlock(&frcti->lock); - ret = frcti->snd_cr.cflags; + ret = frcti->snd_cr.cflags & FRCTFMASK; pthread_rwlock_unlock(&frcti->lock); return ret; } -static void frcti_setflags(struct frcti * frcti, - uint16_t flags) +void frcti_setflags(struct frcti * frcti, + uint16_t flags) { - flags |= FRCTFRTX; /* Should not be set by command */ - assert(frcti); - pthread_rwlock_wrlock(&frcti->lock); + flags &= FRCTFSETMASK; - frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */ + pthread_rwlock_wrlock(&frcti->lock); - frcti->snd_cr.cflags &= flags; + frcti->snd_cr.cflags = (frcti->snd_cr.cflags & ~FRCTFSETMASK) | flags; pthread_rwlock_unlock(&frcti->lock); } -#define frcti_queued_pdu(frcti) \ - (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) +size_t frcti_get_max_rcv_sdu(struct frcti * frcti) +{ + size_t ret; -#define frcti_snd(frcti, spb) \ - (frcti == NULL ? 0 : __frcti_snd(frcti, spb)) + assert(frcti); -#define frcti_rcv(frcti, spb) \ - (frcti == NULL ? 0 : __frcti_rcv(frcti, spb)) + pthread_rwlock_rdlock(&frcti->lock); + ret = frcti->max_rcv_sdu; + pthread_rwlock_unlock(&frcti->lock); -#define frcti_dealloc(frcti) \ - (frcti == NULL ? 0 : __frcti_dealloc(frcti)) + return ret; +} -#define frcti_is_window_open(frcti) \ - (frcti == NULL ? true : __frcti_is_window_open(frcti)) +int frcti_set_max_rcv_sdu(struct frcti * frcti, + size_t max) +{ + assert(frcti); -#define frcti_window_wait(frcti, abstime) \ - (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime)) + if (max == 0) + return -EINVAL; + pthread_rwlock_wrlock(&frcti->lock); + frcti->max_rcv_sdu = max; + pthread_rwlock_unlock(&frcti->lock); -static bool __frcti_is_window_open(struct frcti * frcti) + return 0; +} + +size_t frcti_get_rcv_ring_sz(struct frcti * frcti) { - struct frct_cr * snd_cr = &frcti->snd_cr; - bool ret = true; + size_t ret; + + assert(frcti); pthread_rwlock_rdlock(&frcti->lock); + ret = frcti->rcv_ring_sz; + pthread_rwlock_unlock(&frcti->lock); - if (snd_cr->cflags & FRCTFRESCNTL) - ret = before(snd_cr->seqno, snd_cr->rwe); + return ret; +} - if (!ret) { - struct timespec now; +/* Set before any stream byte has been delivered; -EBUSY otherwise. */ +int frcti_set_rcv_ring_sz(struct frcti * frcti, + size_t n) +{ + int ret = 0; + size_t per_pkt; - clock_gettime(PTHREAD_COND_CLOCK, &now); + assert(frcti); - pthread_mutex_lock(&frcti->mtx); - if (frcti->open) { - frcti->open = false; - frcti->t_wnd = now; - frcti->t_rdvs = now; - } else { - time_t diff; - diff = ts_diff_ns(&now, &frcti->t_wnd); - if (diff > MAX_RDV) { - pthread_mutex_unlock(&frcti->mtx); - pthread_rwlock_unlock(&frcti->lock); - return false; - } - - diff = ts_diff_ns(&now, &frcti->t_rdvs); - if (diff > frcti->rdv) { - frcti->t_rdvs = now; - __send_rdv(frcti->fd); -#ifdef PROC_FLOW_STATS - frcti->n_rdv++; -#endif + if (!frcti->stream) + return -ENOTSUP; + if (!stream_ring_sz_ok(frcti, n)) + return -EINVAL; - } - } + per_pkt = frcti->frag_mtu - frcti_data_hdr_len(frcti); + + pthread_rwlock_wrlock(&frcti->lock); - pthread_mutex_unlock(&frcti->mtx); + if (frcti->rcv_ring != NULL) { + ret = -EBUSY; + } else { + frcti->rcv_ring_sz = n; + frcti->ring_seq_cap = (uint32_t) (n / per_pkt); } pthread_rwlock_unlock(&frcti->lock); @@ -543,392 +1906,1868 @@ static bool __frcti_is_window_open(struct frcti * frcti) return ret; } -static int __frcti_window_wait(struct frcti * frcti, - struct timespec * abstime) +time_t frcti_get_rto_min(struct frcti * frcti) { - struct frct_cr * snd_cr = &frcti->snd_cr; - int ret = 0; + time_t v; + + assert(frcti); pthread_rwlock_rdlock(&frcti->lock); + v = frcti->rto_min; + pthread_rwlock_unlock(&frcti->lock); - if (!(snd_cr->cflags & FRCTFRESCNTL)) { - pthread_rwlock_unlock(&frcti->lock); - return 0; + return v; +} + +/* Floor at the timer-wheel resolution; finer granularity is unrepresentable. */ +int frcti_set_rto_min(struct frcti * frcti, + time_t rto_min) +{ + time_t floor = (time_t) (1ULL << RXMQ_RES); + time_t rto_floor; + time_t rto; + + assert(frcti); + + if (rto_min < floor) + return -EINVAL; + + pthread_rwlock_wrlock(&frcti->lock); + + frcti->rto_min = rto_min; + if (frcti->srtt > 0) { + rto_floor = MAX(rto_min, 2 * frcti->srtt); + rto = MAX(rto_floor, + frcti->srtt + (frcti->mdev << MDEV_MUL)); + STORE_RELEASE(&frcti->rto, rto); + } else if (frcti->rto < rto_min) { + STORE_RELEASE(&frcti->rto, rto_min); } - while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { - struct timespec now; - pthread_rwlock_unlock(&frcti->lock); - pthread_mutex_lock(&frcti->mtx); + pthread_rwlock_unlock(&frcti->lock); + + return 0; +} + +/* Re-arm a fresh rxm so a lost fast-retx still recovers via RTO. */ +static void sack_rxm_snd(struct frcti * frcti, + void * pkt, + size_t len) +{ + struct ssm_pk_buff * spb; + const struct frct_pci * pci; + uint32_t rcv_lwe; + uint32_t seqno; + + rcv_lwe = LOAD_RELAXED(&frcti->rcv_cr.lwe); + + spb = rxm_pkt_prepare(pkt, len, rcv_lwe, frcti->stream); + if (spb == NULL) + return; + + pci = (const struct frct_pci *) ssm_pk_buff_head(spb); + seqno = ntoh32(pci->seqno); + + /* Register fresh rxm before send; old entry self-cleans. */ + if (rxm_arm(frcti, seqno, spb) < 0) { + frct_spb_release(spb); + return; + } - if (frcti->open) { - clock_gettime(PTHREAD_COND_CLOCK, &now); + STAT_BUMP(frcti, rxm_sack); + frct_tx(frcti, spb); +} + +/* Additive HoL emit; original snd_slots[hp].rxm stays armed (NewReno). */ +static void fast_rxm_send(struct frcti * frcti, + void * pkt, + size_t len) +{ + struct ssm_pk_buff * spb; + uint32_t rcv_lwe; + + rcv_lwe = LOAD_RELAXED(&frcti->rcv_cr.lwe); + + spb = rxm_pkt_prepare(pkt, len, rcv_lwe, frcti->stream); + if (spb == NULL) + return; + + frct_tx(frcti, spb); +} + +/* PCI bytes survive head_release at receive; just rewind the pointer. */ +static __inline__ uint16_t frag_role_peek(struct ssm_pk_buff * spb) +{ + const struct frct_pci * pci; + + assert(ssm_pk_buff_head(spb) != NULL); + + pci = (const struct frct_pci *) (ssm_pk_buff_head(spb) - FRCT_PCILEN); + + return ntoh16(pci->flags) & FRCT_FR_MASK; +} + +enum frag_state { + FRAG_NOT_READY, /* head missing / FIRST..LAST run incomplete */ + FRAG_DELIVER, /* *count fragments form a deliverable SDU */ + FRAG_DROP, /* *count fragments at lwe are malformed */ +}; + +/* + * On a gap in the run: FRTX waits (NOT_READY); best-effort scans forward + * for the next FIRST/SOLE and returns DROP for the broken prefix. *count + * gets the offset from the trailing edge. NOT_READY if no later run is + * in window. Caller rdlock. + */ +static enum frag_state frag_inspect_gap(struct frcti * frcti, + size_t start, + size_t * count) +{ + const struct rcv_slot * slots = frcti->rcv_slots; + struct ssm_pk_buff * spb; + uint32_t k; + uint16_t role; + size_t m; + + if (frcti->rcv_cr.cflags & FRCTFRTX) + return FRAG_NOT_READY; + + k = frcti->rcv_cr.rwe - RQ_SIZE; + + for (m = start; m < RQ_SIZE; ++m) { + if (slots[RQ_SLOT(k + m)].idx == -1) + continue; + + spb = rq_frag(frcti, k + m); + role = frag_role_peek(spb); + + if (role == FRCT_FR_SOLE || role == FRCT_FR_FIRST) { + if (m == 0) + return FRAG_NOT_READY; + + *count = m; + return FRAG_DROP; + } + } + + return FRAG_NOT_READY; +} + +/* + * Inspect rq[lwe..]; set *count and return DELIVER/DROP/NOT_READY. DROP + * covers broken prefixes (mid/last at HoL, FIRST..[non-LAST]..new-FIRST). + * Non-FRTX flows skip past gaps to the next FIRST/SOLE. Caller rdlock. + */ +static enum frag_state frag_run_inspect(struct frcti * frcti, + size_t * count) +{ + const struct rcv_slot * slots = frcti->rcv_slots; + struct ssm_pk_buff * spb; + uint32_t k = frcti->rcv_cr.rwe - RQ_SIZE; + uint16_t role; + size_t n = 0; - frcti->t_wnd = now; - frcti->t_rdvs = now; - frcti->open = false; + if (slots[RQ_SLOT(k)].idx == -1) + return frag_inspect_gap(frcti, 0, count); + + spb = rq_frag(frcti, k); + role = frag_role_peek(spb); + + if (role == FRCT_FR_SOLE) { + *count = 1; + return FRAG_DELIVER; + } + + if (role != FRCT_FR_FIRST) { + *count = 1; + return FRAG_DROP; + } + + while (true) { + if (n == RQ_SIZE || slots[RQ_SLOT(k + n)].idx == -1) + return frag_inspect_gap(frcti, n, count); + + spb = rq_frag(frcti, k + n); + role = frag_role_peek(spb); + ++n; + + if (role == FRCT_FR_LAST) { + *count = n; + return FRAG_DELIVER; + } + + if (n > 1 && role != FRCT_FR_MID) { + /* SOLE or new FIRST mid-run: drop the prefix. */ + *count = n - 1; + return FRAG_DROP; } + } +} - pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx); +/* Caller wrlock. Delivery edge is implicit: rwe - RQ_SIZE. */ +static void frag_drop(struct frcti * frcti, + size_t count) +{ + uint32_t k = frcti->rcv_cr.rwe - RQ_SIZE; + uint32_t edge; + size_t i; - ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime); + for (i = 0; i < count; ++i) { + size_t pos = RQ_SLOT(k + i); - pthread_cleanup_pop(false); + if (frcti->rcv_slots[pos].idx == -1) + continue; - if (ret == -ETIMEDOUT) { - time_t diff; + frct_spb_release_idx(frcti->rcv_slots[pos].idx); + frcti->rcv_slots[pos].idx = -1; + } - clock_gettime(PTHREAD_COND_CLOCK, &now); + frcti->rcv_cr.rwe += count; - diff = ts_diff_ns(&now, &frcti->t_wnd); - if (diff > MAX_RDV) { - pthread_mutex_unlock(&frcti->mtx); - return -ECONNRESET; /* write fails! */ - } + /* Drop may span a gap; pull lwe up to preserve rwe - RQ_SIZE <= lwe. */ + edge = frcti->rcv_cr.rwe - RQ_SIZE; + if (before(frcti->rcv_cr.lwe, edge)) + STORE_RELEASE(&frcti->rcv_cr.lwe, edge); +} + +/* Copy `count` fragments at rq[lwe..] into buf; release + advance lwe. */ +static size_t frag_gather(struct frcti * frcti, + size_t count, + uint8_t * buf) +{ + struct ssm_pk_buff * frag; + size_t off = 0; + size_t i; + uint32_t k = frcti->rcv_cr.rwe - RQ_SIZE; + + for (i = 0; i < count; ++i) { + size_t pos = RQ_SLOT(k + i); + size_t flen; + + frag = rq_frag(frcti, k + i); + flen = ssm_pk_buff_len(frag); + memcpy(buf + off, ssm_pk_buff_head(frag), flen); + off += flen; + frct_spb_release_idx(frcti->rcv_slots[pos].idx); + frcti->rcv_slots[pos].idx = -1; + } - diff = ts_diff_ns(&now, &frcti->t_rdvs); - if (diff > frcti->rdv) { - frcti->t_rdvs = now; - __send_rdv(frcti->fd); - } + frcti->rcv_cr.rwe += count; + + return off; +} + +/* Caller holds lock. */ +static size_t frag_total_len(struct frcti * frcti, + size_t count, + bool * overflow) +{ + struct ssm_pk_buff * frag; + size_t total = 0; + size_t i; + uint32_t k = frcti->rcv_cr.rwe - RQ_SIZE; + + *overflow = false; + + for (i = 0; i < count; ++i) { + size_t flen; + + frag = rq_frag(frcti, k + i); + flen = ssm_pk_buff_len(frag); + if (total + flen < total) { + *overflow = true; + return 0; } + total += flen; + } - pthread_mutex_unlock(&frcti->mtx); - pthread_rwlock_rdlock(&frcti->lock); + return total; +} + +/* + * Process a delivered slot at lwe: latch FIN if acceptable, + * advance byte_high (clamped to byte_fin once latched). + */ +static __inline__ void stream_deliver_slot(struct frcti * frcti, + size_t lp) +{ + uint32_t end; + + end = frcti->rcv_slots[lp].end; + + if (frcti->rcv_slots[lp].fin) { + if (end == frcti->rcv_byte_high && !frcti->rcv_fin_seen) { + frcti->rcv_fin_seen = true; + frcti->rcv_byte_fin = end; + } else { + STAT_BUMP(frcti, strm_fin_drop); + } } + if (frcti->rcv_fin_seen && after(end, frcti->rcv_byte_fin)) + end = frcti->rcv_byte_fin; + + frcti->rcv_byte_high = end; +} + +/* Two-segment memcpy from buf into the rx ring at byte offset start. */ +static void stream_ring_write(struct frcti * frcti, + uint32_t start, + buffer_t buf) +{ + size_t mask = frcti->rcv_ring_sz - 1; + size_t off = start & mask; + + if (off + buf.len <= frcti->rcv_ring_sz) { + memcpy(frcti->rcv_ring + off, buf.data, buf.len); + } else { + size_t first = frcti->rcv_ring_sz - off; + memcpy(frcti->rcv_ring + off, buf.data, first); + memcpy(frcti->rcv_ring, buf.data + first, buf.len - first); + } +} + +/* Two-segment memcpy from the rx ring at byte offset start into buf. */ +static void stream_ring_read(struct frcti * frcti, + uint32_t start, + buffer_t buf) +{ + size_t mask = frcti->rcv_ring_sz - 1; + size_t off = start & mask; + + if (off + buf.len <= frcti->rcv_ring_sz) { + memcpy(buf.data, frcti->rcv_ring + off, buf.len); + } else { + size_t first = frcti->rcv_ring_sz - off; + memcpy(buf.data, frcti->rcv_ring + off, first); + memcpy(buf.data + first, frcti->rcv_ring, buf.len - first); + } +} + +/* Deliver-or-drop one stashed slot at lwe; advance lwe/rwe. Caller wrlock. */ +static void stream_advance_lwe(struct frcti * frcti) +{ + size_t lp; + + lp = RQ_SLOT(frcti->rcv_cr.lwe); + + if (frcti->rcv_slots[lp].start != frcti->rcv_byte_high) + STAT_BUMP(frcti, strm_drop); + else + stream_deliver_slot(frcti, lp); + + frcti->rcv_slots[lp].fin = 0; + frcti->rcv_slots[lp].idx = -1; + STORE_RELEASE(&frcti->rcv_cr.lwe, frcti->rcv_cr.lwe + 1); + frcti->rcv_cr.rwe++; +} + +/* + * Validate a stream DATA packet before stashing. Returns 0 if the + * packet may be written into rcv_ring + rq[], -1 otherwise. + */ +static __inline__ int stream_stash_check(struct frcti * frcti, + uint32_t start, + uint32_t end, + size_t plen, + uint16_t flags) +{ + if (end - start != (uint32_t) plen) + return -1; + + /* FIN MUST be 0-byte. */ + if ((flags & FRCT_FIN) && plen != 0) + return -1; + + /* Post-EOS: no further FIN once latched. */ + if (frcti->rcv_fin_seen && (flags & FRCT_FIN)) + return -1; + + /* Post-EOS: reject data at or past byte_fin. */ + if (frcti->rcv_fin_seen && !before(start, frcti->rcv_byte_fin)) + return -1; + + /* Stale: peer is behind the delivered edge. */ + if (before(end, frcti->rcv_byte_next)) + return -1; + + /* Exact-edge: only an empty-stream FIN is meaningful. */ + if (end == frcti->rcv_byte_next && !(flags & FRCT_FIN)) + return -1; + + if (end - frcti->rcv_byte_next > frcti->rcv_ring_sz) + return -1; + + return 0; +} + +/* + * Stream-mode DATA receive: validate, stash payload in rcv_ring, mark + * rq[pos], advance lwe through any newly-contiguous run. Returns 0 + * (spb released) or -1 (caller releases). Caller wrlock. + */ +static int frcti_stream_data_rcv(struct frcti * frcti, + struct ssm_pk_buff * spb, + size_t pos, + uint16_t flags) +{ + struct frct_pci_stream * spci; + uint32_t start; + uint32_t end; + buffer_t buf; + size_t skip; + + if (ssm_pk_buff_len(spb) < FRCT_PCI_STREAM_LEN) + return -1; + + if (frcti->rcv_ring == NULL) { + frcti->rcv_ring = calloc(1, frcti->rcv_ring_sz); + if (frcti->rcv_ring == NULL) + return -ENOMEM; + } + + spci = FRCT_HDR_POP(spb, frct_pci_stream); + start = ntoh32(spci->start); + end = ntoh32(spci->end); + + buf.data = ssm_pk_buff_head(spb); + buf.len = ssm_pk_buff_len(spb); + + if (stream_stash_check(frcti, start, end, buf.len, flags) < 0) + return -1; + + /* Trim front-overlap with already-delivered region. */ + if (before(start, frcti->rcv_byte_next)) { + skip = frcti->rcv_byte_next - start; + buf.data += skip; + buf.len -= skip; + start = frcti->rcv_byte_next; + } + + stream_ring_write(frcti, start, buf); + STAT_ADD(frcti, strm_rcv_byte, buf.len); + + frcti->rcv_slots[pos].idx = 1; + frcti->rcv_slots[pos].start = start; + frcti->rcv_slots[pos].end = end; + frcti->rcv_slots[pos].fin = (flags & FRCT_FIN) ? 1 : 0; + + while (frcti->rcv_slots[RQ_SLOT(frcti->rcv_cr.lwe)].idx != -1) + stream_advance_lwe(frcti); + + frct_spb_release(spb); + + return 0; +} + +/* + * DATA receive: stash idx at rq[pos], advance lwe through any + * contiguous run. Caller wrlock. + */ +static void frcti_data_stash(struct frcti * frcti, + ssize_t idx, + size_t pos, + uint16_t flags) +{ + frcti->rcv_slots[pos].idx = idx; + + if ((flags & FRCT_FR_MASK) != FRCT_FR_SOLE) + STAT_BUMP(frcti, frag_rcv); + + /* lwe = cum-ACK edge; advance per fragment through contiguous run. */ + while (before(frcti->rcv_cr.lwe, frcti->rcv_cr.rwe) + && frcti->rcv_slots[RQ_SLOT(frcti->rcv_cr.lwe)].idx != -1) + STORE_RELEASE(&frcti->rcv_cr.lwe, frcti->rcv_cr.lwe + 1); +} + +/* Stream consume: copy up to `count` contiguous bytes from ring into buf. */ +static ssize_t frcti_consume_stream(struct frcti * frcti, + uint8_t * buf, + size_t count) +{ + size_t avail; + size_t copy; + ssize_t ret; + buffer_t dst; + + assert(frcti); + + pthread_rwlock_wrlock(&frcti->lock); + + avail = (size_t) (frcti->rcv_byte_high - frcti->rcv_byte_next); + if (avail == 0) { + /* EOS drained: signal EOF to the reader. */ + if (frcti->rcv_fin_seen + && frcti->rcv_byte_next == frcti->rcv_byte_fin) + ret = 0; + else + ret = -EAGAIN; + goto unlock; + } + + copy = MIN(avail, count); + + dst.data = buf; + dst.len = copy; + stream_ring_read(frcti, frcti->rcv_byte_next, dst); + + frcti->rcv_byte_next += (uint32_t) copy; + STAT_ADD(frcti, strm_dlv_byte, copy); + + ret = (ssize_t) copy; + + unlock: pthread_rwlock_unlock(&frcti->lock); return ret; } -static ssize_t __frcti_queued_pdu(struct frcti * frcti) +/* + * FRTX consume: copy next ready PDU (full SDU or nothing). Returns bytes, + * -EAGAIN (no PDU), or -EMSGSIZE (oversize: run dropped to unblock flow). + */ +static ssize_t frcti_consume(struct frcti * frcti, + uint8_t * buf, + size_t count) { - ssize_t idx; - size_t pos; + size_t n; + size_t total; + bool overflow; + enum frag_state st; + ssize_t ret; assert(frcti); - /* See if we already have the next PDU. */ pthread_rwlock_wrlock(&frcti->lock); - pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); - - idx = frcti->rq[pos]; - if (idx != -1) { - ++frcti->rcv_cr.lwe; - ++frcti->rcv_cr.rwe; - frcti->rq[pos] = -1; + while (true) { + st = frag_run_inspect(frcti, &n); + if (st == FRAG_NOT_READY) { + ret = -EAGAIN; + goto unlock; + } + if (st == FRAG_DROP) { + STAT_ADD(frcti, frag_drop, n); + frag_drop(frcti, n); + continue; + } + /* FRAG_DELIVER */ + total = frag_total_len(frcti, n, &overflow); + if (overflow || total > frcti->max_rcv_sdu || total > count) { + STAT_ADD(frcti, frag_drop, n); + frag_drop(frcti, n); + ret = -EMSGSIZE; + goto unlock; + } + ret = (ssize_t) frag_gather(frcti, n, buf); + if (n > 1) + STAT_BUMP(frcti, sdu_reasm); + goto unlock; } + unlock: pthread_rwlock_unlock(&frcti->lock); - return idx; + return ret; } -static ssize_t __frcti_pdu_ready(struct frcti * frcti) +static bool frcti_pdu_ready(struct frcti * frcti) { - ssize_t idx; - size_t pos; + size_t pos; + size_t count; + bool ready; assert(frcti); - /* See if we already have the next PDU. */ pthread_rwlock_rdlock(&frcti->lock); - pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); - idx = frcti->rq[pos]; + if (frcti->stream) { + ready = frcti->rcv_byte_high != frcti->rcv_byte_next; + pthread_rwlock_unlock(&frcti->lock); + return ready; + } + + if (frag_run_inspect(frcti, &count) != FRAG_DELIVER) { + /* Drop case: frcti_consume will handle it; not ready. */ + pthread_rwlock_unlock(&frcti->lock); + return false; + } + + pos = RQ_SLOT(frcti->rcv_cr.rwe - RQ_SIZE); + ready = frcti->rcv_slots[pos].idx != -1; pthread_rwlock_unlock(&frcti->lock); - return idx; + return ready; } -#include <timerwheel.c> +/* No srtt yet: probe at the cold-probe cadence to seed it. */ +#define PROBE_DUE_COLD(frcti, now_ns) \ + ((now_ns) - (frcti)->t_snd_probe > (uint64_t) RTTP_COLD_NS) + +/* Have srtt: probe when peer quiet for > 2*srtt and last probe > srtt. */ +#define PROBE_DUE_WARM(frcti, now_ns) \ + ((now_ns) - (frcti)->t_rcv_rtt > 2u * (uint64_t)(frcti)->srtt \ + && (now_ns) - (frcti)->t_snd_probe > (uint64_t)(frcti)->srtt) + +/* Seeds srtt for receive-only sides so they don't fall back to 1 s RTO. */ +__attribute__((cold)) +static void frcti_rcv_probe(struct frcti * frcti, + uint64_t now_ns) +{ + uint32_t probe_id; + uint8_t nonce[RTTP_NONCE_LEN] = { 0 }; + + pthread_rwlock_wrlock(&frcti->lock); + + if (frcti->srtt == 0 && !PROBE_DUE_COLD(frcti, now_ns)) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + if (frcti->srtt != 0 && !PROBE_DUE_WARM(frcti, now_ns)) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + probe_id = rttp_alloc_probe(frcti, now_ns, nonce); + + pthread_rwlock_unlock(&frcti->lock); + + if (probe_id != 0) + frcti_rttp_snd(frcti, probe_id, 0, nonce); +} + +/* Echo at slot `pos` matches our probe: id, slot, nonce all intact. */ +static __inline__ bool probe_echo_matches(struct frcti * frcti, + size_t pos, + uint32_t echo_id, + const uint8_t nonce[RTTP_NONCE_LEN]) +{ + if (frcti->probes[pos].id != echo_id) + return false; + + if (frcti->probes[pos].ts == 0) + return false; + + return memcmp(frcti->probes[pos].nonce, nonce, RTTP_NONCE_LEN) == 0; +} /* - * Send a final ACK for everything that has not been ACK'd. - * If the flow should be kept active for retransmission, - * the returned time will be negative. + * RTT probe (echo_id == 0): bounce the nonce back to peer. + * RTT echo (echo_id != 0): verify nonce + feed sample. */ -static time_t __frcti_dealloc(struct frcti * frcti) +static void frcti_rttp_rcv(struct frcti * frcti, + buffer_t pkt, + uint64_t now_ns) { - struct timespec now; - time_t wait; - int ackno; - int fd = -1; + const struct frct_rttp * rttp; + uint32_t probe_id; + uint32_t echo_id; + uint8_t nonce[RTTP_NONCE_LEN]; + size_t ring_pos; + uint64_t sample; + + if (pkt.len < RTTP_PAYLOAD) + return; + + rttp = (const struct frct_rttp *) pkt.data; + probe_id = ntoh32(rttp->probe_id); + echo_id = ntoh32(rttp->echo_id); + + /* Forged/malformed: bouncing this would loop on echo_id == 0. */ + if (probe_id == 0 && echo_id == 0) + return; + + memcpy(nonce, rttp->nonce, sizeof(nonce)); + + if (echo_id == 0) { + /* Probe: echo back with same nonce so peer can verify. */ + STAT_BUMP(frcti, rttp_rcv); + frcti_rttp_snd(frcti, 0, probe_id, nonce); + return; + } + + ring_pos = RTTP_POS(echo_id); + + pthread_rwlock_wrlock(&frcti->lock); + + if (!probe_echo_matches(frcti, ring_pos, echo_id, nonce)) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + sample = now_ns - frcti->probes[ring_pos].ts; + frcti->probes[ring_pos].ts = 0; + frcti->t_rcv_rtt = now_ns; + + /* Clamp probe sample to RTT_CLAMP_MUL * srtt to avoid poisoning. */ + if (frcti->srtt > 0) + sample = MIN(sample, (uint64_t) frcti->srtt * RTT_CLAMP_MUL); + + rtt_update(frcti, sample, now_ns); + + pthread_rwlock_unlock(&frcti->lock); +} + +/* Honours piggybacked ACK on the KA. */ +static void frcti_ka_rcv(struct frcti * frcti, + const struct frct_pci * pci, + uint64_t now_ns, + uint16_t flags) +{ + uint32_t ka_ackno; + + STORE_RELEASE(&frcti->t_ka_rcv, now_ns); + STAT_BUMP(frcti, ka_rcv); + + if (!(flags & FRCT_ACK)) + return; + + ka_ackno = ntoh32(pci->ackno); + + pthread_rwlock_wrlock(&frcti->lock); + + if (within(ka_ackno, frcti->snd_cr.lwe, frcti->snd_cr.seqno)) + STORE_RELEASE(&frcti->snd_cr.lwe, ka_ackno); + + pthread_rwlock_unlock(&frcti->lock); +} + +/* + * Additive HoL re-emit (carries DRF); runs before rcv_cr->act + * refresh so it doesn't pre-empt peer's first DRF. + */ +__attribute__((cold)) +static void frcti_nack_rcv(struct frcti * frcti) +{ + struct timespec now; + uint64_t now_ns; + size_t hp; + struct rxm_entry * rxm; + void * pkt_copy = NULL; + size_t pkt_len = 0; clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); - pthread_rwlock_rdlock(&frcti->lock); + pthread_rwlock_wrlock(&frcti->lock); - ackno = frcti->rcv_cr.lwe; - if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno) - fd = frcti->fd; + STAT_BUMP(frcti, nack_rcv); + + if (frcti->snd_cr.seqno == frcti->snd_cr.lwe) { + pthread_rwlock_unlock(&frcti->lock); + return; + } - wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec, - frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec); - wait = MAX(wait, 0); + hp = RQ_SLOT(frcti->snd_cr.lwe); + rxm = LOAD_ACQUIRE(&frcti->snd_slots[hp].rxm); + if (rxm == NULL || RXM_AGED_OUT(rxm->t0, now_ns, frcti->t_r)) { + pthread_rwlock_unlock(&frcti->lock); + return; + } - if (frcti->snd_cr.cflags & FRCTFLINGER - && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno)) - wait = -wait; + pkt_copy = malloc(rxm->len); + if (pkt_copy != NULL) { + memcpy(pkt_copy, rxm->pkt, rxm->len); + pkt_len = rxm->len; + /* Karn: suppress RTT sample for next ACK. */ + frcti->snd_slots[hp].flags |= SND_RTX | SND_FAST_RXM; + frcti->rtt_lwe = frcti->snd_cr.lwe + 1; + } pthread_rwlock_unlock(&frcti->lock); - if (fd != -1) - __send_frct_pkt(fd, FRCT_ACK, ackno, 0); + if (pkt_copy != NULL) { + fast_rxm_send(frcti, pkt_copy, pkt_len); + free(pkt_copy); + } +} - return wait; +__attribute__((cold)) +static void frcti_rdv_rcv(struct frcti * frcti) +{ + uint32_t rwe; + + pthread_rwlock_rdlock(&frcti->lock); + + rwe = frcti_advert_rwe(frcti); + + pthread_rwlock_unlock(&frcti->lock); + + STAT_BUMP(frcti, rdv_rcv); + + frcti_pkt_snd(frcti, FRCT_FC, 0, rwe); } -static int __frcti_snd(struct frcti * frcti, - struct ssm_pk_buff * spb) +/* + * FC window advert from any flag-bearing packet. Caps at lwe + RQ_SIZE, + * rejects backward shrink (forged/stale FC), marks window open. + * Caller wrlock. + */ +static __inline__ void frcti_fc_rcv(struct frcti * frcti, + const struct frct_pci * pci) { - struct frct_pci * pci; - struct timespec now; - struct frct_cr * snd_cr; - struct frct_cr * rcv_cr; - uint32_t seqno; - bool rtx; + struct frct_cr * snd_cr; + uint32_t rwe; + uint32_t rwe_max; + + snd_cr = &frcti->snd_cr; + rwe = ntoh32(pci->window); + rwe_max = snd_cr->lwe + RQ_SIZE; + + if (after(rwe, rwe_max)) + rwe = rwe_max; + + /* Reject backward shrink (forged/stale FC). */ + if (before(rwe, snd_cr->rwe)) + rwe = snd_cr->rwe; + + STORE_RELAXED(&snd_cr->rwe, rwe); + frcti->open = true; +} + +/* Packet copies captured under frcti->lock; emitted after release. */ +struct pending { + buffer_t fast_rxm; + buffer_t sack_rxm[SACK_RXM_MAX]; + size_t sack_rxm_cnt; +}; + +/* Idempotent; only extends when snd_cr.seqno advances past recovery_high. */ +static void recovery_enter(struct frcti * frcti) +{ + uint32_t hi = frcti->snd_cr.seqno + RTT_QUARANTINE; + + if (!frcti->in_recovery || after(hi, frcti->recovery_high)) { + frcti->in_recovery = true; + frcti->recovery_high = hi; + } +} + +/* True when cum-ACK clears recovery_high or all in-flight ACKed. */ +static bool recovery_exit_reached(struct frcti * frcti, + uint32_t ackno) +{ + if (!frcti->in_recovery) + return false; + + if (!before(ackno, frcti->recovery_high)) + return true; + + return ackno == frcti->snd_cr.seqno; +} + +/* RTT sample gate: Karn + SACK-consume + 4x clamp + don't-seed. */ +static bool rtt_sample_eligible(struct frcti * frcti, + size_t p, + uint16_t flags, + uint32_t lwe) +{ + if (frcti->in_recovery) + return false; + if (flags & FRCT_RXM) + return false; + if (frcti->snd_slots[p].flags & SND_RTX) + return false; + if (LOAD_ACQUIRE(&frcti->snd_slots[p].rxm) == NULL) + return false; + if (before(lwe, frcti->rtt_lwe)) + return false; + /* Don't seed srtt from a cum-ACK; let probes seed. */ + if (frcti->srtt == 0) + return false; + return true; +} + +#define RXM_SLOT_EMPTY(rxm) ((rxm) == NULL) +#define FAST_RXM_STAGED(pending) ((pending)->fast_rxm.data != NULL) +#define RXM_FAST_DONE(flags) (((flags) & SND_FAST_RXM) != 0) + +/* RACK fast retransmit on cum-ACK: HoL aged past R, not yet retransmitted. */ +static void fast_rxm_consider(struct frcti * frcti, + uint64_t now_ns, + struct pending * pending) +{ + struct rxm_entry * rxm; + struct snd_slot * slot; + size_t hp; + uint64_t R; + bool rack_ok; + + hp = RQ_SLOT(frcti->snd_cr.lwe); + slot = &frcti->snd_slots[hp]; + rxm = LOAD_ACQUIRE(&slot->rxm); + R = rack_reorder_window(frcti); + + if (RXM_SLOT_EMPTY(rxm)) + return; + + /* RFC 8985 §6.2: time-based RACK OR DupThresh count. */ + rack_ok = (int64_t)(frcti->t_latest_ack - slot->time) > (int64_t) R; + if (!rack_ok && frcti->dup_thresh < DUP_THRESH) + return; + + /* HoL aged past t_r; let rxm_due tear the flow down. */ + if (RXM_AGED_OUT(rxm->t0, now_ns, frcti->t_r)) + return; + + /* Already on it. */ + if (FAST_RXM_STAGED(pending) || RXM_FAST_DONE(slot->flags)) + return; + + recovery_enter(frcti); + + pending->fast_rxm.data = malloc(rxm->len); + if (pending->fast_rxm.data == NULL) + return; + + pending->fast_rxm.len = rxm->len; + memcpy(pending->fast_rxm.data, rxm->pkt, rxm->len); + slot->flags |= SND_RTX | SND_FAST_RXM; + frcti->rtt_lwe = frcti->snd_cr.lwe + 1; + if (rack_ok) + STAT_BUMP(frcti, rxm_rack); + else + STAT_BUMP(frcti, rxm_dupthresh); +} + +/* Caller holds wrlock; RACK fast retransmit queued in pending. */ +__attribute__((hot)) +static void frcti_ack_rcv(struct frcti * frcti, + const struct frct_pci * pci, + uint16_t flags, + uint64_t now_ns, + struct pending * pending) +{ + uint32_t ackno; + uint32_t lwe; + size_t p; + size_t fresh; + + if (!(flags & FRCT_DATA)) + STAT_BUMP(frcti, ack_rcv); + + ackno = ntoh32(pci->ackno); + if (ackno == frcti->snd_cr.lwe) { + /* RFC 8985 §6.2: only on scoreboard change. */ + if (frcti->snd_cr.lwe != frcti->rack_fired_lwe) { + fast_rxm_consider(frcti, now_ns, pending); + frcti->rack_fired_lwe = frcti->snd_cr.lwe; + } + return; + } + + if (!within(ackno, frcti->snd_cr.lwe, frcti->snd_cr.seqno)) + return; + + lwe = frcti->snd_cr.lwe; + p = RQ_SLOT(lwe); + + STORE_RELEASE(&frcti->snd_cr.lwe, ackno); + + /* RFC 8985 §7.2: halve mult per REO_DECAY_PKTS fresh-ACK'd seqnos. */ + fresh = ackno - frcti->dsack_lwe_snap; + if (frcti->reo_wnd_mult > 1 && fresh >= REO_DECAY_PKTS) { + uint8_t half = frcti->reo_wnd_mult >> 1; + frcti->reo_wnd_mult = half < 1 ? 1 : half; + frcti->dsack_lwe_snap = ackno; + } + + /* RFC 8985: latest cum-ACKed send-time (slot of ackno-1). */ + frcti->t_latest_ack = frcti->snd_slots[RQ_SLOT(ackno - 1)].time; + + /* RFC 8985: SACK-above-lwe count is per-recovery-episode. */ + frcti->dup_thresh = 0; + + /* Karn: only collapse RTO backoff on a fresh ACK. */ + if ((frcti->snd_slots[p].flags & SND_RTX) == 0) + STORE_RELEASE(&frcti->rto_mul, 0); + + if (recovery_exit_reached(frcti, ackno)) + frcti->in_recovery = false; + + if (rtt_sample_eligible(frcti, p, flags, lwe)) { + uint64_t mrtt = now_ns - frcti->snd_slots[p].time; + if (!(flags & FRCT_DATA)) + STAT_BUMP(frcti, ack_rtt); + rtt_update(frcti, mrtt, now_ns); + frcti->t_rcv_rtt = now_ns; + } +} + +/* Skip k == lwe under clamp: NULLing HoL from a stale SACK wedges it. */ +static uint32_t sack_mark_blocks(struct frcti * frcti, + const uint8_t * payload, + uint16_t n, + uint32_t * newly_marked) +{ + uint32_t hi_sacked = frcti->snd_cr.lwe; + uint32_t marked = 0; + uint16_t i; + + for (i = 0; i < n; ++i) { + uint32_t s; + uint32_t e; + uint32_t k; + bool clamped; + + sack_block_get(payload, i, &s, &e); + + if (!before(s, e)) + continue; + + clamped = before(s, frcti->snd_cr.lwe); + if (clamped) + s = frcti->snd_cr.lwe; + if (after(e, frcti->snd_cr.seqno)) + e = frcti->snd_cr.seqno; + + for (k = s; before(k, e); ++k) { + size_t kp = RQ_SLOT(k); + uint64_t t_k; + if (clamped && k == frcti->snd_cr.lwe) + continue; + if (LOAD_ACQUIRE(&frcti->snd_slots[kp].rxm) == NULL) + continue; + STORE_RELEASE(&frcti->snd_slots[kp].rxm, NULL); + frcti->snd_slots[kp].flags = 0; + marked++; + /* RACK.fack: latest SACK-confirmed send-time. */ + t_k = frcti->snd_slots[kp].time; + if (t_k > frcti->t_latest_ack) + frcti->t_latest_ack = t_k; + } + + if (after(e, hi_sacked)) + hi_sacked = e; + } + + *newly_marked = marked; + return hi_sacked; +} + +/* Queue once per loss event (SND_FAST_RXM gates). Emit after unlock. */ +static void sack_queue_rxm(struct frcti * frcti, + uint32_t hi_sacked, + uint64_t now_ns, + struct pending * pending) +{ + uint64_t R = rack_reorder_window(frcti); + uint32_t k; + bool rack_ok; + + for (k = frcti->snd_cr.lwe; before(k, hi_sacked); ++k) { + struct rxm_entry * rxm; + size_t kp = RQ_SLOT(k); + size_t cnt = pending->sack_rxm_cnt; + size_t rack_age; + + rxm = LOAD_ACQUIRE(&frcti->snd_slots[kp].rxm); + + if (cnt >= SACK_RXM_MAX) + break; + + if (rxm == NULL) + continue; + + if (frcti->snd_slots[kp].flags & SND_FAST_RXM) + continue; + + if (RXM_AGED_OUT(rxm->t0, now_ns, frcti->t_r)) + continue; + + rack_age = frcti->t_latest_ack - frcti->snd_slots[kp].time; + /* RFC 8985 §6.2: time-based RACK OR DupThresh count. */ + rack_ok = (int64_t) rack_age > (int64_t) R; + if (!rack_ok && frcti->dup_thresh < DUP_THRESH) + continue; + + if (!rack_ok) + STAT_BUMP(frcti, rxm_dupthresh); + + pending->sack_rxm[cnt].data = malloc(rxm->len); + if (pending->sack_rxm[cnt].data == NULL) + break; + + pending->sack_rxm[cnt].len = rxm->len; + memcpy(pending->sack_rxm[cnt].data, rxm->pkt, rxm->len); + pending->sack_rxm_cnt++; + /* NULL slot so the original timer self-cleans. */ + STORE_RELEASE(&frcti->snd_slots[kp].rxm, NULL); + frcti->snd_slots[kp].time = now_ns; + frcti->snd_slots[kp].flags |= SND_RTX | SND_FAST_RXM; + frcti->rtt_lwe = k + 1; + } +} + +/* + * RFC 2883 D-SACK detector. Returns true iff block[0] is a D-SACK + * report: + * case 1: blocks[0].start < pkt_ackno (strictly below cum-ACK). + * case 2: blocks[0] is a strict sub-range of some blocks[i>0]. + * MAX_DSACK_LAG bounds case-1 distance to one rcv window (sanity). + */ +static bool sack_is_dsack(struct frcti * frcti, + const uint8_t * payload, + uint16_t n, + uint32_t pkt_ackno) +{ + uint32_t s0; + uint32_t e0; + uint16_t i; + + if (n == 0) + return false; + + sack_block_get(payload, 0, &s0, &e0); + if (!before(s0, e0)) + return false; + + if (before(s0, pkt_ackno)) { + if ((pkt_ackno - s0) <= (uint32_t) MAX_DSACK_LAG) + return true; + STAT_BUMP(frcti, dsack_drop); + return false; + } + + for (i = 1; i < n; ++i) { + uint32_t si; + uint32_t ei; + + sack_block_get(payload, i, &si, &ei); + if (!before(si, ei)) + continue; + if (!before(s0, si) && !after(e0, ei) + && (s0 != si || e0 != ei)) + return true; + } + + return false; +} + +/* RFC 8985 §7.2: grow reo_wnd_mult on DSACK evidence. Caller wrlock. */ +static __inline__ void reo_wnd_on_dsack(struct frcti * frcti) +{ + if (frcti->reo_wnd_mult < REO_WND_MULT_MAX) + frcti->reo_wnd_mult++; + + frcti->dsack_lwe_snap = frcti->snd_cr.lwe; +} + +/* Caller holds wrlock; retransmits queued for post-unlock emission. */ +static void frcti_sack_rcv(struct frcti * frcti, + buffer_t pkt, + uint32_t pkt_ackno, + uint64_t now_ns, + struct pending * pending) +{ + uint32_t hi_sacked; + uint32_t marked; + uint16_t n; + bool dsack; + uint16_t n_real; + + if (pkt.len < SACK_HDR_SIZE) + return; + + n = ntoh16(*(const uint16_t *) pkt.data); + if (n > SACK_MAX_BLOCKS) + return; + + if (pkt.len < SACK_HDR_SIZE + (size_t) n * SACK_BLOCK_SIZE) + return; + + STAT_BUMP(frcti, sack_rcv); + + dsack = sack_is_dsack(frcti, pkt.data, n, pkt_ackno); + n_real = n - (dsack ? 1 : 0); + + if (dsack) { + STAT_BUMP(frcti, dsack_rcv); + reo_wnd_on_dsack(frcti); + } + + /* DSACK-only carries no new gap; don't enter recovery. */ + if (n_real > 0) + recovery_enter(frcti); + + marked = 0; + hi_sacked = sack_mark_blocks(frcti, pkt.data, n, &marked); + frcti->dup_thresh += marked; + + if (after(hi_sacked, frcti->snd_cr.lwe)) + sack_queue_rxm(frcti, hi_sacked, now_ns, pending); +} + +/* Emit and free queued packet copies. */ +static void pending_flush(struct frcti * frcti, + struct pending * pending) +{ + size_t i; + + for (i = 0; i < pending->sack_rxm_cnt; ++i) { + sack_rxm_snd(frcti, pending->sack_rxm[i].data, + pending->sack_rxm[i].len); + free(pending->sack_rxm[i].data); + } + + if (pending->fast_rxm.data != NULL) { + fast_rxm_send(frcti, pending->fast_rxm.data, + pending->fast_rxm.len); + free(pending->fast_rxm.data); + } +} + +/* Pre-DRF NACK: ask peer to retransmit HoL; seqno is informational. */ +static void frcti_nack_snd(struct frcti * frcti, + uint32_t seqno_unseen) +{ + struct ssm_pk_buff * spb; + struct frct_pci * pci; + + if (frct_ctrl_alloc(&spb, &pci, 0) < 0) + return; + + pci->flags = hton16(FRCT_NACK); + pci->seqno = hton32(seqno_unseen); + + frct_hcs_set(pci, false); + + frct_tx(frcti, spb); +} + +enum frct_act { + FRCT_ACTIVE, + FRCT_INACT_NEED_NACK, + FRCT_INACT_DROP, +}; + +/* On rcv inactivity: rebase on DRF, or arm pre-DRF NACK. Caller wrlock. */ +static enum frct_act rcv_inact_check(struct frcti * frcti, + uint16_t flags, + uint32_t seqno, + uint64_t now_ns) +{ + struct frct_cr * rcv_cr = &frcti->rcv_cr; + uint64_t cd; + + if (now_ns - rcv_cr->act <= rcv_cr->inact) + return FRCT_ACTIVE; + + if (flags & FRCT_DRF) { + /* Release stale rq[] slots before rebasing. */ + release_rq(frcti); + STORE_RELEASE(&rcv_cr->lwe, seqno); + rcv_cr->rwe = seqno + RQ_SIZE; + rcv_cr->seqno = seqno; + return FRCT_ACTIVE; + } + + if (!(flags & FRCT_DATA)) + return FRCT_ACTIVE; + + /* Pre-DRF: nudge sender with NACK (rate-limited). */ + cd = frcti->srtt > 0 ? (uint64_t) frcti->srtt : NACK_COOLDOWN_NS; + if (now_ns - frcti->t_nack < cd) + return FRCT_INACT_DROP; + + frcti->t_nack = now_ns; + STAT_BUMP(frcti, nack_snd); + + return FRCT_INACT_NEED_NACK; +} + +/* Both modes: bounded accept into rq[seqno]. Caller wrlock. */ +__attribute__((hot)) +static bool rq_accept(struct frcti * frcti, + uint32_t seqno, + size_t pos, + uint16_t flags) +{ + struct frct_cr * rcv_cr = &frcti->rcv_cr; + + if (!before(seqno, rcv_cr->rwe)) { + STAT_BUMP(frcti, out_rcv); + return false; + } + + if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) { + STAT_BUMP(frcti, rqo_rcv); + return false; + } + + if (frcti->rcv_slots[pos].idx != -1) { + if (flags & FRCT_RXM) + STAT_BUMP(frcti, rxm_rcv); + else + STAT_BUMP(frcti, dup_rcv); + /* RFC 2883 §4 case 2: in-window dup; sub-range marker. */ + frcti->dsack_seqno = seqno; + frcti->dsack_valid = true; + return false; + } + + return true; +} + +/* OOO arrival; throttle by min_gap + scoreboard dedup. */ +static bool sack_check(struct frcti * frcti, + uint32_t seqno, + uint64_t now_ns, + struct sack_args * out) +{ + struct frct_cr * rcv_cr = &frcti->rcv_cr; + uint64_t min_gap; + uint16_t n; + + if (!after(seqno, rcv_cr->lwe)) + return false; + + STAT_BUMP(frcti, ooo_rcv); + + /* SACK carries cum-ACK; bound by t_a like any other ACK. */ + if (ACK_AGED_OUT(rcv_cr->act, now_ns, frcti->t_a)) + return false; + + /* srtt/8 gate starved recovery under burst loss; floor to save CPU. */ + min_gap = (uint64_t) SACK_MIN_GAP_NS; + + if (now_ns - frcti->t_snd_sack < min_gap) + return false; + + out->dsack = false; + n = dsack_consume(frcti, out->blocks); + if (n == 1) + out->dsack = true; + n += sack_blocks_build(frcti, out->blocks + n, + frcti->sack_n_max - n); + + if (!out->dsack + && rcv_cr->lwe == frcti->sack_lwe && n == frcti->sack_n) + return false; + + out->n = n; + out->ack = rcv_cr->lwe; + out->rwe = frcti_advert_rwe(frcti); + frcti->t_snd_sack = now_ns; + frcti->sack_lwe = rcv_cr->lwe; + frcti->sack_n = n; + + return true; +} + +/* Wire-dup of fresh DATA at an already-ACKed seqno. */ +static __inline__ bool is_dup_data(uint16_t flags, + uint32_t seqno, + uint32_t lwe) +{ + if (!(flags & FRCT_DATA)) + return false; + + if (flags & FRCT_RXM) + return false; + + return before(seqno, lwe); +} + +/* + * Wire-dup ACK packet: same seqno as the previous emission. Updates + * the dedup ackno on a fresh ACK; caller drops on true. + */ +static __inline__ bool is_dup_ack(struct frcti * frcti, + uint16_t flags, + uint32_t seqno) +{ + if (flags & FRCT_DATA) + return false; + + if (!(flags & FRCT_ACK)) + return false; + + if (seqno == frcti->rcv_cr.ackno) + return true; + + frcti->rcv_cr.ackno = seqno; + + return false; +} + +/* Caller wrlock. */ +__attribute__((cold)) +static void seqno_rotate(struct frcti * frcti, + uint64_t now_ns) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + + if (now_ns - snd_cr->act <= snd_cr->inact) + return; + /* Idle-on-wire ≠ idle e2e: don't orphan in-flight rxm. */ + if (snd_cr->seqno != snd_cr->lwe) + return; + + random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); + STORE_RELEASE(&snd_cr->lwe, snd_cr->seqno); + STORE_RELAXED(&snd_cr->rwe, snd_cr->lwe + START_WINDOW); + frcti->rtt_lwe = snd_cr->seqno; + frcti->in_recovery = false; + frcti->recovery_high = snd_cr->seqno; +} + +__attribute__((hot)) +static int frcti_snd(struct frcti * frcti, + struct ssm_pk_buff * spb, + uint16_t flags) +{ + struct frct_pci * pci; + struct frct_pci_stream * spci = NULL; + struct timespec now; + struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; + uint32_t seqno; + uint16_t pci_flags = 0; + bool rtx; + uint64_t now_ns; + uint64_t rcv_idle; + uint32_t probe_id = 0; + uint8_t probe_nonce[RTTP_NONCE_LEN] = { 0 }; + bool probe; + size_t payload_len = 0; assert(frcti); - assert(ssm_pk_buff_len(spb) != 0); + /* Stream mode permits 0-byte sends for the EOS marker. */ + assert(ssm_pk_buff_len(spb) != 0 || frcti->stream); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; - timerwheel_move(); + tw_move_safe(); - pci = (struct frct_pci *) ssm_pk_buff_push(spb, FRCT_PCILEN); + if (frcti->stream) + payload_len = ssm_pk_buff_len(spb); + + pci = FRCT_HDR_PUSH(spb, frcti); if (pci == NULL) return -ENOMEM; - memset(pci, 0, sizeof(*pci)); + memset(pci, 0, FRCT_PCILEN); + + if (frcti->stream) + spci = FRCT_SPCI(pci); clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); pthread_rwlock_wrlock(&frcti->lock); rtx = snd_cr->cflags & FRCTFRTX; - pci->flags |= FRCT_DATA; + pci_flags |= FRCT_DATA; + if (!frcti->stream) + pci_flags |= (flags & FRCT_FR_MASK); - /* Set DRF if there are no unacknowledged packets. */ - if (snd_cr->seqno == snd_cr->lwe) - pci->flags |= FRCT_DRF; + if (!frcti->stream && (flags & FRCT_FR_MASK) != FRCT_FR_SOLE) + STAT_BUMP(frcti, frag_snd); + + if (frcti->stream) { + if (flags & FRCT_FIN) + pci_flags |= FRCT_FIN; - /* Choose a new sequence number if sender inactivity expired. */ - if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) { - /* There are no unacknowledged packets. */ - assert(snd_cr->seqno == snd_cr->lwe); - random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); - snd_cr->lwe = snd_cr->seqno; - snd_cr->rwe = snd_cr->lwe + START_WINDOW; + spci->start = hton32(frcti->snd_byte_next); + frcti->snd_byte_next += (uint32_t) payload_len; + spci->end = hton32(frcti->snd_byte_next); + STAT_ADD(frcti, strm_snd_byte, payload_len); } + if (snd_cr->seqno == snd_cr->lwe) + pci_flags |= FRCT_DRF; + + seqno_rotate(frcti, now_ns); + seqno = snd_cr->seqno; pci->seqno = hton32(seqno); - if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) { - pci->flags |= FRCT_FC; - *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF); + rcv_idle = now_ns - rcv_cr->act; + + if (rcv_idle < rcv_cr->inact) { + pci_flags |= FRCT_FC; + pci->window = hton32(frcti_advert_rwe(frcti)); } if (!rtx) { - snd_cr->lwe++; + STORE_RELEASE(&snd_cr->lwe, snd_cr->lwe + 1); + STORE_RELEASE(&snd_cr->rwe, snd_cr->lwe + RQ_SIZE); } else { - if (!frcti->probe) { - frcti->rttseq = snd_cr->seqno; - frcti->t_probe = now; - frcti->probe = true; -#ifdef PROC_FLOW_STATS - frcti->n_prb++; -#endif - } - if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) { - pci->flags |= FRCT_ACK; + size_t p = RQ_SLOT(seqno); + frcti->snd_slots[p].time = now_ns; + /* Fresh send clears RTX bits. */ + frcti->snd_slots[p].flags = 0; + if (rcv_idle <= (uint64_t) frcti->t_a) { + pci_flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); rcv_cr->seqno = rcv_cr->lwe; } } + pci->flags = hton16(pci_flags); + + frct_hcs_set(pci, frcti->stream); + snd_cr->seqno++; - snd_cr->act = now; + STORE_RELEASE(&snd_cr->act, now_ns); + + probe = rtt_probe_arm(frcti, now_ns, &probe_id, probe_nonce); pthread_rwlock_unlock(&frcti->lock); + if (probe) + frcti_rttp_snd(frcti, probe_id, 0, probe_nonce); + if (rtx) - timerwheel_rxm(frcti, seqno, spb); + rxm_arm(frcti, seqno, spb); return 0; } -static void rtt_estimator(struct frcti * frcti, - time_t mrtt) +/* 0-byte FRCT_FIN DATA so peer's flow_read returns 0 at this byte. */ +static void frcti_stream_fin_snd(struct frcti * frcti) { - time_t srtt = frcti->srtt; - time_t rttvar = frcti->mdev; + struct ssm_pk_buff * spb; + bool already; - if (srtt == 0) { /* first measurement */ - srtt = mrtt; - rttvar = mrtt >> 1; - } else { - time_t delta = mrtt - srtt; - srtt += (delta >> 3); - delta = (ABS(delta) - rttvar) >> 2; -#ifdef FRCT_LINUX_RTT_ESTIMATOR - if (delta < 0) - delta >>= 3; -#endif - rttvar += delta; + assert(frcti->stream); + + pthread_rwlock_wrlock(&frcti->lock); + + already = frcti->snd_fin_sent; + frcti->snd_fin_sent = true; + + pthread_rwlock_unlock(&frcti->lock); + + if (already) + return; + + if (frct_spb_reserve(frcti_data_hdr_len(frcti), &spb) < 0) + return; + + /* Reset spb to 0-len so frcti_snd's head_alloc populates PCI. */ + ssm_pk_buff_truncate(spb, 0); + + if (frcti_snd(frcti, spb, FRCT_FIN) < 0) { + frct_spb_release(spb); + return; } -#ifdef PROC_FLOW_STATS - frcti->n_rtt++; -#endif - frcti->srtt = MAX(1000L, srtt); - frcti->mdev = MAX(100L, rttvar); - frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL)); -} - -/* Always queues the next application packet on the RQ. */ -static void __frcti_rcv(struct frcti * frcti, - struct ssm_pk_buff * spb) -{ - ssize_t idx; - size_t pos; - struct frct_pci * pci; - struct timespec now; - struct frct_cr * rcv_cr; - struct frct_cr * snd_cr; - uint32_t seqno; - uint32_t ackno; - uint32_t rwe; - int fd = -1; - assert(frcti); + if (frct_tx(frcti, spb) < 0) + return; + + pthread_rwlock_wrlock(&frcti->lock); + + frcti->snd_fin_seqno = frcti->snd_cr.seqno - 1; + + pthread_rwlock_unlock(&frcti->lock); +} + +static bool final_ack_due(struct frcti * frcti, + struct frct_cr * rcv_cr, + uint64_t now_ns) +{ + if (rcv_cr->lwe == rcv_cr->seqno) + return false; + + if (ACK_AGED_OUT(rcv_cr->act, now_ns, frcti->t_a)) + return false; + + return true; +} + +/* Drain-loop predicate: FLINGER cflag + unACK'd data below the FIN/seqno. */ +static bool frcti_lingering(struct frcti * frcti) +{ + struct frct_cr * snd_cr; + uint32_t edge; + bool linger; + + /* Idempotent; FIN must be sent before any linger check uses it. */ + if (frcti->stream) + frcti_stream_fin_snd(frcti); + + pthread_rwlock_rdlock(&frcti->lock); + + snd_cr = &frcti->snd_cr; + + if (frcti->snd_fin_sent) + edge = frcti->snd_fin_seqno; + else + edge = snd_cr->seqno; + + linger = (snd_cr->cflags & FRCTFLINGER) && before(snd_cr->lwe, edge); + + pthread_rwlock_unlock(&frcti->lock); + + return linger; +} + +static time_t frcti_dealloc(struct frcti * frcti) +{ + struct timespec now; + struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; + int ackno; + bool due; + int64_t now_ns; + int64_t rcv; + int64_t snd; - rcv_cr = &frcti->rcv_cr; snd_cr = &frcti->snd_cr; + rcv_cr = &frcti->rcv_cr; + + /* Idempotent; usually already sent by frcti_lingering. */ + if (frcti->stream) + frcti_stream_fin_snd(frcti); clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); - pci = (struct frct_pci *) ssm_pk_buff_pop(spb, FRCT_PCILEN); + pthread_rwlock_rdlock(&frcti->lock); - idx = ssm_pk_buff_get_off(spb); - seqno = ntoh32(pci->seqno); - pos = seqno & (RQ_SIZE - 1); + ackno = rcv_cr->lwe; + rcv = (int64_t)(rcv_cr->act + rcv_cr->inact) - now_ns; + snd = (int64_t)(snd_cr->act + snd_cr->inact) - now_ns; + due = final_ack_due(frcti, rcv_cr, now_ns); - pthread_rwlock_wrlock(&frcti->lock); + pthread_rwlock_unlock(&frcti->lock); - if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) { - if (pci->flags & FRCT_DRF) { /* New run. */ - rcv_cr->lwe = seqno; - rcv_cr->rwe = seqno + RQ_SIZE; - rcv_cr->seqno = seqno; - } else if (pci->flags & FRCT_DATA) { - goto drop_packet; - } - } + if (due) + frcti_pkt_snd(frcti, FRCT_ACK, ackno, 0); - rcv_cr->act = now; + return (time_t) MAX((MAX(rcv, snd) / BILLION), 0); +} - /* For now, just send an immediate window update. */ - if (pci->flags & FRCT_RDVS) { - fd = frcti->fd; - rwe = rcv_cr->rwe; - pthread_rwlock_unlock(&frcti->lock); +__attribute__((hot)) +static void frcti_rcv(struct frcti * frcti, + struct ssm_pk_buff * spb) +{ + ssize_t idx; + size_t pos; + struct frct_pci * pci; + struct timespec now; + uint64_t now_ns; + struct frct_cr * rcv_cr; + uint32_t seqno; + uint16_t flags; + buffer_t pkt; + struct pending pending = { 0 }; + bool in_order; + struct sack_args * sa = NULL; + bool send_sack = false; + + assert(frcti); + + rcv_cr = &frcti->rcv_cr; - __send_frct_pkt(fd, FRCT_FC, 0, rwe); + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); - ssm_pool_remove(proc.pool, idx); + if (ssm_pk_buff_len(spb) < FRCT_PCILEN) { + frct_spb_release(spb); return; } - if (pci->flags & FRCT_ACK) { - ackno = ntoh32(pci->ackno); - if (after(ackno, frcti->snd_cr.lwe)) - frcti->snd_cr.lwe = ackno; + pci = FRCT_HDR_POP(spb, frct_pci); - if (frcti->probe && after(ackno, frcti->rttseq)) { -#ifdef PROC_FLOW_STATS - if (!(pci->flags & FRCT_DATA)) - frcti->n_dak++; -#endif - rtt_estimator(frcti, ts_diff_ns(&now, &frcti->t_probe)); - frcti->probe = false; - } + idx = ssm_pk_buff_get_off(spb); + seqno = ntoh32(pci->seqno); + pos = RQ_SLOT(seqno); + + flags = ntoh16(pci->flags); + + pkt.data = ssm_pk_buff_head(spb); + pkt.len = ssm_pk_buff_len(spb); + + /* Stateless / lock-free dispatches. spb released via ctrl_done. */ + if (flags & FRCT_KA) { + frcti_ka_rcv(frcti, pci, now_ns, flags); + goto ctrl_done; } - if (pci->flags & FRCT_FC) { - uint32_t rwe; + if (flags & FRCT_RTTP) { + frcti_rttp_rcv(frcti, pkt, now_ns); + goto ctrl_done; + } - rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); - rwe |= snd_cr->rwe & 0xFF000000; + if (flags & FRCT_NACK) { + frcti_nack_rcv(frcti); + goto ctrl_done; + } - /* Rollover for 24 bit */ - if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF) - rwe += 0x01000000; + if (flags & FRCT_RDVS) { + frcti_rdv_rcv(frcti); + goto ctrl_done; + } - snd_cr->rwe = rwe; + pthread_rwlock_wrlock(&frcti->lock); - pthread_mutex_lock(&frcti->mtx); - if (!frcti->open) { - frcti->open = true; - pthread_cond_broadcast(&frcti->cond); + /* rcv_inact_check is a no-op for non-DATA non-DRF packets. */ + if (flags & (FRCT_DATA | FRCT_DRF)) { + switch (rcv_inact_check(frcti, flags, seqno, now_ns)) { + case FRCT_INACT_NEED_NACK: + pthread_rwlock_unlock(&frcti->lock); + frcti_nack_snd(frcti, seqno - 1); + frct_spb_release(spb); + return; + case FRCT_INACT_DROP: + goto drop_packet; + case FRCT_ACTIVE: + /* FALLTHRU */ + default: + break; } - pthread_mutex_unlock(&frcti->mtx); } - if (!(pci->flags & FRCT_DATA)) + /* DATA-only act refresh: non-DATA would lock out DRF rebase. */ + if (flags & FRCT_DATA) + STORE_RELEASE(&rcv_cr->act, now_ns); + + /* Wire-dup ACK packet: same seqno as the previous emission. */ + if (is_dup_ack(frcti, flags, seqno)) { + STAT_BUMP(frcti, ack_dup_rcv); + goto drop_packet; + } + + /* Wire-dup of DATA: piggybacked ACK info already processed. */ + if (is_dup_data(flags, seqno, rcv_cr->lwe)) { + rcv_cr->seqno = seqno; + STAT_BUMP(frcti, dup_rcv); + /* RFC 2883 §4 case 1: dup below cum-ACK. */ + frcti->dsack_seqno = seqno; + frcti->dsack_valid = true; + goto drop_packet; + } + + if (flags & FRCT_ACK) + frcti_ack_rcv(frcti, pci, flags, now_ns, &pending); + + if (flags & FRCT_SACK) + frcti_sack_rcv(frcti, pkt, ntoh32(pci->ackno), + now_ns, &pending); + + if (flags & FRCT_FC) + frcti_fc_rcv(frcti, pci); + + if (!(flags & FRCT_DATA)) goto drop_packet; if (before(seqno, rcv_cr->lwe)) { - rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */ -#ifdef PROC_FLOW_STATS - frcti->n_dup++; -#endif + /* Bump rcv_cr.seqno to force ack_snd to fire on the dup. */ + rcv_cr->seqno = seqno; + if (flags & FRCT_RXM) + STAT_BUMP(frcti, rxm_rcv); + else + STAT_BUMP(frcti, dup_rcv); + /* RFC 2883 §4 case 1: dup below cum-ACK. */ + frcti->dsack_seqno = seqno; + frcti->dsack_valid = true; goto drop_packet; } - if (rcv_cr->cflags & FRCTFRTX) { + if (!rq_accept(frcti, seqno, pos, flags)) + goto drop_packet; - if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */ -#ifdef PROC_FLOW_STATS - frcti->n_out++; -#endif + if (frcti->stream) { + if (frcti_stream_data_rcv(frcti, spb, pos, flags) < 0) { + STAT_BUMP(frcti, strm_drop); goto drop_packet; } - - if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) { -#ifdef PROC_FLOW_STATS - frcti->n_rqo++; -#endif - goto drop_packet; /* Out of rq. */ - } - if (frcti->rq[pos] != -1) { -#ifdef PROC_FLOW_STATS - frcti->n_dup++; -#endif - goto drop_packet; /* Duplicate in rq. */ - } - fd = frcti->fd; + /* spb consumed by stash; do not release in drop path. */ + spb = NULL; } else { - rcv_cr->lwe = seqno; + frcti_data_stash(frcti, idx, pos, flags); + } + + /* Lazy alloc: only OOO arrivals can trigger a SACK send. */ + if (after(seqno, rcv_cr->lwe) && frcti->sack_n_max > 0) { + size_t sa_sz = sizeof(*sa) + + frcti->sack_n_max * sizeof(sa->blocks[0]); + sa = malloc(sa_sz); + /* If alloc fails, sack_check sees NULL and we skip SACK. */ } - frcti->rq[pos] = idx; + send_sack = sa != NULL && sack_check(frcti, seqno, now_ns, sa); + in_order = !after(seqno, rcv_cr->lwe); pthread_rwlock_unlock(&frcti->lock); - if (fd != -1) - timerwheel_delayed_ack(fd, frcti); + if (send_sack) { + STAT_BUMP(frcti, sack_snd); + if (sa->dsack) + STAT_BUMP(frcti, dsack_snd); + frcti_sack_snd(frcti, sa); + } else if (in_order) { + ack_arm(frcti); + } + + pending_flush(frcti, &pending); + + frcti_rcv_probe(frcti, now_ns); + + free(sa); + return; + ctrl_done: + frct_spb_release(spb); return; drop_packet: pthread_rwlock_unlock(&frcti->lock); - ssm_pool_remove(proc.pool, idx); - send_frct_pkt(frcti); - return; + frct_spb_release(spb); + /* with_sack=true: ack_snd no-ops if neither dsack nor SACK is due. */ + ack_snd(frcti, true); + + pending_flush(frcti, &pending); + free(sa); } + +/* NULL-shim macros for the no-FRCT case. */ + +#define FRCTI_SND(frcti, spb, flags) \ + ((frcti) == NULL ? 0 : frcti_snd((frcti), (spb), (flags))) + +#define FRCTI_RCV(frcti, spb) \ + do { \ + if ((frcti) != NULL) \ + frcti_rcv((frcti), (spb)); \ + } while (0) + +#define FRCTI_PDU_READY(frcti) \ + ((frcti) != NULL && frcti_pdu_ready(frcti)) + +#define FRCTI_CONSUME(frcti, buf, count) \ + ((frcti) == NULL ? (ssize_t) -EAGAIN \ + : (frcti)->stream \ + ? frcti_consume_stream((frcti), (buf), (count)) \ + : frcti_consume((frcti), (buf), (count))) + +#define FRCTI_IS_FRTX(frcti) \ + ((frcti) != NULL && ((frcti)->rcv_cr.cflags & FRCTFRTX)) + +#define FRCTI_IS_STREAM(frcti) ((frcti) != NULL && (frcti)->stream) + +#define FRCTI_PAYLOAD_CAP(frcti) \ + ((frcti)->frag_mtu - frcti_data_hdr_len(frcti)) + +#define FRCTI_NEEDS_FRAG(frcti, count) \ + ((frcti) != NULL && (count) > FRCTI_PAYLOAD_CAP(frcti)) + +#define FRCTI_IS_WINDOW_OPEN(frcti) \ + ((frcti) == NULL ? true : frcti_is_window_open(frcti)) + +#define FRCTI_IS_WINDOW_OPEN_N(frcti, n) \ + ((frcti) == NULL ? true : frcti_is_window_open_n((frcti), (n))) + +#define FRCTI_LINGERING(frcti) \ + ((frcti) == NULL ? false : frcti_lingering(frcti)) + +#define FRCTI_DEALLOC(frcti) \ + ((frcti) == NULL ? (time_t) 0 : frcti_dealloc(frcti)) + diff --git a/src/lib/pb/ipcp.proto b/src/lib/pb/ipcp.proto index 9dc402f5..406b8d9c 100644 --- a/src/lib/pb/ipcp.proto +++ b/src/lib/pb/ipcp.proto @@ -54,7 +54,7 @@ message ipcp_msg { optional int32 response = 10; optional string comp = 11; optional uint32 timeo_sec = 12; - optional sint32 mpl = 13; + optional sint32 mpl = 13; /* MPL in ms. */ optional int32 result = 14; optional uint32 uid = 15; /* 0 = GSPP, >0 = PUP uid */ } diff --git a/src/lib/pb/irm.proto b/src/lib/pb/irm.proto index 579fd388..5de860a5 100644 --- a/src/lib/pb/irm.proto +++ b/src/lib/pb/irm.proto @@ -88,7 +88,7 @@ message irm_msg { repeated ipcp_list_msg ipcps = 17; repeated name_info_msg names = 18; optional timespec_msg timeo = 19; - optional sint32 mpl = 20; + optional sint32 mpl = 20; /* MPL in ms. */ optional string comp = 21; optional bytes pk = 22; /* piggyback */ optional uint32 timeo_sec = 23; diff --git a/src/lib/pb/model.proto b/src/lib/pb/model.proto index 51bea760..4c1564a5 100644 --- a/src/lib/pb/model.proto +++ b/src/lib/pb/model.proto @@ -28,7 +28,7 @@ message qosspec_msg { required uint32 availability = 3; /* Class of 9s. */ required uint32 loss = 4; /* Packet loss. */ required uint32 ber = 5; /* Bit error rate, ppb. */ - required uint32 in_order = 6; /* In-order delivery. */ + required uint32 service = 6; /* enum qos_service. */ required uint32 max_gap = 7; /* In ms. */ required uint32 timeout = 8; /* Timeout in ms. */ } @@ -37,7 +37,7 @@ message flow_info_msg { required uint32 id = 1; required uint32 n_pid = 2; required uint32 n_1_pid = 3; - required uint32 mpl = 4; + required uint32 mpl = 4; /* MPL in ms. */ required uint32 state = 5; required qosspec_msg qos = 6; required uint32 uid = 7; diff --git a/src/lib/protobuf.c b/src/lib/protobuf.c index 75c71dfd..a824d357 100644 --- a/src/lib/protobuf.c +++ b/src/lib/protobuf.c @@ -759,7 +759,7 @@ qosspec_msg_t * qos_spec_s_to_msg(const struct qos_spec * s) msg->availability = s->availability; msg->loss = s->loss; msg->ber = s->ber; - msg->in_order = s->in_order; + msg->service = s->service; msg->max_gap = s->max_gap; msg->timeout = s->timeout; @@ -777,7 +777,7 @@ struct qos_spec qos_spec_msg_to_s(const qosspec_msg_t * msg) s.availability = msg->availability; s.loss = msg->loss; s.ber = msg->ber; - s.in_order = msg->in_order; + s.service = msg->service; s.max_gap = msg->max_gap; s.timeout = msg->timeout; diff --git a/src/lib/qoscube.c b/src/lib/qoscube.c index 1eaa0d7c..5d7ae17d 100644 --- a/src/lib/qoscube.c +++ b/src/lib/qoscube.c @@ -29,15 +29,11 @@ qoscube_t qos_spec_to_cube(qosspec_t qs) { - if (qs.delay <= qos_voice.delay && - qs.bandwidth <= qos_voice.bandwidth && - qs.availability >= qos_voice.availability && - qs.max_gap <= qos_voice.max_gap) + if (qs.delay <= 50 && qs.bandwidth <= 100000 + && qs.availability >= 5 && qs.max_gap <= 50) return QOS_CUBE_VOICE; - else if (qs.delay <= qos_video.delay && - qs.bandwidth <= qos_video.bandwidth && - qs.availability >= qos_video.availability && - qs.max_gap <= qos_video.max_gap) + else if (qs.delay <= 100 && qs.availability >= 3 + && qs.max_gap <= 100) return QOS_CUBE_VIDEO; else return QOS_CUBE_BE; diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c deleted file mode 100644 index d0f5c05c..00000000 --- a/src/lib/timerwheel.c +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2026 - * - * Timerwheel - * - * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include <ouroboros/list.h> - -/* Overflow limits range to about 6 hours. */ -#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) -#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES) -#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES) - -struct rxm { - struct list_head next; - uint32_t seqno; -#ifndef RXM_BUFFER_ON_HEAP - struct ssm_pk_buff * spb; -#endif - struct frct_pci * pkt; - size_t len; - time_t t0; /* Time when original was sent (us). */ - struct frcti * frcti; - int fd; - int flow_id; /* Prevent rtx when fd reused. */ -}; - -struct ack { - struct list_head next; - struct frcti * frcti; - int fd; - int flow_id; -}; - -struct { - /* - * At a 1 ms min resolution, every level bumps the - * resolution by a factor of 16. - */ - struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS]; - - struct list_head acks[ACKQ_SLOTS]; - bool map[ACKQ_SLOTS][PROC_MAX_FLOWS]; - - size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */ - size_t prv_ack; /* Last processed ack slot. */ - pthread_mutex_t lock; -} rw; - -static void timerwheel_fini(void) -{ - size_t i; - size_t j; - struct list_head * p; - struct list_head * h; - - pthread_mutex_lock(&rw.lock); - - for (i = 0; i < RXMQ_LVLS; ++i) { - for (j = 0; j < RXMQ_SLOTS; j++) { - list_for_each_safe(p, h, &rw.rxms[i][j]) { - struct rxm * rxm; - rxm = list_entry(p, struct rxm, next); - list_del(&rxm->next); -#ifdef RXM_BUFFER_ON_HEAP - free(rxm->pkt); -#else - ssm_pk_buff_ack(rxm->spb); - ipcp_spb_release(rxm->spb); -#endif - free(rxm); - } - } - } - - for (i = 0; i < ACKQ_SLOTS; ++i) { - list_for_each_safe(p, h, &rw.acks[i]) { - struct ack * a = list_entry(p, struct ack, next); - list_del(&a->next); - free(a); - } - } - - pthread_mutex_unlock(&rw.lock); - - pthread_mutex_destroy(&rw.lock); -} - -static int timerwheel_init(void) -{ - struct timespec now; - size_t i; - size_t j; - - if (pthread_mutex_init(&rw.lock, NULL)) - return -1; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - for (i = 0; i < RXMQ_LVLS; ++i) { - rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1); - rw.prv_rxm[i] >>= (RXMQ_BUMP * i); - rw.prv_rxm[i] &= (RXMQ_SLOTS - 1); - for (j = 0; j < RXMQ_SLOTS; ++j) - list_head_init(&rw.rxms[i][j]); - } - - rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1); - for (i = 0; i < ACKQ_SLOTS; ++i) - list_head_init(&rw.acks[i]); - - return 0; -} - -static void timerwheel_move(void) -{ - struct timespec now; - struct list_head * p; - struct list_head * h; - size_t rxm_slot; - size_t ack_slot; - size_t i; - size_t j; - - pthread_mutex_lock(&rw.lock); - - pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock); - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - rxm_slot = ts_to_rxm_slot(now); - - for (i = 0; i < RXMQ_LVLS; ++i) { - size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1); - j = rw.prv_rxm[i]; - if (j_max_slot < j) - j_max_slot += RXMQ_SLOTS; - while (j++ < j_max_slot) { - list_for_each_safe(p, h, - &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { - struct rxm * r; - struct frct_cr * snd_cr; - struct frct_cr * rcv_cr; - size_t slot; - size_t rslot; - ssize_t idx; - struct ssm_pk_buff * spb; - struct frct_pci * pci; - struct flow * f; - uint32_t snd_lwe; - uint32_t rcv_lwe; - size_t lvl = 0; - - r = list_entry(p, struct rxm, next); - - list_del(&r->next); - - snd_cr = &r->frcti->snd_cr; - rcv_cr = &r->frcti->rcv_cr; - f = &proc.flows[r->fd]; -#ifndef RXM_BUFFER_ON_HEAP - ssm_pk_buff_ack(r->spb); -#endif - if (f->frcti == NULL - || f->info.id != r->flow_id) - goto cleanup; - - pthread_rwlock_rdlock(&r->frcti->lock); - - snd_lwe = snd_cr->lwe; - rcv_lwe = rcv_cr->lwe; - - pthread_rwlock_unlock(&r->frcti->lock); - - /* Has been ack'd, remove. */ - if (before(r->seqno, snd_lwe)) - goto cleanup; - - /* Check for r-timer expiry. */ - if (ts_to_ns(now) - r->t0 > r->frcti->r) - goto flow_down; - - pthread_rwlock_wrlock(&r->frcti->lock); - - if (r->seqno == r->frcti->rttseq) { - r->frcti->rto += - r->frcti->rto >> RTO_DIV; - r->frcti->probe = false; - } -#ifdef PROC_FLOW_STATS - r->frcti->n_rtx++; -#endif - rslot = r->frcti->rto >> RXMQ_RES; - - pthread_rwlock_unlock(&r->frcti->lock); - - /* Schedule at least in the next time slot. */ - slot = ts_to_ns(now) >> RXMQ_RES; - - while (rslot >= RXMQ_SLOTS) { - ++lvl; - rslot >>= RXMQ_BUMP; - slot >>= RXMQ_BUMP; - } - - if (lvl >= RXMQ_LVLS) /* Can't reschedule */ - goto flow_down; - - rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1); -#ifdef RXM_BLOCKING - if (ipcp_spb_reserve(&spb, r->len) < 0) -#else - if (ssm_pool_alloc(proc.pool, r->len, NULL, - &spb) < 0) -#endif - goto reschedule; /* rdrbuff full */ - - pci = (struct frct_pci *) ssm_pk_buff_head(spb); - memcpy(pci, r->pkt, r->len); -#ifndef RXM_BUFFER_ON_HEAP - ipcp_spb_release(r->spb); - r->spb = spb; - r->pkt = pci; - ssm_pk_buff_wait_ack(spb); -#endif - idx = ssm_pk_buff_get_off(spb); - - /* Retransmit the copy. */ - pci->ackno = hton32(rcv_lwe); -#ifdef RXM_BLOCKING - if (ssm_rbuff_write_b(f->tx_rb, idx, NULL) < 0) -#else - if (ssm_rbuff_write(f->tx_rb, idx) < 0) -#endif - goto flow_down; - ssm_flow_set_notify(f->set, f->info.id, - FLOW_PKT); - reschedule: - list_add(&r->next, &rw.rxms[lvl][rslot]); - continue; - - flow_down: - ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - cleanup: -#ifdef RXM_BUFFER_ON_HEAP - free(r->pkt); -#else - ipcp_spb_release(r->spb); -#endif - free(r); - } - } - rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1); - /* Move up a level in the wheel. */ - rxm_slot >>= RXMQ_BUMP; - } - - ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; - - j = rw.prv_ack; - - if (ack_slot < j) - ack_slot += ACKQ_SLOTS; - - while (j++ < ack_slot) { - list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) { - struct ack * a; - struct flow * f; - - a = list_entry(p, struct ack, next); - - list_del(&a->next); - - f = &proc.flows[a->fd]; - - rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false; - - if (f->info.id == a->flow_id && f->frcti != NULL) - send_frct_pkt(a->frcti); - - free(a); - } - } - - rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1); - - pthread_cleanup_pop(true); -} - -static int timerwheel_rxm(struct frcti * frcti, - uint32_t seqno, - struct ssm_pk_buff * spb) -{ - struct timespec now; - struct rxm * r; - size_t slot; - size_t lvl = 0; - time_t rto_slot; - - r = malloc(sizeof(*r)); - if (r == NULL) - return -ENOMEM; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - r->t0 = ts_to_ns(now); - r->seqno = seqno; - r->frcti = frcti; - r->len = ssm_pk_buff_len(spb); -#ifdef RXM_BUFFER_ON_HEAP - r->pkt = malloc(r->len); - if (r->pkt == NULL) { - free(r); - return -ENOMEM; - } - memcpy(r->pkt, ssm_pk_buff_head(spb), r->len); -#else - r->spb = spb; - r->pkt = (struct frct_pci *) ssm_pk_buff_head(spb); -#endif - pthread_rwlock_rdlock(&r->frcti->lock); - - rto_slot = frcti->rto >> RXMQ_RES; - slot = r->t0 >> RXMQ_RES; - - r->fd = frcti->fd; - r->flow_id = proc.flows[r->fd].info.id; - - pthread_rwlock_unlock(&r->frcti->lock); - - while (rto_slot >= RXMQ_SLOTS) { - ++lvl; - rto_slot >>= RXMQ_BUMP; - slot >>= RXMQ_BUMP; - } - - if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */ -#ifdef RXM_BUFFER_ON_HEAP - free(r->pkt); -#endif - free(r); - return -EPERM; - } - - slot = (slot + rto_slot + 1) & (RXMQ_SLOTS - 1); - - pthread_mutex_lock(&rw.lock); - - list_add_tail(&r->next, &rw.rxms[lvl][slot]); -#ifndef RXM_BUFFER_ON_HEAP - ssm_pk_buff_wait_ack(spb); -#endif - pthread_mutex_unlock(&rw.lock); - - return 0; -} - -static int timerwheel_delayed_ack(int fd, - struct frcti * frcti) -{ - struct timespec now; - struct ack * a; - size_t slot; - - a = malloc(sizeof(*a)); - if (a == NULL) - return -ENOMEM; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_rdlock(&frcti->lock); - - slot = (((ts_to_ns(now) + (TICTIME << 1)) >> ACKQ_RES) + 1) - & (ACKQ_SLOTS - 1); - - pthread_rwlock_unlock(&frcti->lock); - - a->fd = fd; - a->frcti = frcti; - a->flow_id = proc.flows[fd].info.id; - - pthread_mutex_lock(&rw.lock); - - if (rw.map[slot][fd]) { - pthread_mutex_unlock(&rw.lock); - free(a); - return 0; - } - - rw.map[slot][fd] = true; - - list_add_tail(&a->next, &rw.acks[slot]); - - pthread_mutex_unlock(&rw.lock); - - return 0; -} |
