diff options
Diffstat (limited to 'src/lib/frct.c')
| -rw-r--r-- | src/lib/frct.c | 832 |
1 files changed, 682 insertions, 150 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index 2322a039..fad2cf69 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2020 + * Ouroboros - Copyright (C) 2016 - 2026 * * Flow and Retransmission Control * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,24 +20,24 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -/* Default Delta-t parameters */ -#define DELT_MPL (60 * MILLION) /* us */ -#define DELT_A (1 * MILLION) /* us */ -#define DELT_R (20 * MILLION) /* us */ +#include <ouroboros/endian.h> -#define RQ_SIZE 1024 +#define DELT_RDV (100 * MILLION) /* ns */ +#define MAX_RDV (1 * BILLION) /* ns */ -#define FRCT_PCILEN (sizeof(struct frct_pci)) +#define FRCT "frct" +#define FRCT_PCILEN (sizeof(struct frct_pci)) +#define FRCT_NAME_STRLEN 32 struct frct_cr { - uint32_t lwe; - uint32_t rwe; + uint32_t lwe; /* Left window edge */ + uint32_t rwe; /* Right window edge */ - uint8_t cflags; - uint32_t seqno; + uint8_t cflags; + uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */ - time_t act; /* s */ - time_t inact; /* s */ + struct timespec act; /* Last seen activity */ + time_t inact; /* Inactivity (s) */ }; struct frcti { @@ -46,21 +46,36 @@ struct frcti { time_t mpl; time_t a; time_t r; + time_t rdv; - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ - time_t rto; /* retransmission timeout */ + time_t srtt; /* Smoothed rtt */ + time_t mdev; /* Deviation */ + time_t rto; /* Retransmission timeout */ uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ - + struct timespec t_probe; /* Probe time */ + bool probe; /* Probe active */ +#ifdef PROC_FLOW_STATS + size_t n_rtx; /* Number of rxm packets */ + size_t n_prb; /* Number of rtt probes */ + size_t n_rtt; /* Number of estimates */ + size_t n_dup; /* Duplicates received */ + size_t n_dak; /* Delayed ACKs received */ + size_t n_rdv; /* Number of rdv packets */ + size_t n_out; /* Packets out of window */ + size_t n_rqo; /* Packets out of rqueue */ +#endif struct frct_cr snd_cr; struct frct_cr rcv_cr; - struct rxmwheel * rw; ssize_t rq[RQ_SIZE]; pthread_rwlock_t lock; + + bool open; /* Window open/closed */ + struct timespec t_wnd; /* Window closed time */ + struct timespec t_rdvs; /* Last rendez-vous sent */ + pthread_cond_t cond; + pthread_mutex_t mtx; }; enum frct_flags { @@ -74,22 +89,259 @@ enum frct_flags { }; struct frct_pci { - uint16_t flags; + uint8_t flags; + uint8_t pad; /* 24 bit window! */ uint16_t window; uint32_t seqno; uint32_t ackno; } __attribute__((packed)); -#include <rxmwheel.c> +#ifdef PROC_FLOW_STATS -static struct frcti * frcti_create(int fd) +static int frct_rib_read(const char * path, + char * buf, + size_t len) { - struct frcti * frcti; - time_t delta_t; - ssize_t idx; struct timespec now; + char * entry; + struct flow * flow; + struct frcti * frcti; + int fd; + + (void) len; + + entry = strstr(path, RIB_SEPARATOR); + assert(entry); + *entry = '\0'; + + fd = atoi(path); + + flow = &proc.flows[fd]; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_rdlock(&proc.lock); + + frcti = flow->frcti; + + pthread_rwlock_rdlock(&frcti->lock); + + sprintf(buf, + "Maximum packet lifetime (ns): %20ld\n" + "Max time to Ack (ns): %20ld\n" + "Max time to Retransmit (ns): %20ld\n" + "Smoothed rtt (ns): %20ld\n" + "RTT standard deviation (ns): %20ld\n" + "Retransmit timeout RTO (ns): %20ld\n" + "Sender left window edge: %20u\n" + "Sender right window edge: %20u\n" + "Sender inactive (ns): %20lld\n" + "Sender current sequence number: %20u\n" + "Receiver left window edge: %20u\n" + "Receiver right window edge: %20u\n" + "Receiver inactive (ns): %20lld\n" + "Receiver last ack: %20u\n" + "Number of pkt retransmissions: %20zu\n" + "Number of rtt probes: %20zu\n" + "Number of rtt estimates: %20zu\n" + "Number of duplicates received: %20zu\n" + "Number of delayed acks received: %20zu\n" + "Number of rendez-vous sent: %20zu\n" + "Number of packets out of window: %20zu\n" + "Number of packets out of rqueue: %20zu\n", + frcti->mpl, + frcti->a, + frcti->r, + frcti->srtt, + frcti->mdev, + frcti->rto, + frcti->snd_cr.lwe, + frcti->snd_cr.rwe, + ts_diff_ns(&now, &frcti->snd_cr.act), + frcti->snd_cr.seqno, + frcti->rcv_cr.lwe, + frcti->rcv_cr.rwe, + ts_diff_ns(&now, &frcti->rcv_cr.act), + frcti->rcv_cr.seqno, + frcti->n_rtx, + frcti->n_prb, + frcti->n_rtt, + frcti->n_dup, + frcti->n_dak, + frcti->n_rdv, + frcti->n_out, + frcti->n_rqo); + + pthread_rwlock_unlock(&flow->frcti->lock); + + pthread_rwlock_unlock(&proc.lock); + + return strlen(buf); +} + +static int frct_rib_readdir(char *** buf) +{ + *buf = malloc(sizeof(**buf)); + if (*buf == NULL) + goto fail_malloc; + + (*buf)[0] = strdup("frct"); + if ((*buf)[0] == NULL) + goto fail_strdup; + + return 1; + + fail_strdup: + free(*buf); + fail_malloc: + return -ENOMEM; +} + +static int frct_rib_getattr(const char * path, + struct rib_attr * attr) +{ + (void) path; + (void) attr; + + attr->size = 1189; + attr->mtime = 0; + + return 0; +} + + +static struct rib_ops r_ops = { + .read = frct_rib_read, + .readdir = frct_rib_readdir, + .getattr = frct_rib_getattr +}; + +#endif /* PROC_FLOW_STATS */ + +static bool before(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq1 - seq2) < 0; +} + +static bool after(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq2 - seq1) < 0; +} + +static void __send_frct_pkt(int fd, + uint8_t flags, + uint32_t ackno, + uint32_t rwe) +{ + struct ssm_pk_buff * spb; + struct frct_pci * pci; + ssize_t idx; + struct flow * f; + + /* Raw calls needed to bypass frcti. */ +#ifdef RXM_BLOCKING + idx = ssm_pool_alloc_b(proc.pool, sizeof(*pci), NULL, &spb, NULL); +#else + idx = ssm_pool_alloc(proc.pool, sizeof(*pci), NULL, &spb); +#endif + if (idx < 0) + return; + + pci = (struct frct_pci *) ssm_pk_buff_head(spb); + memset(pci, 0, sizeof(*pci)); + + *((uint32_t *) pci) = hton32(rwe); + + pci->flags = flags; + pci->ackno = hton32(ackno); + + f = &proc.flows[fd]; + + if (spb_encrypt(f, spb) < 0) + goto fail; + +#ifdef RXM_BLOCKING + if (ssm_rbuff_write_b(f->tx_rb, idx, NULL)) +#else + if (ssm_rbuff_write(f->tx_rb, idx)) +#endif + goto fail; + + ssm_flow_set_notify(f->set, f->info.id, FLOW_PKT); + + return; + + fail: + ipcp_spb_release(spb); + return; +} + +static void send_frct_pkt(struct frcti * frcti) +{ + struct timespec now; + time_t diff; + uint32_t ackno; + uint32_t rwe; + int fd; + + assert(frcti); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&frcti->lock); + + if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + fd = frcti->fd; + ackno = frcti->rcv_cr.lwe; + rwe = frcti->rcv_cr.rwe; + + diff = ts_diff_ns(&now, &frcti->rcv_cr.act); + if (diff > frcti->a) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + diff = ts_diff_ns(&now, &frcti->snd_cr.act); + if (diff < TICTIME) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); + + __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); +} + +static void __send_rdv(int fd) +{ + __send_frct_pkt(fd, FRCT_RDVS, 0, 0); +} + +static struct frcti * frcti_create(int fd, + time_t a, + time_t r, + time_t mpl) +{ + struct frcti * frcti; + ssize_t idx; + struct timespec now; + pthread_condattr_t cattr; +#ifdef PROC_FLOW_STATS + char frctstr[FRCT_NAME_STRLEN + 1]; +#endif + mpl *= MILLION; + a *= BILLION; + r *= BILLION; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) @@ -100,44 +352,78 @@ static struct frcti * frcti_create(int fd) if (pthread_rwlock_init(&frcti->lock, NULL)) goto fail_lock; - for (idx = 0; idx < RQ_SIZE; ++idx) - frcti->rq[idx] = -1; + if (pthread_mutex_init(&frcti->mtx, NULL)) + goto fail_mutex; - clock_gettime(CLOCK_REALTIME_COARSE, &now); + if (pthread_condattr_init(&cattr)) + goto fail_cattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&frcti->cond, &cattr)) + goto fail_cond; - frcti->mpl = DELT_MPL; - frcti->a = DELT_A; - frcti->r = DELT_R; - frcti->fd = fd; +#ifdef PROC_FLOW_STATS + sprintf(frctstr, "%d", fd); + if (rib_reg(frctstr, &r_ops)) + goto fail_rib_reg; +#endif + pthread_condattr_destroy(&cattr); - delta_t = frcti->mpl + frcti->a + frcti->r; + for (idx = 0; idx < RQ_SIZE; ++idx) + frcti->rq[idx] = -1; - frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ - frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + clock_gettime(PTHREAD_COND_CLOCK, &now); - frcti->rttseq = 0; - frcti->probe = false; + frcti->mpl = mpl; + frcti->a = a; + frcti->r = r; + frcti->rdv = DELT_RDV; + frcti->fd = fd; - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ - frcti->rto = 20000; /* initial rxm will be after 20 ms */ - frcti->rw = NULL; - if (ai.flows[fd].qs.loss == 0) { - frcti->snd_cr.cflags |= FRCTFRTX; + frcti->rttseq = 0; + frcti->probe = false; + + frcti->srtt = 0; /* Updated on first ACK */ + frcti->mdev = 10 * MILLION; /* Updated on first ACK */ + frcti->rto = BILLION; /* Initial rxm will be after 1 s */ +#ifdef PROC_FLOW_STATS + frcti->n_rtx = 0; + frcti->n_prb = 0; + frcti->n_rtt = 0; + frcti->n_dup = 0; + frcti->n_dak = 0; + frcti->n_rdv = 0; + frcti->n_out = 0; + frcti->n_rqo = 0; +#endif + if (proc.flows[fd].info.qs.loss == 0) { + frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; frcti->rcv_cr.cflags |= FRCTFRTX; - frcti->rw = rxmwheel_create(); - if (frcti->rw == NULL) - goto fail_rw; } - frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */ - frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + frcti->snd_cr.cflags |= FRCTFRESCNTL; + + frcti->snd_cr.rwe = START_WINDOW; + frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ + frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1); + + frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */ + frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1); return frcti; - fail_rw: +#ifdef PROC_FLOW_STATS + fail_rib_reg: + pthread_cond_destroy(&frcti->cond); +#endif + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&frcti->mtx); + fail_mutex: pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); @@ -147,24 +433,23 @@ static struct frcti * frcti_create(int fd) static void frcti_destroy(struct frcti * frcti) { - /* - * FIXME: In case of reliable transmission we should - * make sure everything we sent is acked. - */ - - if (frcti->rw != NULL) - rxmwheel_destroy(frcti->rw); - +#ifdef PROC_FLOW_STATS + char frctstr[FRCT_NAME_STRLEN + 1]; + sprintf(frctstr, "%d", frcti->fd); + rib_unreg(frctstr); +#endif + pthread_cond_destroy(&frcti->cond); + pthread_mutex_destroy(&frcti->mtx); pthread_rwlock_destroy(&frcti->lock); free(frcti); } -static uint16_t frcti_getconf(struct frcti * frcti) +static uint16_t frcti_getflags(struct frcti * frcti) { uint16_t ret; - assert (frcti); + assert(frcti); pthread_rwlock_rdlock(&frcti->lock); @@ -175,14 +460,147 @@ static uint16_t frcti_getconf(struct frcti * frcti) return ret; } -#define frcti_queued_pdu(frcti) \ - (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) +static void frcti_setflags(struct frcti * frcti, + uint16_t flags) +{ + flags |= FRCTFRTX; /* Should not be set by command */ + + assert(frcti); + + pthread_rwlock_wrlock(&frcti->lock); + + frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */ + + frcti->snd_cr.cflags &= flags; + + pthread_rwlock_unlock(&frcti->lock); +} + +#define frcti_queued_pdu(frcti) \ + (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) + +#define frcti_snd(frcti, spb) \ + (frcti == NULL ? 0 : __frcti_snd(frcti, spb)) + +#define frcti_rcv(frcti, spb) \ + (frcti == NULL ? 0 : __frcti_rcv(frcti, spb)) -#define frcti_snd(frcti, sdb) \ - (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) +#define frcti_dealloc(frcti) \ + (frcti == NULL ? 0 : __frcti_dealloc(frcti)) + +#define frcti_is_window_open(frcti) \ + (frcti == NULL ? true : __frcti_is_window_open(frcti)) + +#define frcti_window_wait(frcti, abstime) \ + (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime)) + + +static bool __frcti_is_window_open(struct frcti * frcti) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + bool ret = true; + + pthread_rwlock_rdlock(&frcti->lock); -#define frcti_rcv(frcti, sdb) \ - (frcti == NULL ? idx : __frcti_rcv(frcti, sdb)) + if (snd_cr->cflags & FRCTFRESCNTL) + ret = before(snd_cr->seqno, snd_cr->rwe); + + if (!ret) { + struct timespec now; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_mutex_lock(&frcti->mtx); + if (frcti->open) { + frcti->open = false; + frcti->t_wnd = now; + frcti->t_rdvs = now; + } else { + time_t diff; + diff = ts_diff_ns(&now, &frcti->t_wnd); + if (diff > MAX_RDV) { + pthread_mutex_unlock(&frcti->mtx); + pthread_rwlock_unlock(&frcti->lock); + return false; + } + + diff = ts_diff_ns(&now, &frcti->t_rdvs); + if (diff > frcti->rdv) { + frcti->t_rdvs = now; + __send_rdv(frcti->fd); +#ifdef PROC_FLOW_STATS + frcti->n_rdv++; +#endif + + } + } + + pthread_mutex_unlock(&frcti->mtx); + } + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} + +static int __frcti_window_wait(struct frcti * frcti, + struct timespec * abstime) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + int ret = 0; + + pthread_rwlock_rdlock(&frcti->lock); + + if (!(snd_cr->cflags & FRCTFRESCNTL)) { + pthread_rwlock_unlock(&frcti->lock); + return 0; + } + + while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { + struct timespec now; + pthread_rwlock_unlock(&frcti->lock); + pthread_mutex_lock(&frcti->mtx); + + if (frcti->open) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + + frcti->t_wnd = now; + frcti->t_rdvs = now; + frcti->open = false; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx); + + ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime); + + pthread_cleanup_pop(false); + + if (ret == -ETIMEDOUT) { + time_t diff; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + diff = ts_diff_ns(&now, &frcti->t_wnd); + if (diff > MAX_RDV) { + pthread_mutex_unlock(&frcti->mtx); + return -ECONNRESET; /* write fails! */ + } + + diff = ts_diff_ns(&now, &frcti->t_rdvs); + if (diff > frcti->rdv) { + frcti->t_rdvs = now; + __send_rdv(frcti->fd); + } + } + + pthread_mutex_unlock(&frcti->mtx); + pthread_rwlock_rdlock(&frcti->lock); + } + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} static ssize_t __frcti_queued_pdu(struct frcti * frcti) { @@ -199,6 +617,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) idx = frcti->rq[pos]; if (idx != -1) { ++frcti->rcv_cr.lwe; + ++frcti->rcv_cr.rwe; frcti->rq[pos] = -1; } @@ -207,54 +626,92 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) return idx; } -static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) +static ssize_t __frcti_pdu_ready(struct frcti * frcti) { - struct frct_pci * pci; + ssize_t idx; + size_t pos; - pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); - if (pci != NULL) - memset(pci, 0, sizeof(*pci)); + assert(frcti); - return pci; -} + /* See if we already have the next PDU. */ + pthread_rwlock_rdlock(&frcti->lock); -static bool before(uint32_t seq1, - uint32_t seq2) -{ - return (int32_t)(seq1 - seq2) < 0; + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); + idx = frcti->rq[pos]; + + pthread_rwlock_unlock(&frcti->lock); + + return idx; } -static bool after(uint32_t seq1, - uint32_t seq2) +#include <timerwheel.c> + +/* + * Send a final ACK for everything that has not been ACK'd. + * If the flow should be kept active for retransmission, + * the returned time will be negative. + */ +static time_t __frcti_dealloc(struct frcti * frcti) { - return (int32_t)(seq2 - seq1) < 0; + struct timespec now; + time_t wait; + int ackno; + int fd = -1; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_rdlock(&frcti->lock); + + ackno = frcti->rcv_cr.lwe; + if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno) + fd = frcti->fd; + + wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec, + frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec); + wait = MAX(wait, 0); + + if (frcti->snd_cr.cflags & FRCTFLINGER + && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno)) + wait = -wait; + + pthread_rwlock_unlock(&frcti->lock); + + if (fd != -1) + __send_frct_pkt(fd, FRCT_ACK, ackno, 0); + + return wait; } static int __frcti_snd(struct frcti * frcti, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct frct_pci * pci; struct timespec now; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; + bool rtx; assert(frcti); + assert(ssm_pk_buff_len(spb) != 0); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; - if (frcti->rw != NULL) - rxmwheel_move(frcti->rw); + timerwheel_move(); - pci = frcti_alloc_head(sdb); + pci = (struct frct_pci *) ssm_pk_buff_head_alloc(spb, FRCT_PCILEN); if (pci == NULL) - return -1; + return -ENOMEM; + + memset(pci, 0, sizeof(*pci)); - clock_gettime(CLOCK_REALTIME, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_wrlock(&frcti->lock); + rtx = snd_cr->cflags & FRCTFRTX; + pci->flags |= FRCT_DATA; /* Set DRF if there are no unacknowledged packets. */ @@ -262,141 +719,216 @@ static int __frcti_snd(struct frcti * frcti, pci->flags |= FRCT_DRF; /* Choose a new sequence number if sender inactivity expired. */ - if (now.tv_sec - snd_cr->act > snd_cr->inact) { + if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) { /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); - frcti->snd_cr.lwe = snd_cr->seqno - 1; + snd_cr->lwe = snd_cr->seqno; + snd_cr->rwe = snd_cr->lwe + START_WINDOW; } seqno = snd_cr->seqno; pci->seqno = hton32(seqno); - if (!(snd_cr->cflags & FRCTFRTX)) { + if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) { + pci->flags |= FRCT_FC; + *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF); + } + + if (!rtx) { snd_cr->lwe++; } else { if (!frcti->probe) { frcti->rttseq = snd_cr->seqno; frcti->t_probe = now; frcti->probe = true; +#ifdef PROC_FLOW_STATS + frcti->n_prb++; +#endif } - - if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { + if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); + rcv_cr->seqno = rcv_cr->lwe; } } snd_cr->seqno++; - snd_cr->act = now.tv_sec; + snd_cr->act = now; pthread_rwlock_unlock(&frcti->lock); - if (frcti->rw != NULL) - rxmwheel_add(frcti->rw, frcti, seqno, sdb); + if (rtx) + timerwheel_rxm(frcti, seqno, spb); return 0; } static void rtt_estimator(struct frcti * frcti, - time_t mrtt_us) + time_t mrtt) { - time_t srtt = frcti->srtt_us; - time_t rttvar = frcti->mdev_us; + time_t srtt = frcti->srtt; + time_t rttvar = frcti->mdev; if (srtt == 0) { /* first measurement */ - srtt = mrtt_us; - rttvar = mrtt_us >> 1; - + srtt = mrtt; + rttvar = mrtt >> 1; } else { - time_t delta = mrtt_us - srtt; + time_t delta = mrtt - srtt; srtt += (delta >> 3); - rttvar -= rttvar >> 2; - rttvar += ABS(delta) >> 2; + delta = (ABS(delta) - rttvar) >> 2; +#ifdef FRCT_LINUX_RTT_ESTIMATOR + if (delta < 0) + delta >>= 3; +#endif + rttvar += delta; } - - frcti->srtt_us = MAX(1U, srtt); - frcti->mdev_us = MAX(1U, rttvar); - frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2)); +#ifdef PROC_FLOW_STATS + frcti->n_rtt++; +#endif + frcti->srtt = MAX(1000L, srtt); + frcti->mdev = MAX(100L, rttvar); + frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL)); } -/* Always queues the packet on the RQ for the application. */ -static ssize_t __frcti_rcv(struct frcti * frcti, - struct shm_du_buff * sdb) +/* Always queues the next application packet on the RQ. */ +static void __frcti_rcv(struct frcti * frcti, + struct ssm_pk_buff * spb) { ssize_t idx; + size_t pos; struct frct_pci * pci; struct timespec now; - struct frct_cr * snd_cr; struct frct_cr * rcv_cr; + struct frct_cr * snd_cr; uint32_t seqno; + uint32_t ackno; + uint32_t rwe; + int fd = -1; assert(frcti); rcv_cr = &frcti->rcv_cr; snd_cr = &frcti->snd_cr; - clock_gettime(CLOCK_REALTIME, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); - pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); + pci = (struct frct_pci *) ssm_pk_buff_head_release(spb, FRCT_PCILEN); - idx = shm_du_buff_get_idx(sdb); + idx = ssm_pk_buff_get_idx(spb); seqno = ntoh32(pci->seqno); + pos = seqno & (RQ_SIZE - 1); pthread_rwlock_wrlock(&frcti->lock); - if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { - if (pci->flags & FRCT_DRF) /* New run. */ + if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) { + if (pci->flags & FRCT_DRF) { /* New run. */ rcv_cr->lwe = seqno; - else + rcv_cr->rwe = seqno + RQ_SIZE; + rcv_cr->seqno = seqno; + } else if (pci->flags & FRCT_DATA) { goto drop_packet; + } } - if (before(seqno, rcv_cr->lwe)) - goto drop_packet; + rcv_cr->act = now; - if (rcv_cr->cflags & FRCTFRTX) { - if (pci->flags & FRCT_ACK) { - uint32_t ackno = ntoh32(pci->ackno); - /* Check for duplicate (old) acks. */ - if (after(ackno, snd_cr->lwe)) - snd_cr->lwe = ackno; - - if (frcti->probe && after(ackno, frcti->rttseq)) { - rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, - &now)); - frcti->probe = false; - } + /* For now, just send an immediate window update. */ + if (pci->flags & FRCT_RDVS) { + fd = frcti->fd; + rwe = rcv_cr->rwe; + pthread_rwlock_unlock(&frcti->lock); + + __send_frct_pkt(fd, FRCT_FC, 0, rwe); + + ssm_pool_remove(proc.pool, idx); + return; + } + + if (pci->flags & FRCT_ACK) { + ackno = ntoh32(pci->ackno); + if (after(ackno, frcti->snd_cr.lwe)) + frcti->snd_cr.lwe = ackno; + + if (frcti->probe && after(ackno, frcti->rttseq)) { +#ifdef PROC_FLOW_STATS + if (!(pci->flags & FRCT_DATA)) + frcti->n_dak++; +#endif + rtt_estimator(frcti, ts_diff_ns(&now, &frcti->t_probe)); + frcti->probe = false; } + } - if (seqno == rcv_cr->lwe) { - ++frcti->rcv_cr.lwe; - } else { - size_t pos = seqno & (RQ_SIZE - 1); - if ((seqno - rcv_cr->lwe) >= RQ_SIZE) - goto drop_packet; /* Out of rq. */ + if (pci->flags & FRCT_FC) { + uint32_t rwe; + + rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); + rwe |= snd_cr->rwe & 0xFF000000; + + /* Rollover for 24 bit */ + if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF) + rwe += 0x01000000; + + snd_cr->rwe = rwe; - if (frcti->rq[pos] != -1) - goto drop_packet; /* Duplicate in rq */ + pthread_mutex_lock(&frcti->mtx); + if (!frcti->open) { + frcti->open = true; + pthread_cond_broadcast(&frcti->cond); + } + pthread_mutex_unlock(&frcti->mtx); + } + + if (!(pci->flags & FRCT_DATA)) + goto drop_packet; + + if (before(seqno, rcv_cr->lwe)) { + rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */ +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif + goto drop_packet; + } - frcti->rq[pos] = idx; - idx = -EAGAIN; + if (rcv_cr->cflags & FRCTFRTX) { + + if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */ +#ifdef PROC_FLOW_STATS + frcti->n_out++; +#endif + goto drop_packet; } + + if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) { +#ifdef PROC_FLOW_STATS + frcti->n_rqo++; +#endif + goto drop_packet; /* Out of rq. */ + } + if (frcti->rq[pos] != -1) { +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif + goto drop_packet; /* Duplicate in rq. */ + } + fd = frcti->fd; } else { - rcv_cr->lwe = seqno + 1; + rcv_cr->lwe = seqno; } - rcv_cr->act = now.tv_sec; + frcti->rq[pos] = idx; pthread_rwlock_unlock(&frcti->lock); - if (frcti->rw != NULL) - rxmwheel_move(frcti->rw); + if (fd != -1) + timerwheel_delayed_ack(fd, frcti); - return idx; + return; drop_packet: pthread_rwlock_unlock(&frcti->lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; + ssm_pool_remove(proc.pool, idx); + send_frct_pkt(frcti); + return; } |
