diff options
Diffstat (limited to 'src/lib/frct.c')
| -rw-r--r-- | src/lib/frct.c | 473 |
1 files changed, 298 insertions, 175 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index 919e2617..fad2cf69 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2020 + * Ouroboros - Copyright (C) 2016 - 2026 * * Flow and Retransmission Control * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,10 +20,14 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#define DELT_RDV (100 * MILLION) /* ns */ -#define MAX_RDV (1 * BILLION) /* ns */ +#include <ouroboros/endian.h> -#define FRCT_PCILEN (sizeof(struct frct_pci)) +#define DELT_RDV (100 * MILLION) /* ns */ +#define MAX_RDV (1 * BILLION) /* ns */ + +#define FRCT "frct" +#define FRCT_PCILEN (sizeof(struct frct_pci)) +#define FRCT_NAME_STRLEN 32 struct frct_cr { uint32_t lwe; /* Left window edge */ @@ -50,10 +54,20 @@ struct frcti { uint32_t rttseq; struct timespec t_probe; /* Probe time */ bool probe; /* Probe active */ - +#ifdef PROC_FLOW_STATS + size_t n_rtx; /* Number of rxm packets */ + size_t n_prb; /* Number of rtt probes */ + size_t n_rtt; /* Number of estimates */ + size_t n_dup; /* Duplicates received */ + size_t n_dak; /* Delayed ACKs received */ + size_t n_rdv; /* Number of rdv packets */ + size_t n_out; /* Packets out of window */ + size_t n_rqo; /* Packets out of rqueue */ +#endif struct frct_cr snd_cr; struct frct_cr rcv_cr; + ssize_t rq[RQ_SIZE]; pthread_rwlock_t lock; @@ -84,6 +98,128 @@ struct frct_pci { uint32_t ackno; } __attribute__((packed)); +#ifdef PROC_FLOW_STATS + +static int frct_rib_read(const char * path, + char * buf, + size_t len) +{ + struct timespec now; + char * entry; + struct flow * flow; + struct frcti * frcti; + int fd; + + (void) len; + + entry = strstr(path, RIB_SEPARATOR); + assert(entry); + *entry = '\0'; + + fd = atoi(path); + + flow = &proc.flows[fd]; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_rdlock(&proc.lock); + + frcti = flow->frcti; + + pthread_rwlock_rdlock(&frcti->lock); + + sprintf(buf, + "Maximum packet lifetime (ns): %20ld\n" + "Max time to Ack (ns): %20ld\n" + "Max time to Retransmit (ns): %20ld\n" + "Smoothed rtt (ns): %20ld\n" + "RTT standard deviation (ns): %20ld\n" + "Retransmit timeout RTO (ns): %20ld\n" + "Sender left window edge: %20u\n" + "Sender right window edge: %20u\n" + "Sender inactive (ns): %20lld\n" + "Sender current sequence number: %20u\n" + "Receiver left window edge: %20u\n" + "Receiver right window edge: %20u\n" + "Receiver inactive (ns): %20lld\n" + "Receiver last ack: %20u\n" + "Number of pkt retransmissions: %20zu\n" + "Number of rtt probes: %20zu\n" + "Number of rtt estimates: %20zu\n" + "Number of duplicates received: %20zu\n" + "Number of delayed acks received: %20zu\n" + "Number of rendez-vous sent: %20zu\n" + "Number of packets out of window: %20zu\n" + "Number of packets out of rqueue: %20zu\n", + frcti->mpl, + frcti->a, + frcti->r, + frcti->srtt, + frcti->mdev, + frcti->rto, + frcti->snd_cr.lwe, + frcti->snd_cr.rwe, + ts_diff_ns(&now, &frcti->snd_cr.act), + frcti->snd_cr.seqno, + frcti->rcv_cr.lwe, + frcti->rcv_cr.rwe, + ts_diff_ns(&now, &frcti->rcv_cr.act), + frcti->rcv_cr.seqno, + frcti->n_rtx, + frcti->n_prb, + frcti->n_rtt, + frcti->n_dup, + frcti->n_dak, + frcti->n_rdv, + frcti->n_out, + frcti->n_rqo); + + pthread_rwlock_unlock(&flow->frcti->lock); + + pthread_rwlock_unlock(&proc.lock); + + return strlen(buf); +} + +static int frct_rib_readdir(char *** buf) +{ + *buf = malloc(sizeof(**buf)); + if (*buf == NULL) + goto fail_malloc; + + (*buf)[0] = strdup("frct"); + if ((*buf)[0] == NULL) + goto fail_strdup; + + return 1; + + fail_strdup: + free(*buf); + fail_malloc: + return -ENOMEM; +} + +static int frct_rib_getattr(const char * path, + struct rib_attr * attr) +{ + (void) path; + (void) attr; + + attr->size = 1189; + attr->mtime = 0; + + return 0; +} + + +static struct rib_ops r_ops = { + .read = frct_rib_read, + .readdir = frct_rib_readdir, + .getattr = frct_rib_getattr +}; + +#endif /* PROC_FLOW_STATS */ + static bool before(uint32_t seq1, uint32_t seq2) { @@ -101,21 +237,21 @@ static void __send_frct_pkt(int fd, uint32_t ackno, uint32_t rwe) { - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct frct_pci * pci; ssize_t idx; struct flow * f; /* Raw calls needed to bypass frcti. */ #ifdef RXM_BLOCKING - idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); + idx = ssm_pool_alloc_b(proc.pool, sizeof(*pci), NULL, &spb, NULL); #else - idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb); + idx = ssm_pool_alloc(proc.pool, sizeof(*pci), NULL, &spb); #endif if (idx < 0) return; - pci = (struct frct_pci *) shm_du_buff_head(sdb); + pci = (struct frct_pci *) ssm_pk_buff_head(spb); memset(pci, 0, sizeof(*pci)); *((uint32_t *) pci) = hton32(rwe); @@ -123,17 +259,25 @@ static void __send_frct_pkt(int fd, pci->flags = flags; pci->ackno = hton32(ackno); - f = &ai.flows[fd]; + f = &proc.flows[fd]; + + if (spb_encrypt(f, spb) < 0) + goto fail; + #ifdef RXM_BLOCKING - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { + if (ssm_rbuff_write_b(f->tx_rb, idx, NULL)) #else - if (shm_rbuff_write(f->tx_rb, idx)) { + if (ssm_rbuff_write(f->tx_rb, idx)) #endif - ipcp_sdb_release(sdb); - return; - } + goto fail; + + ssm_flow_set_notify(f->set, f->info.id, FLOW_PKT); + + return; - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + fail: + ipcp_spb_release(spb); + return; } static void send_frct_pkt(struct frcti * frcti) @@ -146,9 +290,11 @@ static void send_frct_pkt(struct frcti * frcti) assert(frcti); - pthread_rwlock_rdlock(&frcti->lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&frcti->lock); - if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { + if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) { pthread_rwlock_unlock(&frcti->lock); return; } @@ -157,61 +303,45 @@ static void send_frct_pkt(struct frcti * frcti) ackno = frcti->rcv_cr.lwe; rwe = frcti->rcv_cr.rwe; - clock_gettime(PTHREAD_COND_CLOCK, &now); - - diff = ts_diff_ns(&frcti->rcv_cr.act, &now); - - pthread_rwlock_unlock(&frcti->lock); - - if (diff > frcti->a || diff < DELT_ACK) + diff = ts_diff_ns(&now, &frcti->rcv_cr.act); + if (diff > frcti->a) { + pthread_rwlock_unlock(&frcti->lock); return; + } - __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); - - pthread_rwlock_wrlock(&frcti->lock); + diff = ts_diff_ns(&now, &frcti->snd_cr.act); + if (diff < TICTIME) { + pthread_rwlock_unlock(&frcti->lock); + return; + } - if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) - frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; pthread_rwlock_unlock(&frcti->lock); + + __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); } static void __send_rdv(int fd) { - struct shm_du_buff * sdb; - struct frct_pci * pci; - ssize_t idx; - struct flow * f; - - /* Raw calls needed to bypass frcti. */ - idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); - if (idx < 0) - return; - - pci = (struct frct_pci *) shm_du_buff_head(sdb); - memset(pci, 0, sizeof(*pci)); - - pci->flags = FRCT_RDVS; - - f = &ai.flows[fd]; - - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { - ipcp_sdb_release(sdb); - return; - } - - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + __send_frct_pkt(fd, FRCT_RDVS, 0, 0); } -static struct frcti * frcti_create(int fd) +static struct frcti * frcti_create(int fd, + time_t a, + time_t r, + time_t mpl) { - struct frcti * frcti; - ssize_t idx; - struct timespec now; - time_t mpl; - time_t a; - time_t r; - pthread_condattr_t cattr; + struct frcti * frcti; + ssize_t idx; + struct timespec now; + pthread_condattr_t cattr; +#ifdef PROC_FLOW_STATS + char frctstr[FRCT_NAME_STRLEN + 1]; +#endif + mpl *= MILLION; + a *= BILLION; + r *= BILLION; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) @@ -233,14 +363,21 @@ static struct frcti * frcti_create(int fd) if (pthread_cond_init(&frcti->cond, &cattr)) goto fail_cond; +#ifdef PROC_FLOW_STATS + sprintf(frctstr, "%d", fd); + if (rib_reg(frctstr, &r_ops)) + goto fail_rib_reg; +#endif + pthread_condattr_destroy(&cattr); + for (idx = 0; idx < RQ_SIZE; ++idx) frcti->rq[idx] = -1; clock_gettime(PTHREAD_COND_CLOCK, &now); - frcti->mpl = mpl = DELT_MPL; - frcti->a = a = DELT_A; - frcti->r = r = DELT_R; + frcti->mpl = mpl; + frcti->a = a; + frcti->r = r; frcti->rdv = DELT_RDV; frcti->fd = fd; @@ -249,10 +386,19 @@ static struct frcti * frcti_create(int fd) frcti->probe = false; frcti->srtt = 0; /* Updated on first ACK */ - frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */ - frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */ - - if (ai.flows[fd].qs.loss == 0) { + frcti->mdev = 10 * MILLION; /* Updated on first ACK */ + frcti->rto = BILLION; /* Initial rxm will be after 1 s */ +#ifdef PROC_FLOW_STATS + frcti->n_rtx = 0; + frcti->n_prb = 0; + frcti->n_rtt = 0; + frcti->n_dup = 0; + frcti->n_dak = 0; + frcti->n_rdv = 0; + frcti->n_out = 0; + frcti->n_rqo = 0; +#endif + if (proc.flows[fd].info.qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; frcti->rcv_cr.cflags |= FRCTFRTX; } @@ -269,9 +415,13 @@ static struct frcti * frcti_create(int fd) return frcti; +#ifdef PROC_FLOW_STATS + fail_rib_reg: + pthread_cond_destroy(&frcti->cond); +#endif fail_cond: pthread_condattr_destroy(&cattr); -fail_cattr: + fail_cattr: pthread_mutex_destroy(&frcti->mtx); fail_mutex: pthread_rwlock_destroy(&frcti->lock); @@ -283,6 +433,11 @@ fail_cattr: static void frcti_destroy(struct frcti * frcti) { +#ifdef PROC_FLOW_STATS + char frctstr[FRCT_NAME_STRLEN + 1]; + sprintf(frctstr, "%d", frcti->fd); + rib_unreg(frctstr); +#endif pthread_cond_destroy(&frcti->cond); pthread_mutex_destroy(&frcti->mtx); pthread_rwlock_destroy(&frcti->lock); @@ -324,14 +479,11 @@ static void frcti_setflags(struct frcti * frcti, #define frcti_queued_pdu(frcti) \ (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) -#define frcti_snd(frcti, sdb) \ - (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) +#define frcti_snd(frcti, spb) \ + (frcti == NULL ? 0 : __frcti_snd(frcti, spb)) -#define frcti_rcv(frcti, sdb) \ - (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) - -#define frcti_tick(frcti) \ - (frcti == NULL ? 0 : __frcti_tick()) +#define frcti_rcv(frcti, spb) \ + (frcti == NULL ? 0 : __frcti_rcv(frcti, spb)) #define frcti_dealloc(frcti) \ (frcti == NULL ? 0 : __frcti_dealloc(frcti)) @@ -346,7 +498,7 @@ static void frcti_setflags(struct frcti * frcti, static bool __frcti_is_window_open(struct frcti * frcti) { struct frct_cr * snd_cr = &frcti->snd_cr; - int ret = true; + bool ret = true; pthread_rwlock_rdlock(&frcti->lock); @@ -354,7 +506,7 @@ static bool __frcti_is_window_open(struct frcti * frcti) ret = before(snd_cr->seqno, snd_cr->rwe); if (!ret) { - struct timespec now; + struct timespec now; clock_gettime(PTHREAD_COND_CLOCK, &now); @@ -365,16 +517,21 @@ static bool __frcti_is_window_open(struct frcti * frcti) frcti->t_rdvs = now; } else { time_t diff; - diff = ts_diff_ns(&frcti->t_wnd, &now); + diff = ts_diff_ns(&now, &frcti->t_wnd); if (diff > MAX_RDV) { pthread_mutex_unlock(&frcti->mtx); + pthread_rwlock_unlock(&frcti->lock); return false; } - diff = ts_diff_ns(&frcti->t_rdvs, &now); + diff = ts_diff_ns(&now, &frcti->t_rdvs); if (diff > frcti->rdv) { frcti->t_rdvs = now; __send_rdv(frcti->fd); +#ifdef PROC_FLOW_STATS + frcti->n_rdv++; +#endif + } } @@ -401,7 +558,6 @@ static int __frcti_window_wait(struct frcti * frcti, while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { struct timespec now; - pthread_rwlock_unlock(&frcti->lock); pthread_mutex_lock(&frcti->mtx); @@ -413,12 +569,9 @@ static int __frcti_window_wait(struct frcti * frcti, frcti->open = false; } - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) &frcti->mtx); + pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx); - ret = -pthread_cond_timedwait(&frcti->cond, - &frcti->mtx, - abstime); + ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime); pthread_cleanup_pop(false); @@ -427,13 +580,13 @@ static int __frcti_window_wait(struct frcti * frcti, clock_gettime(PTHREAD_COND_CLOCK, &now); - diff = ts_diff_ns(&frcti->t_wnd, &now); + diff = ts_diff_ns(&now, &frcti->t_wnd); if (diff > MAX_RDV) { pthread_mutex_unlock(&frcti->mtx); return -ECONNRESET; /* write fails! */ } - diff = ts_diff_ns(&frcti->t_rdvs, &now); + diff = ts_diff_ns(&now, &frcti->t_rdvs); if (diff > frcti->rdv) { frcti->t_rdvs = now; __send_rdv(frcti->fd); @@ -515,6 +668,7 @@ static time_t __frcti_dealloc(struct frcti * frcti) wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec, frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec); + wait = MAX(wait, 0); if (frcti->snd_cr.cflags & FRCTFLINGER && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno)) @@ -529,7 +683,7 @@ static time_t __frcti_dealloc(struct frcti * frcti) } static int __frcti_snd(struct frcti * frcti, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct frct_pci * pci; struct timespec now; @@ -539,13 +693,14 @@ static int __frcti_snd(struct frcti * frcti, bool rtx; assert(frcti); + assert(ssm_pk_buff_len(spb) != 0); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; timerwheel_move(); - pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); + pci = (struct frct_pci *) ssm_pk_buff_head_alloc(spb, FRCT_PCILEN); if (pci == NULL) return -ENOMEM; @@ -568,7 +723,7 @@ static int __frcti_snd(struct frcti * frcti, /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); - snd_cr->lwe = snd_cr->seqno - 1; + snd_cr->lwe = snd_cr->seqno; snd_cr->rwe = snd_cr->lwe + START_WINDOW; } @@ -587,9 +742,11 @@ static int __frcti_snd(struct frcti * frcti, frcti->rttseq = snd_cr->seqno; frcti->t_probe = now; frcti->probe = true; +#ifdef PROC_FLOW_STATS + frcti->n_prb++; +#endif } - - if (now.tv_sec - rcv_cr->act.tv_sec <= frcti->a) { + if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); rcv_cr->seqno = rcv_cr->lwe; @@ -602,7 +759,7 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); if (rtx) - timerwheel_rxm(frcti, seqno, sdb); + timerwheel_rxm(frcti, seqno, spb); return 0; } @@ -619,22 +776,24 @@ static void rtt_estimator(struct frcti * frcti, } else { time_t delta = mrtt - srtt; srtt += (delta >> 3); - rttvar += (ABS(delta) - rttvar) >> 2; + delta = (ABS(delta) - rttvar) >> 2; +#ifdef FRCT_LINUX_RTT_ESTIMATOR + if (delta < 0) + delta >>= 3; +#endif + rttvar += delta; } - - frcti->srtt = MAX(1000U, srtt); - frcti->mdev = MAX(100U, rttvar); - frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1)); -} - -static void __frcti_tick(void) -{ - timerwheel_move(); +#ifdef PROC_FLOW_STATS + frcti->n_rtt++; +#endif + frcti->srtt = MAX(1000L, srtt); + frcti->mdev = MAX(100L, rttvar); + frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL)); } /* Always queues the next application packet on the RQ. */ static void __frcti_rcv(struct frcti * frcti, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { ssize_t idx; size_t pos; @@ -654,9 +813,9 @@ static void __frcti_rcv(struct frcti * frcti, clock_gettime(PTHREAD_COND_CLOCK, &now); - pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); + pci = (struct frct_pci *) ssm_pk_buff_head_release(spb, FRCT_PCILEN); - idx = shm_du_buff_get_idx(sdb); + idx = ssm_pk_buff_get_idx(spb); seqno = ntoh32(pci->seqno); pos = seqno & (RQ_SIZE - 1); @@ -666,11 +825,14 @@ static void __frcti_rcv(struct frcti * frcti, if (pci->flags & FRCT_DRF) { /* New run. */ rcv_cr->lwe = seqno; rcv_cr->rwe = seqno + RQ_SIZE; - } else { + rcv_cr->seqno = seqno; + } else if (pci->flags & FRCT_DATA) { goto drop_packet; } } + rcv_cr->act = now; + /* For now, just send an immediate window update. */ if (pci->flags & FRCT_RDVS) { fd = frcti->fd; @@ -679,7 +841,7 @@ static void __frcti_rcv(struct frcti * frcti, __send_frct_pkt(fd, FRCT_FC, 0, rwe); - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_pool_remove(proc.pool, idx); return; } @@ -689,7 +851,11 @@ static void __frcti_rcv(struct frcti * frcti, frcti->snd_cr.lwe = ackno; if (frcti->probe && after(ackno, frcti->rttseq)) { - rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now)); +#ifdef PROC_FLOW_STATS + if (!(pci->flags & FRCT_DATA)) + frcti->n_dak++; +#endif + rtt_estimator(frcti, ts_diff_ns(&now, &frcti->t_probe)); frcti->probe = false; } } @@ -719,20 +885,33 @@ static void __frcti_rcv(struct frcti * frcti, if (before(seqno, rcv_cr->lwe)) { rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */ +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif goto drop_packet; } if (rcv_cr->cflags & FRCTFRTX) { - if (!before(seqno, rcv_cr->rwe)) /* Out of window. */ + if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */ +#ifdef PROC_FLOW_STATS + frcti->n_out++; +#endif goto drop_packet; + } - if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) + if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) { +#ifdef PROC_FLOW_STATS + frcti->n_rqo++; +#endif goto drop_packet; /* Out of rq. */ - - if (frcti->rq[pos] != -1) + } + if (frcti->rq[pos] != -1) { +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif goto drop_packet; /* Duplicate in rq. */ - + } fd = frcti->fd; } else { rcv_cr->lwe = seqno; @@ -740,72 +919,16 @@ static void __frcti_rcv(struct frcti * frcti, frcti->rq[pos] = idx; - rcv_cr->act = now; - pthread_rwlock_unlock(&frcti->lock); if (fd != -1) - timerwheel_ack(fd, frcti); + timerwheel_delayed_ack(fd, frcti); return; drop_packet: pthread_rwlock_unlock(&frcti->lock); - + ssm_pool_remove(proc.pool, idx); send_frct_pkt(frcti); - - shm_rdrbuff_remove(ai.rdrb, idx); return; } - -/* Filter fqueue events for non-data packets */ -int frcti_filter(struct fqueue * fq) -{ - struct shm_du_buff * sdb; - int fd; - ssize_t idx; - struct frcti * frcti; - struct shm_rbuff * rb; - - while (fq->next < fq->fqsize) { - if (fq->fqueue[fq->next + 1] != FLOW_PKT) - return 1; - - pthread_rwlock_rdlock(&ai.lock); - - fd = ai.ports[fq->fqueue[fq->next]].fd; - rb = ai.flows[fd].rx_rb; - frcti = ai.flows[fd].frcti; - - if (frcti == NULL) { - pthread_rwlock_unlock(&ai.lock); - return 1; - } - - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&ai.lock); - return 1; - } - - idx = shm_rbuff_read(rb); - if (idx < 0) { - pthread_rwlock_unlock(&ai.lock); - return 0; - } - - sdb = shm_rdrbuff_get(ai.rdrb, idx); - - __frcti_rcv(frcti, sdb); - - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&ai.lock); - return 1; - } - - pthread_rwlock_unlock(&ai.lock); - - fq->next += 2; - } - - return fq->next < fq->fqsize; -} |
