diff options
Diffstat (limited to 'src/lib/timerwheel.c')
| -rw-r--r-- | src/lib/timerwheel.c | 209 |
1 files changed, 97 insertions, 112 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index a9f3c72a..2c796c96 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2020 + * Ouroboros - Copyright (C) 2016 - 2026 * * Timerwheel * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -30,16 +30,12 @@ struct rxm { struct list_head next; uint32_t seqno; -#ifdef RXM_BUFFER_ON_HEAP - uint8_t * pkt; - size_t pkt_len; -#else - struct shm_du_buff * sdb; - uint8_t * head; - uint8_t * tail; +#ifndef RXM_BUFFER_ON_HEAP + struct ssm_pk_buff * spb; #endif + struct frct_pci * pkt; + size_t len; time_t t0; /* Time when original was sent (us). */ - size_t mul; /* RTO multiplier. */ struct frcti * frcti; int fd; int flow_id; /* Prevent rtx when fd reused. */ @@ -62,11 +58,9 @@ struct { struct list_head acks[ACKQ_SLOTS]; bool map[ACKQ_SLOTS][PROG_MAX_FLOWS]; - size_t prv_rxm; /* Last processed rxm slot at lvl 0. */ - size_t prv_ack; /* Last processed ack slot. */ + size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */ + size_t prv_ack; /* Last processed ack slot. */ pthread_mutex_t lock; - - bool in_use; } rw; static void timerwheel_fini(void) @@ -87,8 +81,8 @@ static void timerwheel_fini(void) #ifdef RXM_BUFFER_ON_HEAP free(rxm->pkt); #else - shm_du_buff_ack(rxm->sdb); - ipcp_sdb_release(rxm->sdb); + ssm_pk_buff_ack(rxm->spb); + ipcp_spb_release(rxm->spb); #endif free(rxm); } @@ -119,8 +113,10 @@ static int timerwheel_init(void) clock_gettime(PTHREAD_COND_CLOCK, &now); - rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1); for (i = 0; i < RXMQ_LVLS; ++i) { + rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1); + rw.prv_rxm[i] >>= (RXMQ_BUMP * i); + rw.prv_rxm[i] &= (RXMQ_SLOTS - 1); for (j = 0; j < RXMQ_SLOTS; ++j) list_head_init(&rw.rxms[i][j]); } @@ -142,39 +138,34 @@ static void timerwheel_move(void) size_t i; size_t j; - if (!__sync_bool_compare_and_swap(&rw.in_use, true, true)) - return; - pthread_mutex_lock(&rw.lock); - pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, - (void *) &rw.lock); + pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock); clock_gettime(PTHREAD_COND_CLOCK, &now); - rxm_slot = ts_to_ns(now) >> RXMQ_RES; - j = rw.prv_rxm; - rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1); + rxm_slot = ts_to_rxm_slot(now); for (i = 0; i < RXMQ_LVLS; ++i) { size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1); + j = rw.prv_rxm[i]; if (j_max_slot < j) j_max_slot += RXMQ_SLOTS; - while (j++ < j_max_slot) { list_for_each_safe(p, h, &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; + size_t slot; size_t rslot; ssize_t idx; - struct shm_du_buff * sdb; - uint8_t * head; + struct ssm_pk_buff * spb; + struct frct_pci * pci; struct flow * f; uint32_t snd_lwe; uint32_t rcv_lwe; - time_t rto; + size_t lvl = 0; r = list_entry(p, struct rxm, next); @@ -182,100 +173,103 @@ static void timerwheel_move(void) snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; - f = &ai.flows[r->fd]; + f = &proc.flows[r->fd]; #ifndef RXM_BUFFER_ON_HEAP - shm_du_buff_ack(r->sdb); + ssm_pk_buff_ack(r->spb); #endif if (f->frcti == NULL - || f->flow_id != r->flow_id) + || f->info.id != r->flow_id) goto cleanup; - pthread_rwlock_wrlock(&r->frcti->lock); + pthread_rwlock_rdlock(&r->frcti->lock); snd_lwe = snd_cr->lwe; rcv_lwe = rcv_cr->lwe; - rto = r->frcti->rto; pthread_rwlock_unlock(&r->frcti->lock); /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_lwe) < 0) + if (before(r->seqno, snd_lwe)) goto cleanup; /* Check for r-timer expiry. */ if (ts_to_ns(now) - r->t0 > r->frcti->r) goto flow_down; - if (r->frcti->probe - && (r->frcti->rttseq + 1) == r->seqno) + pthread_rwlock_wrlock(&r->frcti->lock); + + if (r->seqno == r->frcti->rttseq) { + r->frcti->rto += + r->frcti->rto >> RTO_DIV; r->frcti->probe = false; + } +#ifdef PROC_FLOW_STATS + r->frcti->n_rtx++; +#endif + rslot = r->frcti->rto >> RXMQ_RES; + + pthread_rwlock_unlock(&r->frcti->lock); + + /* Schedule at least in the next time slot. */ + slot = ts_to_ns(now) >> RXMQ_RES; + + while (rslot >= RXMQ_SLOTS) { + ++lvl; + rslot >>= RXMQ_BUMP; + slot >>= RXMQ_BUMP; + } + + if (lvl >= RXMQ_LVLS) /* Can't reschedule */ + goto flow_down; + + rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1); #ifdef RXM_BLOCKING - #ifdef RXM_BUFFER_ON_HEAP - if (ipcp_sdb_reserve(&sdb, r->pkt_len)) - #else - if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) - #endif + if (ipcp_spb_reserve(&spb, r->len) < 0) #else - #ifdef RXM_BUFFER_ON_HEAP - if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL, - &sdb)) - #else - if (shm_rdrbuff_alloc(ai.rdrb, - r->tail - r->head, NULL, - &sdb)) - #endif + if (ssm_pool_alloc(proc.pool, r->len, NULL, + &spb) < 0) #endif - goto reschedule; /* rbuff full */ - idx = shm_du_buff_get_idx(sdb); + goto reschedule; /* rdrbuff full */ - head = shm_du_buff_head(sdb); -#ifdef RXM_BUFFER_ON_HEAP - memcpy(head, r->pkt, r->pkt_len); -#else - memcpy(head, r->head, r->tail - r->head); - ipcp_sdb_release(r->sdb); - r->sdb = sdb; - r->head = head; - r->tail = shm_du_buff_tail(sdb); - shm_du_buff_wait_ack(sdb); + pci = (struct frct_pci *) ssm_pk_buff_head(spb); + memcpy(pci, r->pkt, r->len); +#ifndef RXM_BUFFER_ON_HEAP + ipcp_spb_release(r->spb); + r->spb = spb; + r->pkt = pci; + ssm_pk_buff_wait_ack(spb); #endif + idx = ssm_pk_buff_get_idx(spb); + /* Retransmit the copy. */ - ((struct frct_pci *) head)->ackno = - hton32(rcv_lwe); + pci->ackno = hton32(rcv_lwe); #ifdef RXM_BLOCKING - if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0) + if (ssm_rbuff_write_b(f->tx_rb, idx, NULL) < 0) #else - if (shm_rbuff_write(f->tx_rb, idx) == 0) + if (ssm_rbuff_write(f->tx_rb, idx) < 0) #endif - shm_flow_set_notify(f->set, f->flow_id, - FLOW_PKT); - reschedule: - r->mul++; - - /* Schedule at least in the next time slot. */ - rslot = (rxm_slot - + MAX(((rto * r->mul) >> RXMQ_RES), 1)) - & (RXMQ_SLOTS - 1); - - list_add_tail(&r->next, &rw.rxms[i][rslot]); - + goto flow_down; + ssm_flow_set_notify(f->set, f->info.id, + FLOW_PKT); + reschedule: + list_add(&r->next, &rw.rxms[lvl][rslot]); continue; - flow_down: - shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - cleanup: + flow_down: + ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + cleanup: #ifdef RXM_BUFFER_ON_HEAP free(r->pkt); #else - ipcp_sdb_release(r->sdb); + ipcp_spb_release(r->spb); #endif free(r); } } + rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1); /* Move up a level in the wheel. */ rxm_slot >>= RXMQ_BUMP; - j >>= RXMQ_BUMP; } ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; @@ -294,15 +288,14 @@ static void timerwheel_move(void) list_del(&a->next); - f = &ai.flows[a->fd]; + f = &proc.flows[a->fd]; rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false; - if (f->flow_id == a->flow_id && f->frcti != NULL) + if (f->info.id == a->flow_id && f->frcti != NULL) send_frct_pkt(a->frcti); free(a); - } } @@ -313,7 +306,7 @@ static void timerwheel_move(void) static int timerwheel_rxm(struct frcti * frcti, uint32_t seqno, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct timespec now; struct rxm * r; @@ -328,21 +321,19 @@ static int timerwheel_rxm(struct frcti * frcti, clock_gettime(PTHREAD_COND_CLOCK, &now); r->t0 = ts_to_ns(now); - r->mul = 0; r->seqno = seqno; r->frcti = frcti; + r->len = ssm_pk_buff_len(spb); #ifdef RXM_BUFFER_ON_HEAP - r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); - r->pkt = malloc(r->pkt_len); + r->pkt = malloc(r->len); if (r->pkt == NULL) { free(r); return -ENOMEM; } - memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len); + memcpy(r->pkt, ssm_pk_buff_head(spb), r->len); #else - r->sdb = sdb; - r->head = shm_du_buff_head(sdb); - r->tail = shm_du_buff_tail(sdb); + r->spb = spb; + r->pkt = (struct frct_pci *) ssm_pk_buff_head(spb); #endif pthread_rwlock_rdlock(&r->frcti->lock); @@ -350,7 +341,7 @@ static int timerwheel_rxm(struct frcti * frcti, slot = r->t0 >> RXMQ_RES; r->fd = frcti->fd; - r->flow_id = ai.flows[r->fd].flow_id; + r->flow_id = proc.flows[r->fd].info.id; pthread_rwlock_unlock(&r->frcti->lock); @@ -368,23 +359,21 @@ static int timerwheel_rxm(struct frcti * frcti, return -EPERM; } - slot = (slot + rto_slot) & (RXMQ_SLOTS - 1); + slot = (slot + rto_slot + 1) & (RXMQ_SLOTS - 1); pthread_mutex_lock(&rw.lock); list_add_tail(&r->next, &rw.rxms[lvl][slot]); #ifndef RXM_BUFFER_ON_HEAP - shm_du_buff_wait_ack(sdb); + ssm_pk_buff_wait_ack(spb); #endif pthread_mutex_unlock(&rw.lock); - __sync_bool_compare_and_swap(&rw.in_use, false, true); - return 0; } -static int timerwheel_ack(int fd, - struct frcti * frcti) +static int timerwheel_delayed_ack(int fd, + struct frcti * frcti) { struct timespec now; struct ack * a; @@ -396,18 +385,16 @@ static int timerwheel_ack(int fd, clock_gettime(PTHREAD_COND_CLOCK, &now); - slot = DELT_ACK >> ACKQ_RES; - if (slot >= ACKQ_SLOTS) { /* Out of timerwheel range. */ - free(a); - return -EPERM; - } + pthread_rwlock_rdlock(&frcti->lock); - slot = (((ts_to_ns(now) + DELT_ACK) >> ACKQ_RES) + 1) + slot = (((ts_to_ns(now) + (TICTIME << 1)) >> ACKQ_RES) + 1) & (ACKQ_SLOTS - 1); + pthread_rwlock_unlock(&frcti->lock); + a->fd = fd; a->frcti = frcti; - a->flow_id = ai.flows[fd].flow_id; + a->flow_id = proc.flows[fd].info.id; pthread_mutex_lock(&rw.lock); @@ -423,7 +410,5 @@ static int timerwheel_ack(int fd, pthread_mutex_unlock(&rw.lock); - __sync_bool_compare_and_swap(&rw.in_use, false, true); - return 0; } |
