summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmake/config/irmd.cmake2
-rw-r--r--include/ouroboros/fqueue.h3
-rw-r--r--include/ouroboros/ssm_rbuff.h19
-rw-r--r--src/irmd/config.h.in1
-rw-r--r--src/irmd/main.c644
-rw-r--r--src/irmd/reg/flow.c4
-rw-r--r--src/irmd/reg/flow.h11
-rw-r--r--src/irmd/reg/reg.c189
-rw-r--r--src/irmd/reg/reg.h34
-rw-r--r--src/lib/dev.c229
-rw-r--r--src/lib/frct.c6
-rw-r--r--src/lib/serdes-irm.c132
-rw-r--r--src/lib/ssm/rbuff.c95
-rw-r--r--src/lib/ssm/tests/rbuff_test.c30
14 files changed, 1285 insertions, 114 deletions
diff --git a/cmake/config/irmd.cmake b/cmake/config/irmd.cmake
index b6b2dc40..79e24bae 100644
--- a/cmake/config/irmd.cmake
+++ b/cmake/config/irmd.cmake
@@ -22,6 +22,8 @@ set(OAP_REPLAY_TIMER 20 CACHE STRING
"OAP replay protection window (s)")
set(OAP_REPLAY_MAX 4096 CACHE STRING
"Maximum entries in the OAP replay cache (bounds memory/CPU under flood)")
+set(OAP_REKEY_TIMER 120 CACHE STRING
+ "Tier-2 re-key interval (s); bounds key age / PCS healing, 0 disables")
set(OAP_CLIENT_AUTH_DEFAULT TRUE CACHE BOOL
"Client requires the server to authenticate by default")
set(DEBUG_PROTO_OAP FALSE CACHE BOOL
diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h
index 2546c79d..322da3ea 100644
--- a/include/ouroboros/fqueue.h
+++ b/include/ouroboros/fqueue.h
@@ -34,7 +34,8 @@ enum fqtype {
FLOW_UP = (1 << 2),
FLOW_ALLOC = (1 << 3),
FLOW_DEALLOC = (1 << 4),
- FLOW_PEER = (1 << 5)
+ FLOW_PEER = (1 << 5),
+ FLOW_UPD = (1 << 6)
};
struct flow_set;
diff --git a/include/ouroboros/ssm_rbuff.h b/include/ouroboros/ssm_rbuff.h
index 2443b63d..e77eec09 100644
--- a/include/ouroboros/ssm_rbuff.h
+++ b/include/ouroboros/ssm_rbuff.h
@@ -28,10 +28,12 @@
#include <stdint.h>
-#define ACL_RDWR 0000
-#define ACL_RDONLY 0001
-#define ACL_FLOWDOWN 0002
-#define ACL_FLOWPEER 0004
+#define RB_RD 0001 /* read permitted (0 = no access) */
+#define RB_WR 0002 /* write permitted (0 = no access) */
+#define RB_RDWR (RB_RD | RB_WR)
+#define RB_FLOWDOWN 0004
+#define RB_FLOWPEER 0010
+#define RB_REKEY 0020 /* re-key seed parked (out-of-band signal) */
struct ssm_rbuff;
@@ -45,10 +47,13 @@ struct ssm_rbuff * ssm_rbuff_open(pid_t pid,
void ssm_rbuff_close(struct ssm_rbuff * rb);
-void ssm_rbuff_set_acl(struct ssm_rbuff * rb,
- uint32_t flags);
+void ssm_rbuff_set_bits(struct ssm_rbuff * rb,
+ uint32_t bits);
-uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb);
+void ssm_rbuff_clr_bits(struct ssm_rbuff * rb,
+ uint32_t bits);
+
+uint32_t ssm_rbuff_get_flags(struct ssm_rbuff * rb);
void ssm_rbuff_fini(struct ssm_rbuff * rb);
diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in
index 84d58130..e14cff75 100644
--- a/src/irmd/config.h.in
+++ b/src/irmd/config.h.in
@@ -43,6 +43,7 @@
#define OAP_REPLAY_TIMER @OAP_REPLAY_TIMER@
#define OAP_REPLAY_MAX @OAP_REPLAY_MAX@
+#define OAP_REKEY_TIMER @OAP_REKEY_TIMER@
#cmakedefine01 OAP_CLIENT_AUTH_DEFAULT
#define BOOTSTRAP_TIMEOUT @BOOTSTRAP_TIMEOUT@
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 484a265a..3519e079 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -36,6 +36,7 @@
#include <ouroboros/crypt.h>
#include <ouroboros/errno.h>
#include <ouroboros/flow.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/hash.h>
#include <ouroboros/irm.h>
#include <ouroboros/list.h>
@@ -86,6 +87,8 @@
#define TIMESYNC_SLACK 100 /* ms */
#define OAP_SEEN_TIMER 20 /* s */
#define DEALLOC_TIME 300 /* s */
+#define REKEY_BATCH 64 /* flows re-keyed per timer pass */
+#define REKEY_RESP_TIMEO 20 /* s; give-up on a re-key RESPONSE */
#define DIRECT_MPL 20 /* ms */
/* bytes; in-process, bounded only by PUP/GSPP. */
#define DIRECT_MTU 65000
@@ -105,6 +108,29 @@ struct cmd {
int fd;
};
+/* In-flight Tier-2 re-key, owned solely by the re-key worker thread. */
+struct rekey_ctx {
+ struct list_head next;
+
+ int flow_id;
+ void * ctx; /* OAP client ctx (opaque) */
+ struct timespec deadline; /* reap if no RESPONSE by then */
+};
+
+enum rekey_evt_type {
+ REKEY_INIT = 0, /* start an exchange for flow_id */
+ REKEY_RESP /* a RESPONSE arrived for flow_id */
+};
+
+struct rekey_evt {
+ struct list_head next;
+
+ enum rekey_evt_type type;
+ int flow_id;
+ pid_t n_1_pid; /* INIT: flow's lower IPCP */
+ buffer_t buf; /* RESP: owned RESPONSE payload */
+};
+
struct {
bool log_stdout; /* log to stdout */
#ifdef HAVE_TOML
@@ -126,6 +152,14 @@ struct {
pthread_t irm_sanitize; /* clean up irmd resources */
pthread_t acceptor; /* accept new commands */
+
+ struct {
+ pthread_t worker; /* Tier-2 re-key orchestrator */
+ struct list_head inbox; /* re-key events for worker */
+ pthread_cond_t cond; /* inbox signal condvar */
+ pthread_mutex_t lock; /* inbox lock */
+ bool stop; /* worker shutdown flag */
+ } rk;
} irmd;
static enum irm_state irmd_get_state(void)
@@ -932,6 +966,8 @@ static int flow_accept(struct flow_info * flow,
log_err("Failed to respond to flow allocation.");
goto fail_resp;
} else {
+ if (sk->nid != NID_undef)
+ reg_flow_set_rekey(flow->id, false);
log_info("Flow %d accepted by %d for %s (uid %d).",
flow->id, flow->n_pid, name, flow->uid);
}
@@ -1358,6 +1394,9 @@ static int flow_alloc(const char * dst,
goto fail_complete;
}
+ if (sk->nid != NID_undef)
+ reg_flow_set_rekey(flow->id, true);
+
freebuf(req_hdr);
freebuf(resp_hdr);
freebuf(hash);
@@ -1432,6 +1471,512 @@ static int flow_dealloc_resp(struct flow_info * flow)
return 0;
}
+/*
+ * Inbox producers. Any thread may post; the worker drains. INIT carries
+ * the flow's lower IPCP pid; RESP transfers ownership of buf.
+ */
+static void rekey_post(enum rekey_evt_type type,
+ int flow_id,
+ pid_t n_1_pid,
+ buffer_t * buf)
+{
+ struct rekey_evt * evt;
+
+ evt = malloc(sizeof(*evt));
+ if (evt == NULL) {
+ log_err("Failed to post re-key event for flow %d.", flow_id);
+ if (buf != NULL)
+ freebuf(*buf);
+ return;
+ }
+
+ list_head_init(&evt->next);
+ evt->type = type;
+ evt->flow_id = flow_id;
+ evt->n_1_pid = n_1_pid;
+ clrbuf(evt->buf);
+ if (buf != NULL) {
+ evt->buf = *buf;
+ clrbuf(*buf);
+ }
+
+ pthread_mutex_lock(&irmd.rk.lock);
+
+ list_add_tail(&evt->next, &irmd.rk.inbox);
+ pthread_cond_signal(&irmd.rk.cond);
+
+ pthread_mutex_unlock(&irmd.rk.lock);
+}
+
+static void rekey_post_init(int flow_id,
+ pid_t n_1_pid)
+{
+ rekey_post(REKEY_INIT, flow_id, n_1_pid, NULL);
+}
+
+static void rekey_post_resp(int flow_id,
+ buffer_t * buf)
+{
+ rekey_post(REKEY_RESP, flow_id, 0, buf);
+}
+
+/* Worker-only: find an in-flight entry by flow_id. */
+static struct rekey_ctx * rekey_find(struct list_head * tbl,
+ int flow_id)
+{
+ struct list_head * p;
+
+ list_for_each(p, tbl) {
+ struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next);
+ if (e->flow_id == flow_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+/* Worker-only: drop an entry, freeing its OAP ctx. */
+static void rekey_drop(struct rekey_ctx * e)
+{
+ if (e->ctx != NULL)
+ oap_ctx_free(e->ctx);
+
+ list_del(&e->next);
+ free(e);
+}
+
+/* Flow-update relay payload: a 1-byte type prefix on an opaque body. */
+enum flow_upd_type {
+ FLOW_UPD_REKEY_REQ = 0,
+ FLOW_UPD_REKEY_RESP = 1,
+};
+
+/* Prepend the update type to body; caller frees out on success. */
+static int flow_upd_wrap(buffer_t * out,
+ uint8_t type,
+ const buffer_t * body)
+{
+ out->len = body->len + 1;
+ out->data = malloc(out->len);
+ if (out->data == NULL)
+ return -ENOMEM;
+
+ out->data[0] = type;
+ memcpy(out->data + 1, body->data, body->len);
+
+ return 0;
+}
+
+/*
+ * Worker-only: start a fresh OAP exchange over the live flow. Replaces
+ * any prior in-flight entry for flow_id (handles flow_id recycling).
+ */
+static void rekey_do_initiate(struct list_head * tbl,
+ int flow_id,
+ pid_t n_1_pid)
+{
+ struct rekey_ctx * e;
+ struct flow_info info;
+ struct name_info name;
+ buffer_t req = BUF_INIT;
+ buffer_t upd = BUF_INIT;
+ buffer_t data = BUF_INIT;
+ char nbuf[NAME_SIZE + 1];
+ void * ctx = NULL;
+ int ret;
+
+ e = rekey_find(tbl, flow_id);
+ if (e != NULL)
+ rekey_drop(e);
+
+ if (reg_get_name_for_flow_id(nbuf, flow_id) < 0 ||
+ reg_get_name_info(nbuf, &name) < 0) {
+ log_warn("No name info to re-key flow %d.", flow_id);
+ reg_flow_clear_in_flight(flow_id);
+ return;
+ }
+
+ if (oap_cli_prepare(&ctx, &name, &req, data) < 0) {
+ log_err("Failed to prepare re-key for flow %d.", flow_id);
+ reg_flow_clear_in_flight(flow_id);
+ return;
+ }
+
+ memset(&info, 0, sizeof(info));
+ info.id = flow_id;
+ info.n_1_pid = n_1_pid;
+
+ if (flow_upd_wrap(&upd, FLOW_UPD_REKEY_REQ, &req) < 0) {
+ log_err("Failed to wrap re-key request for flow %d.", flow_id);
+ goto fail_ctx;
+ }
+
+ ret = ipcp_flow_update(&info, upd);
+ freebuf(upd);
+ if (ret < 0) {
+ log_err("Failed to send re-key request for flow %d.", flow_id);
+ goto fail_ctx;
+ }
+
+ e = malloc(sizeof(*e));
+ if (e == NULL) {
+ log_err("Failed to track re-key for flow %d.", flow_id);
+ goto fail_ctx;
+ }
+
+ list_head_init(&e->next);
+ e->flow_id = flow_id;
+ e->ctx = ctx;
+ clock_gettime(PTHREAD_COND_CLOCK, &e->deadline);
+ e->deadline.tv_sec += REKEY_RESP_TIMEO;
+
+ list_add(&e->next, tbl);
+
+ log_dbg("Re-key request sent for flow %d.", flow_id);
+
+ freebuf(req);
+
+ return;
+
+ fail_ctx:
+ oap_ctx_free(ctx);
+ reg_flow_clear_in_flight(flow_id);
+ freebuf(req);
+}
+
+/* Worker-only: complete the exchange, install the pending seed. */
+static void rekey_do_complete(struct list_head * tbl,
+ int flow_id,
+ buffer_t buf)
+{
+ struct rekey_ctx * e;
+ struct name_info info;
+ struct crypt_sk sk;
+ uint8_t kbuf[SYMMKEYSZ];
+ buffer_t data = BUF_INIT;
+ char name[NAME_SIZE + 1];
+ uint8_t newgen;
+
+ e = rekey_find(tbl, flow_id);
+ if (e == NULL) {
+ log_dbg("Stale re-key RESPONSE for flow %d.", flow_id);
+ return;
+ }
+
+ if (reg_get_name_for_flow_id(name, flow_id) < 0 ||
+ reg_get_name_info(name, &info) < 0) {
+ log_err("No name info to re-key flow %d.", flow_id);
+ goto finish;
+ }
+
+ sk.key = kbuf;
+
+ /* oap_cli_complete frees the ctx on every path. */
+ if (oap_cli_complete(e->ctx, &info, buf, &data, &sk) < 0) {
+ log_err("Re-key completion failed for flow %d.", flow_id);
+ e->ctx = NULL;
+ goto finish;
+ }
+
+ e->ctx = NULL;
+
+ newgen = data.len == 1 ? *(uint8_t *) data.data : 0;
+
+ if (newgen >= 16) {
+ log_warn("Re-key gen %u out of range for flow %d.",
+ newgen, flow_id);
+ goto finish_clear;
+ }
+
+ if (reg_flow_store_pending(flow_id, kbuf, newgen) < 0)
+ log_warn("Flow %d gone during re-key.", flow_id);
+ else
+ reg_notify_flow(flow_id, FLOW_UPD);
+
+ log_dbg("Re-key completed for flow %d (gen %u).", flow_id, newgen);
+
+ finish_clear:
+ crypt_secure_clear(kbuf, SYMMKEYSZ);
+ freebuf(data);
+ finish:
+ rekey_drop(e);
+ reg_flow_clear_in_flight(flow_id);
+}
+
+/* Worker-only: reap entries whose RESPONSE never arrived. */
+static void rekey_reap_expired(struct list_head * tbl)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ list_for_each_safe(p, h, tbl) {
+ struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next);
+ if (ts_diff_ns(&e->deadline, &now) > 0)
+ continue;
+
+ log_warn("Re-key timed out for flow %d.", e->flow_id);
+ reg_flow_clear_in_flight(e->flow_id);
+ rekey_drop(e);
+ }
+}
+
+/* Responder side: process request, install pending seed, send response. */
+static int rekey_respond(struct flow_info * flow,
+ buffer_t * pk)
+{
+ struct name_info info;
+ struct crypt_sk sk;
+ uint8_t kbuf[SYMMKEYSZ];
+ buffer_t rsp = BUF_INIT;
+ buffer_t upd = BUF_INIT;
+ buffer_t data = BUF_INIT;
+ char name[NAME_SIZE + 1];
+ uint8_t newgen;
+ int epoch;
+ int err;
+
+ epoch = reg_flow_get_epoch(flow->id);
+ if (epoch < 0) {
+ log_warn("Re-key for unknown flow %d.", flow->id);
+ return -EBADF;
+ }
+
+ if (reg_get_name_for_flow_id(name, flow->id) < 0 ||
+ reg_get_name_info(name, &info) < 0) {
+ log_err("No name info to re-key flow %d.", flow->id);
+ return -ENAME;
+ }
+
+ if (reg_flow_rekey_pending(flow->id)) {
+ log_dbg("Duplicate re-key request for flow %d.", flow->id);
+ return 0;
+ }
+
+ newgen = (uint8_t) ((epoch + 1) & 0x0F);
+ data.data = &newgen;
+ data.len = 1;
+
+ sk.key = kbuf;
+
+ err = oap_srv_process(&info, *pk, &rsp, &data, &sk);
+ if (err < 0) {
+ /* data still points to stack newgen; don't free it. */
+ log_err("Re-key OAP failed for flow %d.", flow->id);
+ goto finish;
+ }
+
+ /* On success oap_srv_process repointed data to client output. */
+ freebuf(data);
+
+ if (reg_flow_store_pending(flow->id, kbuf, newgen) < 0) {
+ log_warn("Flow %d gone during re-key.", flow->id);
+ err = -EBADF;
+ goto finish;
+ }
+
+ reg_notify_flow(flow->id, FLOW_UPD);
+
+ if (flow_upd_wrap(&upd, FLOW_UPD_REKEY_RESP, &rsp) == 0) {
+ if (ipcp_flow_update(flow, upd) < 0)
+ log_err("Failed to send re-key response for flow %d.",
+ flow->id);
+ freebuf(upd);
+ }
+
+ err = 0;
+ finish:
+ crypt_secure_clear(kbuf, SYMMKEYSZ);
+ freebuf(rsp);
+
+ return err;
+}
+
+static int flow_update_arr(struct flow_info * flow,
+ buffer_t * pk)
+{
+ uint8_t type;
+
+ if (pk->len < 1)
+ return -EINVAL;
+
+ type = pk->data[0];
+
+ /* Strip the type byte, keeping the malloc base for hand-off. */
+ memmove(pk->data, pk->data + 1, pk->len - 1);
+ pk->len -= 1;
+
+ switch (type) {
+ case FLOW_UPD_REKEY_REQ:
+ return rekey_respond(flow, pk);
+ case FLOW_UPD_REKEY_RESP:
+ /* Hand the payload to the worker and take ownership. */
+ rekey_post_resp(flow->id, pk);
+ return 0;
+ default:
+ log_warn("Unknown flow update type %u.", type);
+ return -EINVAL;
+ }
+}
+
+static int flow_update(struct flow_info * flow,
+ bool rekey,
+ struct crypt_sk * sk,
+ bool * has_key)
+{
+ uint8_t seed[SYMMKEYSZ];
+ uint8_t epoch;
+
+ *has_key = false;
+
+ if (rekey) {
+ /* Watermark re-key: the app can't know its lower IPCP. */
+ pid_t n_1_pid = reg_flow_get_n_1_pid(flow->id);
+ if (n_1_pid > 0)
+ rekey_post_init(flow->id, n_1_pid);
+ return 0;
+ }
+
+ if (!reg_flow_take_pending(flow->id, seed, &epoch))
+ return 0;
+
+ memcpy(sk->key, seed, SYMMKEYSZ);
+ sk->epoch = epoch;
+ *has_key = true;
+
+ crypt_secure_clear(seed, SYMMKEYSZ);
+
+ log_dbg("Delivered re-key seed for flow %d (gen %u).",
+ flow->id, epoch);
+
+ return 0;
+}
+
+/* Free every parked OAP ctx at worker exit or cancellation. */
+static void rekey_table_cleanup(void * o)
+{
+ struct list_head * tbl = o;
+ struct list_head * p;
+ struct list_head * h;
+
+ list_for_each_safe(p, h, tbl) {
+ struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next);
+ rekey_drop(e);
+ }
+}
+
+/* Pop one event, or NULL if none, draining the inbox under its lock. */
+static struct rekey_evt * rekey_inbox_wait(const struct timespec * deadline)
+{
+ struct rekey_evt * evt = NULL;
+ struct timespec now;
+
+ pthread_mutex_lock(&irmd.rk.lock);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ while (list_is_empty(&irmd.rk.inbox) && !irmd.rk.stop &&
+ ts_diff_ns(deadline, &now) > 0) {
+ pthread_cond_timedwait(&irmd.rk.cond, &irmd.rk.lock, deadline);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ }
+
+ if (!list_is_empty(&irmd.rk.inbox)) {
+ evt = list_first_entry(&irmd.rk.inbox, struct rekey_evt, next);
+ list_del(&evt->next);
+ }
+
+ pthread_mutex_unlock(&irmd.rk.lock);
+
+ return evt;
+}
+
+/*
+ * Single worker owning all in-flight Tier-2 re-keys. It drains the
+ * inbox, runs the periodic snapshot, and reaps timed-out exchanges.
+ * The table is touched only here, so it needs no lock.
+ */
+static void * rekey_worker(void * o)
+{
+ struct list_head table;
+ struct timespec next;
+
+ (void) o;
+
+ list_head_init(&table);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &next);
+ next.tv_sec += OAP_REKEY_TIMER;
+
+ pthread_cleanup_push(rekey_table_cleanup, &table);
+
+ while (!irmd.rk.stop) {
+ struct rekey_evt * evt;
+ struct timespec now;
+ struct timespec deadline = next;
+ struct list_head * p;
+
+ /* Wake no later than the soonest in-flight deadline. */
+ list_for_each(p, &table) {
+ struct rekey_ctx * e;
+ e = list_entry(p, struct rekey_ctx, next);
+ if (ts_diff_ns(&e->deadline, &deadline) < 0)
+ deadline = e->deadline;
+ }
+
+ evt = rekey_inbox_wait(&deadline);
+
+ if (irmd.rk.stop) {
+ if (evt != NULL) {
+ freebuf(evt->buf);
+ free(evt);
+ }
+ break;
+ }
+
+ if (evt != NULL) {
+ switch (evt->type) {
+ case REKEY_INIT:
+ rekey_do_initiate(&table, evt->flow_id,
+ evt->n_1_pid);
+ break;
+ case REKEY_RESP:
+ rekey_do_complete(&table, evt->flow_id,
+ evt->buf);
+ freebuf(evt->buf);
+ break;
+ default:
+ break;
+ }
+
+ free(evt);
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ if (ts_diff_ns(&next, &now) <= 0) {
+ struct rekey_info snap[REKEY_BATCH];
+ int n;
+ int i;
+
+ n = reg_flow_snapshot_rekey_due(snap, REKEY_BATCH);
+ for (i = 0; i < n; ++i)
+ rekey_do_initiate(&table, snap[i].flow_id,
+ snap[i].n_1_pid);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &next);
+ next.tv_sec += OAP_REKEY_TIMER;
+ }
+
+ rekey_reap_expired(&table);
+ }
+
+ pthread_cleanup_pop(1);
+
+ return (void *) 0;
+}
+
static void * acceptloop(void * o)
{
int csockfd;
@@ -1502,6 +2047,7 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
struct timespec now;
struct timespec ts = TIMESPEC_INIT_S(0); /* static analysis */
int res;
+ bool has_key = false;
irm_msg_t * ret_msg;
buffer_t data;
@@ -1622,7 +2168,8 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
hbuf = malloc(SYMMKEYSZ);
if (hbuf == NULL) {
log_err("Failed to malloc key buf");
- return NULL;
+ res = -ENOMEM;
+ break;
}
memcpy(hbuf, kbuf, SYMMKEYSZ);
@@ -1652,7 +2199,8 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
hbuf = malloc(SYMMKEYSZ);
if (hbuf == NULL) {
log_err("Failed to malloc key buf");
- return NULL;
+ res = -ENOMEM;
+ break;
}
memcpy(hbuf, kbuf, SYMMKEYSZ);
ret_msg->sym_key.data = hbuf;
@@ -1698,6 +2246,38 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
flow = flow_info_msg_to_s(msg->flow_info);
res = flow_alloc_reply(&flow, msg->response, &data);
break;
+ case IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR:
+ data.len = msg->pk.len;
+ data.data = msg->pk.data;
+ msg->pk.data = NULL; /* pass data */
+ msg->pk.len = 0;
+ flow = flow_info_msg_to_s(msg->flow_info);
+ res = flow_update_arr(&flow, &data);
+ freebuf(data);
+ break;
+ case IRM_MSG_CODE__IRM_FLOW_UPDATE:
+ flow = flow_info_msg_to_s(msg->flow_info);
+ sk.key = kbuf;
+ res = flow_update(&flow, msg->rekey, &sk, &has_key);
+ if (res == 0) {
+ ret_msg->flow_info = flow_info_s_to_msg(&flow);
+ if (has_key) {
+ hbuf = malloc(SYMMKEYSZ);
+ if (hbuf == NULL) {
+ log_err("Failed to malloc key buf");
+ res = -ENOMEM;
+ break;
+ }
+
+ memcpy(hbuf, kbuf, SYMMKEYSZ);
+ ret_msg->sym_key.data = hbuf;
+ ret_msg->sym_key.len = SYMMKEYSZ;
+ ret_msg->has_sym_key = true;
+ ret_msg->has_generation = true;
+ ret_msg->generation = sk.epoch;
+ }
+ }
+ break;
default:
log_err("Don't know that message code.");
res = -1;
@@ -2060,6 +2640,29 @@ static int irm_init(void)
list_head_init(&irmd.cmds);
+ if (pthread_mutex_init(&irmd.rk.lock, NULL)) {
+ log_err("Failed to initialize mutex.");
+ goto fail_rk_lock;
+ }
+
+ if (pthread_condattr_init(&cattr)) {
+ log_err("Failed to initialize condattr.");
+ goto fail_rk_lock;
+ }
+
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&irmd.rk.cond, &cattr)) {
+ log_err("Failed to initialize condvar.");
+ pthread_condattr_destroy(&cattr);
+ goto fail_rk_cond;
+ }
+
+ pthread_condattr_destroy(&cattr);
+
+ list_head_init(&irmd.rk.inbox);
+
if (stat(SOCK_PATH, &st) == -1) {
if (mkdir(SOCK_PATH, 0777)) {
log_err("Failed to create sockets directory.");
@@ -2163,6 +2766,10 @@ static int irm_init(void)
fail_sock_path:
unlink(IRM_SOCK_PATH);
fail_stat:
+ pthread_cond_destroy(&irmd.rk.cond);
+ fail_rk_cond:
+ pthread_mutex_destroy(&irmd.rk.lock);
+ fail_rk_lock:
pthread_cond_destroy(&irmd.cmd_cond);
fail_cmd_cond:
pthread_mutex_destroy(&irmd.cmd_lock);
@@ -2211,8 +2818,22 @@ static void irm_fini(void)
pthread_mutex_unlock(&irmd.cmd_lock);
+ pthread_mutex_lock(&irmd.rk.lock);
+
+ list_for_each_safe(p, h, &irmd.rk.inbox) {
+ struct rekey_evt * evt;
+ evt = list_entry(p, struct rekey_evt, next);
+ list_del(&evt->next);
+ freebuf(evt->buf);
+ free(evt);
+ }
+
+ pthread_mutex_unlock(&irmd.rk.lock);
+
pthread_mutex_destroy(&irmd.cmd_lock);
pthread_cond_destroy(&irmd.cmd_cond);
+ pthread_mutex_destroy(&irmd.rk.lock);
+ pthread_cond_destroy(&irmd.rk.cond);
pthread_rwlock_destroy(&irmd.state_lock);
#ifdef HAVE_FUSE
@@ -2250,10 +2871,18 @@ static int irm_start(void)
if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL))
goto fail_acceptor;
+ irmd.rk.stop = false;
+ if (OAP_REKEY_TIMER > 0 &&
+ pthread_create(&irmd.rk.worker, NULL, rekey_worker, NULL))
+ goto fail_rekey_worker;
+
log_info("Ouroboros IPC Resource Manager daemon started...");
return 0;
+ fail_rekey_worker:
+ pthread_cancel(irmd.acceptor);
+ pthread_join(irmd.acceptor, NULL);
fail_acceptor:
pthread_cancel(irmd.irm_sanitize);
pthread_join(irmd.irm_sanitize, NULL);
@@ -2293,6 +2922,17 @@ static void irm_sigwait(sigset_t sigset)
static void irm_stop(void)
{
+ if (OAP_REKEY_TIMER > 0) {
+ pthread_mutex_lock(&irmd.rk.lock);
+
+ irmd.rk.stop = true;
+ pthread_cond_signal(&irmd.rk.cond);
+
+ pthread_mutex_unlock(&irmd.rk.lock);
+
+ pthread_join(irmd.rk.worker, NULL);
+ }
+
pthread_cancel(irmd.acceptor);
pthread_cancel(irmd.irm_sanitize);
diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c
index 5c709dea..ccb2562d 100644
--- a/src/irmd/reg/flow.c
+++ b/src/irmd/reg/flow.c
@@ -24,6 +24,7 @@
#define OUROBOROS_PREFIX "reg/flow"
+#include <ouroboros/crypt.h>
#include <ouroboros/logs.h>
#include "flow.h"
@@ -32,6 +33,7 @@
#include <errno.h>
#include <stdbool.h>
#include <stdlib.h>
+#include <string.h>
struct reg_flow * reg_flow_create(const struct flow_info * info)
{
@@ -79,6 +81,8 @@ void reg_flow_destroy(struct reg_flow * flow)
{
assert(flow != NULL);
+ crypt_secure_clear(flow->rk.pending_seed, SYMMKEYSZ);
+
switch(flow->info.state) {
case FLOW_ACCEPT_PENDING:
clrbuf(flow->req_data);
diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h
index 9a4046d3..15fc7b8f 100644
--- a/src/irmd/reg/flow.h
+++ b/src/irmd/reg/flow.h
@@ -49,6 +49,17 @@ struct reg_flow {
bool direct;
+ /* Tier-2 re-key state (encrypted flows only) */
+ struct {
+ bool encrypted; /* flow carries a cipher */
+ uint8_t epoch; /* last epoch installed by app */
+ bool initiator; /* OAP initiator (role 0) */
+ bool in_flight; /* a re-key is in progress */
+ uint8_t pending_seed[SYMMKEYSZ];
+ uint8_t pending_epoch;
+ bool has_pending; /* new seed awaits app pull */
+ } rk;
+
struct ssm_rbuff * n_rb;
struct ssm_rbuff * n_1_rb;
};
diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c
index 365064e5..70baf64e 100644
--- a/src/irmd/reg/reg.c
+++ b/src/irmd/reg/reg.c
@@ -25,6 +25,7 @@ The IPC Resource Manager - Registry
#define OUROBOROS_PREFIX "reg"
#include <ouroboros/bitmap.h>
+#include <ouroboros/crypt.h>
#include <ouroboros/errno.h>
#include <ouroboros/list.h>
#include <ouroboros/logs.h>
@@ -2102,6 +2103,194 @@ bool reg_flow_is_direct(int flow_id)
return ret;
}
+void reg_flow_set_rekey(int flow_id,
+ bool initiator)
+{
+ struct reg_flow * flow;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL) {
+ flow->rk.encrypted = true;
+ flow->rk.initiator = initiator;
+ flow->rk.epoch = 0;
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+}
+
+int reg_flow_get_epoch(int flow_id)
+{
+ struct reg_flow * flow;
+ int epoch = -1;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL && flow->rk.encrypted)
+ epoch = flow->rk.epoch;
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return epoch;
+}
+
+bool reg_flow_rekey_pending(int flow_id)
+{
+ struct reg_flow * flow;
+ bool ret = false;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL)
+ ret = flow->rk.has_pending;
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
+}
+
+pid_t reg_flow_get_n_1_pid(int flow_id)
+{
+ struct reg_flow * flow;
+ pid_t pid = -1;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL)
+ pid = flow->info.n_1_pid;
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return pid;
+}
+
+int reg_flow_snapshot_rekey_due(struct rekey_info * snap,
+ int max)
+{
+ struct list_head * p;
+ int n = 0;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ llist_for_each(p, &reg.flows) {
+ struct reg_flow * f;
+
+ if (n == max)
+ break;
+
+ f = list_entry(p, struct reg_flow, next);
+
+ if (f->info.state != FLOW_ALLOCATED || f->direct)
+ continue;
+
+ if (!f->rk.encrypted || !f->rk.initiator)
+ continue;
+
+ if (f->rk.in_flight || f->rk.has_pending)
+ continue;
+
+ f->rk.in_flight = true;
+
+ snap[n].flow_id = f->info.id;
+ snap[n].n_pid = f->info.n_pid;
+ snap[n].n_1_pid = f->info.n_1_pid;
+ snap[n].epoch = f->rk.epoch;
+ strcpy(snap[n].name, f->name);
+ ++n;
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return n;
+}
+
+void reg_flow_clear_in_flight(int flow_id)
+{
+ struct reg_flow * flow;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL)
+ flow->rk.in_flight = false;
+
+ pthread_mutex_unlock(&reg.mtx);
+}
+
+int reg_flow_store_pending(int flow_id,
+ const uint8_t * seed,
+ uint8_t epoch)
+{
+ struct reg_flow * flow;
+ int ret = -ENOENT;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL) {
+ memcpy(flow->rk.pending_seed, seed, SYMMKEYSZ);
+ flow->rk.pending_epoch = epoch;
+ flow->rk.has_pending = true;
+ flow->rk.in_flight = false;
+ /* Doorbell raised only after the seed is parked. */
+ if (flow->n_rb != NULL)
+ ssm_rbuff_set_bits(flow->n_rb, RB_REKEY);
+ ret = 0;
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
+}
+
+bool reg_flow_take_pending(int flow_id,
+ uint8_t * seed,
+ uint8_t * epoch)
+{
+ struct reg_flow * flow;
+ bool ret = false;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL && flow->rk.has_pending) {
+ memcpy(seed, flow->rk.pending_seed, SYMMKEYSZ);
+ *epoch = flow->rk.pending_epoch;
+ flow->rk.epoch = flow->rk.pending_epoch; /* app installed it */
+ flow->rk.has_pending = false;
+ crypt_secure_clear(flow->rk.pending_seed, SYMMKEYSZ);
+ if (flow->n_rb != NULL)
+ ssm_rbuff_clr_bits(flow->n_rb, RB_REKEY);
+ ret = true;
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
+}
+
+void reg_notify_flow(int flow_id,
+ int event)
+{
+ struct reg_flow * flow;
+ struct reg_proc * proc;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL) {
+ proc = __reg_get_proc(flow->info.n_pid);
+ if (proc != NULL)
+ ssm_flow_set_notify(proc->set, flow_id, event);
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+}
+
int reg_respond_flow_direct(int flow_id,
buffer_t * pbuf)
{
diff --git a/src/irmd/reg/reg.h b/src/irmd/reg/reg.h
index 6b576471..e0c64fed 100644
--- a/src/irmd/reg/reg.h
+++ b/src/irmd/reg/reg.h
@@ -163,6 +163,40 @@ int reg_wait_flow_direct(int flow_id,
bool reg_flow_is_direct(int flow_id);
+/* Per-flow snapshot for the re-key timer */
+struct rekey_info {
+ int flow_id;
+ pid_t n_pid;
+ pid_t n_1_pid;
+ char name[NAME_SIZE + 1];
+ uint8_t epoch;
+};
+
+void reg_flow_set_rekey(int flow_id,
+ bool initiator);
+
+int reg_flow_get_epoch(int flow_id);
+
+bool reg_flow_rekey_pending(int flow_id);
+
+pid_t reg_flow_get_n_1_pid(int flow_id);
+
+int reg_flow_snapshot_rekey_due(struct rekey_info * snap,
+ int max);
+
+void reg_flow_clear_in_flight(int flow_id);
+
+int reg_flow_store_pending(int flow_id,
+ const uint8_t * seed,
+ uint8_t epoch);
+
+bool reg_flow_take_pending(int flow_id,
+ uint8_t * seed,
+ uint8_t * epoch);
+
+void reg_notify_flow(int flow_id,
+ int event);
+
void reg_dealloc_flow(struct flow_info * info);
void reg_dealloc_flow_resp(struct flow_info * info);
diff --git a/src/lib/dev.c b/src/lib/dev.c
index cff1ebf2..3064b1e2 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -100,6 +100,9 @@ struct flow {
struct crypt_ctx * crypt;
int headsz; /* selector */
int tailsz; /* Tag + CRC */
+ struct timespec rk_grace; /* TX-promote deadline (0 = none) */
+ bool rk_wm_inflight; /* re-key trigger in flight */
+ uint32_t rk_wm_ctr; /* throttles the consult */
struct timespec snd_act;
struct timespec rcv_act;
@@ -509,6 +512,66 @@ static void flow_drain_rx_nb(struct flow * flow)
}
}
+/* TX-promotion grace when the peer's install latency is unknown (raw). */
+#define REKEY_GRACE_MS 1000
+
+/*
+ * Pull a parked re-key seed from the IRMd and install it. Driven from the
+ * data path when RB_REKEY shows on rx_rb. crypt_rekey is concurrency-safe
+ * on its own; proc.lock (rd) only guards against teardown.
+ */
+static void flow_rekey(struct flow * flow)
+{
+ struct flow_info info;
+ struct crypt_sk sk;
+ struct timespec now;
+ struct timespec intv;
+ time_t ms;
+ uint8_t key[SYMMKEYSZ];
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {SOCK_BUF_SIZE, buf};
+ bool has_key;
+
+ pthread_rwlock_rdlock(&proc.lock);
+ if (flow->info.id < 0 || flow->crypt == NULL) {
+ pthread_rwlock_unlock(&proc.lock);
+ return;
+ }
+ info = flow->info;
+ pthread_rwlock_unlock(&proc.lock);
+
+ if (flow_update__irm_req_ser(&msg, &info, false) < 0)
+ return;
+
+ if (send_recv_msg(&msg) < 0)
+ return;
+
+ sk.key = key;
+ if (flow_rekey__irm_result_des(&msg, &sk, &has_key) < 0)
+ return;
+
+ if (!has_key)
+ return;
+
+ pthread_rwlock_rdlock(&proc.lock);
+ if (flow->info.id == info.id && flow->crypt != NULL) {
+ if (crypt_rekey(flow->crypt, &sk) == 0) {
+ /* Hold TX on the old epoch until the peer installs. */
+ ms = flow->info.mpl > 0 ? flow->info.mpl * 3
+ : REKEY_GRACE_MS;
+ intv.tv_sec = ms / 1000;
+ intv.tv_nsec = (ms % 1000) * MILLION;
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, &intv, &flow->rk_grace);
+ }
+ /* Re-arm the watermark even if the install was a no-op. */
+ STORE_RELAXED(&flow->rk_wm_inflight, false);
+ }
+ pthread_rwlock_unlock(&proc.lock);
+
+ crypt_secure_clear(key, SYMMKEYSZ);
+}
+
/*
* Wait clamped by caller deadline, next tw expiry, and TICTIME;
* a clamp-timeout means tw work is due, not caller-deadline.
@@ -533,6 +596,14 @@ static int flow_rx_one(struct flow * flow,
return -EFLOWDOWN;
}
+ /* Pull a parked re-key before re-blocking (idle reader). */
+ if (flow->crypt != NULL
+ && (ssm_rbuff_get_flags(rx_rb) & RB_REKEY)) {
+ pthread_rwlock_unlock(&proc.lock);
+ flow_rekey(flow);
+ continue;
+ }
+
idx = ssm_rbuff_read_b(rx_rb, &wait_abs);
if (idx == -ETIMEDOUT) {
pthread_rwlock_unlock(&proc.lock);
@@ -593,7 +664,7 @@ static void flow_clear(int fd)
}
/*
- * Set ACL_FLOWDOWN on rx/tx so any in-flight blocking reads or writes
+ * Set RB_FLOWDOWN on rx/tx so any in-flight blocking reads or writes
* wake up and drop their proc.lock rdlock. Must run BEFORE flow_fini's
* wrlock, else the wrlock blocks on those rdlock holders and the
* in-flight calls never see the FLOWDOWN signal.
@@ -604,9 +675,9 @@ static void flow_quiesce(int fd)
struct ssm_rbuff * tx_rb = proc.flows[fd].tx_rb;
if (rx_rb != NULL)
- ssm_rbuff_set_acl(rx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rx_rb, RB_FLOWDOWN);
if (tx_rb != NULL)
- ssm_rbuff_set_acl(tx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(tx_rb, RB_FLOWDOWN);
}
static void do_flow_fini(int fd)
@@ -1256,8 +1327,6 @@ int fccntl(int fd,
va_list l;
struct timespec * timeo;
qosspec_t * qs;
- uint32_t rx_acl;
- uint32_t tx_acl;
size_t * qlen;
struct flow * flow;
uint16_t old_acc;
@@ -1353,31 +1422,26 @@ int fccntl(int fd,
&& flow->frcti != NULL)
emit_eos = true;
- rx_acl = ssm_rbuff_get_acl(flow->rx_rb);
- tx_acl = ssm_rbuff_get_acl(flow->tx_rb);
- /* Our flow write-only -> peer's read-only. */
+ /* Our flow write-only -> peer's read-only; restore on RDWR. */
if (flow->oflags & FLOWFWRONLY)
- rx_acl |= ACL_RDONLY;
- if (flow->oflags & FLOWFRDWR)
- rx_acl |= ACL_RDWR;
+ ssm_rbuff_clr_bits(flow->rx_rb, RB_WR);
+ else
+ ssm_rbuff_set_bits(flow->rx_rb, RB_WR);
if (flow->oflags & FLOWFDOWN) {
- rx_acl |= ACL_FLOWDOWN;
- tx_acl |= ACL_FLOWDOWN;
+ ssm_rbuff_set_bits(flow->rx_rb, RB_FLOWDOWN);
+ ssm_rbuff_set_bits(flow->tx_rb, RB_FLOWDOWN);
ssm_flow_set_notify(flow->set,
flow->info.id,
FLOW_DOWN);
} else {
- rx_acl &= ~ACL_FLOWDOWN;
- tx_acl &= ~ACL_FLOWDOWN;
+ ssm_rbuff_clr_bits(flow->rx_rb, RB_FLOWDOWN);
+ ssm_rbuff_clr_bits(flow->tx_rb, RB_FLOWDOWN);
ssm_flow_set_notify(flow->set,
flow->info.id,
FLOW_UP);
}
- ssm_rbuff_set_acl(flow->rx_rb, rx_acl);
- ssm_rbuff_set_acl(flow->tx_rb, tx_acl);
-
break;
case FLOWGFLAGS:
fflags = va_arg(l, uint32_t *);
@@ -1667,6 +1731,92 @@ static ssize_t flow_write_frag(struct flow * flow,
return (ssize_t) count;
}
+/*
+ * Watermark: re-key when the TX batch is within KEY_REKEY_WATERMARK node
+ * keys of exhaustion (0 disables), ahead of the timer; consult keyrot at
+ * most once per FLOW_WM_CHECK writes.
+ */
+#define FLOW_WM_CHECK (1u << 16)
+
+/*
+ * Switch TX to the freshly installed epoch once the peer is seen on it
+ * (peer_synced) or the install grace has elapsed (breaks the symmetric
+ * wait where neither side sends the new epoch first).
+ */
+static void flow_tx_promote(struct flow * flow,
+ const struct timespec * now)
+{
+ if (flow->crypt == NULL)
+ return;
+
+ if (flow->rk_grace.tv_sec == 0 && flow->rk_grace.tv_nsec == 0)
+ return;
+
+ if (!crypt_peer_synced(flow->crypt)
+ && ts_diff_ns(now, &flow->rk_grace) < 0)
+ return;
+
+ crypt_tx_promote(flow->crypt);
+ flow->rk_grace.tv_sec = 0;
+ flow->rk_grace.tv_nsec = 0;
+}
+
+/*
+ * Ask the IRMd to start an OAP re-key for this flow. The reply carries no
+ * key; the seed arrives later over RB_REKEY. Fired from the write path as
+ * the TX batch nears exhaustion, ahead of the timer.
+ */
+static int flow_rekey_trigger(struct flow * flow)
+{
+ struct flow_info info;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {SOCK_BUF_SIZE, buf};
+
+ pthread_rwlock_rdlock(&proc.lock);
+ if (flow->info.id < 0 || flow->crypt == NULL) {
+ pthread_rwlock_unlock(&proc.lock);
+ return -1;
+ }
+ info = flow->info;
+ pthread_rwlock_unlock(&proc.lock);
+
+ if (flow_update__irm_req_ser(&msg, &info, true) < 0)
+ return -1;
+
+ if (send_recv_msg(&msg) < 0)
+ return -1;
+
+ return 0;
+}
+
+/*
+ * True when the live TX batch has run low and no re-key is in flight.
+ * Advances a throttle so the (locking) keyrot consult runs at most once
+ * per FLOW_WM_CHECK writes.
+ */
+static bool flow_wm_due(struct flow * flow)
+{
+ uint32_t tick;
+
+ if (KEY_REKEY_WATERMARK == 0)
+ return false;
+
+ if (flow->crypt == NULL)
+ return false;
+
+ if (LOAD_RELAXED(&flow->rk_wm_inflight))
+ return false;
+
+ tick = FETCH_ADD_RELAXED(&flow->rk_wm_ctr, 1);
+ if ((tick & (FLOW_WM_CHECK - 1)) != 0)
+ return false;
+
+ if (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY)
+ return false;
+
+ return crypt_nodes_left(flow->crypt) <= KEY_REKEY_WATERMARK;
+}
+
ssize_t flow_write(int fd,
const void * buf,
size_t count)
@@ -1710,6 +1860,19 @@ ssize_t flow_write(int fd,
if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
return -EPERM;
+ if (flow->crypt != NULL
+ && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY))
+ flow_rekey(flow);
+
+ flow_tx_promote(flow, &now);
+
+ /* Pre-empt TX key exhaustion; the timer is the backstop. */
+ if (flow_wm_due(flow)) {
+ STORE_RELAXED(&flow->rk_wm_inflight, true);
+ if (flow_rekey_trigger(flow) < 0)
+ STORE_RELAXED(&flow->rk_wm_inflight, false);
+ }
+
tw_move_safe();
if (flow->frcti != NULL) {
@@ -1784,6 +1947,10 @@ static ssize_t raw_flow_read_pkt(struct flow * flow,
ssize_t idx;
while (true) {
+ if (flow->crypt != NULL
+ && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY))
+ flow_rekey(flow);
+
if (!block) {
idx = ssm_rbuff_read(flow->rx_rb);
if (idx < 0)
@@ -1917,6 +2084,16 @@ ssize_t flow_read(int fd,
pthread_rwlock_unlock(&proc.lock);
+ if (flow->crypt != NULL
+ && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY))
+ flow_rekey(flow);
+
+ /* Advance TX off a stale epoch even on recv-mostly (ACK-only) flows. */
+ if (flow->crypt != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ flow_tx_promote(flow, &now);
+ }
+
tw_move_safe();
idx = flow->part_idx;
@@ -2101,6 +2278,18 @@ static int fqueue_filter(struct fqueue * fq)
pthread_rwlock_rdlock(&proc.lock);
while (fq->next < fq->fqsize) {
+ if (fq->fqueue[fq->next].event == FLOW_UPD) {
+ /* Re-key doorbell: pull internally, never surface. */
+ fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd;
+ ++fq->next;
+ if (fd >= 0) {
+ pthread_rwlock_unlock(&proc.lock);
+ flow_rekey(&proc.flows[fd]);
+ pthread_rwlock_rdlock(&proc.lock);
+ }
+ continue;
+ }
+
if (fq->fqueue[fq->next].event != FLOW_PKT) {
ret = 1;
goto out;
@@ -2643,8 +2832,8 @@ int ipcp_flow_fini(int fd)
return -1;
}
- ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN);
- ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(proc.flows[fd].rx_rb, RB_FLOWDOWN);
+ ssm_rbuff_set_bits(proc.flows[fd].tx_rb, RB_FLOWDOWN);
ssm_flow_set_notify(proc.flows[fd].set,
proc.flows[fd].info.id,
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 3077df65..c055433d 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -878,10 +878,10 @@ static void frct_mark_flow_down(struct frcti * frcti)
struct flow * f = frcti_to_flow(frcti);
if (f->rx_rb != NULL)
- ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(f->rx_rb, RB_FLOWDOWN);
if (f->tx_rb != NULL)
- ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(f->tx_rb, RB_FLOWDOWN);
}
__attribute__((cold))
@@ -890,7 +890,7 @@ static void frct_mark_peer_dead(struct frcti * frcti)
struct flow * f = frcti_to_flow(frcti);
if (f->rx_rb != NULL)
- ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWPEER);
+ ssm_rbuff_set_bits(f->rx_rb, RB_FLOWPEER);
if (proc.fqset != NULL)
ssm_flow_set_notify(proc.fqset, f->info.id, FLOW_PEER);
diff --git a/src/lib/serdes-irm.c b/src/lib/serdes-irm.c
index a896576d..24bb349f 100644
--- a/src/lib/serdes-irm.c
+++ b/src/lib/serdes-irm.c
@@ -174,6 +174,48 @@ int flow__irm_result_des(buffer_t * buf,
else
memset(sk->key, 0, SYMMKEYSZ);
+ sk->epoch = msg->has_generation ? (uint8_t) msg->generation : 0;
+
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return err;
+}
+
+int flow_rekey__irm_result_des(buffer_t * buf,
+ struct crypt_sk * sk,
+ bool * has_key)
+{
+ irm_msg_t * msg;
+ int err;
+
+ msg = irm_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ err = -EIRMD;
+ goto fail_msg;
+ }
+
+ if (!msg->has_result) {
+ err = -EIRMD;
+ goto fail;
+ }
+
+ if (msg->result < 0) {
+ err = msg->result;
+ goto fail;
+ }
+
+ *has_key = msg->has_sym_key && msg->sym_key.len == SYMMKEYSZ;
+ if (*has_key) {
+ memcpy(sk->key, msg->sym_key.data, SYMMKEYSZ);
+ sk->nid = NID_undef;
+ sk->epoch = msg->has_generation ?
+ (uint8_t) msg->generation : 0;
+ }
+
irm_msg__free_unpacked(msg, NULL);
return 0;
@@ -222,6 +264,44 @@ int flow_dealloc__irm_req_ser(buffer_t * buf,
return -ENOMEM;
}
+int flow_update__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ bool rekey)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_FLOW_UPDATE;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->has_rekey = true;
+ msg->rekey = rekey;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf,
const struct flow_info * flow)
{
@@ -398,15 +478,19 @@ int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf,
return 0;
fail_msg:
+ /* hash/pk are borrowed from the caller; detach before free. */
+ msg->hash.len = 0;
+ msg->hash.data = NULL;
+ msg->pk.len = 0;
+ msg->pk.data = NULL;
irm_msg__free_unpacked(msg, NULL);
fail_malloc:
return -ENOMEM;
}
-int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
- const struct flow_info * flow,
- int response,
- const buffer_t * data)
+int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const buffer_t * data)
{
irm_msg_t * msg;
size_t len;
@@ -417,16 +501,14 @@ int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
irm_msg__init(msg);
- msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg->flow_info = flow_info_s_to_msg(flow);
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR;
+ msg->flow_info = flow_info_s_to_msg(flow);
if (msg->flow_info == NULL)
goto fail_msg;
msg->has_pk = true;
msg->pk.len = data->len;
msg->pk.data = data->data;
- msg->has_response = true;
- msg->response = response;
len = irm_msg__get_packed_size(msg);
if (len == 0 || len > buf->len)
@@ -436,27 +518,25 @@ int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
irm_msg__pack(msg, buf->data);
- /* Don't free * data! */
- msg->pk.len = 0;
+ /* Don't free data! */
+ msg->pk.len = 0;
msg->pk.data = NULL;
-
irm_msg__free_unpacked(msg, NULL);
return 0;
fail_msg:
- /* hash/pk are borrowed from the caller; detach before free. */
- msg->hash.len = 0;
- msg->hash.data = NULL;
- msg->pk.len = 0;
- msg->pk.data = NULL;
+ /* pk.data is borrowed from the caller; detach before free. */
+ msg->pk.len = 0;
+ msg->pk.data = NULL;
irm_msg__free_unpacked(msg, NULL);
fail_malloc:
return -ENOMEM;
}
-int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf,
- const struct flow_info * flow,
- const buffer_t * data)
+int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ int response,
+ const buffer_t * data)
{
irm_msg_t * msg;
size_t len;
@@ -467,14 +547,16 @@ int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf,
irm_msg__init(msg);
- msg->code = IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR;
- msg->flow_info = flow_info_s_to_msg(flow);
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
+ msg->flow_info = flow_info_s_to_msg(flow);
if (msg->flow_info == NULL)
goto fail_msg;
msg->has_pk = true;
msg->pk.len = data->len;
msg->pk.data = data->data;
+ msg->has_response = true;
+ msg->response = response;
len = irm_msg__get_packed_size(msg);
if (len == 0 || len > buf->len)
@@ -484,16 +566,14 @@ int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf,
irm_msg__pack(msg, buf->data);
- /* Don't free data! */
- msg->pk.len = 0;
+ /* Don't free * data! */
+ msg->pk.len = 0;
msg->pk.data = NULL;
+
irm_msg__free_unpacked(msg, NULL);
return 0;
fail_msg:
- /* pk.data is borrowed from the caller; detach before free. */
- msg->pk.len = 0;
- msg->pk.data = NULL;
irm_msg__free_unpacked(msg, NULL);
fail_malloc:
return -ENOMEM;
diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c
index c149c306..0121af89 100644
--- a/src/lib/ssm/rbuff.c
+++ b/src/lib/ssm/rbuff.c
@@ -74,7 +74,7 @@ struct ssm_rbuff {
ssize_t * shm_base; /* start of shared memory */
size_t * head; /* start of ringbuffer */
size_t * tail;
- size_t * acl; /* access control */
+ size_t * flags; /* out-of-band flags (RB_*) */
pthread_mutex_t * mtx; /* lock for cond vars only */
pthread_cond_t * add; /* signal when new data */
pthread_cond_t * del; /* signal when data removed */
@@ -114,8 +114,8 @@ static struct ssm_rbuff * rbuff_create(pid_t pid,
rb->shm_base = shm_base;
rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE));
rb->tail = (size_t *) (rb->head + 1);
- rb->acl = (size_t *) (rb->tail + 1);
- rb->mtx = (pthread_mutex_t *) (rb->acl + 1);
+ rb->flags = (size_t *) (rb->tail + 1);
+ rb->mtx = (pthread_mutex_t *) (rb->flags + 1);
rb->add = (pthread_cond_t *) (rb->mtx + 1);
rb->del = rb->add + 1;
rb->pid = pid;
@@ -181,7 +181,7 @@ struct ssm_rbuff * ssm_rbuff_create(pid_t pid,
if (pthread_cond_init(rb->del, &cattr))
goto fail_del;
- *rb->acl = ACL_RDWR;
+ *rb->flags = RB_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -231,7 +231,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb)
assert(rb);
/*
- * Caller must set ACL_FLOWDOWN first; if a user becomes
+ * Caller must set RB_FLOWDOWN first; if a user becomes
* cancellable, push a cleanup that decrements n_users.
*/
while (__atomic_load_n(&rb->n_users, __ATOMIC_SEQ_CST) > 0) {
@@ -245,7 +245,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb)
int ssm_rbuff_write(struct ssm_rbuff * rb,
size_t off)
{
- size_t acl;
+ size_t flags;
bool was_empty;
int ret = 0;
@@ -253,15 +253,15 @@ int ssm_rbuff_write(struct ssm_rbuff * rb,
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl != ACL_RDWR) {
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags != RB_RDWR) {
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
- goto fail_acl;
+ goto fail_flags;
}
- if (acl & ACL_RDONLY) {
+ if (!(flags & RB_WR)) {
ret = -ENOTALLOC;
- goto fail_acl;
+ goto fail_flags;
}
}
@@ -287,7 +287,7 @@ int ssm_rbuff_write(struct ssm_rbuff * rb,
fail_mutex:
pthread_mutex_unlock(rb->mtx);
- fail_acl:
+ fail_flags:
__atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
@@ -296,7 +296,7 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
size_t off,
const struct timespec * abstime)
{
- size_t acl;
+ size_t flags;
int ret = 0;
bool was_empty;
@@ -304,15 +304,15 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl != ACL_RDWR) {
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags != RB_RDWR) {
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
- goto fail_acl;
+ goto fail_flags;
}
- if (acl & ACL_RDONLY) {
+ if (!(flags & RB_WR)) {
ret = -ENOTALLOC;
- goto fail_acl;
+ goto fail_flags;
}
}
@@ -321,8 +321,8 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
while (IS_FULL(rb) && ret != -ETIMEDOUT) {
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
break;
}
@@ -341,25 +341,28 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
pthread_mutex_unlock(rb->mtx);
- fail_acl:
+ fail_flags:
__atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
-static int check_rb_acl(struct ssm_rbuff * rb)
+static int check_rb_flags(struct ssm_rbuff * rb)
{
- size_t acl;
+ size_t flags;
assert(rb != NULL);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
- if (acl & ACL_FLOWDOWN)
+ if (flags & RB_FLOWDOWN)
return -EFLOWDOWN;
- if (acl & ACL_FLOWPEER)
+ if (flags & RB_FLOWPEER)
return -EFLOWPEER;
+ if (!(flags & RB_RD))
+ return -ENOTALLOC;
+
return -EAGAIN;
}
@@ -372,7 +375,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
if (IS_EMPTY(rb)) {
- ret = check_rb_acl(rb);
+ ret = check_rb_flags(rb);
goto out;
}
@@ -380,7 +383,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
if (IS_EMPTY(rb)) {
pthread_mutex_unlock(rb->mtx);
- ret = check_rb_acl(rb);
+ ret = check_rb_flags(rb);
goto out;
}
@@ -400,14 +403,14 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
const struct timespec * abstime)
{
ssize_t idx = -1;
- size_t acl;
+ size_t flags;
assert(rb != NULL);
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (IS_EMPTY(rb) && (flags & RB_FLOWDOWN)) {
idx = -EFLOWDOWN;
goto out;
}
@@ -418,7 +421,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
while (IS_EMPTY(rb) &&
idx != -ETIMEDOUT &&
- check_rb_acl(rb) == -EAGAIN) {
+ check_rb_flags(rb) == -EAGAIN) {
idx = -robust_wait(rb->add, rb->mtx, abstime);
}
@@ -429,7 +432,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
ADVANCE_TAIL(rb);
pthread_cond_broadcast(rb->del);
} else if (idx != -ETIMEDOUT) {
- idx = check_rb_acl(rb);
+ idx = check_rb_flags(rb);
}
pthread_mutex_unlock(rb->mtx);
@@ -441,23 +444,35 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
return idx;
}
-void ssm_rbuff_set_acl(struct ssm_rbuff * rb,
- uint32_t flags)
+void ssm_rbuff_set_bits(struct ssm_rbuff * rb,
+ uint32_t bits)
+{
+ assert(rb != NULL);
+
+ robust_mutex_lock(rb->mtx);
+ __atomic_fetch_or(rb->flags, (size_t) bits, __ATOMIC_SEQ_CST);
+ pthread_cond_broadcast(rb->add);
+ pthread_cond_broadcast(rb->del);
+ pthread_mutex_unlock(rb->mtx);
+}
+
+void ssm_rbuff_clr_bits(struct ssm_rbuff * rb,
+ uint32_t bits)
{
assert(rb != NULL);
robust_mutex_lock(rb->mtx);
- __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST);
+ __atomic_fetch_and(rb->flags, ~(size_t) bits, __ATOMIC_SEQ_CST);
pthread_cond_broadcast(rb->add);
pthread_cond_broadcast(rb->del);
pthread_mutex_unlock(rb->mtx);
}
-uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb)
+uint32_t ssm_rbuff_get_flags(struct ssm_rbuff * rb)
{
assert(rb != NULL);
- return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ return (uint32_t) __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
}
void ssm_rbuff_fini(struct ssm_rbuff * rb)
diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c
index 58cb39c3..48e5a714 100644
--- a/src/lib/ssm/tests/rbuff_test.c
+++ b/src/lib/ssm/tests/rbuff_test.c
@@ -206,10 +206,10 @@ static int test_ssm_rbuff_fill_drain(void)
return TEST_RC_FAIL;
}
-static int test_ssm_rbuff_acl(void)
+static int test_ssm_rbuff_flags(void)
{
struct ssm_rbuff * rb;
- uint32_t acl;
+ uint32_t flags;
TEST_START();
@@ -219,16 +219,16 @@ static int test_ssm_rbuff_acl(void)
goto fail;
}
- acl = ssm_rbuff_get_acl(rb);
- if (acl != ACL_RDWR) {
- printf("Expected ACL_RDWR, got %u.\n", acl);
+ flags = ssm_rbuff_get_flags(rb);
+ if (flags != RB_RDWR) {
+ printf("Expected RB_RDWR, got %u.\n", flags);
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDONLY);
- acl = ssm_rbuff_get_acl(rb);
- if (acl != ACL_RDONLY) {
- printf("Expected ACL_RDONLY, got %u.\n", acl);
+ ssm_rbuff_clr_bits(rb, RB_WR);
+ flags = ssm_rbuff_get_flags(rb);
+ if (flags != RB_RD) {
+ printf("Expected RB_RD, got %u.\n", flags);
goto fail_rb;
}
@@ -237,7 +237,7 @@ static int test_ssm_rbuff_acl(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) {
printf("Expected -EFLOWDOWN on FLOWDOWN.\n");
goto fail_rb;
@@ -553,7 +553,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
ts_add(&now, &interval, &abs_timeout);
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
ret = ssm_rbuff_read_b(rb, &abs_timeout);
if (ret != -EFLOWDOWN) {
@@ -561,7 +561,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDWR);
+ ssm_rbuff_clr_bits(rb, RB_FLOWDOWN);
for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
if (ssm_rbuff_write(rb, i) < 0) {
@@ -573,7 +573,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
ts_add(&now, &interval, &abs_timeout);
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
ret = ssm_rbuff_write_b(rb, 999, &abs_timeout);
if (ret != -EFLOWDOWN) {
@@ -581,7 +581,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDWR);
+ ssm_rbuff_clr_bits(rb, RB_FLOWDOWN);
while (ssm_rbuff_read(rb) >= 0)
;
@@ -664,7 +664,7 @@ int rbuff_test(int argc,
ret |= test_ssm_rbuff_write_read();
ret |= test_ssm_rbuff_read_empty();
ret |= test_ssm_rbuff_fill_drain();
- ret |= test_ssm_rbuff_acl();
+ ret |= test_ssm_rbuff_flags();
ret |= test_ssm_rbuff_open_close();
ret |= test_ssm_rbuff_threaded();
ret |= test_ssm_rbuff_blocking();