diff options
Diffstat (limited to 'src/ipcpd/unicast/dt.c')
| -rw-r--r-- | src/ipcpd/unicast/dt.c | 389 |
1 files changed, 206 insertions, 183 deletions
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index 53accba3..252477f4 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2020 + * Ouroboros - Copyright (C) 2016 - 2026 * * Data Transfer Component * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as @@ -41,14 +41,15 @@ #include <ouroboros/fccntl.h> #endif +#include "addr-auth.h" +#include "common/comp.h" +#include "common/connmgr.h" #include "ca.h" -#include "connmgr.h" #include "ipcp.h" #include "dt.h" #include "pff.h" #include "routing.h" #include "psched.h" -#include "comp.h" #include "fa.h" #include <stdlib.h> @@ -58,15 +59,16 @@ #include <inttypes.h> #include <assert.h> -#define QOS_BLOCK_LEN 672 -#define STAT_FILE_LEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX) +#define QOS_BLOCK_LEN 672 +#define RIB_FILE_STRLEN (169 + RIB_TM_STRLEN + QOS_BLOCK_LEN * QOS_CUBE_MAX) +#define RIB_NAME_STRLEN 256 #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif struct comp_info { - void (* post_packet)(void * comp, struct shm_du_buff * sdb); + void (* post_packet)(void * comp, struct ssm_pk_buff * spb); void * comp; char * name; }; @@ -81,7 +83,7 @@ struct dt_pci { qoscube_t qc; uint8_t ttl; uint8_t ecn; - uint32_t eid; + uint64_t eid; }; struct { @@ -111,7 +113,7 @@ static void dt_pci_ser(uint8_t * head, memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size); memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN); memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN); - memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN); + memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN); memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size); } @@ -133,16 +135,18 @@ static void dt_pci_des(uint8_t * head, memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size); } -static void dt_pci_shrink(struct shm_du_buff * sdb) +static void dt_pci_shrink(struct ssm_pk_buff * spb) { - assert(sdb); + assert(spb); - shm_du_buff_head_release(sdb, dt_pci_info.head_size); + ssm_pk_buff_head_release(spb, dt_pci_info.head_size); } struct { struct psched * psched; + uint64_t addr; + struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; #ifdef IPCP_FLOW_STATS @@ -175,24 +179,28 @@ struct { pthread_t listener; } dt; -static int dt_stat_read(const char * path, - char * buf, - size_t len) +static int dt_rib_read(const char * path, + char * buf, + size_t len) { #ifdef IPCP_FLOW_STATS int fd; int i; char str[QOS_BLOCK_LEN + 1]; char addrstr[20]; - char tmstr[20]; + char * entry; + char tmstr[RIB_TM_STRLEN]; size_t rxqlen = 0; size_t txqlen = 0; struct tm * tm; /* NOTE: we may need stronger checks. */ - fd = atoi(path); + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); - if (len < STAT_FILE_LEN) + if (len < RIB_FILE_STRLEN) return 0; buf[0] = '\0'; @@ -204,13 +212,13 @@ static int dt_stat_read(const char * path, return 0; } - if (dt.stat[fd].addr == ipcpi.dt_addr) + if (dt.stat[fd].addr == dt.addr) sprintf(addrstr, "%s", dt.comps[fd].name); else - sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr); + sprintf(addrstr, ADDR_FMT32, ADDR_VAL32(&dt.stat[fd].addr)); - tm = localtime(&dt.stat[fd].stamp); - strftime(tmstr, sizeof(tmstr), "%F %T", tm); + tm = gmtime(&dt.stat[fd].stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); if (fd >= PROG_RES_FDS) { fccntl(fd, FLOWGRXQLEN, &rxqlen); @@ -218,11 +226,11 @@ static int dt_stat_read(const char * path, } sprintf(buf, - "Flow established at: %20s\n" + "Flow established at: %.*s\n" "Endpoint address: %20s\n" "Queued packets (rx): %20zu\n" "Queued packets (tx): %20zu\n\n", - tmstr, addrstr, rxqlen, txqlen); + RIB_TM_STRLEN - 1, tmstr, addrstr, rxqlen, txqlen); for (i = 0; i < QOS_CUBE_MAX; ++i) { sprintf(str, "Qos cube %3d:\n" @@ -261,7 +269,7 @@ static int dt_stat_read(const char * path, pthread_mutex_unlock(&dt.stat[fd].lock); - return STAT_FILE_LEN; + return RIB_FILE_STRLEN; #else (void) path; (void) buf; @@ -270,7 +278,7 @@ static int dt_stat_read(const char * path, #endif } -static int dt_stat_readdir(char *** buf) +static int dt_rib_readdir(char *** buf) { #ifdef IPCP_FLOW_STATS char entry[RIB_PATH_LEN + 1]; @@ -280,94 +288,88 @@ static int dt_stat_readdir(char *** buf) pthread_rwlock_rdlock(&dt.lock); if (dt.n_flows < 1) { - pthread_rwlock_unlock(&dt.lock); - return 0; + *buf = NULL; + goto no_flows; } *buf = malloc(sizeof(**buf) * dt.n_flows); - if (*buf == NULL) { - pthread_rwlock_unlock(&dt.lock); - return -ENOMEM; - } + if (*buf == NULL) + goto fail_entries; for (i = 0; i < PROG_MAX_FLOWS; ++i) { pthread_mutex_lock(&dt.stat[i].lock); if (dt.stat[i].stamp == 0) { pthread_mutex_unlock(&dt.stat[i].lock); - /* Optimization: skip unused res_fds. */ - if (i < PROG_RES_FDS) - i = PROG_RES_FDS; - continue; + break; } + pthread_mutex_unlock(&dt.stat[i].lock); + sprintf(entry, "%zu", i); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - while (idx-- > 0) - free((*buf)[idx]); - free(buf); - pthread_mutex_unlock(&dt.stat[i].lock); - pthread_rwlock_unlock(&dt.lock); - return -ENOMEM; - } + if ((*buf)[idx] == NULL) + goto fail_entry; strcpy((*buf)[idx++], entry); - pthread_mutex_unlock(&dt.stat[i].lock); } - + no_flows: pthread_rwlock_unlock(&dt.lock); - assert((size_t) idx == dt.n_flows); - return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + fail_entries: + pthread_rwlock_unlock(&dt.lock); + return -ENOMEM; #else (void) buf; return 0; #endif } -static int dt_stat_getattr(const char * path, - struct stat * st) +static int dt_rib_getattr(const char * path, + struct rib_attr * attr) { #ifdef IPCP_FLOW_STATS - int fd; + int fd; + char * entry; - fd = atoi(path); + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); - st->st_mode = S_IFREG | 0755; - st->st_nlink = 1; - st->st_uid = getuid(); - st->st_gid = getgid(); + fd = atoi(entry); pthread_mutex_lock(&dt.stat[fd].lock); if (dt.stat[fd].stamp != -1) { - st->st_size = STAT_FILE_LEN; - st->st_mtime = dt.stat[fd].stamp; + attr->size = RIB_FILE_STRLEN; + attr->mtime = dt.stat[fd].stamp; } else { - st->st_size = 0; - st->st_mtime = 0; + attr->size = 0; + attr->mtime = 0; } pthread_mutex_unlock(&dt.stat[fd].lock); #else (void) path; - (void) st; + (void) attr; #endif return 0; } static struct rib_ops r_ops = { - .read = dt_stat_read, - .readdir = dt_stat_readdir, - .getattr = dt_stat_getattr + .read = dt_rib_read, + .readdir = dt_rib_readdir, + .getattr = dt_rib_getattr }; #ifdef IPCP_FLOW_STATS - static void stat_used(int fd, uint64_t addr) { @@ -397,6 +399,7 @@ static void handle_event(void * self, const void * o) { struct conn * c; + int fd; (void) self; @@ -404,19 +407,20 @@ static void handle_event(void * self, switch (event) { case NOTIFY_DT_CONN_ADD: + fd = c->flow_info.fd; #ifdef IPCP_FLOW_STATS - stat_used(c->flow_info.fd, c->conn_info.addr); + stat_used(fd, c->conn_info.addr); #endif - psched_add(dt.psched, c->flow_info.fd); - log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); + psched_add(dt.psched, fd); + log_dbg("Added fd %d to packet scheduler.", fd); break; case NOTIFY_DT_CONN_DEL: + fd = c->flow_info.fd; #ifdef IPCP_FLOW_STATS - stat_used(c->flow_info.fd, INVALID_ADDR); + stat_used(fd, INVALID_ADDR); #endif - psched_del(dt.psched, c->flow_info.fd); - log_dbg("Removed fd %d from " - "packet scheduler.", c->flow_info.fd); + psched_del(dt.psched, fd); + log_dbg("Removed fd %d from packet scheduler.", fd); break; default: break; @@ -425,7 +429,7 @@ static void handle_event(void * self, static void packet_handler(int fd, qoscube_t qc, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct dt_pci dt_pci; int ret; @@ -433,7 +437,7 @@ static void packet_handler(int fd, uint8_t * head; size_t len; - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = ssm_pk_buff_len(spb); #ifndef IPCP_FLOW_STATS (void) fd; @@ -447,13 +451,13 @@ static void packet_handler(int fd, #endif memset(&dt_pci, 0, sizeof(dt_pci)); - head = shm_du_buff_head(sdb); + head = ssm_pk_buff_head(spb); dt_pci_des(head, &dt_pci); - if (dt_pci.dst_addr != ipcpi.dt_addr) { + if (dt_pci.dst_addr != dt.addr) { if (dt_pci.ttl == 0) { log_dbg("TTL was zero."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); @@ -468,8 +472,9 @@ static void packet_handler(int fd, /* FIXME: Use qoscube from PCI instead of incoming flow. */ ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); if (ofd < 0) { - log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr); - ipcp_sdb_release(sdb); + log_dbg("No next hop for %" PRIu64 ".", + dt_pci.dst_addr); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); @@ -481,14 +486,14 @@ static void packet_handler(int fd, return; } - *(head + dt_pci_info.ecn_o) |= ca_calc_ecn(ofd, len); + (void) ca_calc_ecn(ofd, head + dt_pci_info.ecn_o, qc, len); - ret = ipcp_flow_write(ofd, sdb); + ret = ipcp_flow_write(ofd, spb); if (ret < 0) { log_dbg("Failed to write packet to fd %d.", ofd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[ofd].lock); @@ -508,48 +513,17 @@ static void packet_handler(int fd, pthread_mutex_unlock(&dt.stat[ofd].lock); #endif } else { - dt_pci_shrink(sdb); + dt_pci_shrink(spb); if (dt_pci.eid >= PROG_RES_FDS) { uint8_t ecn = *(head + dt_pci_info.ecn_o); - fa_ecn_update(dt_pci.eid, ecn, len); - - if (ipcp_flow_write(dt_pci.eid, sdb)) { - ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; - dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif - - } -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].rcv_pkt[qc]; - dt.stat[dt_pci.eid].rcv_bytes[qc] += len; - ++dt.stat[dt_pci.eid].lcl_r_pkt[qc]; - dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif + fa_np1_rcv(dt_pci.eid, ecn, spb); return; } if (dt.comps[dt_pci.eid].post_packet == NULL) { - log_err("No registered component on eid %d.", + log_err("No registered component on eid %" PRIu64 ".", dt_pci.eid); - ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; - dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif + ipcp_spb_release(spb); return; } #ifdef IPCP_FLOW_STATS @@ -567,7 +541,7 @@ static void packet_handler(int fd, pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); #endif dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, - sdb); + spb); } } @@ -591,28 +565,36 @@ static void * dt_conn_handle(void * o) return 0; } -int dt_init(enum pol_routing pr, - uint8_t addr_size, - uint8_t eid_size, - uint8_t max_ttl) +int dt_init(struct dt_config cfg) { int i; int j; - char dtstr[256]; - int pp; + char dtstr[RIB_NAME_STRLEN + 1]; + enum pol_pff pp; struct conn_info info; memset(&info, 0, sizeof(info)); + dt.addr = addr_auth_address(); + if (dt.addr == INVALID_ADDR) { + log_err("Failed to get address"); + return -1; + } + strcpy(info.comp_name, DT_COMP); strcpy(info.protocol, DT_PROTO); info.pref_version = 1; info.pref_syntax = PROTO_FIXED; - info.addr = ipcpi.dt_addr; + info.addr = dt.addr; + + if (cfg.eid_size != 8) { /* only support 64 bits from now */ + log_warn("Invalid EID size. Only 64 bit is supported."); + cfg.eid_size = 8; + } - dt_pci_info.addr_size = addr_size; - dt_pci_info.eid_size = eid_size; - dt_pci_info.max_ttl = max_ttl; + dt_pci_info.addr_size = cfg.addr_size; + dt_pci_info.eid_size = cfg.eid_size; + dt_pci_info.max_ttl = cfg.max_ttl; dt_pci_info.qc_o = dt_pci_info.addr_size; dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN; @@ -620,18 +602,12 @@ int dt_init(enum pol_routing pr, dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN; dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size; - if (notifier_reg(handle_event, NULL)) { - log_err("Failed to register with notifier."); - goto fail_notifier_reg; - } - if (connmgr_comp_init(COMPID_DT, &info)) { log_err("Failed to register with connmgr."); goto fail_connmgr_comp_init; } - pp = routing_init(pr); - if (pp < 0) { + if (routing_init(&cfg.routing, &pp) < 0) { log_err("Failed to init routing."); goto fail_routing; } @@ -668,6 +644,7 @@ int dt_init(enum pol_routing pr, for (i = 0; i < PROG_MAX_FLOWS; ++i) if (pthread_mutex_init(&dt.stat[i].lock, NULL)) { + log_err("Failed to init mutex for flow %d.", i); for (j = 0; j < i; ++j) pthread_mutex_destroy(&dt.stat[j].lock); goto fail_stat_lock; @@ -675,9 +652,11 @@ int dt_init(enum pol_routing pr, dt.n_flows = 0; #endif - sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr); - if (rib_reg(dtstr, &r_ops)) + sprintf(dtstr, "%s." ADDR_FMT32, DT, ADDR_VAL32(&dt.addr)); + if (rib_reg(dtstr, &r_ops)) { + log_err("Failed to register RIB."); goto fail_rib_reg; + } return 0; @@ -701,16 +680,16 @@ int dt_init(enum pol_routing pr, fail_routing: connmgr_comp_fini(COMPID_DT); fail_connmgr_comp_init: - notifier_unreg(&handle_event); - fail_notifier_reg: return -1; } void dt_fini(void) { + char dtstr[RIB_NAME_STRLEN + 1]; int i; - rib_unreg(DT); + sprintf(dtstr, "%s.%" PRIu64, DT, dt.addr); + rib_unreg(dtstr); #ifdef IPCP_FLOW_STATS for (i = 0; i < PROG_MAX_FLOWS; ++i) pthread_mutex_destroy(&dt.stat[i].lock); @@ -728,70 +707,109 @@ void dt_fini(void) routing_fini(); connmgr_comp_fini(COMPID_DT); - - notifier_unreg(&handle_event); } int dt_start(void) { - dt.psched = psched_create(packet_handler); + dt.psched = psched_create(packet_handler, ipcp_flow_read); if (dt.psched == NULL) { log_err("Failed to create N-1 packet scheduler."); - return -1; + goto fail_psched; + } + + if (notifier_reg(handle_event, NULL)) { + log_err("Failed to register with notifier."); + goto fail_notifier_reg; } if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - psched_destroy(dt.psched); - return -1; + goto fail_listener; + } + + if (routing_start() < 0) { + log_err("Failed to start routing."); + goto fail_routing; } return 0; + + fail_routing: + pthread_cancel(dt.listener); + pthread_join(dt.listener, NULL); + fail_listener: + notifier_unreg(&handle_event); + fail_notifier_reg: + psched_destroy(dt.psched); + fail_psched: + return -1; } void dt_stop(void) { + routing_stop(); + pthread_cancel(dt.listener); pthread_join(dt.listener, NULL); + + notifier_unreg(&handle_event); + psched_destroy(dt.psched); } int dt_reg_comp(void * comp, - void (* func)(void * func, struct shm_du_buff *), + void (* func)(void * func, struct ssm_pk_buff *), char * name) { - int res_fd; + int eid; - assert(func); + assert(func != NULL); pthread_rwlock_wrlock(&dt.lock); - res_fd = bmp_allocate(dt.res_fds); - if (!bmp_is_id_valid(dt.res_fds, res_fd)) { - log_warn("Reserved fds depleted."); + eid = bmp_allocate(dt.res_fds); + if (!bmp_is_id_valid(dt.res_fds, eid)) { + log_err("Cannot allocate EID."); pthread_rwlock_unlock(&dt.lock); return -EBADF; } - assert(dt.comps[res_fd].post_packet == NULL); - assert(dt.comps[res_fd].comp == NULL); - assert(dt.comps[res_fd].name == NULL); + assert(dt.comps[eid].post_packet == NULL); + assert(dt.comps[eid].comp == NULL); + assert(dt.comps[eid].name == NULL); - dt.comps[res_fd].post_packet = func; - dt.comps[res_fd].comp = comp; - dt.comps[res_fd].name = name; + dt.comps[eid].post_packet = func; + dt.comps[eid].comp = comp; + dt.comps[eid].name = name; pthread_rwlock_unlock(&dt.lock); #ifdef IPCP_FLOW_STATS - stat_used(res_fd, ipcpi.dt_addr); + stat_used(eid, dt.addr); #endif - return res_fd; + return eid; +} + +void dt_unreg_comp(int eid) +{ + assert(eid >= 0 && eid < PROG_RES_FDS); + + pthread_rwlock_wrlock(&dt.lock); + + assert(dt.comps[eid].post_packet != NULL); + + dt.comps[eid].post_packet = NULL; + dt.comps[eid].comp = NULL; + dt.comps[eid].name = NULL; + + pthread_rwlock_unlock(&dt.lock); + + return; } int dt_write_packet(uint64_t dst_addr, qoscube_t qc, - int np1_fd, - struct shm_du_buff * sdb) + uint64_t eid, + struct ssm_pk_buff * spb) { struct dt_pci dt_pci; int fd; @@ -799,51 +817,56 @@ int dt_write_packet(uint64_t dst_addr, uint8_t * head; size_t len; - assert(sdb); - assert(dst_addr != ipcpi.dt_addr); - - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + assert(spb); + assert(dst_addr != dt.addr); #ifdef IPCP_FLOW_STATS + len = ssm_pk_buff_len(spb); - pthread_mutex_lock(&dt.stat[np1_fd].lock); + if (eid < PROG_RES_FDS) { + pthread_mutex_lock(&dt.stat[eid].lock); - ++dt.stat[np1_fd].lcl_r_pkt[qc]; - dt.stat[np1_fd].lcl_r_bytes[qc] += len; + ++dt.stat[eid].lcl_r_pkt[qc]; + dt.stat[eid].lcl_r_bytes[qc] += len; - pthread_mutex_unlock(&dt.stat[np1_fd].lock); + pthread_mutex_unlock(&dt.stat[eid].lock); + } #endif - fd = pff_nhop(dt.pff[qc], dst_addr); if (fd < 0) { - log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); + log_dbg("Could not get nhop for " ADDR_FMT32 ".", + ADDR_VAL32(&dst_addr)); #ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[np1_fd].lock); + if (eid < PROG_RES_FDS) { + pthread_mutex_lock(&dt.stat[eid].lock); - ++dt.stat[np1_fd].f_nhp_pkt[qc]; - dt.stat[np1_fd].f_nhp_bytes[qc] += len; + ++dt.stat[eid].lcl_r_pkt[qc]; + dt.stat[eid].lcl_r_bytes[qc] += len; - pthread_mutex_unlock(&dt.stat[np1_fd].lock); + pthread_mutex_unlock(&dt.stat[eid].lock); + } #endif return -EPERM; } - head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size); + head = ssm_pk_buff_head_alloc(spb, dt_pci_info.head_size); if (head == NULL) { log_dbg("Failed to allocate DT header."); goto fail_write; } - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = ssm_pk_buff_len(spb); dt_pci.dst_addr = dst_addr; dt_pci.qc = qc; - dt_pci.eid = np1_fd; - dt_pci.ecn = ca_calc_ecn(fd, len); + dt_pci.eid = eid; + dt_pci.ecn = 0; + + (void) ca_calc_ecn(fd, &dt_pci.ecn, qc, len); dt_pci_ser(head, &dt_pci); - ret = ipcp_flow_write(fd, sdb); + ret = ipcp_flow_write(fd, spb); if (ret < 0) { log_dbg("Failed to write packet to fd %d.", fd); if (ret == -EFLOWDOWN) @@ -868,7 +891,7 @@ int dt_write_packet(uint64_t dst_addr, #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); - if (np1_fd < PROG_RES_FDS) { + if (eid < PROG_RES_FDS) { ++dt.stat[fd].lcl_w_pkt[qc]; dt.stat[fd].lcl_w_bytes[qc] += len; } |
