summaryrefslogtreecommitdiff
path: root/src/lib/frct.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r--src/lib/frct.c473
1 files changed, 298 insertions, 175 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 919e2617..fad2cf69 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2020
+ * Ouroboros - Copyright (C) 2016 - 2026
*
* Flow and Retransmission Control
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -20,10 +20,14 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#define DELT_RDV (100 * MILLION) /* ns */
-#define MAX_RDV (1 * BILLION) /* ns */
+#include <ouroboros/endian.h>
-#define FRCT_PCILEN (sizeof(struct frct_pci))
+#define DELT_RDV (100 * MILLION) /* ns */
+#define MAX_RDV (1 * BILLION) /* ns */
+
+#define FRCT "frct"
+#define FRCT_PCILEN (sizeof(struct frct_pci))
+#define FRCT_NAME_STRLEN 32
struct frct_cr {
uint32_t lwe; /* Left window edge */
@@ -50,10 +54,20 @@ struct frcti {
uint32_t rttseq;
struct timespec t_probe; /* Probe time */
bool probe; /* Probe active */
-
+#ifdef PROC_FLOW_STATS
+ size_t n_rtx; /* Number of rxm packets */
+ size_t n_prb; /* Number of rtt probes */
+ size_t n_rtt; /* Number of estimates */
+ size_t n_dup; /* Duplicates received */
+ size_t n_dak; /* Delayed ACKs received */
+ size_t n_rdv; /* Number of rdv packets */
+ size_t n_out; /* Packets out of window */
+ size_t n_rqo; /* Packets out of rqueue */
+#endif
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
+
ssize_t rq[RQ_SIZE];
pthread_rwlock_t lock;
@@ -84,6 +98,128 @@ struct frct_pci {
uint32_t ackno;
} __attribute__((packed));
+#ifdef PROC_FLOW_STATS
+
+static int frct_rib_read(const char * path,
+ char * buf,
+ size_t len)
+{
+ struct timespec now;
+ char * entry;
+ struct flow * flow;
+ struct frcti * frcti;
+ int fd;
+
+ (void) len;
+
+ entry = strstr(path, RIB_SEPARATOR);
+ assert(entry);
+ *entry = '\0';
+
+ fd = atoi(path);
+
+ flow = &proc.flows[fd];
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_rdlock(&proc.lock);
+
+ frcti = flow->frcti;
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ sprintf(buf,
+ "Maximum packet lifetime (ns): %20ld\n"
+ "Max time to Ack (ns): %20ld\n"
+ "Max time to Retransmit (ns): %20ld\n"
+ "Smoothed rtt (ns): %20ld\n"
+ "RTT standard deviation (ns): %20ld\n"
+ "Retransmit timeout RTO (ns): %20ld\n"
+ "Sender left window edge: %20u\n"
+ "Sender right window edge: %20u\n"
+ "Sender inactive (ns): %20lld\n"
+ "Sender current sequence number: %20u\n"
+ "Receiver left window edge: %20u\n"
+ "Receiver right window edge: %20u\n"
+ "Receiver inactive (ns): %20lld\n"
+ "Receiver last ack: %20u\n"
+ "Number of pkt retransmissions: %20zu\n"
+ "Number of rtt probes: %20zu\n"
+ "Number of rtt estimates: %20zu\n"
+ "Number of duplicates received: %20zu\n"
+ "Number of delayed acks received: %20zu\n"
+ "Number of rendez-vous sent: %20zu\n"
+ "Number of packets out of window: %20zu\n"
+ "Number of packets out of rqueue: %20zu\n",
+ frcti->mpl,
+ frcti->a,
+ frcti->r,
+ frcti->srtt,
+ frcti->mdev,
+ frcti->rto,
+ frcti->snd_cr.lwe,
+ frcti->snd_cr.rwe,
+ ts_diff_ns(&now, &frcti->snd_cr.act),
+ frcti->snd_cr.seqno,
+ frcti->rcv_cr.lwe,
+ frcti->rcv_cr.rwe,
+ ts_diff_ns(&now, &frcti->rcv_cr.act),
+ frcti->rcv_cr.seqno,
+ frcti->n_rtx,
+ frcti->n_prb,
+ frcti->n_rtt,
+ frcti->n_dup,
+ frcti->n_dak,
+ frcti->n_rdv,
+ frcti->n_out,
+ frcti->n_rqo);
+
+ pthread_rwlock_unlock(&flow->frcti->lock);
+
+ pthread_rwlock_unlock(&proc.lock);
+
+ return strlen(buf);
+}
+
+static int frct_rib_readdir(char *** buf)
+{
+ *buf = malloc(sizeof(**buf));
+ if (*buf == NULL)
+ goto fail_malloc;
+
+ (*buf)[0] = strdup("frct");
+ if ((*buf)[0] == NULL)
+ goto fail_strdup;
+
+ return 1;
+
+ fail_strdup:
+ free(*buf);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int frct_rib_getattr(const char * path,
+ struct rib_attr * attr)
+{
+ (void) path;
+ (void) attr;
+
+ attr->size = 1189;
+ attr->mtime = 0;
+
+ return 0;
+}
+
+
+static struct rib_ops r_ops = {
+ .read = frct_rib_read,
+ .readdir = frct_rib_readdir,
+ .getattr = frct_rib_getattr
+};
+
+#endif /* PROC_FLOW_STATS */
+
static bool before(uint32_t seq1,
uint32_t seq2)
{
@@ -101,21 +237,21 @@ static void __send_frct_pkt(int fd,
uint32_t ackno,
uint32_t rwe)
{
- struct shm_du_buff * sdb;
+ struct ssm_pk_buff * spb;
struct frct_pci * pci;
ssize_t idx;
struct flow * f;
/* Raw calls needed to bypass frcti. */
#ifdef RXM_BLOCKING
- idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+ idx = ssm_pool_alloc_b(proc.pool, sizeof(*pci), NULL, &spb, NULL);
#else
- idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb);
+ idx = ssm_pool_alloc(proc.pool, sizeof(*pci), NULL, &spb);
#endif
if (idx < 0)
return;
- pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ pci = (struct frct_pci *) ssm_pk_buff_head(spb);
memset(pci, 0, sizeof(*pci));
*((uint32_t *) pci) = hton32(rwe);
@@ -123,17 +259,25 @@ static void __send_frct_pkt(int fd,
pci->flags = flags;
pci->ackno = hton32(ackno);
- f = &ai.flows[fd];
+ f = &proc.flows[fd];
+
+ if (spb_encrypt(f, spb) < 0)
+ goto fail;
+
#ifdef RXM_BLOCKING
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ if (ssm_rbuff_write_b(f->tx_rb, idx, NULL))
#else
- if (shm_rbuff_write(f->tx_rb, idx)) {
+ if (ssm_rbuff_write(f->tx_rb, idx))
#endif
- ipcp_sdb_release(sdb);
- return;
- }
+ goto fail;
+
+ ssm_flow_set_notify(f->set, f->info.id, FLOW_PKT);
+
+ return;
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+ fail:
+ ipcp_spb_release(spb);
+ return;
}
static void send_frct_pkt(struct frcti * frcti)
@@ -146,9 +290,11 @@ static void send_frct_pkt(struct frcti * frcti)
assert(frcti);
- pthread_rwlock_rdlock(&frcti->lock);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_wrlock(&frcti->lock);
- if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
+ if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) {
pthread_rwlock_unlock(&frcti->lock);
return;
}
@@ -157,61 +303,45 @@ static void send_frct_pkt(struct frcti * frcti)
ackno = frcti->rcv_cr.lwe;
rwe = frcti->rcv_cr.rwe;
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
-
- pthread_rwlock_unlock(&frcti->lock);
-
- if (diff > frcti->a || diff < DELT_ACK)
+ diff = ts_diff_ns(&now, &frcti->rcv_cr.act);
+ if (diff > frcti->a) {
+ pthread_rwlock_unlock(&frcti->lock);
return;
+ }
- __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);
-
- pthread_rwlock_wrlock(&frcti->lock);
+ diff = ts_diff_ns(&now, &frcti->snd_cr.act);
+ if (diff < TICTIME) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
- if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
- frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
pthread_rwlock_unlock(&frcti->lock);
+
+ __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);
}
static void __send_rdv(int fd)
{
- struct shm_du_buff * sdb;
- struct frct_pci * pci;
- ssize_t idx;
- struct flow * f;
-
- /* Raw calls needed to bypass frcti. */
- idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
- if (idx < 0)
- return;
-
- pci = (struct frct_pci *) shm_du_buff_head(sdb);
- memset(pci, 0, sizeof(*pci));
-
- pci->flags = FRCT_RDVS;
-
- f = &ai.flows[fd];
-
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
- ipcp_sdb_release(sdb);
- return;
- }
-
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+ __send_frct_pkt(fd, FRCT_RDVS, 0, 0);
}
-static struct frcti * frcti_create(int fd)
+static struct frcti * frcti_create(int fd,
+ time_t a,
+ time_t r,
+ time_t mpl)
{
- struct frcti * frcti;
- ssize_t idx;
- struct timespec now;
- time_t mpl;
- time_t a;
- time_t r;
- pthread_condattr_t cattr;
+ struct frcti * frcti;
+ ssize_t idx;
+ struct timespec now;
+ pthread_condattr_t cattr;
+#ifdef PROC_FLOW_STATS
+ char frctstr[FRCT_NAME_STRLEN + 1];
+#endif
+ mpl *= MILLION;
+ a *= BILLION;
+ r *= BILLION;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
@@ -233,14 +363,21 @@ static struct frcti * frcti_create(int fd)
if (pthread_cond_init(&frcti->cond, &cattr))
goto fail_cond;
+#ifdef PROC_FLOW_STATS
+ sprintf(frctstr, "%d", fd);
+ if (rib_reg(frctstr, &r_ops))
+ goto fail_rib_reg;
+#endif
+ pthread_condattr_destroy(&cattr);
+
for (idx = 0; idx < RQ_SIZE; ++idx)
frcti->rq[idx] = -1;
clock_gettime(PTHREAD_COND_CLOCK, &now);
- frcti->mpl = mpl = DELT_MPL;
- frcti->a = a = DELT_A;
- frcti->r = r = DELT_R;
+ frcti->mpl = mpl;
+ frcti->a = a;
+ frcti->r = r;
frcti->rdv = DELT_RDV;
frcti->fd = fd;
@@ -249,10 +386,19 @@ static struct frcti * frcti_create(int fd)
frcti->probe = false;
frcti->srtt = 0; /* Updated on first ACK */
- frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
- frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */
-
- if (ai.flows[fd].qs.loss == 0) {
+ frcti->mdev = 10 * MILLION; /* Updated on first ACK */
+ frcti->rto = BILLION; /* Initial rxm will be after 1 s */
+#ifdef PROC_FLOW_STATS
+ frcti->n_rtx = 0;
+ frcti->n_prb = 0;
+ frcti->n_rtt = 0;
+ frcti->n_dup = 0;
+ frcti->n_dak = 0;
+ frcti->n_rdv = 0;
+ frcti->n_out = 0;
+ frcti->n_rqo = 0;
+#endif
+ if (proc.flows[fd].info.qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
frcti->rcv_cr.cflags |= FRCTFRTX;
}
@@ -269,9 +415,13 @@ static struct frcti * frcti_create(int fd)
return frcti;
+#ifdef PROC_FLOW_STATS
+ fail_rib_reg:
+ pthread_cond_destroy(&frcti->cond);
+#endif
fail_cond:
pthread_condattr_destroy(&cattr);
-fail_cattr:
+ fail_cattr:
pthread_mutex_destroy(&frcti->mtx);
fail_mutex:
pthread_rwlock_destroy(&frcti->lock);
@@ -283,6 +433,11 @@ fail_cattr:
static void frcti_destroy(struct frcti * frcti)
{
+#ifdef PROC_FLOW_STATS
+ char frctstr[FRCT_NAME_STRLEN + 1];
+ sprintf(frctstr, "%d", frcti->fd);
+ rib_unreg(frctstr);
+#endif
pthread_cond_destroy(&frcti->cond);
pthread_mutex_destroy(&frcti->mtx);
pthread_rwlock_destroy(&frcti->lock);
@@ -324,14 +479,11 @@ static void frcti_setflags(struct frcti * frcti,
#define frcti_queued_pdu(frcti) \
(frcti == NULL ? idx : __frcti_queued_pdu(frcti))
-#define frcti_snd(frcti, sdb) \
- (frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
+#define frcti_snd(frcti, spb) \
+ (frcti == NULL ? 0 : __frcti_snd(frcti, spb))
-#define frcti_rcv(frcti, sdb) \
- (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
-
-#define frcti_tick(frcti) \
- (frcti == NULL ? 0 : __frcti_tick())
+#define frcti_rcv(frcti, spb) \
+ (frcti == NULL ? 0 : __frcti_rcv(frcti, spb))
#define frcti_dealloc(frcti) \
(frcti == NULL ? 0 : __frcti_dealloc(frcti))
@@ -346,7 +498,7 @@ static void frcti_setflags(struct frcti * frcti,
static bool __frcti_is_window_open(struct frcti * frcti)
{
struct frct_cr * snd_cr = &frcti->snd_cr;
- int ret = true;
+ bool ret = true;
pthread_rwlock_rdlock(&frcti->lock);
@@ -354,7 +506,7 @@ static bool __frcti_is_window_open(struct frcti * frcti)
ret = before(snd_cr->seqno, snd_cr->rwe);
if (!ret) {
- struct timespec now;
+ struct timespec now;
clock_gettime(PTHREAD_COND_CLOCK, &now);
@@ -365,16 +517,21 @@ static bool __frcti_is_window_open(struct frcti * frcti)
frcti->t_rdvs = now;
} else {
time_t diff;
- diff = ts_diff_ns(&frcti->t_wnd, &now);
+ diff = ts_diff_ns(&now, &frcti->t_wnd);
if (diff > MAX_RDV) {
pthread_mutex_unlock(&frcti->mtx);
+ pthread_rwlock_unlock(&frcti->lock);
return false;
}
- diff = ts_diff_ns(&frcti->t_rdvs, &now);
+ diff = ts_diff_ns(&now, &frcti->t_rdvs);
if (diff > frcti->rdv) {
frcti->t_rdvs = now;
__send_rdv(frcti->fd);
+#ifdef PROC_FLOW_STATS
+ frcti->n_rdv++;
+#endif
+
}
}
@@ -401,7 +558,6 @@ static int __frcti_window_wait(struct frcti * frcti,
while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
struct timespec now;
-
pthread_rwlock_unlock(&frcti->lock);
pthread_mutex_lock(&frcti->mtx);
@@ -413,12 +569,9 @@ static int __frcti_window_wait(struct frcti * frcti,
frcti->open = false;
}
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) &frcti->mtx);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx);
- ret = -pthread_cond_timedwait(&frcti->cond,
- &frcti->mtx,
- abstime);
+ ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime);
pthread_cleanup_pop(false);
@@ -427,13 +580,13 @@ static int __frcti_window_wait(struct frcti * frcti,
clock_gettime(PTHREAD_COND_CLOCK, &now);
- diff = ts_diff_ns(&frcti->t_wnd, &now);
+ diff = ts_diff_ns(&now, &frcti->t_wnd);
if (diff > MAX_RDV) {
pthread_mutex_unlock(&frcti->mtx);
return -ECONNRESET; /* write fails! */
}
- diff = ts_diff_ns(&frcti->t_rdvs, &now);
+ diff = ts_diff_ns(&now, &frcti->t_rdvs);
if (diff > frcti->rdv) {
frcti->t_rdvs = now;
__send_rdv(frcti->fd);
@@ -515,6 +668,7 @@ static time_t __frcti_dealloc(struct frcti * frcti)
wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec,
frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec);
+ wait = MAX(wait, 0);
if (frcti->snd_cr.cflags & FRCTFLINGER
&& before(frcti->snd_cr.lwe, frcti->snd_cr.seqno))
@@ -529,7 +683,7 @@ static time_t __frcti_dealloc(struct frcti * frcti)
}
static int __frcti_snd(struct frcti * frcti,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
struct frct_pci * pci;
struct timespec now;
@@ -539,13 +693,14 @@ static int __frcti_snd(struct frcti * frcti,
bool rtx;
assert(frcti);
+ assert(ssm_pk_buff_len(spb) != 0);
snd_cr = &frcti->snd_cr;
rcv_cr = &frcti->rcv_cr;
timerwheel_move();
- pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
+ pci = (struct frct_pci *) ssm_pk_buff_head_alloc(spb, FRCT_PCILEN);
if (pci == NULL)
return -ENOMEM;
@@ -568,7 +723,7 @@ static int __frcti_snd(struct frcti * frcti,
/* There are no unacknowledged packets. */
assert(snd_cr->seqno == snd_cr->lwe);
random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
- snd_cr->lwe = snd_cr->seqno - 1;
+ snd_cr->lwe = snd_cr->seqno;
snd_cr->rwe = snd_cr->lwe + START_WINDOW;
}
@@ -587,9 +742,11 @@ static int __frcti_snd(struct frcti * frcti,
frcti->rttseq = snd_cr->seqno;
frcti->t_probe = now;
frcti->probe = true;
+#ifdef PROC_FLOW_STATS
+ frcti->n_prb++;
+#endif
}
-
- if (now.tv_sec - rcv_cr->act.tv_sec <= frcti->a) {
+ if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) {
pci->flags |= FRCT_ACK;
pci->ackno = hton32(rcv_cr->lwe);
rcv_cr->seqno = rcv_cr->lwe;
@@ -602,7 +759,7 @@ static int __frcti_snd(struct frcti * frcti,
pthread_rwlock_unlock(&frcti->lock);
if (rtx)
- timerwheel_rxm(frcti, seqno, sdb);
+ timerwheel_rxm(frcti, seqno, spb);
return 0;
}
@@ -619,22 +776,24 @@ static void rtt_estimator(struct frcti * frcti,
} else {
time_t delta = mrtt - srtt;
srtt += (delta >> 3);
- rttvar += (ABS(delta) - rttvar) >> 2;
+ delta = (ABS(delta) - rttvar) >> 2;
+#ifdef FRCT_LINUX_RTT_ESTIMATOR
+ if (delta < 0)
+ delta >>= 3;
+#endif
+ rttvar += delta;
}
-
- frcti->srtt = MAX(1000U, srtt);
- frcti->mdev = MAX(100U, rttvar);
- frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1));
-}
-
-static void __frcti_tick(void)
-{
- timerwheel_move();
+#ifdef PROC_FLOW_STATS
+ frcti->n_rtt++;
+#endif
+ frcti->srtt = MAX(1000L, srtt);
+ frcti->mdev = MAX(100L, rttvar);
+ frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL));
}
/* Always queues the next application packet on the RQ. */
static void __frcti_rcv(struct frcti * frcti,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
ssize_t idx;
size_t pos;
@@ -654,9 +813,9 @@ static void __frcti_rcv(struct frcti * frcti,
clock_gettime(PTHREAD_COND_CLOCK, &now);
- pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
+ pci = (struct frct_pci *) ssm_pk_buff_head_release(spb, FRCT_PCILEN);
- idx = shm_du_buff_get_idx(sdb);
+ idx = ssm_pk_buff_get_idx(spb);
seqno = ntoh32(pci->seqno);
pos = seqno & (RQ_SIZE - 1);
@@ -666,11 +825,14 @@ static void __frcti_rcv(struct frcti * frcti,
if (pci->flags & FRCT_DRF) { /* New run. */
rcv_cr->lwe = seqno;
rcv_cr->rwe = seqno + RQ_SIZE;
- } else {
+ rcv_cr->seqno = seqno;
+ } else if (pci->flags & FRCT_DATA) {
goto drop_packet;
}
}
+ rcv_cr->act = now;
+
/* For now, just send an immediate window update. */
if (pci->flags & FRCT_RDVS) {
fd = frcti->fd;
@@ -679,7 +841,7 @@ static void __frcti_rcv(struct frcti * frcti,
__send_frct_pkt(fd, FRCT_FC, 0, rwe);
- shm_rdrbuff_remove(ai.rdrb, idx);
+ ssm_pool_remove(proc.pool, idx);
return;
}
@@ -689,7 +851,11 @@ static void __frcti_rcv(struct frcti * frcti,
frcti->snd_cr.lwe = ackno;
if (frcti->probe && after(ackno, frcti->rttseq)) {
- rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now));
+#ifdef PROC_FLOW_STATS
+ if (!(pci->flags & FRCT_DATA))
+ frcti->n_dak++;
+#endif
+ rtt_estimator(frcti, ts_diff_ns(&now, &frcti->t_probe));
frcti->probe = false;
}
}
@@ -719,20 +885,33 @@ static void __frcti_rcv(struct frcti * frcti,
if (before(seqno, rcv_cr->lwe)) {
rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */
+#ifdef PROC_FLOW_STATS
+ frcti->n_dup++;
+#endif
goto drop_packet;
}
if (rcv_cr->cflags & FRCTFRTX) {
- if (!before(seqno, rcv_cr->rwe)) /* Out of window. */
+ if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */
+#ifdef PROC_FLOW_STATS
+ frcti->n_out++;
+#endif
goto drop_packet;
+ }
- if (!before(seqno, rcv_cr->lwe + RQ_SIZE))
+ if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) {
+#ifdef PROC_FLOW_STATS
+ frcti->n_rqo++;
+#endif
goto drop_packet; /* Out of rq. */
-
- if (frcti->rq[pos] != -1)
+ }
+ if (frcti->rq[pos] != -1) {
+#ifdef PROC_FLOW_STATS
+ frcti->n_dup++;
+#endif
goto drop_packet; /* Duplicate in rq. */
-
+ }
fd = frcti->fd;
} else {
rcv_cr->lwe = seqno;
@@ -740,72 +919,16 @@ static void __frcti_rcv(struct frcti * frcti,
frcti->rq[pos] = idx;
- rcv_cr->act = now;
-
pthread_rwlock_unlock(&frcti->lock);
if (fd != -1)
- timerwheel_ack(fd, frcti);
+ timerwheel_delayed_ack(fd, frcti);
return;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
-
+ ssm_pool_remove(proc.pool, idx);
send_frct_pkt(frcti);
-
- shm_rdrbuff_remove(ai.rdrb, idx);
return;
}
-
-/* Filter fqueue events for non-data packets */
-int frcti_filter(struct fqueue * fq)
-{
- struct shm_du_buff * sdb;
- int fd;
- ssize_t idx;
- struct frcti * frcti;
- struct shm_rbuff * rb;
-
- while (fq->next < fq->fqsize) {
- if (fq->fqueue[fq->next + 1] != FLOW_PKT)
- return 1;
-
- pthread_rwlock_rdlock(&ai.lock);
-
- fd = ai.ports[fq->fqueue[fq->next]].fd;
- rb = ai.flows[fd].rx_rb;
- frcti = ai.flows[fd].frcti;
-
- if (frcti == NULL) {
- pthread_rwlock_unlock(&ai.lock);
- return 1;
- }
-
- if (__frcti_pdu_ready(frcti) >= 0) {
- pthread_rwlock_unlock(&ai.lock);
- return 1;
- }
-
- idx = shm_rbuff_read(rb);
- if (idx < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return 0;
- }
-
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
- __frcti_rcv(frcti, sdb);
-
- if (__frcti_pdu_ready(frcti) >= 0) {
- pthread_rwlock_unlock(&ai.lock);
- return 1;
- }
-
- pthread_rwlock_unlock(&ai.lock);
-
- fq->next += 2;
- }
-
- return fq->next < fq->fqsize;
-}