summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dt.c')
-rw-r--r--src/ipcpd/unicast/dt.c389
1 files changed, 206 insertions, 183 deletions
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 53accba3..252477f4 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2020
+ * Ouroboros - Copyright (C) 2016 - 2026
*
* Data Transfer Component
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -41,14 +41,15 @@
#include <ouroboros/fccntl.h>
#endif
+#include "addr-auth.h"
+#include "common/comp.h"
+#include "common/connmgr.h"
#include "ca.h"
-#include "connmgr.h"
#include "ipcp.h"
#include "dt.h"
#include "pff.h"
#include "routing.h"
#include "psched.h"
-#include "comp.h"
#include "fa.h"
#include <stdlib.h>
@@ -58,15 +59,16 @@
#include <inttypes.h>
#include <assert.h>
-#define QOS_BLOCK_LEN 672
-#define STAT_FILE_LEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX)
+#define QOS_BLOCK_LEN 672
+#define RIB_FILE_STRLEN (169 + RIB_TM_STRLEN + QOS_BLOCK_LEN * QOS_CUBE_MAX)
+#define RIB_NAME_STRLEN 256
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
struct comp_info {
- void (* post_packet)(void * comp, struct shm_du_buff * sdb);
+ void (* post_packet)(void * comp, struct ssm_pk_buff * spb);
void * comp;
char * name;
};
@@ -81,7 +83,7 @@ struct dt_pci {
qoscube_t qc;
uint8_t ttl;
uint8_t ecn;
- uint32_t eid;
+ uint64_t eid;
};
struct {
@@ -111,7 +113,7 @@ static void dt_pci_ser(uint8_t * head,
memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size);
memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN);
memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN);
- memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN);
+ memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN);
memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size);
}
@@ -133,16 +135,18 @@ static void dt_pci_des(uint8_t * head,
memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size);
}
-static void dt_pci_shrink(struct shm_du_buff * sdb)
+static void dt_pci_shrink(struct ssm_pk_buff * spb)
{
- assert(sdb);
+ assert(spb);
- shm_du_buff_head_release(sdb, dt_pci_info.head_size);
+ ssm_pk_buff_head_release(spb, dt_pci_info.head_size);
}
struct {
struct psched * psched;
+ uint64_t addr;
+
struct pff * pff[QOS_CUBE_MAX];
struct routing_i * routing[QOS_CUBE_MAX];
#ifdef IPCP_FLOW_STATS
@@ -175,24 +179,28 @@ struct {
pthread_t listener;
} dt;
-static int dt_stat_read(const char * path,
- char * buf,
- size_t len)
+static int dt_rib_read(const char * path,
+ char * buf,
+ size_t len)
{
#ifdef IPCP_FLOW_STATS
int fd;
int i;
char str[QOS_BLOCK_LEN + 1];
char addrstr[20];
- char tmstr[20];
+ char * entry;
+ char tmstr[RIB_TM_STRLEN];
size_t rxqlen = 0;
size_t txqlen = 0;
struct tm * tm;
/* NOTE: we may need stronger checks. */
- fd = atoi(path);
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+ assert(entry);
+
+ fd = atoi(entry);
- if (len < STAT_FILE_LEN)
+ if (len < RIB_FILE_STRLEN)
return 0;
buf[0] = '\0';
@@ -204,13 +212,13 @@ static int dt_stat_read(const char * path,
return 0;
}
- if (dt.stat[fd].addr == ipcpi.dt_addr)
+ if (dt.stat[fd].addr == dt.addr)
sprintf(addrstr, "%s", dt.comps[fd].name);
else
- sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr);
+ sprintf(addrstr, ADDR_FMT32, ADDR_VAL32(&dt.stat[fd].addr));
- tm = localtime(&dt.stat[fd].stamp);
- strftime(tmstr, sizeof(tmstr), "%F %T", tm);
+ tm = gmtime(&dt.stat[fd].stamp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
if (fd >= PROG_RES_FDS) {
fccntl(fd, FLOWGRXQLEN, &rxqlen);
@@ -218,11 +226,11 @@ static int dt_stat_read(const char * path,
}
sprintf(buf,
- "Flow established at: %20s\n"
+ "Flow established at: %.*s\n"
"Endpoint address: %20s\n"
"Queued packets (rx): %20zu\n"
"Queued packets (tx): %20zu\n\n",
- tmstr, addrstr, rxqlen, txqlen);
+ RIB_TM_STRLEN - 1, tmstr, addrstr, rxqlen, txqlen);
for (i = 0; i < QOS_CUBE_MAX; ++i) {
sprintf(str,
"Qos cube %3d:\n"
@@ -261,7 +269,7 @@ static int dt_stat_read(const char * path,
pthread_mutex_unlock(&dt.stat[fd].lock);
- return STAT_FILE_LEN;
+ return RIB_FILE_STRLEN;
#else
(void) path;
(void) buf;
@@ -270,7 +278,7 @@ static int dt_stat_read(const char * path,
#endif
}
-static int dt_stat_readdir(char *** buf)
+static int dt_rib_readdir(char *** buf)
{
#ifdef IPCP_FLOW_STATS
char entry[RIB_PATH_LEN + 1];
@@ -280,94 +288,88 @@ static int dt_stat_readdir(char *** buf)
pthread_rwlock_rdlock(&dt.lock);
if (dt.n_flows < 1) {
- pthread_rwlock_unlock(&dt.lock);
- return 0;
+ *buf = NULL;
+ goto no_flows;
}
*buf = malloc(sizeof(**buf) * dt.n_flows);
- if (*buf == NULL) {
- pthread_rwlock_unlock(&dt.lock);
- return -ENOMEM;
- }
+ if (*buf == NULL)
+ goto fail_entries;
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
pthread_mutex_lock(&dt.stat[i].lock);
if (dt.stat[i].stamp == 0) {
pthread_mutex_unlock(&dt.stat[i].lock);
- /* Optimization: skip unused res_fds. */
- if (i < PROG_RES_FDS)
- i = PROG_RES_FDS;
- continue;
+ break;
}
+ pthread_mutex_unlock(&dt.stat[i].lock);
+
sprintf(entry, "%zu", i);
(*buf)[idx] = malloc(strlen(entry) + 1);
- if ((*buf)[idx] == NULL) {
- while (idx-- > 0)
- free((*buf)[idx]);
- free(buf);
- pthread_mutex_unlock(&dt.stat[i].lock);
- pthread_rwlock_unlock(&dt.lock);
- return -ENOMEM;
- }
+ if ((*buf)[idx] == NULL)
+ goto fail_entry;
strcpy((*buf)[idx++], entry);
- pthread_mutex_unlock(&dt.stat[i].lock);
}
-
+ no_flows:
pthread_rwlock_unlock(&dt.lock);
- assert((size_t) idx == dt.n_flows);
-
return idx;
+
+ fail_entry:
+ while (idx-- > 0)
+ free((*buf)[idx]);
+ free(*buf);
+ fail_entries:
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
#else
(void) buf;
return 0;
#endif
}
-static int dt_stat_getattr(const char * path,
- struct stat * st)
+static int dt_rib_getattr(const char * path,
+ struct rib_attr * attr)
{
#ifdef IPCP_FLOW_STATS
- int fd;
+ int fd;
+ char * entry;
- fd = atoi(path);
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+ assert(entry);
- st->st_mode = S_IFREG | 0755;
- st->st_nlink = 1;
- st->st_uid = getuid();
- st->st_gid = getgid();
+ fd = atoi(entry);
pthread_mutex_lock(&dt.stat[fd].lock);
if (dt.stat[fd].stamp != -1) {
- st->st_size = STAT_FILE_LEN;
- st->st_mtime = dt.stat[fd].stamp;
+ attr->size = RIB_FILE_STRLEN;
+ attr->mtime = dt.stat[fd].stamp;
} else {
- st->st_size = 0;
- st->st_mtime = 0;
+ attr->size = 0;
+ attr->mtime = 0;
}
pthread_mutex_unlock(&dt.stat[fd].lock);
#else
(void) path;
- (void) st;
+ (void) attr;
#endif
return 0;
}
static struct rib_ops r_ops = {
- .read = dt_stat_read,
- .readdir = dt_stat_readdir,
- .getattr = dt_stat_getattr
+ .read = dt_rib_read,
+ .readdir = dt_rib_readdir,
+ .getattr = dt_rib_getattr
};
#ifdef IPCP_FLOW_STATS
-
static void stat_used(int fd,
uint64_t addr)
{
@@ -397,6 +399,7 @@ static void handle_event(void * self,
const void * o)
{
struct conn * c;
+ int fd;
(void) self;
@@ -404,19 +407,20 @@ static void handle_event(void * self,
switch (event) {
case NOTIFY_DT_CONN_ADD:
+ fd = c->flow_info.fd;
#ifdef IPCP_FLOW_STATS
- stat_used(c->flow_info.fd, c->conn_info.addr);
+ stat_used(fd, c->conn_info.addr);
#endif
- psched_add(dt.psched, c->flow_info.fd);
- log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd);
+ psched_add(dt.psched, fd);
+ log_dbg("Added fd %d to packet scheduler.", fd);
break;
case NOTIFY_DT_CONN_DEL:
+ fd = c->flow_info.fd;
#ifdef IPCP_FLOW_STATS
- stat_used(c->flow_info.fd, INVALID_ADDR);
+ stat_used(fd, INVALID_ADDR);
#endif
- psched_del(dt.psched, c->flow_info.fd);
- log_dbg("Removed fd %d from "
- "packet scheduler.", c->flow_info.fd);
+ psched_del(dt.psched, fd);
+ log_dbg("Removed fd %d from packet scheduler.", fd);
break;
default:
break;
@@ -425,7 +429,7 @@ static void handle_event(void * self,
static void packet_handler(int fd,
qoscube_t qc,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
struct dt_pci dt_pci;
int ret;
@@ -433,7 +437,7 @@ static void packet_handler(int fd,
uint8_t * head;
size_t len;
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ len = ssm_pk_buff_len(spb);
#ifndef IPCP_FLOW_STATS
(void) fd;
@@ -447,13 +451,13 @@ static void packet_handler(int fd,
#endif
memset(&dt_pci, 0, sizeof(dt_pci));
- head = shm_du_buff_head(sdb);
+ head = ssm_pk_buff_head(spb);
dt_pci_des(head, &dt_pci);
- if (dt_pci.dst_addr != ipcpi.dt_addr) {
+ if (dt_pci.dst_addr != dt.addr) {
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
@@ -468,8 +472,9 @@ static void packet_handler(int fd,
/* FIXME: Use qoscube from PCI instead of incoming flow. */
ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
if (ofd < 0) {
- log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);
- ipcp_sdb_release(sdb);
+ log_dbg("No next hop for %" PRIu64 ".",
+ dt_pci.dst_addr);
+ ipcp_spb_release(spb);
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
@@ -481,14 +486,14 @@ static void packet_handler(int fd,
return;
}
- *(head + dt_pci_info.ecn_o) |= ca_calc_ecn(ofd, len);
+ (void) ca_calc_ecn(ofd, head + dt_pci_info.ecn_o, qc, len);
- ret = ipcp_flow_write(ofd, sdb);
+ ret = ipcp_flow_write(ofd, spb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", ofd);
if (ret == -EFLOWDOWN)
notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd);
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[ofd].lock);
@@ -508,48 +513,17 @@ static void packet_handler(int fd,
pthread_mutex_unlock(&dt.stat[ofd].lock);
#endif
} else {
- dt_pci_shrink(sdb);
+ dt_pci_shrink(spb);
if (dt_pci.eid >= PROG_RES_FDS) {
uint8_t ecn = *(head + dt_pci_info.ecn_o);
- fa_ecn_update(dt_pci.eid, ecn, len);
-
- if (ipcp_flow_write(dt_pci.eid, sdb)) {
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
-
- }
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].rcv_pkt[qc];
- dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
- ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
- dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
+ fa_np1_rcv(dt_pci.eid, ecn, spb);
return;
}
if (dt.comps[dt_pci.eid].post_packet == NULL) {
- log_err("No registered component on eid %d.",
+ log_err("No registered component on eid %" PRIu64 ".",
dt_pci.eid);
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
+ ipcp_spb_release(spb);
return;
}
#ifdef IPCP_FLOW_STATS
@@ -567,7 +541,7 @@ static void packet_handler(int fd,
pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
#endif
dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp,
- sdb);
+ spb);
}
}
@@ -591,28 +565,36 @@ static void * dt_conn_handle(void * o)
return 0;
}
-int dt_init(enum pol_routing pr,
- uint8_t addr_size,
- uint8_t eid_size,
- uint8_t max_ttl)
+int dt_init(struct dt_config cfg)
{
int i;
int j;
- char dtstr[256];
- int pp;
+ char dtstr[RIB_NAME_STRLEN + 1];
+ enum pol_pff pp;
struct conn_info info;
memset(&info, 0, sizeof(info));
+ dt.addr = addr_auth_address();
+ if (dt.addr == INVALID_ADDR) {
+ log_err("Failed to get address");
+ return -1;
+ }
+
strcpy(info.comp_name, DT_COMP);
strcpy(info.protocol, DT_PROTO);
info.pref_version = 1;
info.pref_syntax = PROTO_FIXED;
- info.addr = ipcpi.dt_addr;
+ info.addr = dt.addr;
+
+ if (cfg.eid_size != 8) { /* only support 64 bits from now */
+ log_warn("Invalid EID size. Only 64 bit is supported.");
+ cfg.eid_size = 8;
+ }
- dt_pci_info.addr_size = addr_size;
- dt_pci_info.eid_size = eid_size;
- dt_pci_info.max_ttl = max_ttl;
+ dt_pci_info.addr_size = cfg.addr_size;
+ dt_pci_info.eid_size = cfg.eid_size;
+ dt_pci_info.max_ttl = cfg.max_ttl;
dt_pci_info.qc_o = dt_pci_info.addr_size;
dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN;
@@ -620,18 +602,12 @@ int dt_init(enum pol_routing pr,
dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN;
dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size;
- if (notifier_reg(handle_event, NULL)) {
- log_err("Failed to register with notifier.");
- goto fail_notifier_reg;
- }
-
if (connmgr_comp_init(COMPID_DT, &info)) {
log_err("Failed to register with connmgr.");
goto fail_connmgr_comp_init;
}
- pp = routing_init(pr);
- if (pp < 0) {
+ if (routing_init(&cfg.routing, &pp) < 0) {
log_err("Failed to init routing.");
goto fail_routing;
}
@@ -668,6 +644,7 @@ int dt_init(enum pol_routing pr,
for (i = 0; i < PROG_MAX_FLOWS; ++i)
if (pthread_mutex_init(&dt.stat[i].lock, NULL)) {
+ log_err("Failed to init mutex for flow %d.", i);
for (j = 0; j < i; ++j)
pthread_mutex_destroy(&dt.stat[j].lock);
goto fail_stat_lock;
@@ -675,9 +652,11 @@ int dt_init(enum pol_routing pr,
dt.n_flows = 0;
#endif
- sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
- if (rib_reg(dtstr, &r_ops))
+ sprintf(dtstr, "%s." ADDR_FMT32, DT, ADDR_VAL32(&dt.addr));
+ if (rib_reg(dtstr, &r_ops)) {
+ log_err("Failed to register RIB.");
goto fail_rib_reg;
+ }
return 0;
@@ -701,16 +680,16 @@ int dt_init(enum pol_routing pr,
fail_routing:
connmgr_comp_fini(COMPID_DT);
fail_connmgr_comp_init:
- notifier_unreg(&handle_event);
- fail_notifier_reg:
return -1;
}
void dt_fini(void)
{
+ char dtstr[RIB_NAME_STRLEN + 1];
int i;
- rib_unreg(DT);
+ sprintf(dtstr, "%s.%" PRIu64, DT, dt.addr);
+ rib_unreg(dtstr);
#ifdef IPCP_FLOW_STATS
for (i = 0; i < PROG_MAX_FLOWS; ++i)
pthread_mutex_destroy(&dt.stat[i].lock);
@@ -728,70 +707,109 @@ void dt_fini(void)
routing_fini();
connmgr_comp_fini(COMPID_DT);
-
- notifier_unreg(&handle_event);
}
int dt_start(void)
{
- dt.psched = psched_create(packet_handler);
+ dt.psched = psched_create(packet_handler, ipcp_flow_read);
if (dt.psched == NULL) {
log_err("Failed to create N-1 packet scheduler.");
- return -1;
+ goto fail_psched;
+ }
+
+ if (notifier_reg(handle_event, NULL)) {
+ log_err("Failed to register with notifier.");
+ goto fail_notifier_reg;
}
if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) {
log_err("Failed to create listener thread.");
- psched_destroy(dt.psched);
- return -1;
+ goto fail_listener;
+ }
+
+ if (routing_start() < 0) {
+ log_err("Failed to start routing.");
+ goto fail_routing;
}
return 0;
+
+ fail_routing:
+ pthread_cancel(dt.listener);
+ pthread_join(dt.listener, NULL);
+ fail_listener:
+ notifier_unreg(&handle_event);
+ fail_notifier_reg:
+ psched_destroy(dt.psched);
+ fail_psched:
+ return -1;
}
void dt_stop(void)
{
+ routing_stop();
+
pthread_cancel(dt.listener);
pthread_join(dt.listener, NULL);
+
+ notifier_unreg(&handle_event);
+
psched_destroy(dt.psched);
}
int dt_reg_comp(void * comp,
- void (* func)(void * func, struct shm_du_buff *),
+ void (* func)(void * func, struct ssm_pk_buff *),
char * name)
{
- int res_fd;
+ int eid;
- assert(func);
+ assert(func != NULL);
pthread_rwlock_wrlock(&dt.lock);
- res_fd = bmp_allocate(dt.res_fds);
- if (!bmp_is_id_valid(dt.res_fds, res_fd)) {
- log_warn("Reserved fds depleted.");
+ eid = bmp_allocate(dt.res_fds);
+ if (!bmp_is_id_valid(dt.res_fds, eid)) {
+ log_err("Cannot allocate EID.");
pthread_rwlock_unlock(&dt.lock);
return -EBADF;
}
- assert(dt.comps[res_fd].post_packet == NULL);
- assert(dt.comps[res_fd].comp == NULL);
- assert(dt.comps[res_fd].name == NULL);
+ assert(dt.comps[eid].post_packet == NULL);
+ assert(dt.comps[eid].comp == NULL);
+ assert(dt.comps[eid].name == NULL);
- dt.comps[res_fd].post_packet = func;
- dt.comps[res_fd].comp = comp;
- dt.comps[res_fd].name = name;
+ dt.comps[eid].post_packet = func;
+ dt.comps[eid].comp = comp;
+ dt.comps[eid].name = name;
pthread_rwlock_unlock(&dt.lock);
#ifdef IPCP_FLOW_STATS
- stat_used(res_fd, ipcpi.dt_addr);
+ stat_used(eid, dt.addr);
#endif
- return res_fd;
+ return eid;
+}
+
+void dt_unreg_comp(int eid)
+{
+ assert(eid >= 0 && eid < PROG_RES_FDS);
+
+ pthread_rwlock_wrlock(&dt.lock);
+
+ assert(dt.comps[eid].post_packet != NULL);
+
+ dt.comps[eid].post_packet = NULL;
+ dt.comps[eid].comp = NULL;
+ dt.comps[eid].name = NULL;
+
+ pthread_rwlock_unlock(&dt.lock);
+
+ return;
}
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
- int np1_fd,
- struct shm_du_buff * sdb)
+ uint64_t eid,
+ struct ssm_pk_buff * spb)
{
struct dt_pci dt_pci;
int fd;
@@ -799,51 +817,56 @@ int dt_write_packet(uint64_t dst_addr,
uint8_t * head;
size_t len;
- assert(sdb);
- assert(dst_addr != ipcpi.dt_addr);
-
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ assert(spb);
+ assert(dst_addr != dt.addr);
#ifdef IPCP_FLOW_STATS
+ len = ssm_pk_buff_len(spb);
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
+ if (eid < PROG_RES_FDS) {
+ pthread_mutex_lock(&dt.stat[eid].lock);
- ++dt.stat[np1_fd].lcl_r_pkt[qc];
- dt.stat[np1_fd].lcl_r_bytes[qc] += len;
+ ++dt.stat[eid].lcl_r_pkt[qc];
+ dt.stat[eid].lcl_r_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_unlock(&dt.stat[eid].lock);
+ }
#endif
-
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
- log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
+ log_dbg("Could not get nhop for " ADDR_FMT32 ".",
+ ADDR_VAL32(&dst_addr));
#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
+ if (eid < PROG_RES_FDS) {
+ pthread_mutex_lock(&dt.stat[eid].lock);
- ++dt.stat[np1_fd].f_nhp_pkt[qc];
- dt.stat[np1_fd].f_nhp_bytes[qc] += len;
+ ++dt.stat[eid].lcl_r_pkt[qc];
+ dt.stat[eid].lcl_r_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_unlock(&dt.stat[eid].lock);
+ }
#endif
return -EPERM;
}
- head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size);
+ head = ssm_pk_buff_head_alloc(spb, dt_pci_info.head_size);
if (head == NULL) {
log_dbg("Failed to allocate DT header.");
goto fail_write;
}
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ len = ssm_pk_buff_len(spb);
dt_pci.dst_addr = dst_addr;
dt_pci.qc = qc;
- dt_pci.eid = np1_fd;
- dt_pci.ecn = ca_calc_ecn(fd, len);
+ dt_pci.eid = eid;
+ dt_pci.ecn = 0;
+
+ (void) ca_calc_ecn(fd, &dt_pci.ecn, qc, len);
dt_pci_ser(head, &dt_pci);
- ret = ipcp_flow_write(fd, sdb);
+ ret = ipcp_flow_write(fd, spb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", fd);
if (ret == -EFLOWDOWN)
@@ -868,7 +891,7 @@ int dt_write_packet(uint64_t dst_addr,
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- if (np1_fd < PROG_RES_FDS) {
+ if (eid < PROG_RES_FDS) {
++dt.stat[fd].lcl_w_pkt[qc];
dt.stat[fd].lcl_w_bytes[qc] += len;
}