summaryrefslogtreecommitdiff
path: root/src/lib/timerwheel.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2026-05-10 19:06:21 +0200
committerSander Vrijders <sander@ouroboros.rocks>2026-05-20 08:17:07 +0200
commit63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9 (patch)
tree88f0827466b40d0e83da7954123d00cbb5f6c676 /src/lib/timerwheel.c
parentf33769c818cb1f01079405f543b36aa294764112 (diff)
downloadouroboros-63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9.tar.gz
ouroboros-63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9.zip
lib: Update FRCP implementation
The Flow and Retransmission Control Protocol (FRCP) runs end-to-end between two peers over a flow. It provides reliability, in-order delivery, flow control, and liveness. Note that congestion avoidance is orthogonal to FRCP and handled in the IPCP. A fixed 16-octet header, network byte order, is prefixed to every FRCP packet: 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | flags | hcs | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | window | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | seqno | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | ackno | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | payload (variable) ... +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ hcs is a CRC-16-CCITT-FALSE checksum over the PCI (and the stream extension when present), verified before any flag-driven dispatch. A single packet can simultaneously carry DATA + ACK + FC + RXM by OR-ing flag bits. An optional CRC trailer covers the body on DATA when qs.ber == 0, and on every SACK packet; an optional AEAD wrap (per-flow keys) sits outermost. Flag bits (MSB-first; bits 13..15 reserved, MUST be zero): +------+--------+--------+----------------------------------------+ | Bit | Mask | Name | Meaning | +------+--------+--------+----------------------------------------+ | 0 | 0x8000 | DATA | Carries caller payload | | 1 | 0x4000 | DRF | Start of a fresh data run | | 2 | 0x2000 | ACK | ackno field valid | | 3 | 0x1000 | NACK | Pre-DRF nudge (seqno informational) | | 4 | 0x0800 | FC | window field valid (rwe advertisement) | | 5 | 0x0400 | RDVS | Rendezvous probe (window-closed) | | 6 | 0x0200 | FFGM | First Fragment of a multi-fragment SDU | | 7 | 0x0100 | LFGM | Last Fragment of a multi-fragment SDU | | 8 | 0x0080 | RXM | Retransmission | | 9 | 0x0040 | SACK | Block list follows in payload | | 10 | 0x0020 | RTTP | RTT probe / echo (payload follows) | | 11 | 0x0010 | KA | Keepalive | | 12 | 0x0008 | FIN | End of stream marker | | 13-15| -- | -- | Reserved (MUST be zero) | +------+--------+--------+----------------------------------------+ (FFGM, LFGM) encodes the fragment role of a DATA packet (SCTP-style B/E): 11=SOLE, 10=FIRST, 00=MID, 01=LAST. Each fragment carries its own seqno; Retransmission recovers fragments individually, reassembly runs at consume time. In stream mode FFGM/LFGM are unused; per-byte position is carried by the stream extension below and end-of-stream is signalled by FIN on a 0-byte DATA packet. SACK payload (FRCT_ACK | FRCT_FC | FRCT_SACK): 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | n_blocks | padding (2 octets) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | start[0] | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | end[0] | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ... n_blocks pairs total ... Each block describes a *present* (received) range strictly above the cumulative ACK in the PCI ackno. D-SACK (RFC 2883) is signalled in-band as block[0] - no flag bit, no extra framing - and consumed by the RACK reo_wnd_mult scaler (RFC 8985 sec. 7.2). RTTP payload (FRCT_RTTP only; 24 octets): 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | probe_id | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | echo_id | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + nonce (16 octets, echoed verbatim) + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Stream PCI extension (in_order == STREAM only; 8 octets after the base PCI on every DATA packet): 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | start | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | end | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ start, end are monotonic 32-bit byte offsets; end - start equals the on-wire payload length. Stream mode is negotiated at flow allocation; the extension is present iff stream mode is in use, never on a per-packet basis. Service modes are an orthogonal (in_order, loss, ber) vector selected at flow_alloc; the cubes above map to the axes: +----------------+---------+------+-----+-----------------------+ | Cube | in_order| loss | ber | Engaged | +----------------+---------+------+-----+-----------------------+ | qos_raw | 0 | 1 | 1 | Raw passthrough | | qos_raw_safe | 0 | 1 | 0 | Raw + CRC trailer | | qos_rt | 1 | 1 | 1 | FRCP, no FRTX, no CRC | | qos_rt_safe | 1 | 1 | 0 | FRCP, no FRTX, CRC | | qos_msg | 1 | 0 | 0 | FRCP + FRTX | | qos_stream | 2 | 0 | 0 | FRCP + FRTX, stream | +----------------+---------+------+-----+-----------------------+ in_order=0 sends raw datagrams with no PCI (UDP-equivalent); in_order=1 engages FRCP with SDU framing; in_order=2 (stream) requires loss=0 and is rejected otherwise. loss=0 engages the FRTX retransmit machinery. ber=0 appends the CRC-32 trailer; QOS_DISABLE_CRC at build time forces ber=1 for development. Encryption is a separate per-flow attribute layered as an AEAD wrap outside the FRCP packet. Heritage: delta-t (Watson 1981) supplies timer-based connection management - no SYN/FIN handshake, the DRF marker, the t_mpl / t_a / t_r timers. RINA (Day 2008) supplies the unified flow_alloc(name, qos, ...) primitive and the orthogonal QoS-cube axes. Loss detection follows TCP/QUIC practice (RFCs 2018, 2883, 6582, 6298, 8985); RTT probing is nonce-authenticated like QUIC PATH_CHALLENGE. Adds oftp, a minimal file-transfer tool over an FRCP stream flow. The client reads from stdin or --in FILE and writes through a flow_alloc(qos_stream); the server (--listen) calls flow_accept and writes to stdout or --out FILE. Both sides compute a CRC-64/NVMe over the bytes they handle and print the result. The server rejects flows whose negotiated qs.in_order != STREAM. Two FRCP knobs are exposed via env vars on either side: OFTP_FRCT_RTO_MIN fccntl FRCTSRTOMIN (ns) OFTP_FRCT_STREAM_RING_SZ fccntl FRCTSRRINGSZ (octets) The ocbr_client gains an OCBR_QOS env var to pick the cube the client uses for flow_alloc; recognised values are raw, safe, rt, rt_safe, msg, stream. Unknown values fall back to raw with a warning on stderr. Without the env set behaviour is unchanged. Removes the deprecated lib/timerwheel.c Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/timerwheel.c')
-rw-r--r--src/lib/timerwheel.c414
1 files changed, 0 insertions, 414 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
deleted file mode 100644
index d0f5c05c..00000000
--- a/src/lib/timerwheel.c
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2026
- *
- * Timerwheel
- *
- * Dimitri Staessens <dimitri@ouroboros.rocks>
- * Sander Vrijders <sander@ouroboros.rocks>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#include <ouroboros/list.h>
-
-/* Overflow limits range to about 6 hours. */
-#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
-#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES)
-#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES)
-
-struct rxm {
- struct list_head next;
- uint32_t seqno;
-#ifndef RXM_BUFFER_ON_HEAP
- struct ssm_pk_buff * spb;
-#endif
- struct frct_pci * pkt;
- size_t len;
- time_t t0; /* Time when original was sent (us). */
- struct frcti * frcti;
- int fd;
- int flow_id; /* Prevent rtx when fd reused. */
-};
-
-struct ack {
- struct list_head next;
- struct frcti * frcti;
- int fd;
- int flow_id;
-};
-
-struct {
- /*
- * At a 1 ms min resolution, every level bumps the
- * resolution by a factor of 16.
- */
- struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS];
-
- struct list_head acks[ACKQ_SLOTS];
- bool map[ACKQ_SLOTS][PROC_MAX_FLOWS];
-
- size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */
- size_t prv_ack; /* Last processed ack slot. */
- pthread_mutex_t lock;
-} rw;
-
-static void timerwheel_fini(void)
-{
- size_t i;
- size_t j;
- struct list_head * p;
- struct list_head * h;
-
- pthread_mutex_lock(&rw.lock);
-
- for (i = 0; i < RXMQ_LVLS; ++i) {
- for (j = 0; j < RXMQ_SLOTS; j++) {
- list_for_each_safe(p, h, &rw.rxms[i][j]) {
- struct rxm * rxm;
- rxm = list_entry(p, struct rxm, next);
- list_del(&rxm->next);
-#ifdef RXM_BUFFER_ON_HEAP
- free(rxm->pkt);
-#else
- ssm_pk_buff_ack(rxm->spb);
- ipcp_spb_release(rxm->spb);
-#endif
- free(rxm);
- }
- }
- }
-
- for (i = 0; i < ACKQ_SLOTS; ++i) {
- list_for_each_safe(p, h, &rw.acks[i]) {
- struct ack * a = list_entry(p, struct ack, next);
- list_del(&a->next);
- free(a);
- }
- }
-
- pthread_mutex_unlock(&rw.lock);
-
- pthread_mutex_destroy(&rw.lock);
-}
-
-static int timerwheel_init(void)
-{
- struct timespec now;
- size_t i;
- size_t j;
-
- if (pthread_mutex_init(&rw.lock, NULL))
- return -1;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- for (i = 0; i < RXMQ_LVLS; ++i) {
- rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1);
- rw.prv_rxm[i] >>= (RXMQ_BUMP * i);
- rw.prv_rxm[i] &= (RXMQ_SLOTS - 1);
- for (j = 0; j < RXMQ_SLOTS; ++j)
- list_head_init(&rw.rxms[i][j]);
- }
-
- rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1);
- for (i = 0; i < ACKQ_SLOTS; ++i)
- list_head_init(&rw.acks[i]);
-
- return 0;
-}
-
-static void timerwheel_move(void)
-{
- struct timespec now;
- struct list_head * p;
- struct list_head * h;
- size_t rxm_slot;
- size_t ack_slot;
- size_t i;
- size_t j;
-
- pthread_mutex_lock(&rw.lock);
-
- pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock);
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- rxm_slot = ts_to_rxm_slot(now);
-
- for (i = 0; i < RXMQ_LVLS; ++i) {
- size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1);
- j = rw.prv_rxm[i];
- if (j_max_slot < j)
- j_max_slot += RXMQ_SLOTS;
- while (j++ < j_max_slot) {
- list_for_each_safe(p, h,
- &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
- struct rxm * r;
- struct frct_cr * snd_cr;
- struct frct_cr * rcv_cr;
- size_t slot;
- size_t rslot;
- ssize_t idx;
- struct ssm_pk_buff * spb;
- struct frct_pci * pci;
- struct flow * f;
- uint32_t snd_lwe;
- uint32_t rcv_lwe;
- size_t lvl = 0;
-
- r = list_entry(p, struct rxm, next);
-
- list_del(&r->next);
-
- snd_cr = &r->frcti->snd_cr;
- rcv_cr = &r->frcti->rcv_cr;
- f = &proc.flows[r->fd];
-#ifndef RXM_BUFFER_ON_HEAP
- ssm_pk_buff_ack(r->spb);
-#endif
- if (f->frcti == NULL
- || f->info.id != r->flow_id)
- goto cleanup;
-
- pthread_rwlock_rdlock(&r->frcti->lock);
-
- snd_lwe = snd_cr->lwe;
- rcv_lwe = rcv_cr->lwe;
-
- pthread_rwlock_unlock(&r->frcti->lock);
-
- /* Has been ack'd, remove. */
- if (before(r->seqno, snd_lwe))
- goto cleanup;
-
- /* Check for r-timer expiry. */
- if (ts_to_ns(now) - r->t0 > r->frcti->r)
- goto flow_down;
-
- pthread_rwlock_wrlock(&r->frcti->lock);
-
- if (r->seqno == r->frcti->rttseq) {
- r->frcti->rto +=
- r->frcti->rto >> RTO_DIV;
- r->frcti->probe = false;
- }
-#ifdef PROC_FLOW_STATS
- r->frcti->n_rtx++;
-#endif
- rslot = r->frcti->rto >> RXMQ_RES;
-
- pthread_rwlock_unlock(&r->frcti->lock);
-
- /* Schedule at least in the next time slot. */
- slot = ts_to_ns(now) >> RXMQ_RES;
-
- while (rslot >= RXMQ_SLOTS) {
- ++lvl;
- rslot >>= RXMQ_BUMP;
- slot >>= RXMQ_BUMP;
- }
-
- if (lvl >= RXMQ_LVLS) /* Can't reschedule */
- goto flow_down;
-
- rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1);
-#ifdef RXM_BLOCKING
- if (ipcp_spb_reserve(&spb, r->len) < 0)
-#else
- if (ssm_pool_alloc(proc.pool, r->len, NULL,
- &spb) < 0)
-#endif
- goto reschedule; /* rdrbuff full */
-
- pci = (struct frct_pci *) ssm_pk_buff_head(spb);
- memcpy(pci, r->pkt, r->len);
-#ifndef RXM_BUFFER_ON_HEAP
- ipcp_spb_release(r->spb);
- r->spb = spb;
- r->pkt = pci;
- ssm_pk_buff_wait_ack(spb);
-#endif
- idx = ssm_pk_buff_get_off(spb);
-
- /* Retransmit the copy. */
- pci->ackno = hton32(rcv_lwe);
-#ifdef RXM_BLOCKING
- if (ssm_rbuff_write_b(f->tx_rb, idx, NULL) < 0)
-#else
- if (ssm_rbuff_write(f->tx_rb, idx) < 0)
-#endif
- goto flow_down;
- ssm_flow_set_notify(f->set, f->info.id,
- FLOW_PKT);
- reschedule:
- list_add(&r->next, &rw.rxms[lvl][rslot]);
- continue;
-
- flow_down:
- ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
- ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
- cleanup:
-#ifdef RXM_BUFFER_ON_HEAP
- free(r->pkt);
-#else
- ipcp_spb_release(r->spb);
-#endif
- free(r);
- }
- }
- rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1);
- /* Move up a level in the wheel. */
- rxm_slot >>= RXMQ_BUMP;
- }
-
- ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
-
- j = rw.prv_ack;
-
- if (ack_slot < j)
- ack_slot += ACKQ_SLOTS;
-
- while (j++ < ack_slot) {
- list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) {
- struct ack * a;
- struct flow * f;
-
- a = list_entry(p, struct ack, next);
-
- list_del(&a->next);
-
- f = &proc.flows[a->fd];
-
- rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
-
- if (f->info.id == a->flow_id && f->frcti != NULL)
- send_frct_pkt(a->frcti);
-
- free(a);
- }
- }
-
- rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1);
-
- pthread_cleanup_pop(true);
-}
-
-static int timerwheel_rxm(struct frcti * frcti,
- uint32_t seqno,
- struct ssm_pk_buff * spb)
-{
- struct timespec now;
- struct rxm * r;
- size_t slot;
- size_t lvl = 0;
- time_t rto_slot;
-
- r = malloc(sizeof(*r));
- if (r == NULL)
- return -ENOMEM;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- r->t0 = ts_to_ns(now);
- r->seqno = seqno;
- r->frcti = frcti;
- r->len = ssm_pk_buff_len(spb);
-#ifdef RXM_BUFFER_ON_HEAP
- r->pkt = malloc(r->len);
- if (r->pkt == NULL) {
- free(r);
- return -ENOMEM;
- }
- memcpy(r->pkt, ssm_pk_buff_head(spb), r->len);
-#else
- r->spb = spb;
- r->pkt = (struct frct_pci *) ssm_pk_buff_head(spb);
-#endif
- pthread_rwlock_rdlock(&r->frcti->lock);
-
- rto_slot = frcti->rto >> RXMQ_RES;
- slot = r->t0 >> RXMQ_RES;
-
- r->fd = frcti->fd;
- r->flow_id = proc.flows[r->fd].info.id;
-
- pthread_rwlock_unlock(&r->frcti->lock);
-
- while (rto_slot >= RXMQ_SLOTS) {
- ++lvl;
- rto_slot >>= RXMQ_BUMP;
- slot >>= RXMQ_BUMP;
- }
-
- if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */
-#ifdef RXM_BUFFER_ON_HEAP
- free(r->pkt);
-#endif
- free(r);
- return -EPERM;
- }
-
- slot = (slot + rto_slot + 1) & (RXMQ_SLOTS - 1);
-
- pthread_mutex_lock(&rw.lock);
-
- list_add_tail(&r->next, &rw.rxms[lvl][slot]);
-#ifndef RXM_BUFFER_ON_HEAP
- ssm_pk_buff_wait_ack(spb);
-#endif
- pthread_mutex_unlock(&rw.lock);
-
- return 0;
-}
-
-static int timerwheel_delayed_ack(int fd,
- struct frcti * frcti)
-{
- struct timespec now;
- struct ack * a;
- size_t slot;
-
- a = malloc(sizeof(*a));
- if (a == NULL)
- return -ENOMEM;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- pthread_rwlock_rdlock(&frcti->lock);
-
- slot = (((ts_to_ns(now) + (TICTIME << 1)) >> ACKQ_RES) + 1)
- & (ACKQ_SLOTS - 1);
-
- pthread_rwlock_unlock(&frcti->lock);
-
- a->fd = fd;
- a->frcti = frcti;
- a->flow_id = proc.flows[fd].info.id;
-
- pthread_mutex_lock(&rw.lock);
-
- if (rw.map[slot][fd]) {
- pthread_mutex_unlock(&rw.lock);
- free(a);
- return 0;
- }
-
- rw.map[slot][fd] = true;
-
- list_add_tail(&a->next, &rw.acks[slot]);
-
- pthread_mutex_unlock(&rw.lock);
-
- return 0;
-}