From 40cc98c427186a54ddf27fbd10763d7457fffb30 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Wed, 20 May 2026 18:15:10 +0200 Subject: lib: Add tail loss probe (TLP) to FRCP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous bugfixes that fixed a false-positive inactivity timer check unmasked the need for the tail loss probe (TLP) to quickly recover from loss of the HoL. When the sender finishes its app data with packets still inflight, RACK fast-rxm cannot fire (no ACK arrives), pre-DRF NACK is gated on rcv_cr.inact, and rto_mul climbs into seconds before the next RXM fire — recv-side deadline expires first. This adds the RFC 8985 §7.2 TLP: PTO = 2*SRTT + max_ack_delay (t_a), capped by the HoL slot's current RTO deadline. Probe fires from frcti_snd and after frcti_ack_rcv; per §7.3 at most one TLP per recovery episode, tracked via tlp_high_seq. On fire, re-emit HoL via fast_rxm_send and hand off to RTO — no PTO self re-arm. A new SND_TLP slot flag is Karn-skipped for RTT sampling, cleared by RTO retransmit or NACK-driven fast-rxm, and clears rto_mul on its TLP-ACK (rto_mul carries no CC meaning in FRCP). Also caps MAX_RTO_MUL at 8 instead of 20. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/frct.c | 183 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 162 insertions(+), 21 deletions(-) (limited to 'src/lib/frct.c') diff --git a/src/lib/frct.c b/src/lib/frct.c index 936ca587..38cea93c 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -25,14 +25,16 @@ #define DELT_RDV (100 * MILLION) /* ns */ #define MAX_RDV (1 * BILLION) /* ns */ -#define MAX_RTO_MUL 20 /* caps the RTO backoff shift */ -#define INITIAL_RTO (1 * BILLION) /* RFC 6298 §2.1: 1 s default */ -#define RTT_BOOT_NS (10 * MILLION) /* rtt_hint floor + initial mdev */ -#define SRTT_FLOOR_NS 1000L /* 1 us; smoothed RTT floor */ -#define MDEV_FLOOR_NS 100L /* 100 ns; mdev sanity floor */ -#define RTT_CLAMP_MUL 16 /* probe sample cap = N * srtt */ -#define MIN_RTT_WIN_NS (300ULL * BILLION) /* 5 min, Linux tcp default */ -#define NACK_COOLDOWN_NS (100 * MILLION) /* pre-DRF NACK cooldown fallback */ +#define MAX_RTO_MUL 8 /* caps the RTO backoff shift */ +#define INITIAL_RTO (1 * BILLION) /* RFC 6298 §2.1: 1 s default */ +#define RTT_BOOT_NS (10 * MILLION) /* rtt_hint floor + initial mdev */ +#define SRTT_FLOOR_NS 1000L /* 1 us; smoothed RTT floor */ +#define MDEV_FLOOR_NS 100L /* 100 ns; mdev sanity floor */ +#define RTT_CLAMP_MUL 16 /* probe sample cap = N * srtt */ +#define MIN_RTT_WIN_NS (300ULL * BILLION) /* 5 min, Linux tcp default */ +#define NACK_COOLDOWN_NS (100 * MILLION) /* pre-DRF NACK cooldown */ +#define FRCT_TX_TIMEO_NS (250 * 1000) /* tx ring write deadline */ +#define ACK_DELAY_NS (2ULL * TICTIME) /* delayed-ACK fire delay */ #define FRCT "frct" #define FRCT_PCILEN (sizeof(struct frct_pci)) @@ -209,6 +211,7 @@ struct frcti_stat { size_t dsack_drop; /* DSACK blocks past MAX_DSACK_LAG */ size_t nack_snd; /* pre-DRF NACKs sent */ size_t nack_rcv; /* pre-DRF NACKs received */ + size_t tlp_snd; /* tail loss probes sent */ size_t rttp_snd; /* RTT probes sent */ size_t rttp_rcv; /* RTT probe replies rcvd */ size_t rtt_smpl; /* RTT estimator samples */ @@ -252,6 +255,7 @@ struct rxm_entry; enum snd_slot_flags { SND_RTX = 0x01, /* Any retransmit; Karn skips next RTT sample. */ SND_FAST_RXM = 0x02, /* Fast-retx one-shot gate per loss event. */ + SND_TLP = 0x04, /* Tail loss probe; ACK resets rto_mul. */ }; struct snd_slot { @@ -349,6 +353,7 @@ struct frcti { uint32_t dsack_lwe_snap; /* lwe @ last DSACK */ uint32_t dup_thresh; /* RFC 8985 */ + uint32_t tlp_high_seq; /* §7.3: 0 = none */ uint64_t t_nack; bool open; /* FC window state */ bool in_recovery; @@ -364,10 +369,12 @@ struct frcti { /* Read/written via __atomic without holding lock. */ uint64_t t_ka_rcv; /* ts_to_ns of last KA rx */ uint8_t ack_pending; /* delayed-ACK dedup */ + uint8_t tlp_pending; /* TLP arm dedup (lazy) */ /* Timer entries; ownership belongs to the tw module. */ struct tw_entry ack_tw; /* delayed-ACK timer */ struct tw_entry ka_tw; /* keepalive timer */ + struct tw_entry tlp_tw; /* tail-loss probe timer */ #ifdef PROC_FLOW_STATS /* STAT: lock-free relaxed atomic counters. */ @@ -480,6 +487,7 @@ static int frct_rib_read(const char * path, "D-SACK out-of-range dropped: %20zu\n" "Pre-DRF NACKs sent: %20zu\n" "Pre-DRF NACKs received: %20zu\n" + "Tail loss probes sent: %20zu\n" "RTT probes sent: %20zu\n" "RTT probe replies received: %20zu\n" "RTT estimator samples: %20zu\n" @@ -524,7 +532,7 @@ static int frct_rib_read(const char * path, s.stat.ooo_rcv, s.stat.sack_snd, s.stat.sack_rcv, s.stat.dsack_snd, s.stat.dsack_rcv, s.stat.dsack_drop, - s.stat.nack_snd, s.stat.nack_rcv, + s.stat.nack_snd, s.stat.nack_rcv, s.stat.tlp_snd, s.stat.rttp_snd, s.stat.rttp_rcv, s.stat.rtt_smpl, s.stat.rdv_snd, s.stat.rdv_rcv, s.stat.ka_snd, s.stat.ka_rcv, @@ -1109,8 +1117,10 @@ static void rxm_snd(struct frcti * frcti, slot = &frcti->snd_slots[pos]; slot->time = TS_TO_UINT64(now); - /* RTO clears fast-rtx gate: a fresh loss event for SACK/RACK. */ - slot->flags = (slot->flags & ~SND_FAST_RXM) | SND_RTX; + /* RTO supersedes any pending TLP/fast-rxm on this slot. */ + slot->flags = (slot->flags & ~(SND_FAST_RXM | SND_TLP)) | SND_RTX; + /* §7.3: RTO supersedes any outstanding TLP. */ + frcti->tlp_high_seq = 0; frcti->rtt_lwe = seqno + 1; @@ -1463,7 +1473,7 @@ static int ack_arm(struct frcti * frcti) return 0; clock_gettime(PTHREAD_COND_CLOCK, &now); - deadline = TS_TO_UINT64(now) + 2ULL * (uint64_t) TICTIME; + deadline = TS_TO_UINT64(now) + ACK_DELAY_NS; tw_post(&frcti->ack_tw, deadline, ack_due, frcti); @@ -1764,6 +1774,7 @@ struct frcti * frcti_create(int fd, tw_init_entry(&frcti->ack_tw); tw_init_entry(&frcti->ka_tw); + tw_init_entry(&frcti->tlp_tw); if (!frcti->lossy) { frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; @@ -1814,6 +1825,7 @@ void frcti_destroy(struct frcti * frcti) rxm_cancel_all(frcti); tw_cancel(&frcti->ack_tw); tw_cancel(&frcti->ka_tw); + tw_cancel(&frcti->tlp_tw); #if defined(PROC_FLOW_STATS) && defined(FRCT_DEBUG_STDOUT) printf("[FRCT teardown] pid=%d fd=%d " @@ -2713,8 +2725,10 @@ static void frcti_nack_rcv(struct frcti * frcti) if (pkt_copy != NULL) { memcpy(pkt_copy, rxm->pkt, rxm->len); pkt_len = rxm->len; - /* Karn: suppress RTT sample for next ACK. */ - frcti->snd_slots[hp].flags |= SND_RTX | SND_FAST_RXM; + /* Karn: suppress RTT sample. NACK supersedes pending TLP. */ + frcti->snd_slots[hp].flags = + (frcti->snd_slots[hp].flags & ~SND_TLP) + | SND_RTX | SND_FAST_RXM; frcti->rtt_lwe = frcti->snd_cr.lwe + 1; } @@ -2742,6 +2756,122 @@ static void frcti_rdv_rcv(struct frcti * frcti) frcti_pkt_snd(frcti, FRCT_FC, 0, rwe); } +/* §7.2: PTO = 2*SRTT + max delayed-ACK delay; fallback when unseeded. */ +static __inline__ uint64_t tlp_pto(const struct frcti * frcti) +{ + if (frcti->srtt > 0) + return 2ULL * (uint64_t) frcti->srtt + ACK_DELAY_NS; + + return NACK_COOLDOWN_NS; +} + +/* + * RFC 8985 §7: lazy probe. Re-evaluate on fire — if sender was active + * within PTO, re-post; else probe HoL once and hand off to RTO. + */ +__attribute__((cold)) +static void tlp_due(void * arg) +{ + struct frcti * frcti = arg; + struct timespec now; + uint64_t now_ns; + uint64_t pto; + uint64_t rto_at; + size_t hp; + struct rxm_entry * rxm; + void * pkt_copy = NULL; + size_t pkt_len = 0; + bool re_post = false; + uint64_t deadline = 0; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + pthread_rwlock_wrlock(&frcti->lock); + + if (frcti->snd_cr.seqno == frcti->snd_cr.lwe) + goto unlock; + if (!before(frcti->snd_cr.seqno, frcti->snd_cr.rwe)) + goto unlock; /* FC-blocked: RDV handles it. */ + /* RFC 8985 §7.3: at most one outstanding TLP per episode. */ + if (frcti->tlp_high_seq != 0) + goto unlock; + + pto = tlp_pto(frcti); + + /* §7.2: anchor PTO on most recent send; defer if still active. */ + if (now_ns < frcti->snd_cr.act + pto) { + deadline = frcti->snd_cr.act + pto; + re_post = true; + goto unlock; + } + + hp = RQ_SLOT(frcti->snd_cr.lwe); + rxm = LOAD_ACQUIRE(&frcti->snd_slots[hp].rxm); + if (rxm == NULL || RXM_AGED_OUT(rxm->t0, now_ns, frcti->t_r)) + goto unlock; + + /* Cap: if HoL RTO is due, let rxm_due fire instead. */ + rto_at = rxm->t0 + + ((uint64_t) frcti->rto + << LOAD_RELAXED(&frcti->rto_mul)); + if (rto_at <= now_ns) + goto unlock; + + pkt_copy = malloc(rxm->len); + if (pkt_copy != NULL) { + memcpy(pkt_copy, rxm->pkt, rxm->len); + pkt_len = rxm->len; + frcti->snd_slots[hp].time = now_ns; + frcti->snd_slots[hp].flags |= SND_TLP | SND_FAST_RXM; + frcti->rtt_lwe = frcti->snd_cr.lwe + 1; + /* §7.3 outstanding-probe marker; ack_rcv/rxm_snd clear. */ + frcti->tlp_high_seq = frcti->snd_cr.seqno; + STAT_BUMP(frcti, tlp_snd); + } + + unlock: + pthread_rwlock_unlock(&frcti->lock); + + if (pkt_copy != NULL) { + fast_rxm_send(frcti, pkt_copy, pkt_len); + free(pkt_copy); + } + + if (re_post) + tw_post(&frcti->tlp_tw, deadline, tlp_due, frcti); + else + __atomic_clear(&frcti->tlp_pending, __ATOMIC_RELAXED); +} + +/* §7.2 lazy: post once per quiet period. tlp_due re-evaluates on fire. */ +static int tlp_arm(struct frcti * frcti) +{ + struct timespec now; + uint64_t now_ns; + uint64_t pto; + uint64_t deadline; + + /* §7.3: at most one outstanding TLP per recovery episode. */ + if (LOAD_RELAXED(&frcti->tlp_high_seq) != 0) + return 0; + if (__atomic_test_and_set(&frcti->tlp_pending, __ATOMIC_RELAXED)) + return 0; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + pto = tlp_pto(frcti); + + deadline = LOAD_RELAXED(&frcti->snd_cr.act) + pto; + if (deadline <= now_ns) + deadline = now_ns + pto; + + tw_post(&frcti->tlp_tw, deadline, tlp_due, frcti); + + return 0; +} + /* * FC window advert from any flag-bearing packet. Caps at lwe + RQ_SIZE, * rejects backward shrink (forged/stale FC), marks window open. @@ -2810,7 +2940,7 @@ static bool rtt_sample_eligible(struct frcti * frcti, return false; if (flags & FRCT_RXM) return false; - if (frcti->snd_slots[p].flags & SND_RTX) + if (frcti->snd_slots[p].flags & (SND_RTX | SND_TLP)) return false; if (LOAD_ACQUIRE(&frcti->snd_slots[p].rxm) == NULL) return false; @@ -2908,6 +3038,11 @@ static void frcti_ack_rcv(struct frcti * frcti, STORE_RELEASE(&frcti->snd_cr.lwe, ackno); + /* §7.3: cum-ACK past the probed seqno resolves the TLP. */ + if (frcti->tlp_high_seq != 0 + && !before(ackno, frcti->tlp_high_seq)) + frcti->tlp_high_seq = 0; + /* RFC 8985 §7.2: halve mult per REO_DECAY_PKTS fresh-ACK'd seqnos. */ fresh = ackno - frcti->dsack_lwe_snap; if (frcti->reo_wnd_mult > 1 && fresh >= REO_DECAY_PKTS) { @@ -2922,8 +3057,9 @@ static void frcti_ack_rcv(struct frcti * frcti, /* RFC 8985: SACK-above-lwe count is per-recovery-episode. */ frcti->dup_thresh = 0; - /* Karn: only collapse RTO backoff on a fresh ACK. */ - if ((frcti->snd_slots[p].flags & SND_RTX) == 0) + /* Karn-skip on retx; TLP ACK clears rto_mul (no CC backoff). */ + if ((frcti->snd_slots[p].flags & SND_RTX) == 0 + || (frcti->snd_slots[p].flags & SND_TLP) != 0) STORE_RELEASE(&frcti->rto_mul, 0); if (recovery_exit_reached(frcti, ackno)) @@ -3189,9 +3325,9 @@ enum frct_act { /* On rcv inactivity: rebase on DRF, or arm pre-DRF NACK. Caller wrlock. */ static enum frct_act rcv_inact_check(struct frcti * frcti, - uint16_t flags, - uint32_t seqno, - uint64_t now_ns) + uint16_t flags, + uint32_t seqno, + uint64_t now_ns) { struct frct_cr * rcv_cr = &frcti->rcv_cr; uint64_t cd; @@ -3473,8 +3609,10 @@ static int frcti_snd(struct frcti * frcti, if (probe) frcti_rttp_snd(frcti, probe_id, 0, probe_nonce); - if (rtx) + if (rtx) { rxm_arm(frcti, seqno, spb); + tlp_arm(frcti); + } return 0; } @@ -3757,6 +3895,9 @@ static void frcti_rcv(struct frcti * frcti, ack_arm(frcti); } + if ((flags & FRCT_ACK) && frcti->snd_cr.seqno != frcti->snd_cr.lwe) + tlp_arm(frcti); + pending_flush(frcti, &pending); frcti_rcv_probe(frcti, now_ns); -- cgit v1.2.3