summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dt.c')
-rw-r--r--src/ipcpd/unicast/dt.c77
1 files changed, 36 insertions, 41 deletions
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 7ce09bde..60cab486 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -139,7 +139,7 @@ static void dt_pci_shrink(struct ssm_pk_buff * spb)
{
assert(spb);
- ssm_pk_buff_head_release(spb, dt_pci_info.head_size);
+ ssm_pk_buff_pop(spb, dt_pci_info.head_size);
}
struct {
@@ -168,22 +168,22 @@ struct {
size_t f_nhp_pkt[QOS_CUBE_MAX];
size_t f_nhp_bytes[QOS_CUBE_MAX];
pthread_mutex_t lock;
- } stat[PROG_MAX_FLOWS];
+ } stat[PROC_MAX_FLOWS];
size_t n_flows;
#endif
struct bmp * res_fds;
- struct comp_info comps[PROG_RES_FDS];
+ struct comp_info comps[PROC_RES_FDS];
pthread_rwlock_t lock;
pthread_t listener;
} dt;
+#ifdef IPCP_FLOW_STATS
static int dt_rib_read(const char * path,
char * buf,
size_t len)
{
-#ifdef IPCP_FLOW_STATS
int fd;
int i;
char str[QOS_BLOCK_LEN + 1];
@@ -220,7 +220,7 @@ static int dt_rib_read(const char * path,
tm = gmtime(&dt.stat[fd].stamp);
strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
- if (fd >= PROG_RES_FDS) {
+ if (fd >= PROC_RES_FDS) {
fccntl(fd, FLOWGRXQLEN, &rxqlen);
fccntl(fd, FLOWGTXQLEN, &txqlen);
}
@@ -270,17 +270,10 @@ static int dt_rib_read(const char * path,
pthread_mutex_unlock(&dt.stat[fd].lock);
return RIB_FILE_STRLEN;
-#else
- (void) path;
- (void) buf;
- (void) len;
- return 0;
-#endif
}
static int dt_rib_readdir(char *** buf)
{
-#ifdef IPCP_FLOW_STATS
char entry[RIB_PATH_LEN + 1];
size_t i;
int idx = 0;
@@ -296,7 +289,7 @@ static int dt_rib_readdir(char *** buf)
if (*buf == NULL)
goto fail_entries;
- for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+ for (i = 0; i < PROC_MAX_FLOWS; ++i) {
pthread_mutex_lock(&dt.stat[i].lock);
if (dt.stat[i].stamp == 0) {
@@ -327,16 +320,11 @@ static int dt_rib_readdir(char *** buf)
fail_entries:
pthread_rwlock_unlock(&dt.lock);
return -ENOMEM;
-#else
- (void) buf;
- return 0;
-#endif
}
static int dt_rib_getattr(const char * path,
struct rib_attr * attr)
{
-#ifdef IPCP_FLOW_STATS
int fd;
char * entry;
@@ -356,10 +344,7 @@ static int dt_rib_getattr(const char * path,
}
pthread_mutex_unlock(&dt.stat[fd].lock);
-#else
- (void) path;
- (void) attr;
-#endif
+
return 0;
}
@@ -368,8 +353,15 @@ static struct rib_ops r_ops = {
.readdir = dt_rib_readdir,
.getattr = dt_rib_getattr
};
+#endif /* IPCP_FLOW_STATS */
#ifdef IPCP_FLOW_STATS
+/*
+ * Hold dt.lock + per-stat together: dt_rib_readdir samples n_flows
+ * under rdlock and walks stamps under per-stat; updates must be
+ * atomic w.r.t. that snapshot or the malloc(n_flows) buffer can
+ * overflow.
+ */
static void stat_used(int fd,
uint64_t addr)
{
@@ -377,6 +369,7 @@ static void stat_used(int fd,
clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ pthread_rwlock_wrlock(&dt.lock);
pthread_mutex_lock(&dt.stat[fd].lock);
memset(&dt.stat[fd], 0, sizeof(dt.stat[fd]));
@@ -384,12 +377,9 @@ static void stat_used(int fd,
dt.stat[fd].stamp = (addr != INVALID_ADDR) ? now.tv_sec : 0;
dt.stat[fd].addr = addr;
- pthread_mutex_unlock(&dt.stat[fd].lock);
-
- pthread_rwlock_wrlock(&dt.lock);
-
(addr != INVALID_ADDR) ? ++dt.n_flows : --dt.n_flows;
+ pthread_mutex_unlock(&dt.stat[fd].lock);
pthread_rwlock_unlock(&dt.lock);
}
#endif
@@ -514,7 +504,7 @@ static void packet_handler(int fd,
#endif
} else {
dt_pci_shrink(spb);
- if (dt_pci.eid >= PROG_RES_FDS) {
+ if (dt_pci.eid >= PROC_RES_FDS) {
uint8_t ecn = *(head + dt_pci_info.ecn_o);
fa_np1_rcv(dt_pci.eid, ecn, spb);
return;
@@ -569,7 +559,9 @@ int dt_init(struct dt_config cfg)
{
int i;
int j;
+#ifdef IPCP_FLOW_STATS
char dtstr[RIB_NAME_STRLEN + 1];
+#endif
enum pol_pff pp;
struct conn_info info;
@@ -636,13 +628,13 @@ int dt_init(struct dt_config cfg)
goto fail_rwlock_init;
}
- dt.res_fds = bmp_create(PROG_RES_FDS, 0);
+ dt.res_fds = bmp_create(PROC_RES_FDS, 0);
if (dt.res_fds == NULL)
goto fail_res_fds;
#ifdef IPCP_FLOW_STATS
memset(dt.stat, 0, sizeof(dt.stat));
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ for (i = 0; i < PROC_MAX_FLOWS; ++i)
if (pthread_mutex_init(&dt.stat[i].lock, NULL)) {
log_err("Failed to init mutex for flow %d.", i);
for (j = 0; j < i; ++j)
@@ -651,18 +643,19 @@ int dt_init(struct dt_config cfg)
}
dt.n_flows = 0;
-#endif
+
sprintf(dtstr, "%s." ADDR_FMT32, DT, ADDR_VAL32(&dt.addr));
if (rib_reg(dtstr, &r_ops)) {
log_err("Failed to register RIB.");
goto fail_rib_reg;
}
+#endif
return 0;
- fail_rib_reg:
#ifdef IPCP_FLOW_STATS
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ fail_rib_reg:
+ for (i = 0; i < PROC_MAX_FLOWS; ++i)
pthread_mutex_destroy(&dt.stat[i].lock);
fail_stat_lock:
#endif
@@ -685,13 +678,15 @@ int dt_init(struct dt_config cfg)
void dt_fini(void)
{
+#ifdef IPCP_FLOW_STATS
char dtstr[RIB_NAME_STRLEN + 1];
+#endif
int i;
+#ifdef IPCP_FLOW_STATS
sprintf(dtstr, "%s.%" PRIu64, DT, dt.addr);
rib_unreg(dtstr);
-#ifdef IPCP_FLOW_STATS
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ for (i = 0; i < PROC_MAX_FLOWS; ++i)
pthread_mutex_destroy(&dt.stat[i].lock);
#endif
bmp_destroy(dt.res_fds);
@@ -791,7 +786,7 @@ int dt_reg_comp(void * comp,
void dt_unreg_comp(int eid)
{
- assert(eid >= 0 && eid < PROG_RES_FDS);
+ assert(eid >= 0 && eid < PROC_RES_FDS);
pthread_rwlock_wrlock(&dt.lock);
@@ -820,10 +815,10 @@ int dt_write_packet(uint64_t dst_addr,
assert(spb);
assert(dst_addr != dt.addr);
+#ifdef IPCP_FLOW_STATS
len = ssm_pk_buff_len(spb);
-#ifdef IPCP_FLOW_STATS
- if (eid < PROG_RES_FDS) {
+ if (eid < PROC_RES_FDS) {
pthread_mutex_lock(&dt.stat[eid].lock);
++dt.stat[eid].lcl_r_pkt[qc];
@@ -837,7 +832,7 @@ int dt_write_packet(uint64_t dst_addr,
log_dbg("Could not get nhop for " ADDR_FMT32 ".",
ADDR_VAL32(&dst_addr));
#ifdef IPCP_FLOW_STATS
- if (eid < PROG_RES_FDS) {
+ if (eid < PROC_RES_FDS) {
pthread_mutex_lock(&dt.stat[eid].lock);
++dt.stat[eid].lcl_r_pkt[qc];
@@ -849,7 +844,7 @@ int dt_write_packet(uint64_t dst_addr,
return -EPERM;
}
- head = ssm_pk_buff_head_alloc(spb, dt_pci_info.head_size);
+ head = ssm_pk_buff_push(spb, dt_pci_info.head_size);
if (head == NULL) {
log_dbg("Failed to allocate DT header.");
goto fail_write;
@@ -876,7 +871,7 @@ int dt_write_packet(uint64_t dst_addr,
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- if (dt_pci.eid < PROG_RES_FDS) {
+ if (dt_pci.eid < PROC_RES_FDS) {
++dt.stat[fd].lcl_w_pkt[qc];
dt.stat[fd].lcl_w_bytes[qc] += len;
}
@@ -891,7 +886,7 @@ int dt_write_packet(uint64_t dst_addr,
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- if (eid < PROG_RES_FDS) {
+ if (eid < PROC_RES_FDS) {
++dt.stat[fd].lcl_w_pkt[qc];
dt.stat[fd].lcl_w_bytes[qc] += len;
}