diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 6 | ||||
| -rw-r--r-- | src/lib/frct.c | 84 |
2 files changed, 72 insertions, 18 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 13c7544b..ae0401b7 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1342,10 +1342,10 @@ int fccntl(int fd, new_acc = flow->oflags & FLOWFACCMODE; /* Defer EOS emit until after proc.lock is dropped: */ - /* frcti_stream_fin_snd may block on shm-pool/tx-rb. */ + /* frcti_fin_snd may block on shm-pool/tx-rb. */ if (new_acc == FLOWFRDONLY && old_acc != FLOWFRDONLY - && FRCTI_IS_STREAM(flow->frcti)) + && flow->frcti != NULL) emit_eos = true; rx_acl = ssm_rbuff_get_acl(flow->rx_rb); @@ -1457,7 +1457,7 @@ int fccntl(int fd, pthread_rwlock_unlock(&proc.lock); if (emit_eos) - frcti_stream_fin_snd(flow->frcti); + frcti_fin_snd(flow->frcti); va_end(l); diff --git a/src/lib/frct.c b/src/lib/frct.c index 27c333c6..2e8955e3 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -106,8 +106,8 @@ struct frct_rttp { /* * Flag values are assigned MSB-first on the wire (RFC convention): * bit 0 = 0x8000 occupies wire-position 0 of the 16-bit flags - * field, bit 11 = 0x0010 is the last assigned bit, and the four - * LSBs (0x000F) are reserved. + * field, bit 12 = 0x0008 is the last assigned bit, and the three + * LSBs (0x0007) are reserved. */ enum frct_flags { FRCT_DATA = 0x8000, /* PDU carries data */ @@ -122,7 +122,7 @@ enum frct_flags { FRCT_SACK = 0x0040, /* SACK block list follows */ FRCT_RTTP = 0x0020, /* RTT probe / echo */ FRCT_KA = 0x0010, /* Keepalive */ - FRCT_FIN = 0x0008, /* End of stream (stream) */ + FRCT_FIN = 0x0008, /* End of stream */ }; /* @@ -3780,24 +3780,39 @@ static int frcti_snd(struct frcti * frcti, return 0; } -/* 0-byte FRCT_FIN DATA so peer's flow_read returns 0 at this byte. */ -static void frcti_stream_fin_snd(struct frcti * frcti) +/* + * Stream: 0-byte FRCT_FIN DATA so peer's flow_read returns 0 at this + * byte. Msg: control packet with FRCT_FIN flag, snd_cr.seqno carried + * in pci->ackno (sender packs via frcti_pkt_snd's ackno parameter). + */ +static void frcti_fin_snd(struct frcti * frcti) { struct ssm_pk_buff * spb; bool already; + uint32_t fin_seqno; - assert(frcti->stream); + if (!(frcti->snd_cr.cflags & FRCTFLINGER)) + return; pthread_rwlock_wrlock(&frcti->lock); already = frcti->snd_fin_sent; frcti->snd_fin_sent = true; + fin_seqno = frcti->snd_cr.seqno; + + if (!already && !frcti->stream) + frcti->snd_fin_seqno = fin_seqno; pthread_rwlock_unlock(&frcti->lock); if (already) return; + if (!frcti->stream) { + frcti_pkt_snd(frcti, FRCT_FIN, fin_seqno, 0); + return; + } + if (frct_spb_reserve(frcti_data_hdr_len(frcti), &spb) < 0) return; @@ -3832,31 +3847,60 @@ static bool final_ack_due(struct frcti * frcti, return true; } -/* Drain-loop predicate: FLINGER cflag + unACK'd data below the FIN/seqno. */ +/* Snd-side has FLINGER cflag and unACK'd data below the FIN/seqno. */ +static __inline__ bool snd_drain_pending(struct frct_cr * snd_cr, + uint32_t edge) +{ + if (!(snd_cr->cflags & FRCTFLINGER)) + return false; + + return before(snd_cr->lwe, edge); +} + +/* Peer is still active and we haven't seen their FIN yet. */ +static __inline__ bool rcv_drain_pending(struct frcti * frcti, + struct frct_cr * rcv_cr, + uint64_t now_ns) +{ + if (frcti->rcv_fin_seen) + return false; + + return !ts_aged_ns(now_ns, rcv_cr->act, rcv_cr->inact); +} + +/* Drain-loop predicate: snd-side unACK'd data OR peer still active. */ static bool frcti_lingering(struct frcti * frcti) { + struct timespec now; struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; uint32_t edge; - bool linger; + uint64_t now_ns; + bool snd_linger; + bool rcv_linger; - /* Idempotent; FIN must be sent before any linger check uses it. */ - if (frcti->stream) - frcti_stream_fin_snd(frcti); + /* Idempotent; emits FIN once per side, both stream and msg. */ + frcti_fin_snd(frcti); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + now_ns = TS_TO_UINT64(now); pthread_rwlock_rdlock(&frcti->lock); snd_cr = &frcti->snd_cr; + rcv_cr = &frcti->rcv_cr; if (frcti->snd_fin_sent) edge = frcti->snd_fin_seqno; else edge = snd_cr->seqno; - linger = (snd_cr->cflags & FRCTFLINGER) && before(snd_cr->lwe, edge); + snd_linger = snd_drain_pending(snd_cr, edge); + rcv_linger = rcv_drain_pending(frcti, rcv_cr, now_ns); pthread_rwlock_unlock(&frcti->lock); - return linger; + return snd_linger || rcv_linger; } static time_t frcti_dealloc(struct frcti * frcti) @@ -3874,8 +3918,7 @@ static time_t frcti_dealloc(struct frcti * frcti) rcv_cr = &frcti->rcv_cr; /* Idempotent; usually already sent by frcti_lingering. */ - if (frcti->stream) - frcti_stream_fin_snd(frcti); + frcti_fin_snd(frcti); clock_gettime(PTHREAD_COND_CLOCK, &now); now_ns = TS_TO_UINT64(now); @@ -3960,6 +4003,17 @@ static void frcti_rcv(struct frcti * frcti, goto ctrl_done; } + /* Msg-mode FIN: control packet, FIN seqno carried in pci->ackno. */ + if ((flags & FRCT_FIN) && !(flags & FRCT_DATA)) { + pthread_rwlock_wrlock(&frcti->lock); + if (!frcti->rcv_fin_seen) { + frcti->rcv_fin_seen = true; + frcti->rcv_byte_fin = ntoh32(pci->ackno); + } + pthread_rwlock_unlock(&frcti->lock); + goto ctrl_done; + } + pthread_rwlock_wrlock(&frcti->lock); /* rcv_inact_check is a no-op for non-DATA non-DRF packets. */ |
