summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/fa.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r--src/ipcpd/unicast/fa.c85
1 files changed, 49 insertions, 36 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index ac168bd9..ddf78e22 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2024
+ * Ouroboros - Copyright (C) 2016 - 2026
*
* Flow allocator of the IPC Process
*
@@ -48,6 +48,7 @@
#include "ipcp.h"
#include "dt.h"
#include "ca.h"
+#include "np1.h"
#include <inttypes.h>
#include <stdlib.h>
@@ -85,7 +86,7 @@ struct fa_msg {
struct cmd {
struct list_head next;
- struct shm_du_buff * sdb;
+ struct ssm_pk_buff * spb;
};
struct fa_flow {
@@ -330,7 +331,7 @@ static uint64_t gen_eid(int fd)
static void packet_handler(int fd,
qoscube_t qc,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
struct fa_flow * flow;
uint64_t r_addr;
@@ -342,7 +343,7 @@ static void packet_handler(int fd,
pthread_rwlock_wrlock(&fa.flows_lock);
- len = shm_du_buff_len(sdb);
+ len = ssm_pk_buff_len(spb);
#ifdef IPCP_FLOW_STATS
++flow->p_snd;
@@ -357,8 +358,8 @@ static void packet_handler(int fd,
ca_wnd_wait(wnd);
- if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
- ipcp_sdb_release(sdb);
+ if (dt_write_packet(r_addr, qc, r_eid, spb)) {
+ ipcp_spb_release(spb);
log_dbg("Failed to forward packet.");
#ifdef IPCP_FLOW_STATS
pthread_rwlock_wrlock(&fa.flows_lock);
@@ -411,7 +412,7 @@ static void fa_flow_fini(struct fa_flow * flow)
}
static void fa_post_packet(void * comp,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
struct cmd * cmd;
@@ -422,11 +423,11 @@ static void fa_post_packet(void * comp,
cmd = malloc(sizeof(*cmd));
if (cmd == NULL) {
log_err("Command failed. Out of memory.");
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
return;
}
- cmd->sdb = sdb;
+ cmd->spb = spb;
pthread_mutex_lock(&fa.mtx);
@@ -454,16 +455,16 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg)
pthread_cleanup_pop(true);
- len = shm_du_buff_len(cmd->sdb);
+ len = ssm_pk_buff_len(cmd->spb);
if (len > MSGBUFSZ || len < sizeof(*msg)) {
log_warn("Invalid flow allocation message (len: %zd).", len);
free(cmd);
return 0; /* No valid message */
}
- memcpy(msg, shm_du_buff_head(cmd->sdb), len);
+ memcpy(msg, ssm_pk_buff_head(cmd->spb), len);
- ipcp_sdb_release(cmd->sdb);
+ ipcp_spb_release(cmd->spb);
free(cmd);
@@ -687,13 +688,21 @@ void fa_fini(void)
pthread_rwlock_destroy(&fa.flows_lock);
}
+static int np1_flow_read_fa(int fd,
+ struct ssm_pk_buff ** spb)
+{
+ return np1_flow_read(fd, spb, NP1_GET_POOL(fd));
+}
+
int fa_start(void)
{
+#ifndef BUILD_CONTAINER
struct sched_param par;
int pol;
int max;
+#endif
- fa.psched = psched_create(packet_handler, np1_flow_read);
+ fa.psched = psched_create(packet_handler, np1_flow_read_fa);
if (fa.psched == NULL) {
log_err("Failed to start packet scheduler.");
goto fail_psched;
@@ -704,6 +713,7 @@ int fa_start(void)
goto fail_thread;
}
+#ifndef BUILD_CONTAINER
if (pthread_getschedparam(fa.worker, &pol, &par)) {
log_err("Failed to get worker thread scheduling parameters.");
goto fail_sched;
@@ -721,12 +731,15 @@ int fa_start(void)
log_err("Failed to set scheduler priority to maximum.");
goto fail_sched;
}
+#endif
return 0;
+#ifndef BUILD_CONTAINER
fail_sched:
pthread_cancel(fa.worker);
pthread_join(fa.worker, NULL);
+#endif
fail_thread:
psched_destroy(fa.psched);
fail_psched:
@@ -747,7 +760,7 @@ int fa_alloc(int fd,
const buffer_t * data)
{
struct fa_msg * msg;
- struct shm_du_buff * sdb;
+ struct ssm_pk_buff * spb;
struct fa_flow * flow;
uint64_t addr;
qoscube_t qc = QOS_CUBE_BE;
@@ -760,10 +773,10 @@ int fa_alloc(int fd,
len = sizeof(*msg) + ipcp_dir_hash_len();
- if (ipcp_sdb_reserve(&sdb, len + data->len))
+ if (ipcp_spb_reserve(&spb, len + data->len))
return -1;
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ msg = (struct fa_msg *) ssm_pk_buff_head(spb);
memset(msg, 0, sizeof(*msg));
eid = gen_eid(fd);
@@ -782,11 +795,11 @@ int fa_alloc(int fd,
memcpy(msg + 1, dst, ipcp_dir_hash_len());
if (data->len > 0)
- memcpy(shm_du_buff_head(sdb) + len, data->data, data->len);
+ memcpy(ssm_pk_buff_head(spb) + len, data->data, data->len);
- if (dt_write_packet(addr, qc, fa.eid, sdb)) {
+ if (dt_write_packet(addr, qc, fa.eid, spb)) {
log_err("Failed to send flow allocation request packet.");
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
return -1;
}
@@ -808,7 +821,7 @@ int fa_alloc_resp(int fd,
const buffer_t * data)
{
struct fa_msg * msg;
- struct shm_du_buff * sdb;
+ struct ssm_pk_buff * spb;
struct fa_flow * flow;
qoscube_t qc = QOS_CUBE_BE;
@@ -819,13 +832,13 @@ int fa_alloc_resp(int fd,
goto fail_alloc_resp;
}
- if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) {
- log_err("Failed to reserve sdb (%zu bytes).",
+ if (ipcp_spb_reserve(&spb, sizeof(*msg) + data->len)) {
+ log_err("Failed to reserve spb (%zu bytes).",
sizeof(*msg) + data->len);
goto fail_reserve;
}
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ msg = (struct fa_msg *) ssm_pk_buff_head(spb);
memset(msg, 0, sizeof(*msg));
msg->code = FLOW_REPLY;
@@ -840,7 +853,7 @@ int fa_alloc_resp(int fd,
pthread_rwlock_unlock(&fa.flows_lock);
- if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {
+ if (dt_write_packet(flow->r_addr, qc, fa.eid, spb)) {
log_err("Failed to send flow allocation response packet.");
goto fail_packet;
}
@@ -856,7 +869,7 @@ int fa_alloc_resp(int fd,
return 0;
fail_packet:
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
fail_reserve:
pthread_rwlock_wrlock(&fa.flows_lock);
fa_flow_fini(flow);
@@ -887,17 +900,17 @@ static int fa_update_remote(int fd,
uint16_t ece)
{
struct fa_msg * msg;
- struct shm_du_buff * sdb;
+ struct ssm_pk_buff * spb;
qoscube_t qc = QOS_CUBE_BE;
struct fa_flow * flow;
uint64_t r_addr;
- if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) {
- log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg));
+ if (ipcp_spb_reserve(&spb, sizeof(*msg))) {
+ log_err("Failed to reserve spb (%zu bytes).", sizeof(*msg));
return -1;
}
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ msg = (struct fa_msg *) ssm_pk_buff_head(spb);
memset(msg, 0, sizeof(*msg));
@@ -916,9 +929,9 @@ static int fa_update_remote(int fd,
pthread_rwlock_unlock(&fa.flows_lock);
- if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {
+ if (dt_write_packet(r_addr, qc, fa.eid, spb)) {
log_err("Failed to send flow update packet.");
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
return -1;
}
@@ -927,7 +940,7 @@ static int fa_update_remote(int fd,
void fa_np1_rcv(uint64_t eid,
uint8_t ecn,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
struct fa_flow * flow;
bool update;
@@ -935,7 +948,7 @@ void fa_np1_rcv(uint64_t eid,
int fd;
size_t len;
- len = shm_du_buff_len(sdb);
+ len = ssm_pk_buff_len(spb);
pthread_rwlock_wrlock(&fa.flows_lock);
@@ -943,7 +956,7 @@ void fa_np1_rcv(uint64_t eid,
if (fd < 0) {
pthread_rwlock_unlock(&fa.flows_lock);
log_dbg("Received packet for unknown EID %" PRIu64 ".", eid);
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
return;
}
@@ -957,9 +970,9 @@ void fa_np1_rcv(uint64_t eid,
pthread_rwlock_unlock(&fa.flows_lock);
- if (ipcp_flow_write(fd, sdb) < 0) {
+ if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0) {
log_dbg("Failed to write to flow %d.", fd);
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
#ifdef IPCP_FLOW_STATS
pthread_rwlock_wrlock(&fa.flows_lock);
++flow->p_rcv_f;