summaryrefslogtreecommitdiff
path: root/src/lib/frct.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r--src/lib/frct.c91
1 files changed, 50 insertions, 41 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2e8955e3..cf1b60e2 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -1124,33 +1124,6 @@ struct rxm_entry {
uint8_t pkt[]; /* flexible — sized at alloc time */
};
-static struct rxm_entry * rxm_entry_create(struct frcti * frcti,
- uint32_t seqno,
- const struct ssm_pk_buff * spb)
-{
- struct rxm_entry * r;
- struct timespec now;
- size_t len = ssm_pk_buff_len(spb);
-
- r = malloc(sizeof(*r) + len);
- if (r == NULL) {
- STAT_BUMP(frcti, rxm_arm_fail);
- return NULL;
- }
-
- memcpy(r->pkt, ssm_pk_buff_head(spb), len);
- r->len = len;
- r->frcti = frcti;
- r->seqno = seqno;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- r->t0 = TS_TO_UINT64(now);
-
- tw_init_entry(&r->tw);
-
- return r;
-}
-
static void rxm_entry_destroy(struct rxm_entry * r)
{
free(r);
@@ -1324,18 +1297,41 @@ static void rxm_due(void * arg)
rxm_entry_destroy(r);
}
-static int rxm_arm(struct frcti * frcti,
- uint32_t seqno,
- const struct ssm_pk_buff * spb)
+/* Pre-allocate rxm entry so frcti_snd can fail before committing seqno. */
+static struct rxm_entry * rxm_alloc(struct frcti * frcti,
+ size_t pkt_len)
{
struct rxm_entry * r;
- time_t rto;
- uint8_t rto_mul;
- uint64_t deadline;
- r = rxm_entry_create(frcti, seqno, spb);
- if (r == NULL)
- return -ENOMEM;
+ r = malloc(sizeof(*r) + pkt_len);
+ if (r == NULL) {
+ STAT_BUMP(frcti, rxm_arm_fail);
+ return NULL;
+ }
+
+ r->frcti = frcti;
+ tw_init_entry(&r->tw);
+
+ return r;
+}
+
+static void rxm_arm(struct frcti * frcti,
+ uint32_t seqno,
+ struct rxm_entry * r,
+ const struct ssm_pk_buff * spb)
+{
+ struct timespec now;
+ time_t rto;
+ uint8_t rto_mul;
+ uint64_t deadline;
+ size_t len = ssm_pk_buff_len(spb);
+
+ memcpy(r->pkt, ssm_pk_buff_head(spb), len);
+ r->len = len;
+ r->seqno = seqno;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ r->t0 = TS_TO_UINT64(now);
rto = LOAD_RELAXED(&frcti->rto);
rto_mul = LOAD_RELAXED(&frcti->rto_mul);
@@ -1343,14 +1339,14 @@ static int rxm_arm(struct frcti * frcti,
pthread_rwlock_wrlock(&frcti->lock);
+ assert(before(seqno, frcti->snd_cr.lwe + RQ_SIZE));
+
list_add_tail(&r->next, &frcti->rxm_list);
STORE_RELEASE(&frcti->snd_slots[RQ_SLOT(seqno)].rxm, r);
pthread_rwlock_unlock(&frcti->lock);
tw_post(&r->tw, deadline, rxm_due, r);
-
- return 0;
}
static void rxm_cancel_all(struct frcti * frcti)
@@ -2135,6 +2131,7 @@ static void sack_rxm_snd(struct frcti * frcti,
{
struct ssm_pk_buff * spb;
const struct frct_pci * pci;
+ struct rxm_entry * rxm;
uint32_t rcv_lwe;
uint32_t seqno;
int ret;
@@ -2148,11 +2145,12 @@ static void sack_rxm_snd(struct frcti * frcti,
pci = (const struct frct_pci *) ssm_pk_buff_head(spb);
seqno = ntoh32(pci->seqno);
- /* Register fresh rxm before send; old entry self-cleans. */
- if (rxm_arm(frcti, seqno, spb) < 0) {
+ rxm = rxm_alloc(frcti, ssm_pk_buff_len(spb));
+ if (rxm == NULL) {
frct_spb_release(spb);
return;
}
+ rxm_arm(frcti, seqno, rxm, spb);
STAT_BUMP(frcti, rxm_sack);
ret = frct_tx(frcti, spb);
@@ -3673,6 +3671,7 @@ static int frcti_snd(struct frcti * frcti,
struct timespec now;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
+ struct rxm_entry * rxm = NULL;
uint32_t seqno;
uint16_t pci_flags = 0;
bool rtx;
@@ -3699,6 +3698,15 @@ static int frcti_snd(struct frcti * frcti,
if (pci == NULL)
return -ENOMEM;
+ /* Pre-allocate rxm so alloc fail can't orphan a seqno. */
+ if (snd_cr->cflags & FRCTFRTX) {
+ rxm = rxm_alloc(frcti, ssm_pk_buff_len(spb));
+ if (rxm == NULL) {
+ ssm_pk_buff_pop(spb, frcti_data_hdr_len(frcti));
+ return -ENOMEM;
+ }
+ }
+
memset(pci, 0, FRCT_PCILEN);
if (frcti->stream)
@@ -3773,7 +3781,8 @@ static int frcti_snd(struct frcti * frcti,
frcti_rttp_snd(frcti, probe_id, 0, probe_nonce);
if (rtx) {
- rxm_arm(frcti, seqno, spb);
+ assert(rxm != NULL);
+ rxm_arm(frcti, seqno, rxm, spb);
tlp_arm(frcti);
}