summaryrefslogtreecommitdiff
path: root/src/lib/frct.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r--src/lib/frct.c832
1 files changed, 682 insertions, 150 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2322a039..fad2cf69 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2020
+ * Ouroboros - Copyright (C) 2016 - 2026
*
* Flow and Retransmission Control
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -20,24 +20,24 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-/* Default Delta-t parameters */
-#define DELT_MPL (60 * MILLION) /* us */
-#define DELT_A (1 * MILLION) /* us */
-#define DELT_R (20 * MILLION) /* us */
+#include <ouroboros/endian.h>
-#define RQ_SIZE 1024
+#define DELT_RDV (100 * MILLION) /* ns */
+#define MAX_RDV (1 * BILLION) /* ns */
-#define FRCT_PCILEN (sizeof(struct frct_pci))
+#define FRCT "frct"
+#define FRCT_PCILEN (sizeof(struct frct_pci))
+#define FRCT_NAME_STRLEN 32
struct frct_cr {
- uint32_t lwe;
- uint32_t rwe;
+ uint32_t lwe; /* Left window edge */
+ uint32_t rwe; /* Right window edge */
- uint8_t cflags;
- uint32_t seqno;
+ uint8_t cflags;
+ uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */
- time_t act; /* s */
- time_t inact; /* s */
+ struct timespec act; /* Last seen activity */
+ time_t inact; /* Inactivity (s) */
};
struct frcti {
@@ -46,21 +46,36 @@ struct frcti {
time_t mpl;
time_t a;
time_t r;
+ time_t rdv;
- time_t srtt_us; /* smoothed rtt */
- time_t mdev_us; /* mdev */
- time_t rto; /* retransmission timeout */
+ time_t srtt; /* Smoothed rtt */
+ time_t mdev; /* Deviation */
+ time_t rto; /* Retransmission timeout */
uint32_t rttseq;
- struct timespec t_probe; /* probe time */
- bool probe; /* probe active */
-
+ struct timespec t_probe; /* Probe time */
+ bool probe; /* Probe active */
+#ifdef PROC_FLOW_STATS
+ size_t n_rtx; /* Number of rxm packets */
+ size_t n_prb; /* Number of rtt probes */
+ size_t n_rtt; /* Number of estimates */
+ size_t n_dup; /* Duplicates received */
+ size_t n_dak; /* Delayed ACKs received */
+ size_t n_rdv; /* Number of rdv packets */
+ size_t n_out; /* Packets out of window */
+ size_t n_rqo; /* Packets out of rqueue */
+#endif
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
- struct rxmwheel * rw;
ssize_t rq[RQ_SIZE];
pthread_rwlock_t lock;
+
+ bool open; /* Window open/closed */
+ struct timespec t_wnd; /* Window closed time */
+ struct timespec t_rdvs; /* Last rendez-vous sent */
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
};
enum frct_flags {
@@ -74,22 +89,259 @@ enum frct_flags {
};
struct frct_pci {
- uint16_t flags;
+ uint8_t flags;
+ uint8_t pad; /* 24 bit window! */
uint16_t window;
uint32_t seqno;
uint32_t ackno;
} __attribute__((packed));
-#include <rxmwheel.c>
+#ifdef PROC_FLOW_STATS
-static struct frcti * frcti_create(int fd)
+static int frct_rib_read(const char * path,
+ char * buf,
+ size_t len)
{
- struct frcti * frcti;
- time_t delta_t;
- ssize_t idx;
struct timespec now;
+ char * entry;
+ struct flow * flow;
+ struct frcti * frcti;
+ int fd;
+
+ (void) len;
+
+ entry = strstr(path, RIB_SEPARATOR);
+ assert(entry);
+ *entry = '\0';
+
+ fd = atoi(path);
+
+ flow = &proc.flows[fd];
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_rdlock(&proc.lock);
+
+ frcti = flow->frcti;
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ sprintf(buf,
+ "Maximum packet lifetime (ns): %20ld\n"
+ "Max time to Ack (ns): %20ld\n"
+ "Max time to Retransmit (ns): %20ld\n"
+ "Smoothed rtt (ns): %20ld\n"
+ "RTT standard deviation (ns): %20ld\n"
+ "Retransmit timeout RTO (ns): %20ld\n"
+ "Sender left window edge: %20u\n"
+ "Sender right window edge: %20u\n"
+ "Sender inactive (ns): %20lld\n"
+ "Sender current sequence number: %20u\n"
+ "Receiver left window edge: %20u\n"
+ "Receiver right window edge: %20u\n"
+ "Receiver inactive (ns): %20lld\n"
+ "Receiver last ack: %20u\n"
+ "Number of pkt retransmissions: %20zu\n"
+ "Number of rtt probes: %20zu\n"
+ "Number of rtt estimates: %20zu\n"
+ "Number of duplicates received: %20zu\n"
+ "Number of delayed acks received: %20zu\n"
+ "Number of rendez-vous sent: %20zu\n"
+ "Number of packets out of window: %20zu\n"
+ "Number of packets out of rqueue: %20zu\n",
+ frcti->mpl,
+ frcti->a,
+ frcti->r,
+ frcti->srtt,
+ frcti->mdev,
+ frcti->rto,
+ frcti->snd_cr.lwe,
+ frcti->snd_cr.rwe,
+ ts_diff_ns(&now, &frcti->snd_cr.act),
+ frcti->snd_cr.seqno,
+ frcti->rcv_cr.lwe,
+ frcti->rcv_cr.rwe,
+ ts_diff_ns(&now, &frcti->rcv_cr.act),
+ frcti->rcv_cr.seqno,
+ frcti->n_rtx,
+ frcti->n_prb,
+ frcti->n_rtt,
+ frcti->n_dup,
+ frcti->n_dak,
+ frcti->n_rdv,
+ frcti->n_out,
+ frcti->n_rqo);
+
+ pthread_rwlock_unlock(&flow->frcti->lock);
+
+ pthread_rwlock_unlock(&proc.lock);
+
+ return strlen(buf);
+}
+
+static int frct_rib_readdir(char *** buf)
+{
+ *buf = malloc(sizeof(**buf));
+ if (*buf == NULL)
+ goto fail_malloc;
+
+ (*buf)[0] = strdup("frct");
+ if ((*buf)[0] == NULL)
+ goto fail_strdup;
+
+ return 1;
+
+ fail_strdup:
+ free(*buf);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int frct_rib_getattr(const char * path,
+ struct rib_attr * attr)
+{
+ (void) path;
+ (void) attr;
+
+ attr->size = 1189;
+ attr->mtime = 0;
+
+ return 0;
+}
+
+
+static struct rib_ops r_ops = {
+ .read = frct_rib_read,
+ .readdir = frct_rib_readdir,
+ .getattr = frct_rib_getattr
+};
+
+#endif /* PROC_FLOW_STATS */
+
+static bool before(uint32_t seq1,
+ uint32_t seq2)
+{
+ return (int32_t)(seq1 - seq2) < 0;
+}
+
+static bool after(uint32_t seq1,
+ uint32_t seq2)
+{
+ return (int32_t)(seq2 - seq1) < 0;
+}
+
+static void __send_frct_pkt(int fd,
+ uint8_t flags,
+ uint32_t ackno,
+ uint32_t rwe)
+{
+ struct ssm_pk_buff * spb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct flow * f;
+
+ /* Raw calls needed to bypass frcti. */
+#ifdef RXM_BLOCKING
+ idx = ssm_pool_alloc_b(proc.pool, sizeof(*pci), NULL, &spb, NULL);
+#else
+ idx = ssm_pool_alloc(proc.pool, sizeof(*pci), NULL, &spb);
+#endif
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) ssm_pk_buff_head(spb);
+ memset(pci, 0, sizeof(*pci));
+
+ *((uint32_t *) pci) = hton32(rwe);
+
+ pci->flags = flags;
+ pci->ackno = hton32(ackno);
+
+ f = &proc.flows[fd];
+
+ if (spb_encrypt(f, spb) < 0)
+ goto fail;
+
+#ifdef RXM_BLOCKING
+ if (ssm_rbuff_write_b(f->tx_rb, idx, NULL))
+#else
+ if (ssm_rbuff_write(f->tx_rb, idx))
+#endif
+ goto fail;
+
+ ssm_flow_set_notify(f->set, f->info.id, FLOW_PKT);
+
+ return;
+
+ fail:
+ ipcp_spb_release(spb);
+ return;
+}
+
+static void send_frct_pkt(struct frcti * frcti)
+{
+ struct timespec now;
+ time_t diff;
+ uint32_t ackno;
+ uint32_t rwe;
+ int fd;
+
+ assert(frcti);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ fd = frcti->fd;
+ ackno = frcti->rcv_cr.lwe;
+ rwe = frcti->rcv_cr.rwe;
+
+ diff = ts_diff_ns(&now, &frcti->rcv_cr.act);
+ if (diff > frcti->a) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ diff = ts_diff_ns(&now, &frcti->snd_cr.act);
+ if (diff < TICTIME) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);
+}
+
+static void __send_rdv(int fd)
+{
+ __send_frct_pkt(fd, FRCT_RDVS, 0, 0);
+}
+
+static struct frcti * frcti_create(int fd,
+ time_t a,
+ time_t r,
+ time_t mpl)
+{
+ struct frcti * frcti;
+ ssize_t idx;
+ struct timespec now;
+ pthread_condattr_t cattr;
+#ifdef PROC_FLOW_STATS
+ char frctstr[FRCT_NAME_STRLEN + 1];
+#endif
+ mpl *= MILLION;
+ a *= BILLION;
+ r *= BILLION;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
@@ -100,44 +352,78 @@ static struct frcti * frcti_create(int fd)
if (pthread_rwlock_init(&frcti->lock, NULL))
goto fail_lock;
- for (idx = 0; idx < RQ_SIZE; ++idx)
- frcti->rq[idx] = -1;
+ if (pthread_mutex_init(&frcti->mtx, NULL))
+ goto fail_mutex;
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&frcti->cond, &cattr))
+ goto fail_cond;
- frcti->mpl = DELT_MPL;
- frcti->a = DELT_A;
- frcti->r = DELT_R;
- frcti->fd = fd;
+#ifdef PROC_FLOW_STATS
+ sprintf(frctstr, "%d", fd);
+ if (rib_reg(frctstr, &r_ops))
+ goto fail_rib_reg;
+#endif
+ pthread_condattr_destroy(&cattr);
- delta_t = frcti->mpl + frcti->a + frcti->r;
+ for (idx = 0; idx < RQ_SIZE; ++idx)
+ frcti->rq[idx] = -1;
- frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */
- frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- frcti->rttseq = 0;
- frcti->probe = false;
+ frcti->mpl = mpl;
+ frcti->a = a;
+ frcti->r = r;
+ frcti->rdv = DELT_RDV;
+ frcti->fd = fd;
- frcti->srtt_us = 0; /* updated on first ACK */
- frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */
- frcti->rto = 20000; /* initial rxm will be after 20 ms */
- frcti->rw = NULL;
- if (ai.flows[fd].qs.loss == 0) {
- frcti->snd_cr.cflags |= FRCTFRTX;
+ frcti->rttseq = 0;
+ frcti->probe = false;
+
+ frcti->srtt = 0; /* Updated on first ACK */
+ frcti->mdev = 10 * MILLION; /* Updated on first ACK */
+ frcti->rto = BILLION; /* Initial rxm will be after 1 s */
+#ifdef PROC_FLOW_STATS
+ frcti->n_rtx = 0;
+ frcti->n_prb = 0;
+ frcti->n_rtt = 0;
+ frcti->n_dup = 0;
+ frcti->n_dak = 0;
+ frcti->n_rdv = 0;
+ frcti->n_out = 0;
+ frcti->n_rqo = 0;
+#endif
+ if (proc.flows[fd].info.qs.loss == 0) {
+ frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
frcti->rcv_cr.cflags |= FRCTFRTX;
- frcti->rw = rxmwheel_create();
- if (frcti->rw == NULL)
- goto fail_rw;
}
- frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */
- frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1);
+ frcti->snd_cr.cflags |= FRCTFRESCNTL;
+
+ frcti->snd_cr.rwe = START_WINDOW;
+ frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1);
+
+ frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1);
return frcti;
- fail_rw:
+#ifdef PROC_FLOW_STATS
+ fail_rib_reg:
+ pthread_cond_destroy(&frcti->cond);
+#endif
+ fail_cond:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(&frcti->mtx);
+ fail_mutex:
pthread_rwlock_destroy(&frcti->lock);
fail_lock:
free(frcti);
@@ -147,24 +433,23 @@ static struct frcti * frcti_create(int fd)
static void frcti_destroy(struct frcti * frcti)
{
- /*
- * FIXME: In case of reliable transmission we should
- * make sure everything we sent is acked.
- */
-
- if (frcti->rw != NULL)
- rxmwheel_destroy(frcti->rw);
-
+#ifdef PROC_FLOW_STATS
+ char frctstr[FRCT_NAME_STRLEN + 1];
+ sprintf(frctstr, "%d", frcti->fd);
+ rib_unreg(frctstr);
+#endif
+ pthread_cond_destroy(&frcti->cond);
+ pthread_mutex_destroy(&frcti->mtx);
pthread_rwlock_destroy(&frcti->lock);
free(frcti);
}
-static uint16_t frcti_getconf(struct frcti * frcti)
+static uint16_t frcti_getflags(struct frcti * frcti)
{
uint16_t ret;
- assert (frcti);
+ assert(frcti);
pthread_rwlock_rdlock(&frcti->lock);
@@ -175,14 +460,147 @@ static uint16_t frcti_getconf(struct frcti * frcti)
return ret;
}
-#define frcti_queued_pdu(frcti) \
- (frcti == NULL ? -1 : __frcti_queued_pdu(frcti))
+static void frcti_setflags(struct frcti * frcti,
+ uint16_t flags)
+{
+ flags |= FRCTFRTX; /* Should not be set by command */
+
+ assert(frcti);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */
+
+ frcti->snd_cr.cflags &= flags;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
+
+#define frcti_queued_pdu(frcti) \
+ (frcti == NULL ? idx : __frcti_queued_pdu(frcti))
+
+#define frcti_snd(frcti, spb) \
+ (frcti == NULL ? 0 : __frcti_snd(frcti, spb))
+
+#define frcti_rcv(frcti, spb) \
+ (frcti == NULL ? 0 : __frcti_rcv(frcti, spb))
-#define frcti_snd(frcti, sdb) \
- (frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
+#define frcti_dealloc(frcti) \
+ (frcti == NULL ? 0 : __frcti_dealloc(frcti))
+
+#define frcti_is_window_open(frcti) \
+ (frcti == NULL ? true : __frcti_is_window_open(frcti))
+
+#define frcti_window_wait(frcti, abstime) \
+ (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime))
+
+
+static bool __frcti_is_window_open(struct frcti * frcti)
+{
+ struct frct_cr * snd_cr = &frcti->snd_cr;
+ bool ret = true;
+
+ pthread_rwlock_rdlock(&frcti->lock);
-#define frcti_rcv(frcti, sdb) \
- (frcti == NULL ? idx : __frcti_rcv(frcti, sdb))
+ if (snd_cr->cflags & FRCTFRESCNTL)
+ ret = before(snd_cr->seqno, snd_cr->rwe);
+
+ if (!ret) {
+ struct timespec now;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_mutex_lock(&frcti->mtx);
+ if (frcti->open) {
+ frcti->open = false;
+ frcti->t_wnd = now;
+ frcti->t_rdvs = now;
+ } else {
+ time_t diff;
+ diff = ts_diff_ns(&now, &frcti->t_wnd);
+ if (diff > MAX_RDV) {
+ pthread_mutex_unlock(&frcti->mtx);
+ pthread_rwlock_unlock(&frcti->lock);
+ return false;
+ }
+
+ diff = ts_diff_ns(&now, &frcti->t_rdvs);
+ if (diff > frcti->rdv) {
+ frcti->t_rdvs = now;
+ __send_rdv(frcti->fd);
+#ifdef PROC_FLOW_STATS
+ frcti->n_rdv++;
+#endif
+
+ }
+ }
+
+ pthread_mutex_unlock(&frcti->mtx);
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return ret;
+}
+
+static int __frcti_window_wait(struct frcti * frcti,
+ struct timespec * abstime)
+{
+ struct frct_cr * snd_cr = &frcti->snd_cr;
+ int ret = 0;
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ if (!(snd_cr->cflags & FRCTFRESCNTL)) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return 0;
+ }
+
+ while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
+ struct timespec now;
+ pthread_rwlock_unlock(&frcti->lock);
+ pthread_mutex_lock(&frcti->mtx);
+
+ if (frcti->open) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ frcti->t_wnd = now;
+ frcti->t_rdvs = now;
+ frcti->open = false;
+ }
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx);
+
+ ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime);
+
+ pthread_cleanup_pop(false);
+
+ if (ret == -ETIMEDOUT) {
+ time_t diff;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&now, &frcti->t_wnd);
+ if (diff > MAX_RDV) {
+ pthread_mutex_unlock(&frcti->mtx);
+ return -ECONNRESET; /* write fails! */
+ }
+
+ diff = ts_diff_ns(&now, &frcti->t_rdvs);
+ if (diff > frcti->rdv) {
+ frcti->t_rdvs = now;
+ __send_rdv(frcti->fd);
+ }
+ }
+
+ pthread_mutex_unlock(&frcti->mtx);
+ pthread_rwlock_rdlock(&frcti->lock);
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return ret;
+}
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
@@ -199,6 +617,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
idx = frcti->rq[pos];
if (idx != -1) {
++frcti->rcv_cr.lwe;
+ ++frcti->rcv_cr.rwe;
frcti->rq[pos] = -1;
}
@@ -207,54 +626,92 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
return idx;
}
-static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)
+static ssize_t __frcti_pdu_ready(struct frcti * frcti)
{
- struct frct_pci * pci;
+ ssize_t idx;
+ size_t pos;
- pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
- if (pci != NULL)
- memset(pci, 0, sizeof(*pci));
+ assert(frcti);
- return pci;
-}
+ /* See if we already have the next PDU. */
+ pthread_rwlock_rdlock(&frcti->lock);
-static bool before(uint32_t seq1,
- uint32_t seq2)
-{
- return (int32_t)(seq1 - seq2) < 0;
+ pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+ idx = frcti->rq[pos];
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return idx;
}
-static bool after(uint32_t seq1,
- uint32_t seq2)
+#include <timerwheel.c>
+
+/*
+ * Send a final ACK for everything that has not been ACK'd.
+ * If the flow should be kept active for retransmission,
+ * the returned time will be negative.
+ */
+static time_t __frcti_dealloc(struct frcti * frcti)
{
- return (int32_t)(seq2 - seq1) < 0;
+ struct timespec now;
+ time_t wait;
+ int ackno;
+ int fd = -1;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ ackno = frcti->rcv_cr.lwe;
+ if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno)
+ fd = frcti->fd;
+
+ wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec,
+ frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec);
+ wait = MAX(wait, 0);
+
+ if (frcti->snd_cr.cflags & FRCTFLINGER
+ && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno))
+ wait = -wait;
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ if (fd != -1)
+ __send_frct_pkt(fd, FRCT_ACK, ackno, 0);
+
+ return wait;
}
static int __frcti_snd(struct frcti * frcti,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
struct frct_pci * pci;
struct timespec now;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
uint32_t seqno;
+ bool rtx;
assert(frcti);
+ assert(ssm_pk_buff_len(spb) != 0);
snd_cr = &frcti->snd_cr;
rcv_cr = &frcti->rcv_cr;
- if (frcti->rw != NULL)
- rxmwheel_move(frcti->rw);
+ timerwheel_move();
- pci = frcti_alloc_head(sdb);
+ pci = (struct frct_pci *) ssm_pk_buff_head_alloc(spb, FRCT_PCILEN);
if (pci == NULL)
- return -1;
+ return -ENOMEM;
+
+ memset(pci, 0, sizeof(*pci));
- clock_gettime(CLOCK_REALTIME, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_wrlock(&frcti->lock);
+ rtx = snd_cr->cflags & FRCTFRTX;
+
pci->flags |= FRCT_DATA;
/* Set DRF if there are no unacknowledged packets. */
@@ -262,141 +719,216 @@ static int __frcti_snd(struct frcti * frcti,
pci->flags |= FRCT_DRF;
/* Choose a new sequence number if sender inactivity expired. */
- if (now.tv_sec - snd_cr->act > snd_cr->inact) {
+ if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) {
/* There are no unacknowledged packets. */
assert(snd_cr->seqno == snd_cr->lwe);
random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
- frcti->snd_cr.lwe = snd_cr->seqno - 1;
+ snd_cr->lwe = snd_cr->seqno;
+ snd_cr->rwe = snd_cr->lwe + START_WINDOW;
}
seqno = snd_cr->seqno;
pci->seqno = hton32(seqno);
- if (!(snd_cr->cflags & FRCTFRTX)) {
+ if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) {
+ pci->flags |= FRCT_FC;
+ *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF);
+ }
+
+ if (!rtx) {
snd_cr->lwe++;
} else {
if (!frcti->probe) {
frcti->rttseq = snd_cr->seqno;
frcti->t_probe = now;
frcti->probe = true;
+#ifdef PROC_FLOW_STATS
+ frcti->n_prb++;
+#endif
}
-
- if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
+ if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) {
pci->flags |= FRCT_ACK;
pci->ackno = hton32(rcv_cr->lwe);
+ rcv_cr->seqno = rcv_cr->lwe;
}
}
snd_cr->seqno++;
- snd_cr->act = now.tv_sec;
+ snd_cr->act = now;
pthread_rwlock_unlock(&frcti->lock);
- if (frcti->rw != NULL)
- rxmwheel_add(frcti->rw, frcti, seqno, sdb);
+ if (rtx)
+ timerwheel_rxm(frcti, seqno, spb);
return 0;
}
static void rtt_estimator(struct frcti * frcti,
- time_t mrtt_us)
+ time_t mrtt)
{
- time_t srtt = frcti->srtt_us;
- time_t rttvar = frcti->mdev_us;
+ time_t srtt = frcti->srtt;
+ time_t rttvar = frcti->mdev;
if (srtt == 0) { /* first measurement */
- srtt = mrtt_us;
- rttvar = mrtt_us >> 1;
-
+ srtt = mrtt;
+ rttvar = mrtt >> 1;
} else {
- time_t delta = mrtt_us - srtt;
+ time_t delta = mrtt - srtt;
srtt += (delta >> 3);
- rttvar -= rttvar >> 2;
- rttvar += ABS(delta) >> 2;
+ delta = (ABS(delta) - rttvar) >> 2;
+#ifdef FRCT_LINUX_RTT_ESTIMATOR
+ if (delta < 0)
+ delta >>= 3;
+#endif
+ rttvar += delta;
}
-
- frcti->srtt_us = MAX(1U, srtt);
- frcti->mdev_us = MAX(1U, rttvar);
- frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2));
+#ifdef PROC_FLOW_STATS
+ frcti->n_rtt++;
+#endif
+ frcti->srtt = MAX(1000L, srtt);
+ frcti->mdev = MAX(100L, rttvar);
+ frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL));
}
-/* Always queues the packet on the RQ for the application. */
-static ssize_t __frcti_rcv(struct frcti * frcti,
- struct shm_du_buff * sdb)
+/* Always queues the next application packet on the RQ. */
+static void __frcti_rcv(struct frcti * frcti,
+ struct ssm_pk_buff * spb)
{
ssize_t idx;
+ size_t pos;
struct frct_pci * pci;
struct timespec now;
- struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
+ struct frct_cr * snd_cr;
uint32_t seqno;
+ uint32_t ackno;
+ uint32_t rwe;
+ int fd = -1;
assert(frcti);
rcv_cr = &frcti->rcv_cr;
snd_cr = &frcti->snd_cr;
- clock_gettime(CLOCK_REALTIME, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
+ pci = (struct frct_pci *) ssm_pk_buff_head_release(spb, FRCT_PCILEN);
- idx = shm_du_buff_get_idx(sdb);
+ idx = ssm_pk_buff_get_idx(spb);
seqno = ntoh32(pci->seqno);
+ pos = seqno & (RQ_SIZE - 1);
pthread_rwlock_wrlock(&frcti->lock);
- if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {
- if (pci->flags & FRCT_DRF) /* New run. */
+ if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) {
+ if (pci->flags & FRCT_DRF) { /* New run. */
rcv_cr->lwe = seqno;
- else
+ rcv_cr->rwe = seqno + RQ_SIZE;
+ rcv_cr->seqno = seqno;
+ } else if (pci->flags & FRCT_DATA) {
goto drop_packet;
+ }
}
- if (before(seqno, rcv_cr->lwe))
- goto drop_packet;
+ rcv_cr->act = now;
- if (rcv_cr->cflags & FRCTFRTX) {
- if (pci->flags & FRCT_ACK) {
- uint32_t ackno = ntoh32(pci->ackno);
- /* Check for duplicate (old) acks. */
- if (after(ackno, snd_cr->lwe))
- snd_cr->lwe = ackno;
-
- if (frcti->probe && after(ackno, frcti->rttseq)) {
- rtt_estimator(frcti, ts_diff_us(&frcti->t_probe,
- &now));
- frcti->probe = false;
- }
+ /* For now, just send an immediate window update. */
+ if (pci->flags & FRCT_RDVS) {
+ fd = frcti->fd;
+ rwe = rcv_cr->rwe;
+ pthread_rwlock_unlock(&frcti->lock);
+
+ __send_frct_pkt(fd, FRCT_FC, 0, rwe);
+
+ ssm_pool_remove(proc.pool, idx);
+ return;
+ }
+
+ if (pci->flags & FRCT_ACK) {
+ ackno = ntoh32(pci->ackno);
+ if (after(ackno, frcti->snd_cr.lwe))
+ frcti->snd_cr.lwe = ackno;
+
+ if (frcti->probe && after(ackno, frcti->rttseq)) {
+#ifdef PROC_FLOW_STATS
+ if (!(pci->flags & FRCT_DATA))
+ frcti->n_dak++;
+#endif
+ rtt_estimator(frcti, ts_diff_ns(&now, &frcti->t_probe));
+ frcti->probe = false;
}
+ }
- if (seqno == rcv_cr->lwe) {
- ++frcti->rcv_cr.lwe;
- } else {
- size_t pos = seqno & (RQ_SIZE - 1);
- if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
- goto drop_packet; /* Out of rq. */
+ if (pci->flags & FRCT_FC) {
+ uint32_t rwe;
+
+ rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF));
+ rwe |= snd_cr->rwe & 0xFF000000;
+
+ /* Rollover for 24 bit */
+ if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF)
+ rwe += 0x01000000;
+
+ snd_cr->rwe = rwe;
- if (frcti->rq[pos] != -1)
- goto drop_packet; /* Duplicate in rq */
+ pthread_mutex_lock(&frcti->mtx);
+ if (!frcti->open) {
+ frcti->open = true;
+ pthread_cond_broadcast(&frcti->cond);
+ }
+ pthread_mutex_unlock(&frcti->mtx);
+ }
+
+ if (!(pci->flags & FRCT_DATA))
+ goto drop_packet;
+
+ if (before(seqno, rcv_cr->lwe)) {
+ rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */
+#ifdef PROC_FLOW_STATS
+ frcti->n_dup++;
+#endif
+ goto drop_packet;
+ }
- frcti->rq[pos] = idx;
- idx = -EAGAIN;
+ if (rcv_cr->cflags & FRCTFRTX) {
+
+ if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */
+#ifdef PROC_FLOW_STATS
+ frcti->n_out++;
+#endif
+ goto drop_packet;
}
+
+ if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) {
+#ifdef PROC_FLOW_STATS
+ frcti->n_rqo++;
+#endif
+ goto drop_packet; /* Out of rq. */
+ }
+ if (frcti->rq[pos] != -1) {
+#ifdef PROC_FLOW_STATS
+ frcti->n_dup++;
+#endif
+ goto drop_packet; /* Duplicate in rq. */
+ }
+ fd = frcti->fd;
} else {
- rcv_cr->lwe = seqno + 1;
+ rcv_cr->lwe = seqno;
}
- rcv_cr->act = now.tv_sec;
+ frcti->rq[pos] = idx;
pthread_rwlock_unlock(&frcti->lock);
- if (frcti->rw != NULL)
- rxmwheel_move(frcti->rw);
+ if (fd != -1)
+ timerwheel_delayed_ack(fd, frcti);
- return idx;
+ return;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
+ ssm_pool_remove(proc.pool, idx);
+ send_frct_pkt(frcti);
+ return;
}