diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/frct.c | 183 |
1 files changed, 162 insertions, 21 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index 936ca587..38cea93c 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -25,14 +25,16 @@ #define DELT_RDV (100 * MILLION) /* ns */ #define MAX_RDV (1 * BILLION) /* ns */ -#define MAX_RTO_MUL 20 /* caps the RTO backoff shift */ -#define INITIAL_RTO (1 * BILLION) /* RFC 6298 §2.1: 1 s default */ -#define RTT_BOOT_NS (10 * MILLION) /* rtt_hint floor + initial mdev */ -#define SRTT_FLOOR_NS 1000L /* 1 us; smoothed RTT floor */ -#define MDEV_FLOOR_NS 100L /* 100 ns; mdev sanity floor */ -#define RTT_CLAMP_MUL 16 /* probe sample cap = N * srtt */ -#define MIN_RTT_WIN_NS (300ULL * BILLION) /* 5 min, Linux tcp default */ -#define NACK_COOLDOWN_NS (100 * MILLION) /* pre-DRF NACK cooldown fallback */ +#define MAX_RTO_MUL 8 /* caps the RTO backoff shift */ +#define INITIAL_RTO (1 * BILLION) /* RFC 6298 §2.1: 1 s default */ +#define RTT_BOOT_NS (10 * MILLION) /* rtt_hint floor + initial mdev */ +#define SRTT_FLOOR_NS 1000L /* 1 us; smoothed RTT floor */ +#define MDEV_FLOOR_NS 100L /* 100 ns; mdev sanity floor */ +#define RTT_CLAMP_MUL 16 /* probe sample cap = N * srtt */ +#define MIN_RTT_WIN_NS (300ULL * BILLION) /* 5 min, Linux tcp default */ +#define NACK_COOLDOWN_NS (100 * MILLION) /* pre-DRF NACK cooldown */ +#define FRCT_TX_TIMEO_NS (250 * 1000) /* tx ring write deadline */ +#define ACK_DELAY_NS (2ULL * TICTIME) /* delayed-ACK fire delay */ #define FRCT "frct" #define FRCT_PCILEN (sizeof(struct frct_pci)) @@ -209,6 +211,7 @@ struct frcti_stat { size_t dsack_drop; /* DSACK blocks past MAX_DSACK_LAG */ size_t nack_snd; /* pre-DRF NACKs sent */ size_t nack_rcv; /* pre-DRF NACKs received */ + size_t tlp_snd; /* tail loss probes sent */ size_t rttp_snd; /* RTT probes sent */ size_t rttp_rcv; /* RTT probe replies rcvd */ size_t rtt_smpl; /* RTT estimator samples */ @@ -252,6 +255,7 @@ struct rxm_entry; enum snd_slot_flags { SND_RTX = 0x01, /* Any retransmit; Karn skips next RTT sample. */ SND_FAST_RXM = 0x02, /* Fast-retx one-shot gate per loss event. */ + SND_TLP = 0x04, /* Tail loss probe; ACK resets rto_mul. */ }; struct snd_slot { @@ -349,6 +353,7 @@ struct frcti { uint32_t dsack_lwe_snap; /* lwe @ last DSACK */ uint32_t dup_thresh; /* RFC 8985 */ + uint32_t tlp_high_seq; /* §7.3: 0 = none */ uint64_t t_nack; bool open; /* FC window state */ bool in_recovery; @@ -364,10 +369,12 @@ struct frcti { /* Read/written via __atomic without holding lock. */ uint64_t t_ka_rcv; /* ts_to_ns of last KA rx */ uint8_t ack_pending; /* delayed-ACK dedup */ + uint8_t tlp_pending; /* TLP arm dedup (lazy) */ /* Timer entries; ownership belongs to the tw module. */ struct tw_entry ack_tw; /* delayed-ACK timer */ struct tw_entry ka_tw; /* keepalive timer */ + struct tw_entry tlp_tw; /* tail-loss probe timer */ #ifdef PROC_FLOW_STATS /* STAT: lock-free relaxed atomic counters. */ @@ -480,6 +487,7 @@ static int frct_rib_read(const char * path, "D-SACK out-of-range dropped: %20zu\n" "Pre-DRF NACKs sent: %20zu\n" "Pre-DRF NACKs received: %20zu\n" + "Tail loss probes sent: %20zu\n" "RTT probes sent: %20zu\n" "RTT probe replies received: %20zu\n" "RTT estimator samples: %20zu\n" @@ -524,7 +532,7 @@ static int frct_rib_read(const char * path, s.stat.ooo_rcv, s.stat.sack_snd, s.stat.sack_rcv, s.stat.dsack_snd, s.stat.dsack_rcv, s.stat.dsack_drop, - s.stat.nack_snd, s.stat.nack_rcv, + s.stat.nack_snd, s.stat.nack_rcv, s.stat.tlp_snd, s.stat.rttp_snd, s.stat.rttp_rcv, s.stat.rtt_smpl, s.stat.rdv_snd, s.stat.rdv_rcv, s.stat.ka_snd, s.stat.ka_rcv, @@ -1109,8 +1117,10 @@ static void rxm_snd(struct frcti * frcti, slot = &frcti->snd_slots[pos]; slot->time = TS_TO_UINT64(now); - /* RTO clears fast-rtx gate: a fresh loss event for SACK/RACK. */ - slot->flags = (slot->flags & ~SND_FAST_RXM) | SND_RTX; + /* RTO supersedes any pending TLP/fast-rxm on this slot. */ + slot->flags = (slot->flags & ~(SND_FAST_RXM | SND_TLP)) | SND_RTX; + /* §7.3: RTO supersedes any outstanding TLP. */ + frcti->tlp_high_seq = 0; frcti->rtt_lwe = seqno + 1; @@ -1463,7 +1473,7 @@ static int ack_arm(struct frcti * frcti) return 0; clock_gettime(PTHREAD_COND_CLOCK, &now); - deadline = TS_TO_UINT64(now) + 2ULL * (uint64_t) TICTIME; + deadline = TS_TO_UINT64(now) + ACK_DELAY_NS; tw_post(&frcti->ack_tw, deadline, ack_due, frcti); @@ -1764,6 +1774,7 @@ struct frcti * frcti_create(int fd, tw_init_entry(&frcti->ack_tw); tw_init_entry(&frcti->ka_tw); + tw_init_entry(&frcti->tlp_tw); if (!frcti->lossy) { frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; @@ -1814,6 +1825,7 @@ void frcti_destroy(struct frcti * frcti) rxm_cancel_all(frcti); tw_cancel(&frcti->ack_tw); tw_cancel(&frcti->ka_tw); + tw_cancel(&frcti->tlp_tw); #if defined(PROC_FLOW_STATS) && defined(FRCT_DEBUG_STDOUT) printf("[FRCT teardown] pid=%d fd=%d " @@ -2713,8 +2725,10 @@ static void frcti_nack_rcv(struct frcti * frcti) if (pkt_copy != NULL) { memcpy(pkt_copy, rxm->pkt, rxm->len); pkt_len = rxm->len; - /* Karn: suppress RTT sample for next ACK. */ - frcti->snd_slots[hp].flags |= SND_RTX | SND_FAST_RXM; + /* Karn: suppress RTT sample. NACK supersedes pending TLP. */ + frcti->snd_slots[hp].flags = + (frcti->snd_slots[hp].flags & ~SND_TLP) + | SND_RTX | SND_FAST_RXM; frcti->rtt_lwe = frcti->snd_cr.lwe + 1; } @@ -2742,6 +2756,122 @@ static void frcti_rdv_rcv(struct frcti * frcti) frcti_pkt_snd(frcti, FRCT_FC, 0, rwe); } +/* §7.2: PTO = 2*SRTT + max delayed-ACK delay; fallback when unseeded. */ +static __inline__ uint64_t tlp_pto(const struct frcti * frcti) +{ + if (frcti->srtt > 0) + return 2ULL * (uint64_t) frcti->srtt + ACK_DELAY_NS; + + return NACK_COOLDOWN_NS; +} + +/* + * RFC 8985 §7: lazy probe. Re-evaluate on fire — if sender was active + * within PTO, re-post; else probe HoL once and hand off to RTO. + */ +__attribute__((cold)) +static void tlp_due(void * arg) +{ + struct frcti * frcti = arg; + struct timespec now; + uint64_t now_ns; + uint64_t pto; + uint64_t rto_at; + size_t hp; + struct rxm_entry * rxm; + void * pkt_copy = NULL; + size_t pkt_len = 0; + bool re_post = false; + uint64_t deadline = 0; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + pthread_rwlock_wrlock(&frcti->lock); + + if (frcti->snd_cr.seqno == frcti->snd_cr.lwe) + goto unlock; + if (!before(frcti->snd_cr.seqno, frcti->snd_cr.rwe)) + goto unlock; /* FC-blocked: RDV handles it. */ + /* RFC 8985 §7.3: at most one outstanding TLP per episode. */ + if (frcti->tlp_high_seq != 0) + goto unlock; + + pto = tlp_pto(frcti); + + /* §7.2: anchor PTO on most recent send; defer if still active. */ + if (now_ns < frcti->snd_cr.act + pto) { + deadline = frcti->snd_cr.act + pto; + re_post = true; + goto unlock; + } + + hp = RQ_SLOT(frcti->snd_cr.lwe); + rxm = LOAD_ACQUIRE(&frcti->snd_slots[hp].rxm); + if (rxm == NULL || RXM_AGED_OUT(rxm->t0, now_ns, frcti->t_r)) + goto unlock; + + /* Cap: if HoL RTO is due, let rxm_due fire instead. */ + rto_at = rxm->t0 + + ((uint64_t) frcti->rto + << LOAD_RELAXED(&frcti->rto_mul)); + if (rto_at <= now_ns) + goto unlock; + + pkt_copy = malloc(rxm->len); + if (pkt_copy != NULL) { + memcpy(pkt_copy, rxm->pkt, rxm->len); + pkt_len = rxm->len; + frcti->snd_slots[hp].time = now_ns; + frcti->snd_slots[hp].flags |= SND_TLP | SND_FAST_RXM; + frcti->rtt_lwe = frcti->snd_cr.lwe + 1; + /* §7.3 outstanding-probe marker; ack_rcv/rxm_snd clear. */ + frcti->tlp_high_seq = frcti->snd_cr.seqno; + STAT_BUMP(frcti, tlp_snd); + } + + unlock: + pthread_rwlock_unlock(&frcti->lock); + + if (pkt_copy != NULL) { + fast_rxm_send(frcti, pkt_copy, pkt_len); + free(pkt_copy); + } + + if (re_post) + tw_post(&frcti->tlp_tw, deadline, tlp_due, frcti); + else + __atomic_clear(&frcti->tlp_pending, __ATOMIC_RELAXED); +} + +/* §7.2 lazy: post once per quiet period. tlp_due re-evaluates on fire. */ +static int tlp_arm(struct frcti * frcti) +{ + struct timespec now; + uint64_t now_ns; + uint64_t pto; + uint64_t deadline; + + /* §7.3: at most one outstanding TLP per recovery episode. */ + if (LOAD_RELAXED(&frcti->tlp_high_seq) != 0) + return 0; + if (__atomic_test_and_set(&frcti->tlp_pending, __ATOMIC_RELAXED)) + return 0; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); + + pto = tlp_pto(frcti); + + deadline = LOAD_RELAXED(&frcti->snd_cr.act) + pto; + if (deadline <= now_ns) + deadline = now_ns + pto; + + tw_post(&frcti->tlp_tw, deadline, tlp_due, frcti); + + return 0; +} + /* * FC window advert from any flag-bearing packet. Caps at lwe + RQ_SIZE, * rejects backward shrink (forged/stale FC), marks window open. @@ -2810,7 +2940,7 @@ static bool rtt_sample_eligible(struct frcti * frcti, return false; if (flags & FRCT_RXM) return false; - if (frcti->snd_slots[p].flags & SND_RTX) + if (frcti->snd_slots[p].flags & (SND_RTX | SND_TLP)) return false; if (LOAD_ACQUIRE(&frcti->snd_slots[p].rxm) == NULL) return false; @@ -2908,6 +3038,11 @@ static void frcti_ack_rcv(struct frcti * frcti, STORE_RELEASE(&frcti->snd_cr.lwe, ackno); + /* §7.3: cum-ACK past the probed seqno resolves the TLP. */ + if (frcti->tlp_high_seq != 0 + && !before(ackno, frcti->tlp_high_seq)) + frcti->tlp_high_seq = 0; + /* RFC 8985 §7.2: halve mult per REO_DECAY_PKTS fresh-ACK'd seqnos. */ fresh = ackno - frcti->dsack_lwe_snap; if (frcti->reo_wnd_mult > 1 && fresh >= REO_DECAY_PKTS) { @@ -2922,8 +3057,9 @@ static void frcti_ack_rcv(struct frcti * frcti, /* RFC 8985: SACK-above-lwe count is per-recovery-episode. */ frcti->dup_thresh = 0; - /* Karn: only collapse RTO backoff on a fresh ACK. */ - if ((frcti->snd_slots[p].flags & SND_RTX) == 0) + /* Karn-skip on retx; TLP ACK clears rto_mul (no CC backoff). */ + if ((frcti->snd_slots[p].flags & SND_RTX) == 0 + || (frcti->snd_slots[p].flags & SND_TLP) != 0) STORE_RELEASE(&frcti->rto_mul, 0); if (recovery_exit_reached(frcti, ackno)) @@ -3189,9 +3325,9 @@ enum frct_act { /* On rcv inactivity: rebase on DRF, or arm pre-DRF NACK. Caller wrlock. */ static enum frct_act rcv_inact_check(struct frcti * frcti, - uint16_t flags, - uint32_t seqno, - uint64_t now_ns) + uint16_t flags, + uint32_t seqno, + uint64_t now_ns) { struct frct_cr * rcv_cr = &frcti->rcv_cr; uint64_t cd; @@ -3473,8 +3609,10 @@ static int frcti_snd(struct frcti * frcti, if (probe) frcti_rttp_snd(frcti, probe_id, 0, probe_nonce); - if (rtx) + if (rtx) { rxm_arm(frcti, seqno, spb); + tlp_arm(frcti); + } return 0; } @@ -3757,6 +3895,9 @@ static void frcti_rcv(struct frcti * frcti, ack_arm(frcti); } + if ((flags & FRCT_ACK) && frcti->snd_cr.seqno != frcti->snd_cr.lwe) + tlp_arm(frcti); + pending_flush(frcti, &pending); frcti_rcv_probe(frcti, now_ns); |
