summaryrefslogtreecommitdiff
path: root/src/ipcpd/broadcast/dt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/broadcast/dt.c')
-rw-r--r--src/ipcpd/broadcast/dt.c100
1 files changed, 49 insertions, 51 deletions
diff --git a/src/ipcpd/broadcast/dt.c b/src/ipcpd/broadcast/dt.c
index 6e8bacf1..30e89a4f 100644
--- a/src/ipcpd/broadcast/dt.c
+++ b/src/ipcpd/broadcast/dt.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2020
+ * Ouroboros - Copyright (C) 2016 - 2026
*
* Forward loop for broadcast
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -33,7 +33,6 @@
#define DT "dt"
#define OUROBOROS_PREFIX DT
-#include <ouroboros/endian.h>
#include <ouroboros/dev.h>
#include <ouroboros/errno.h>
#include <ouroboros/fqueue.h>
@@ -41,17 +40,16 @@
#include <ouroboros/logs.h>
#include <ouroboros/notifier.h>
#include <ouroboros/utils.h>
+#include <ouroboros/pthread.h>
-#include "comp.h"
-#include "connmgr.h"
+#include "common/comp.h"
+#include "common/connmgr.h"
#include "dt.h"
-#include "ipcp.h"
#include <assert.h>
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
-#include <pthread.h>
struct nb {
struct list_head next;
@@ -60,14 +58,13 @@ struct nb {
};
struct {
- struct list_head nbs;
- size_t nbs_len;
- pthread_rwlock_t nbs_lock;
+ struct llist nbs;
+ pthread_rwlock_t lock;
- fset_t * set;
+ fset_t * set;
- pthread_t reader;
- pthread_t listener;
+ pthread_t reader;
+ pthread_t listener;
} fwd;
static int dt_add_nb(int fd)
@@ -75,33 +72,32 @@ static int dt_add_nb(int fd)
struct list_head * p;
struct nb * nb;
- pthread_rwlock_wrlock(&fwd.nbs_lock);
+ pthread_rwlock_wrlock(&fwd.lock);
- list_for_each(p, &fwd.nbs) {
+ llist_for_each(p, &fwd.nbs) {
struct nb * el = list_entry(p, struct nb, next);
if (el->fd == fd) {
- log_dbg("Already know neighbor.");
- pthread_rwlock_unlock(&fwd.nbs_lock);
- return -EPERM;
+ pthread_rwlock_unlock(&fwd.lock);
+ log_warn("Already know neighbor on fd %d.", fd);
+ return 0;
}
}
nb = malloc(sizeof(*nb));
if (nb == NULL) {
- pthread_rwlock_unlock(&fwd.nbs_lock);
+ pthread_rwlock_unlock(&fwd.lock);
+ log_err("Failed to malloc neighbor struct.");
return -ENOMEM;
}
nb->fd = fd;
- list_add_tail(&nb->next, p);
+ llist_add_tail(&nb->next, &fwd.nbs);
- ++fwd.nbs_len;
+ pthread_rwlock_unlock(&fwd.lock);
log_dbg("Neighbor %d added.", fd);
- pthread_rwlock_unlock(&fwd.nbs_lock);
-
return 0;
}
@@ -110,21 +106,22 @@ static int dt_del_nb(int fd)
struct list_head * p;
struct list_head * h;
- pthread_rwlock_wrlock(&fwd.nbs_lock);
+ pthread_rwlock_wrlock(&fwd.lock);
- list_for_each_safe(p, h, &fwd.nbs) {
+ llist_for_each_safe(p, h, &fwd.nbs) {
struct nb * nb = list_entry(p, struct nb, next);
if (nb->fd == fd) {
- list_del(&nb->next);
- --fwd.nbs_len;
- pthread_rwlock_unlock(&fwd.nbs_lock);
+ llist_del(&nb->next, &fwd.nbs);
+ pthread_rwlock_unlock(&fwd.lock);
log_dbg("Neighbor %d deleted.", nb->fd);
free(nb);
return 0;
}
}
- pthread_rwlock_unlock(&fwd.nbs_lock);
+ pthread_rwlock_unlock(&fwd.lock);
+
+ log_err("Neighbor not found on fd %d.", fd);
return -EPERM;
}
@@ -156,12 +153,11 @@ static void dt_packet(uint8_t * buf,
{
struct list_head * p;
- pthread_rwlock_rdlock(&fwd.nbs_lock);
+ pthread_rwlock_rdlock(&fwd.lock);
- pthread_cleanup_push((void (*))(void *) pthread_rwlock_unlock,
- &fwd.nbs_lock);
+ pthread_cleanup_push(__cleanup_rwlock_unlock, &fwd.lock);
- list_for_each(p, &fwd.nbs) {
+ llist_for_each(p, &fwd.nbs) {
struct nb * nb = list_entry(p, struct nb, next);
if (nb->fd != in_fd)
flow_write(nb->fd, buf, len); /* FIXME: avoid copy. */
@@ -170,6 +166,11 @@ static void dt_packet(uint8_t * buf,
pthread_cleanup_pop(true);
}
+static void __cleanup_fqueue_destroy(void * fq)
+{
+ fqueue_destroy((fqueue_t *) fq);
+}
+
static void * dt_reader(void * o)
{
fqueue_t * fq;
@@ -184,13 +185,12 @@ static void * dt_reader(void * o)
if (fq == NULL)
return (void *) -1;
- pthread_cleanup_push((void (*) (void *)) fqueue_destroy,
- (void *) fq);
+ pthread_cleanup_push(__cleanup_fqueue_destroy, (void *) fq);
while (true) {
ret = fevent(fwd.set, fq, NULL);
if (ret < 0) {
- log_warn("Event error: %d.", ret);
+ log_warn("Event warning: %d.", ret);
continue;
}
@@ -225,13 +225,13 @@ static void handle_event(void * self,
switch (event) {
case NOTIFY_DT_CONN_ADD:
- if (dt_add_nb(c->flow_info.fd))
- log_dbg("Failed to add neighbor.");
+ if (dt_add_nb(c->flow_info.fd) < 0)
+ log_err("Failed to add neighbor.");
fset_add(fwd.set, c->flow_info.fd);
break;
case NOTIFY_DT_CONN_DEL:
- if (dt_del_nb(c->flow_info.fd))
- log_dbg("Failed to delete neighbor.");
+ if (dt_del_nb(c->flow_info.fd) < 0)
+ log_err("Failed to delete neighbor.");
fset_del(fwd.set, c->flow_info.fd);
break;
default:
@@ -248,12 +248,12 @@ int dt_init(void)
strcpy(info.comp_name, DT);
strcpy(info.comp_name, DT_COMP);
- list_head_init(&fwd.nbs);
+ llist_init(&fwd.nbs);
if (notifier_reg(handle_event, NULL))
goto fail_notifier_reg;
- if (pthread_rwlock_init(&fwd.nbs_lock, NULL))
+ if (pthread_rwlock_init(&fwd.lock, NULL))
goto fail_lock_init;
fwd.set = fset_create();
@@ -269,8 +269,6 @@ int dt_init(void)
if (connmgr_comp_init(COMPID_DT, &info))
goto fail_connmgr_comp_init;
- fwd.nbs_len = 0;
-
return 0;
fail_connmgr_comp_init:
@@ -282,7 +280,7 @@ int dt_init(void)
fail_pthread_create_reader:
fset_destroy(fwd.set);
fail_fset_create:
- pthread_rwlock_destroy(&fwd.nbs_lock);
+ pthread_rwlock_destroy(&fwd.lock);
fail_lock_init:
notifier_unreg(handle_event);
fail_notifier_reg:
@@ -304,15 +302,15 @@ void dt_fini(void)
fset_destroy(fwd.set);
- pthread_rwlock_wrlock(&fwd.nbs_lock);
+ pthread_rwlock_wrlock(&fwd.lock);
- list_for_each_safe(p, h, &fwd.nbs) {
+ llist_for_each_safe(p, h, &fwd.nbs) {
struct nb * n = list_entry(p, struct nb, next);
- list_del(&n->next);
+ llist_del(&n->next, &fwd.nbs);
free(n);
}
- pthread_rwlock_unlock(&fwd.nbs_lock);
+ pthread_rwlock_unlock(&fwd.lock);
- pthread_rwlock_destroy(&fwd.nbs_lock);
+ pthread_rwlock_destroy(&fwd.lock);
}