summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/serdes-irm.h3
-rw-r--r--src/irmd/main.c682
-rw-r--r--src/irmd/reg/flow.c7
-rw-r--r--src/irmd/reg/flow.h7
-rw-r--r--src/irmd/reg/reg.c389
-rw-r--r--src/irmd/reg/reg.h43
-rw-r--r--src/irmd/reg/tests/reg_test.c165
-rw-r--r--src/lib/dev.c74
-rw-r--r--src/lib/pb/irm.proto1
-rw-r--r--src/lib/serdes-irm.c4
10 files changed, 1113 insertions, 262 deletions
diff --git a/include/ouroboros/serdes-irm.h b/include/ouroboros/serdes-irm.h
index 01c4153c..a5854d5b 100644
--- a/include/ouroboros/serdes-irm.h
+++ b/include/ouroboros/serdes-irm.h
@@ -75,7 +75,8 @@ int flow_update__irm_req_ser(buffer_t * buf,
int flow_rekey__irm_result_des(buffer_t * buf,
struct crypt_sk * sk,
- bool * has_key);
+ bool * has_key,
+ bool * initiator);
int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf,
const struct flow_info * info);
diff --git a/src/irmd/main.c b/src/irmd/main.c
index c77355e7..19be4ab9 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -119,7 +119,9 @@ struct rekey_ctx {
enum rekey_evt_type {
REKEY_INIT = 0, /* start an exchange for flow_id */
- REKEY_RESP /* a RESPONSE arrived for flow_id */
+ REKEY_REQ, /* a REQUEST arrived for flow_id */
+ REKEY_RESP, /* a RESPONSE arrived for flow_id */
+ REKEY_DIRECT /* in-process re-key, direct flow */
};
struct rekey_evt {
@@ -137,7 +139,7 @@ struct {
char * cfg_file; /* configuration file path */
#endif
struct lockfile * lf; /* single irmd per system */
- struct ssm_pool * gspp; /* pool for packets */
+ struct ssm_pool * gspp; /* pool for packets */
int sockfd; /* UNIX socket */
@@ -154,11 +156,10 @@ struct {
pthread_t acceptor; /* accept new commands */
struct {
- pthread_t worker; /* Tier-2 re-key orchestrator */
- struct list_head inbox; /* re-key events for worker */
- pthread_cond_t cond; /* inbox signal condvar */
- pthread_mutex_t lock; /* inbox lock */
- bool stop; /* worker shutdown flag */
+ pthread_t worker; /* Tier-2 re-key orchestrator */
+ struct list_head inbox; /* re-key events for worker */
+ pthread_cond_t cond; /* inbox signal condvar */
+ pthread_mutex_t mtx; /* inbox lock */
} rk;
} irmd;
@@ -818,7 +819,8 @@ static int name_unreg(const char * name,
static int get_peer_ids(int fd,
uid_t * uid,
- gid_t * gid)
+ gid_t * gid,
+ pid_t * pid)
{
#if defined(__linux__)
struct ucred ucred;
@@ -831,9 +833,14 @@ static int get_peer_ids(int fd,
*uid = ucred.uid;
*gid = ucred.gid;
+ if (pid != NULL)
+ *pid = ucred.pid;
#else
if (getpeereid(fd, uid, gid) < 0)
goto fail;
+
+ if (pid != NULL)
+ *pid = -1; /* no portable SO_PEERCRED.pid equivalent */
#endif
return 0;
fail:
@@ -882,6 +889,7 @@ static int flow_accept(struct flow_info * flow,
{
buffer_t req_hdr;
buffer_t resp_hdr;
+ buffer_t peer_crt = BUF_INIT;
char name[NAME_SIZE + 1];
struct name_info info;
int err;
@@ -945,7 +953,8 @@ static int flow_accept(struct flow_info * flow,
flow->uid = reg_get_proc_uid(flow->n_pid);
- err = oap_srv_process(&info, req_hdr, &resp_hdr, data, sk);
+ err = oap_srv_process(&info, req_hdr, &resp_hdr, data, sk,
+ false, NULL, &peer_crt);
if (err == -EREPLAY) {
log_warn("Dropping replayed alloc request for %s.", name);
goto fail_replay;
@@ -960,6 +969,8 @@ static int flow_accept(struct flow_info * flow,
log_err("Failed to respond to direct flow.");
goto fail_resp;
}
+ if (sk->nid != NID_undef)
+ reg_flow_set_rekey(flow->id, false, peer_crt);
log_info("Flow %d accepted (direct) by %d for %s.",
flow->id, flow->n_pid, name);
} else if (ipcp_flow_alloc_resp(flow, 0, resp_hdr) < 0) {
@@ -967,11 +978,12 @@ static int flow_accept(struct flow_info * flow,
goto fail_resp;
} else {
if (sk->nid != NID_undef)
- reg_flow_set_rekey(flow->id, false);
+ reg_flow_set_rekey(flow->id, false, peer_crt);
log_info("Flow %d accepted by %d for %s (uid %d).",
flow->id, flow->n_pid, name, flow->uid);
}
+ freebuf(peer_crt);
freebuf(req_hdr);
freebuf(resp_hdr);
@@ -981,6 +993,7 @@ static int flow_accept(struct flow_info * flow,
if (!reg_flow_is_direct(flow->id))
ipcp_flow_alloc_resp(flow, err, resp_hdr);
fail_replay:
+ freebuf(peer_crt);
freebuf(req_hdr);
freebuf(resp_hdr);
fail_wait:
@@ -990,6 +1003,7 @@ static int flow_accept(struct flow_info * flow,
fail_resp:
flow->state = FLOW_NULL;
+ freebuf(peer_crt);
freebuf(req_hdr);
freebuf(resp_hdr);
reg_destroy_flow(flow->id);
@@ -1238,6 +1252,7 @@ static int flow_alloc_direct(const char * dst,
struct flow_info acc; /* server side flow */
buffer_t req_hdr = BUF_INIT;
buffer_t resp_hdr = BUF_INIT;
+ buffer_t no_crt = BUF_INIT;
void * ctx;
int err;
@@ -1247,7 +1262,7 @@ static int flow_alloc_direct(const char * dst,
return -EAGAIN;
}
- if (oap_cli_prepare(&ctx, info, &req_hdr, *data) < 0) {
+ if (oap_cli_prepare(&ctx, info, &req_hdr, *data, false) < 0) {
log_err("Failed to prepare OAP for %s.", dst);
return -EBADF;
}
@@ -1280,7 +1295,7 @@ static int flow_alloc_direct(const char * dst,
return -ETIMEDOUT;
}
- err = oap_cli_complete(ctx, info, resp_hdr, data, sk);
+ err = oap_cli_complete(ctx, info, resp_hdr, data, sk, NULL, NULL);
if (err < 0) {
log_err("OAP completion failed for %s.", dst);
freebuf(resp_hdr);
@@ -1293,6 +1308,10 @@ static int flow_alloc_direct(const char * dst,
flow->mtu = DIRECT_MTU;
flow->state = FLOW_ALLOCATED;
+ /* Mark encrypted for re-key; the acceptor caches the cert. */
+ if (sk->nid != NID_undef)
+ reg_flow_set_rekey(acc.id, true, no_crt);
+
log_info("Flow %d allocated (direct) for %d to %s.",
flow->id, flow->n_pid, dst);
@@ -1311,6 +1330,7 @@ static int flow_alloc(const char * dst,
buffer_t req_hdr = BUF_INIT;
buffer_t resp_hdr = BUF_INIT;
buffer_t hash = BUF_INIT;
+ buffer_t peer_crt = BUF_INIT;
struct name_info info;
void * ctx;
int err;
@@ -1344,6 +1364,8 @@ static int flow_alloc(const char * dst,
goto fail_flow;
}
+ reg_set_name_for_flow_id(dst, flow->id);
+
if (get_ipcp_by_dst(dst, &flow->n_1_pid, &hash) < 0) {
log_err("Failed to find IPCP for %s.", dst);
err = -EIPCP;
@@ -1356,7 +1378,7 @@ static int flow_alloc(const char * dst,
goto fail_prepare;
}
- if (oap_cli_prepare(&ctx, &info, &req_hdr, *data) < 0) {
+ if (oap_cli_prepare(&ctx, &info, &req_hdr, *data, false) < 0) {
log_err("Failed to prepare OAP request for %s.", dst);
err = -EBADF;
goto fail_prepare;
@@ -1388,15 +1410,16 @@ static int flow_alloc(const char * dst,
goto fail_peer;
}
- err = oap_cli_complete(ctx, &info, resp_hdr, data, sk);
+ err = oap_cli_complete(ctx, &info, resp_hdr, data, sk, NULL, &peer_crt);
if (err < 0) {
log_err("OAP completion failed for %s.", dst);
goto fail_complete;
}
if (sk->nid != NID_undef)
- reg_flow_set_rekey(flow->id, true);
+ reg_flow_set_rekey(flow->id, true, peer_crt);
+ freebuf(peer_crt);
freebuf(req_hdr);
freebuf(resp_hdr);
freebuf(hash);
@@ -1404,7 +1427,8 @@ static int flow_alloc(const char * dst,
return 0;
fail_complete:
- ctx = NULL; /* freee'd on complete */
+ freebuf(peer_crt);
+ ctx = NULL; /* free'd on complete */
fail_peer:
flow->state = FLOW_DEALLOCATED;
fail_wait:
@@ -1484,9 +1508,15 @@ static void rekey_post(enum rekey_evt_type type,
evt = malloc(sizeof(*evt));
if (evt == NULL) {
- log_err("Failed to post re-key event for flow %d.", flow_id);
+ log_err("Failed to malloc re-key event for flow %d.", flow_id);
+ if (type == REKEY_INIT || type == REKEY_DIRECT)
+ reg_flow_clear_in_flight(flow_id);
+ else
+ reg_flow_rekey_arr_done(flow_id, type == REKEY_REQ);
+
if (buf != NULL)
freebuf(*buf);
+
return;
}
@@ -1500,12 +1530,12 @@ static void rekey_post(enum rekey_evt_type type,
clrbuf(*buf);
}
- pthread_mutex_lock(&irmd.rk.lock);
+ pthread_mutex_lock(&irmd.rk.mtx);
list_add_tail(&evt->next, &irmd.rk.inbox);
pthread_cond_signal(&irmd.rk.cond);
- pthread_mutex_unlock(&irmd.rk.lock);
+ pthread_mutex_unlock(&irmd.rk.mtx);
}
static void rekey_post_init(int flow_id,
@@ -1520,6 +1550,18 @@ static void rekey_post_resp(int flow_id,
rekey_post(REKEY_RESP, flow_id, 0, buf);
}
+static void rekey_post_req(int flow_id,
+ pid_t n_1_pid,
+ buffer_t * buf)
+{
+ rekey_post(REKEY_REQ, flow_id, n_1_pid, buf);
+}
+
+static void rekey_post_direct(int flow_id)
+{
+ rekey_post(REKEY_DIRECT, flow_id, 0, NULL);
+}
+
/* Worker-only: find an in-flight entry by flow_id. */
static struct rekey_ctx * rekey_find(struct list_head * tbl,
int flow_id)
@@ -1545,6 +1587,18 @@ static void rekey_drop(struct rekey_ctx * e)
free(e);
}
+/* Resolve a flow's registered name info; < 0 if the flow or name is gone. */
+static int rekey_name_info(int flow_id,
+ struct name_info * info)
+{
+ char name[NAME_SIZE + 1];
+
+ if (reg_get_name_for_flow_id(name, flow_id) < 0)
+ return -1;
+
+ return reg_get_name_info(name, info);
+}
+
/* Flow-update relay payload: a 1-byte type prefix on an opaque body. */
enum flow_upd_type {
FLOW_UPD_REKEY_REQ = 0,
@@ -1567,10 +1621,30 @@ static int flow_upd_wrap(buffer_t * out,
return 0;
}
-/*
- * Worker-only: start a fresh OAP exchange over the live flow. Replaces
- * any prior in-flight entry for flow_id (handles flow_id recycling).
- */
+/* Cleanup handlers — the re-key worker is cancelled at shutdown. */
+static void rk_free_evt(void * o)
+{
+ struct rekey_evt * evt = o;
+
+ freebuf(evt->buf);
+ free(evt);
+}
+
+static void rk_freebuf(void * o)
+{
+ freebuf(*(buffer_t *) o);
+}
+
+static void rk_clear_in_flight(void * o)
+{
+ reg_flow_clear_in_flight(*(int *) o);
+}
+
+static void rk_clear_key(void * o)
+{
+ crypt_secure_clear(o, SYMMKEYSZ);
+}
+
static void rekey_do_initiate(struct list_head * tbl,
int flow_id,
pid_t n_1_pid)
@@ -1581,25 +1655,21 @@ static void rekey_do_initiate(struct list_head * tbl,
buffer_t req = BUF_INIT;
buffer_t upd = BUF_INIT;
buffer_t data = BUF_INIT;
- char nbuf[NAME_SIZE + 1];
void * ctx = NULL;
int ret;
e = rekey_find(tbl, flow_id);
if (e != NULL)
- rekey_drop(e);
+ rekey_drop(e); /* Replace in-flight entries */
- if (reg_get_name_for_flow_id(nbuf, flow_id) < 0 ||
- reg_get_name_info(nbuf, &name) < 0) {
- log_warn("No name info to re-key flow %d.", flow_id);
- reg_flow_clear_in_flight(flow_id);
- return;
+ if (rekey_name_info(flow_id, &name) < 0) {
+ log_err("Failed to get name info to re-key flow %d.", flow_id);
+ goto fail;
}
- if (oap_cli_prepare(&ctx, &name, &req, data) < 0) {
+ if (oap_cli_prepare(&ctx, &name, &req, data, true) < 0) {
log_err("Failed to prepare re-key for flow %d.", flow_id);
- reg_flow_clear_in_flight(flow_id);
- return;
+ goto fail;
}
memset(&info, 0, sizeof(info));
@@ -1611,7 +1681,15 @@ static void rekey_do_initiate(struct list_head * tbl,
goto fail_ctx;
}
+ pthread_cleanup_push(rk_clear_in_flight, &flow_id);
+ pthread_cleanup_push(oap_ctx_free, ctx);
+ pthread_cleanup_push(rk_freebuf, &req);
+ pthread_cleanup_push(rk_freebuf, &upd);
ret = ipcp_flow_update(&info, upd);
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
freebuf(upd);
if (ret < 0) {
log_err("Failed to send re-key request for flow %d.", flow_id);
@@ -1620,7 +1698,7 @@ static void rekey_do_initiate(struct list_head * tbl,
e = malloc(sizeof(*e));
if (e == NULL) {
- log_err("Failed to track re-key for flow %d.", flow_id);
+ log_err("Failed to malloc re-key ctx for flow %d.", flow_id);
goto fail_ctx;
}
@@ -1640,8 +1718,9 @@ static void rekey_do_initiate(struct list_head * tbl,
fail_ctx:
oap_ctx_free(ctx);
- reg_flow_clear_in_flight(flow_id);
freebuf(req);
+ fail:
+ reg_flow_clear_in_flight(flow_id);
}
/* Worker-only: complete the exchange, install the pending seed. */
@@ -1654,7 +1733,7 @@ static void rekey_do_complete(struct list_head * tbl,
struct crypt_sk sk;
uint8_t kbuf[SYMMKEYSZ];
buffer_t data = BUF_INIT;
- char name[NAME_SIZE + 1];
+ buffer_t crt = BUF_INIT;
uint8_t newgen;
e = rekey_find(tbl, flow_id);
@@ -1663,24 +1742,36 @@ static void rekey_do_complete(struct list_head * tbl,
return;
}
- if (reg_get_name_for_flow_id(name, flow_id) < 0 ||
- reg_get_name_info(name, &info) < 0) {
- log_err("No name info to re-key flow %d.", flow_id);
+ /* A concurrent responder already parked a seed; don't overwrite. */
+ if (reg_flow_rekey_pending(flow_id)) {
+ log_dbg("Re-key already pending for flow %d.", flow_id);
+ goto finish;
+ }
+
+ if (rekey_name_info(flow_id, &info) < 0) {
+ log_err("Failed to get name info to re-key flow %d.", flow_id);
goto finish;
}
sk.key = kbuf;
+ reg_flow_get_peer_crt(flow_id, &crt);
+
/* oap_cli_complete frees the ctx on every path. */
- if (oap_cli_complete(e->ctx, &info, buf, &data, &sk) < 0) {
- log_err("Re-key completion failed for flow %d.", flow_id);
+ if (oap_cli_complete(e->ctx, &info, buf, &data, &sk, &crt, NULL) < 0) {
+ log_warn("Failed to complete re-key for flow %d.", flow_id);
e->ctx = NULL;
- goto finish;
+ goto finish_clear;
}
e->ctx = NULL;
- newgen = data.len == 1 ? *(uint8_t *) data.data : 0;
+ if (data.len != 1) {
+ log_warn("Re-key reply malformed for flow %d.", flow_id);
+ goto finish_clear;
+ }
+
+ newgen = *(uint8_t *) data.data;
if (newgen >= 16) {
log_warn("Re-key gen %u out of range for flow %d.",
@@ -1688,7 +1779,7 @@ static void rekey_do_complete(struct list_head * tbl,
goto finish_clear;
}
- if (reg_flow_store_pending(flow_id, kbuf, newgen) < 0)
+ if (reg_flow_store_pending(flow_id, kbuf, newgen, true) < 0)
log_warn("Flow %d gone during re-key.", flow_id);
else
reg_notify_flow(flow_id, FLOW_UPD);
@@ -1699,6 +1790,7 @@ static void rekey_do_complete(struct list_head * tbl,
crypt_secure_clear(kbuf, SYMMKEYSZ);
freebuf(data);
finish:
+ freebuf(crt);
rekey_drop(e);
reg_flow_clear_in_flight(flow_id);
}
@@ -1733,7 +1825,7 @@ static int rekey_respond(struct flow_info * flow,
buffer_t rsp = BUF_INIT;
buffer_t upd = BUF_INIT;
buffer_t data = BUF_INIT;
- char name[NAME_SIZE + 1];
+ buffer_t crt = BUF_INIT;
uint8_t newgen;
int epoch;
int err;
@@ -1744,9 +1836,14 @@ static int rekey_respond(struct flow_info * flow,
return -EBADF;
}
- if (reg_get_name_for_flow_id(name, flow->id) < 0 ||
- reg_get_name_info(name, &info) < 0) {
- log_err("No name info to re-key flow %d.", flow->id);
+ /* Collision: we are driving our own exchange; let it win. */
+ if (reg_flow_rekey_should_yield(flow->id)) {
+ log_dbg("Yielding to own re-key for flow %d.", flow->id);
+ return 0;
+ }
+
+ if (rekey_name_info(flow->id, &info) < 0) {
+ log_err("Failed to get name info to re-key flow %d.", flow->id);
return -ENAME;
}
@@ -1761,17 +1858,19 @@ static int rekey_respond(struct flow_info * flow,
sk.key = kbuf;
- err = oap_srv_process(&info, *pk, &rsp, &data, &sk);
+ reg_flow_get_peer_crt(flow->id, &crt);
+
+ err = oap_srv_process(&info, *pk, &rsp, &data, &sk, true, &crt, NULL);
if (err < 0) {
/* data still points to stack newgen; don't free it. */
- log_err("Re-key OAP failed for flow %d.", flow->id);
+ log_err("Failed to process re-key OAP for flow %d.", flow->id);
goto finish;
}
/* On success oap_srv_process repointed data to client output. */
freebuf(data);
- if (reg_flow_store_pending(flow->id, kbuf, newgen) < 0) {
+ if (reg_flow_store_pending(flow->id, kbuf, newgen, false) < 0) {
log_warn("Flow %d gone during re-key.", flow->id);
err = -EBADF;
goto finish;
@@ -1780,9 +1879,17 @@ static int rekey_respond(struct flow_info * flow,
reg_notify_flow(flow->id, FLOW_UPD);
if (flow_upd_wrap(&upd, FLOW_UPD_REKEY_RESP, &rsp) == 0) {
+ pthread_cleanup_push(rk_clear_key, kbuf);
+ pthread_cleanup_push(rk_freebuf, &rsp);
+ pthread_cleanup_push(rk_freebuf, &crt);
+ pthread_cleanup_push(rk_freebuf, &upd);
if (ipcp_flow_update(flow, upd) < 0)
log_err("Failed to send re-key response for flow %d.",
flow->id);
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
freebuf(upd);
}
@@ -1790,56 +1897,182 @@ static int rekey_respond(struct flow_info * flow,
finish:
crypt_secure_clear(kbuf, SYMMKEYSZ);
freebuf(rsp);
+ freebuf(crt);
return err;
}
+/*
+ * Worker-only: re-key a direct (loopback) flow, the exchange runs in-process:
+ * build a client request, then derive the shared seed, and hand the one seed
+ * to both apps with RB_REKEY.
+ */
+static void rekey_do_direct(int flow_id)
+{
+ struct name_info info;
+ struct crypt_sk sk;
+ uint8_t kbuf[SYMMKEYSZ];
+ buffer_t req = BUF_INIT;
+ buffer_t rsp = BUF_INIT;
+ buffer_t data = BUF_INIT;
+ buffer_t crt = BUF_INIT;
+ void * ctx = NULL;
+ uint8_t newgen;
+ int epoch;
+
+ epoch = reg_flow_get_epoch(flow_id);
+ if (epoch < 0) {
+ log_warn("Re-key for unknown flow %d.", flow_id);
+ reg_flow_clear_in_flight(flow_id);
+ return;
+ }
+
+ if (rekey_name_info(flow_id, &info) < 0) {
+ log_err("Failed to get name info to re-key flow %d.", flow_id);
+ reg_flow_clear_in_flight(flow_id);
+ return;
+ }
+
+ if (oap_cli_prepare(&ctx, &info, &req, data, true) < 0) {
+ log_err("Failed to prepare re-key for flow %d.", flow_id);
+ reg_flow_clear_in_flight(flow_id);
+ return;
+ }
+
+ newgen = (uint8_t) ((epoch + 1) & 0x0F);
+ data.data = &newgen;
+ data.len = 1;
+
+ sk.key = kbuf;
+
+ reg_flow_get_peer_crt(flow_id, &crt);
+
+ if (oap_srv_process(&info, req, &rsp, &data, &sk, true,
+ &crt, NULL) < 0) {
+ /* data still points to stack newgen; don't free it. */
+ log_err("Failed to process re-key OAP for flow %d.", flow_id);
+ reg_flow_clear_in_flight(flow_id);
+ goto out;
+ }
+
+ /* On success oap_srv_process repointed data to its output. */
+ freebuf(data);
+
+ if (reg_flow_store_pending_direct(flow_id, kbuf, newgen) < 0) {
+ log_warn("Flow %d gone during re-key.", flow_id);
+ reg_flow_clear_in_flight(flow_id);
+ goto out;
+ }
+
+ reg_notify_flow_peers(flow_id, FLOW_UPD);
+
+ log_dbg("Re-key completed (direct) for flow %d (gen %u).",
+ flow_id, newgen);
+ out:
+ crypt_secure_clear(kbuf, SYMMKEYSZ);
+ oap_ctx_free(ctx);
+ freebuf(req);
+ freebuf(rsp);
+ freebuf(crt);
+}
+
+/* Route one snapshot entry to the wire or in-process re-key path. */
+static void rekey_dispatch(struct list_head * tbl,
+ const struct rekey_info * ri)
+{
+ if (ri->direct)
+ rekey_do_direct(ri->flow_id);
+ else
+ rekey_do_initiate(tbl, ri->flow_id, ri->n_1_pid);
+}
+
static int flow_update_arr(struct flow_info * flow,
buffer_t * pk)
{
uint8_t type;
+ bool is_req;
if (pk->len < 1)
return -EINVAL;
type = pk->data[0];
- /* Strip the type byte, keeping the malloc base for hand-off. */
- memmove(pk->data, pk->data + 1, pk->len - 1);
- pk->len -= 1;
-
switch (type) {
case FLOW_UPD_REKEY_REQ:
- return rekey_respond(flow, pk);
+ is_req = true;
+ break;
case FLOW_UPD_REKEY_RESP:
- /* Hand the payload to the worker and take ownership. */
- rekey_post_resp(flow->id, pk);
- return 0;
+ is_req = false;
+ break;
default:
log_warn("Unknown flow update type %u.", type);
return -EINVAL;
}
+
+ /* Drop floods/spoofs before allocating a worker event. */
+ if (!reg_flow_rekey_arr_admit(flow->id, flow->n_1_pid, is_req))
+ return 0;
+
+ /* Strip the type byte, keeping the malloc base for hand-off. */
+ memmove(pk->data, pk->data + 1, pk->len - 1);
+ pk->len -= 1;
+
+ /* Defer to worker; an inline RESP send deadlocks loopback. */
+ if (is_req)
+ rekey_post_req(flow->id, flow->n_1_pid, pk);
+ else
+ rekey_post_resp(flow->id, pk);
+
+ return 0;
}
static int flow_update(struct flow_info * flow,
+ uid_t uid,
+ pid_t cpid,
bool rekey,
struct crypt_sk * sk,
- bool * has_key)
+ bool * has_key,
+ bool * initiator)
{
uint8_t seed[SYMMKEYSZ];
uint8_t epoch;
+ int rc;
- *has_key = false;
+ *has_key = false;
+ *initiator = false;
if (rekey) {
+ pid_t n_1_pid;
+
+ if (!reg_flow_owned_by(flow->id, uid))
+ return -EPERM;
+
+ /* Direct flows re-key in-process; no lower IPCP carrier. */
+ if (reg_flow_is_direct(flow->id)) {
+ if (reg_flow_rekey_begin(flow->id))
+ rekey_post_direct(flow->id);
+
+ return 0;
+ }
+
/* Watermark re-key: the app can't know its lower IPCP. */
- pid_t n_1_pid = reg_flow_get_n_1_pid(flow->id);
- if (n_1_pid > 0)
+ n_1_pid = reg_flow_get_n_1_pid(flow->id);
+ if (n_1_pid <= 0)
+ return 0;
+
+ /* One exchange per flow; the latch arbitrates collisions. */
+ if (reg_flow_rekey_begin(flow->id))
rekey_post_init(flow->id, n_1_pid);
+
return 0;
}
- if (!reg_flow_take_pending(flow->id, seed, &epoch))
+ rc = reg_flow_take_pending(flow->id, uid, cpid, seed, &epoch,
+ initiator);
+ if (rc == -EPERM)
+ return -EPERM;
+
+ if (rc != 0)
return 0;
memcpy(sk->key, seed, SYMMKEYSZ);
@@ -1854,7 +2087,6 @@ static int flow_update(struct flow_info * flow,
return 0;
}
-/* Free every parked OAP ctx at worker exit or cancellation. */
static void rekey_table_cleanup(void * o)
{
struct list_head * tbl = o;
@@ -1867,32 +2099,97 @@ static void rekey_table_cleanup(void * o)
}
}
-/* Pop one event, or NULL if none, draining the inbox under its lock. */
-static struct rekey_evt * rekey_inbox_wait(const struct timespec * deadline)
+static struct rekey_evt * rekey_event_wait(const struct timespec * dl)
{
struct rekey_evt * evt = NULL;
- struct timespec now;
+ int ret = 0;
- pthread_mutex_lock(&irmd.rk.lock);
+ pthread_mutex_lock(&irmd.rk.mtx);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &irmd.rk.mtx);
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- while (list_is_empty(&irmd.rk.inbox) && !irmd.rk.stop &&
- ts_diff_ns(deadline, &now) > 0) {
- pthread_cond_timedwait(&irmd.rk.cond, &irmd.rk.lock, deadline);
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- }
+ while (list_is_empty(&irmd.rk.inbox) && ret != -ETIMEDOUT)
+ ret = -pthread_cond_timedwait(&irmd.rk.cond, &irmd.rk.mtx, dl);
if (!list_is_empty(&irmd.rk.inbox)) {
evt = list_first_entry(&irmd.rk.inbox, struct rekey_evt, next);
list_del(&evt->next);
}
- pthread_mutex_unlock(&irmd.rk.lock);
+ pthread_cleanup_pop(true);
return evt;
}
+static struct timespec rekey_deadline(struct list_head * tbl,
+ struct timespec next)
+{
+ struct timespec deadline = next;
+ struct list_head * p;
+
+ list_for_each(p, tbl) {
+ struct rekey_ctx * e;
+ e = list_entry(p, struct rekey_ctx, next);
+ if (ts_diff_ns(&e->deadline, &deadline) < 0)
+ deadline = e->deadline;
+ }
+
+ return deadline;
+}
+
+static void rekey_handle_evt(struct list_head * tbl,
+ struct rekey_evt * evt)
+{
+ struct flow_info rinfo;
+
+ pthread_cleanup_push(rk_free_evt, evt);
+
+ switch (evt->type) {
+ case REKEY_INIT:
+ rekey_do_initiate(tbl, evt->flow_id, evt->n_1_pid);
+ break;
+ case REKEY_REQ:
+ memset(&rinfo, 0, sizeof(rinfo));
+ rinfo.id = evt->flow_id;
+ rinfo.n_1_pid = evt->n_1_pid;
+ rekey_respond(&rinfo, &evt->buf);
+ reg_flow_rekey_arr_done(evt->flow_id, true);
+ break;
+ case REKEY_RESP:
+ rekey_do_complete(tbl, evt->flow_id, evt->buf);
+ reg_flow_rekey_arr_done(evt->flow_id, false);
+ break;
+ case REKEY_DIRECT:
+ rekey_do_direct(evt->flow_id);
+ break;
+ default:
+ break;
+ }
+
+ pthread_cleanup_pop(true);
+}
+
+/* On the periodic tick, dispatch all flows due for re-keying. */
+static void rekey_run_periodic(struct list_head * tbl,
+ struct timespec * next)
+{
+ struct rekey_info snap[REKEY_BATCH];
+ struct timespec now;
+ int n;
+ int i;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ if (ts_diff_ns(next, &now) > 0)
+ return;
+
+ n = reg_flow_snapshot_rekey_due(snap, REKEY_BATCH);
+ for (i = 0; i < n; ++i)
+ rekey_dispatch(tbl, &snap[i]);
+
+ clock_gettime(PTHREAD_COND_CLOCK, next);
+ next->tv_sec += OAP_REKEY_TIMER;
+}
+
/*
* Single worker owning all in-flight Tier-2 re-keys. It drains the
* inbox, runs the periodic snapshot, and reaps timed-out exchanges.
@@ -1912,67 +2209,23 @@ static void * rekey_worker(void * o)
pthread_cleanup_push(rekey_table_cleanup, &table);
- while (!irmd.rk.stop) {
+ while (true) {
struct rekey_evt * evt;
- struct timespec now;
- struct timespec deadline = next;
- struct list_head * p;
-
- /* Wake no later than the soonest in-flight deadline. */
- list_for_each(p, &table) {
- struct rekey_ctx * e;
- e = list_entry(p, struct rekey_ctx, next);
- if (ts_diff_ns(&e->deadline, &deadline) < 0)
- deadline = e->deadline;
- }
+ struct timespec deadline;
- evt = rekey_inbox_wait(&deadline);
+ deadline = rekey_deadline(&table, next);
- if (irmd.rk.stop) {
- if (evt != NULL) {
- freebuf(evt->buf);
- free(evt);
- }
- break;
- }
-
- if (evt != NULL) {
- switch (evt->type) {
- case REKEY_INIT:
- rekey_do_initiate(&table, evt->flow_id,
- evt->n_1_pid);
- break;
- case REKEY_RESP:
- rekey_do_complete(&table, evt->flow_id,
- evt->buf);
- freebuf(evt->buf);
- break;
- default:
- break;
- }
-
- free(evt);
- }
+ evt = rekey_event_wait(&deadline);
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- if (ts_diff_ns(&next, &now) <= 0) {
- struct rekey_info snap[REKEY_BATCH];
- int n;
- int i;
+ if (evt != NULL)
+ rekey_handle_evt(&table, evt);
- n = reg_flow_snapshot_rekey_due(snap, REKEY_BATCH);
- for (i = 0; i < n; ++i)
- rekey_do_initiate(&table, snap[i].flow_id,
- snap[i].n_1_pid);
-
- clock_gettime(PTHREAD_COND_CLOCK, &next);
- next.tv_sec += OAP_REKEY_TIMER;
- }
+ rekey_run_periodic(&table, &next);
rekey_reap_expired(&table);
}
- pthread_cleanup_pop(1);
+ pthread_cleanup_pop(true);
return (void *) 0;
}
@@ -2048,6 +2301,10 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
struct timespec ts = TIMESPEC_INIT_S(0); /* static analysis */
int res;
bool has_key = false;
+ bool initiator = false;
+ uid_t uid;
+ gid_t gid;
+ pid_t cpid;
irm_msg_t * ret_msg;
buffer_t data;
@@ -2114,7 +2371,7 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
case IRM_MSG_CODE__IRM_PROC_ANNOUNCE:
proc.pid = msg->pid;
strcpy(proc.prog, msg->prog);
- res = get_peer_ids(fd, &proc.uid, &proc.gid);
+ res = get_peer_ids(fd, &proc.uid, &proc.gid, NULL);
if (res < 0)
log_err("Failed to get UID/GID for pid %d.", msg->pid);
else
@@ -2157,27 +2414,29 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
flow = flow_info_msg_to_s(msg->flow_info);
sk.key = kbuf;
res = flow_accept(&flow, &data, abstime, &sk);
- if (res == 0) {
- ret_msg->flow_info = flow_info_s_to_msg(&flow);
- ret_msg->has_pk = data.len != 0;
- ret_msg->pk.data = data.data;
- ret_msg->pk.len = data.len;
- ret_msg->has_cipher_nid = true;
- ret_msg->cipher_nid = sk.nid;
- if (sk.nid != NID_undef) {
- hbuf = malloc(SYMMKEYSZ);
- if (hbuf == NULL) {
- log_err("Failed to malloc key buf");
- res = -ENOMEM;
- break;
- }
-
- memcpy(hbuf, kbuf, SYMMKEYSZ);
- ret_msg->sym_key.data = hbuf;
- ret_msg->sym_key.len = SYMMKEYSZ;
- ret_msg->has_sym_key = true;
- }
+ if (res != 0)
+ break;
+
+ ret_msg->flow_info = flow_info_s_to_msg(&flow);
+ ret_msg->has_pk = data.len != 0;
+ ret_msg->pk.data = data.data;
+ ret_msg->pk.len = data.len;
+ ret_msg->has_cipher_nid = true;
+ ret_msg->cipher_nid = sk.nid;
+ if (sk.nid == NID_undef)
+ break;
+
+ hbuf = malloc(SYMMKEYSZ);
+ if (hbuf == NULL) {
+ log_err("Failed to malloc key buf");
+ res = -ENOMEM;
+ break;
}
+
+ memcpy(hbuf, kbuf, SYMMKEYSZ);
+ ret_msg->sym_key.data = hbuf;
+ ret_msg->sym_key.len = SYMMKEYSZ;
+ ret_msg->has_sym_key = true;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
data.len = msg->pk.len;
@@ -2188,26 +2447,29 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
abstime = abstime == NULL ? &max : abstime;
sk.key = kbuf;
res = flow_alloc(msg->dst, &flow, &data, abstime, &sk);
- if (res == 0) {
- ret_msg->flow_info = flow_info_s_to_msg(&flow);
- ret_msg->has_pk = data.len != 0;
- ret_msg->pk.data = data.data;
- ret_msg->pk.len = data.len;
- ret_msg->has_cipher_nid = true;
- ret_msg->cipher_nid = sk.nid;
- if (sk.nid != NID_undef) {
- hbuf = malloc(SYMMKEYSZ);
- if (hbuf == NULL) {
- log_err("Failed to malloc key buf");
- res = -ENOMEM;
- break;
- }
- memcpy(hbuf, kbuf, SYMMKEYSZ);
- ret_msg->sym_key.data = hbuf;
- ret_msg->sym_key.len = SYMMKEYSZ;
- ret_msg->has_sym_key = true;
- }
+ if (res != 0)
+ break;
+
+ ret_msg->flow_info = flow_info_s_to_msg(&flow);
+ ret_msg->has_pk = data.len != 0;
+ ret_msg->pk.data = data.data;
+ ret_msg->pk.len = data.len;
+ ret_msg->has_cipher_nid = true;
+ ret_msg->cipher_nid = sk.nid;
+ if (sk.nid == NID_undef)
+ break;
+
+ hbuf = malloc(SYMMKEYSZ);
+ if (hbuf == NULL) {
+ log_err("Failed to malloc key buf");
+ res = -ENOMEM;
+ break;
}
+
+ memcpy(hbuf, kbuf, SYMMKEYSZ);
+ ret_msg->sym_key.data = hbuf;
+ ret_msg->sym_key.len = SYMMKEYSZ;
+ ret_msg->has_sym_key = true;
break;
case IRM_MSG_CODE__IRM_FLOW_JOIN:
assert(msg->pk.len == 0 && msg->pk.data == NULL);
@@ -2257,26 +2519,39 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg,
break;
case IRM_MSG_CODE__IRM_FLOW_UPDATE:
flow = flow_info_msg_to_s(msg->flow_info);
+ if (get_peer_ids(fd, &uid, &gid, &cpid) < 0) {
+ res = -EPERM;
+ break;
+ }
+
+ if (cpid <= 0) /* non-Linux: fall back to asserted pid */
+ cpid = flow.n_pid;
+
sk.key = kbuf;
- res = flow_update(&flow, msg->rekey, &sk, &has_key);
- if (res == 0) {
- ret_msg->flow_info = flow_info_s_to_msg(&flow);
- if (has_key) {
- hbuf = malloc(SYMMKEYSZ);
- if (hbuf == NULL) {
- log_err("Failed to malloc key buf");
- res = -ENOMEM;
- break;
- }
-
- memcpy(hbuf, kbuf, SYMMKEYSZ);
- ret_msg->sym_key.data = hbuf;
- ret_msg->sym_key.len = SYMMKEYSZ;
- ret_msg->has_sym_key = true;
- ret_msg->has_generation = true;
- ret_msg->generation = sk.epoch;
- }
+ res = flow_update(&flow, uid, cpid, msg->rekey, &sk, &has_key,
+ &initiator);
+ if (res != 0)
+ break;
+
+ ret_msg->flow_info = flow_info_s_to_msg(&flow);
+ if (!has_key)
+ break;
+
+ hbuf = malloc(SYMMKEYSZ);
+ if (hbuf == NULL) {
+ log_err("Failed to malloc key buf");
+ res = -ENOMEM;
+ break;
}
+
+ memcpy(hbuf, kbuf, SYMMKEYSZ);
+ ret_msg->sym_key.data = hbuf;
+ ret_msg->sym_key.len = SYMMKEYSZ;
+ ret_msg->has_sym_key = true;
+ ret_msg->has_generation = true;
+ ret_msg->generation = sk.epoch;
+ ret_msg->has_rk_initiator = true;
+ ret_msg->rk_initiator = initiator;
break;
default:
log_err("Don't know that message code.");
@@ -2640,14 +2915,14 @@ static int irm_init(void)
list_head_init(&irmd.cmds);
- if (pthread_mutex_init(&irmd.rk.lock, NULL)) {
+ if (pthread_mutex_init(&irmd.rk.mtx, NULL)) {
log_err("Failed to initialize mutex.");
- goto fail_rk_lock;
+ goto fail_rk_mtx;
}
if (pthread_condattr_init(&cattr)) {
log_err("Failed to initialize condattr.");
- goto fail_rk_lock;
+ goto fail_rk_mtx;
}
#ifndef __APPLE__
@@ -2768,8 +3043,8 @@ static int irm_init(void)
fail_stat:
pthread_cond_destroy(&irmd.rk.cond);
fail_rk_cond:
- pthread_mutex_destroy(&irmd.rk.lock);
- fail_rk_lock:
+ pthread_mutex_destroy(&irmd.rk.mtx);
+ fail_rk_mtx:
pthread_cond_destroy(&irmd.cmd_cond);
fail_cmd_cond:
pthread_mutex_destroy(&irmd.cmd_lock);
@@ -2818,7 +3093,7 @@ static void irm_fini(void)
pthread_mutex_unlock(&irmd.cmd_lock);
- pthread_mutex_lock(&irmd.rk.lock);
+ pthread_mutex_lock(&irmd.rk.mtx);
list_for_each_safe(p, h, &irmd.rk.inbox) {
struct rekey_evt * evt;
@@ -2828,17 +3103,18 @@ static void irm_fini(void)
free(evt);
}
- pthread_mutex_unlock(&irmd.rk.lock);
+ pthread_mutex_unlock(&irmd.rk.mtx);
pthread_mutex_destroy(&irmd.cmd_lock);
pthread_cond_destroy(&irmd.cmd_cond);
- pthread_mutex_destroy(&irmd.rk.lock);
+ pthread_mutex_destroy(&irmd.rk.mtx);
pthread_cond_destroy(&irmd.rk.cond);
pthread_rwlock_destroy(&irmd.state_lock);
#ifdef HAVE_FUSE
while (rmdir(FUSE_PREFIX) < 0 && retries-- > 0)
nanosleep(&wait, NULL);
+
if (retries < 0)
log_err("Failed to remove " FUSE_PREFIX);
#endif
@@ -2871,10 +3147,10 @@ static int irm_start(void)
if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL))
goto fail_acceptor;
- irmd.rk.stop = false;
- if (OAP_REKEY_TIMER > 0 &&
- pthread_create(&irmd.rk.worker, NULL, rekey_worker, NULL))
- goto fail_rekey_worker;
+ if (OAP_REKEY_TIMER > 0) {
+ if (pthread_create(&irmd.rk.worker, NULL, rekey_worker, NULL))
+ goto fail_rekey_worker;
+ }
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2923,13 +3199,7 @@ static void irm_sigwait(sigset_t sigset)
static void irm_stop(void)
{
if (OAP_REKEY_TIMER > 0) {
- pthread_mutex_lock(&irmd.rk.lock);
-
- irmd.rk.stop = true;
- pthread_cond_signal(&irmd.rk.cond);
-
- pthread_mutex_unlock(&irmd.rk.lock);
-
+ pthread_cancel(irmd.rk.worker);
pthread_join(irmd.rk.worker, NULL);
}
diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c
index ccb2562d..8be2dfc7 100644
--- a/src/irmd/reg/flow.c
+++ b/src/irmd/reg/flow.c
@@ -70,10 +70,12 @@ static void destroy_rbuffs(struct reg_flow * flow)
{
if (flow->n_rb != NULL)
ssm_rbuff_destroy(flow->n_rb);
+
flow->n_rb = NULL;
if (flow->n_1_rb != NULL)
ssm_rbuff_destroy(flow->n_1_rb);
+
flow->n_1_rb = NULL;
}
@@ -81,7 +83,10 @@ void reg_flow_destroy(struct reg_flow * flow)
{
assert(flow != NULL);
- crypt_secure_clear(flow->rk.pending_seed, SYMMKEYSZ);
+ if (flow->rk.pending_seed != NULL)
+ crypt_secure_free(flow->rk.pending_seed, SYMMKEYSZ);
+
+ freebuf(flow->rk.peer_crt);
switch(flow->info.state) {
case FLOW_ACCEPT_PENDING:
diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h
index 15fc7b8f..166bed61 100644
--- a/src/irmd/reg/flow.h
+++ b/src/irmd/reg/flow.h
@@ -55,9 +55,14 @@ struct reg_flow {
uint8_t epoch; /* last epoch installed by app */
bool initiator; /* OAP initiator (role 0) */
bool in_flight; /* a re-key is in progress */
- uint8_t pending_seed[SYMMKEYSZ];
+ bool req_queued; /* a peer REQ is in the inbox */
+ bool resp_queued; /* a peer RESP is in the inbox */
+ uint8_t * pending_seed; /* secure heap; NULL until set */
uint8_t pending_epoch;
+ bool pending_initiator; /* pending seed: oap_cli side */
bool has_pending; /* new seed awaits app pull */
+ uint8_t pulled; /* direct: per-app pull mask */
+ buffer_t peer_crt; /* peer cert DER, cached at HS */
} rk;
struct ssm_rbuff * n_rb;
diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c
index 70baf64e..ebf3959d 100644
--- a/src/irmd/reg/reg.c
+++ b/src/irmd/reg/reg.c
@@ -872,6 +872,7 @@ int reg_list_ipcps(ipcp_list_msg_t *** ipcps)
fail:
while (i-- > 0)
ipcp_list_msg__free_unpacked((*ipcps)[i], NULL);
+
free(*ipcps);
fail_malloc:
pthread_mutex_unlock(&reg.mtx);
@@ -1033,6 +1034,20 @@ int reg_get_name_for_flow_id(char * buf,
return f == NULL ? -ENOENT : 0;
}
+void reg_set_name_for_flow_id(const char * name,
+ int flow_id)
+{
+ struct reg_flow * f;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ f = __reg_get_flow(flow_id);
+ if (f != NULL)
+ strcpy(f->name, name);
+
+ pthread_mutex_unlock(&reg.mtx);
+}
+
int reg_list_names(name_info_msg_t *** names)
{
struct list_head * p;
@@ -1077,6 +1092,7 @@ int reg_list_names(name_info_msg_t *** names)
fail:
while (i-- > 0)
name_info_msg__free_unpacked((*names)[i], NULL);
+
free(*names);
fail_malloc:
pthread_mutex_unlock(&reg.mtx);
@@ -2103,10 +2119,21 @@ bool reg_flow_is_direct(int flow_id)
return ret;
}
-void reg_flow_set_rekey(int flow_id,
- bool initiator)
+void reg_flow_set_rekey(int flow_id,
+ bool initiator,
+ buffer_t peer_crt)
{
struct reg_flow * flow;
+ uint8_t * crt = NULL;
+
+ /* Copy the cert outside the lock; publish it with rk.encrypted. */
+ if (peer_crt.len > 0) {
+ crt = malloc(peer_crt.len);
+ if (crt != NULL)
+ memcpy(crt, peer_crt.data, peer_crt.len);
+ else
+ log_warn("Failed to cache peer cert for re-key.");
+ }
pthread_mutex_lock(&reg.mtx);
@@ -2115,9 +2142,47 @@ void reg_flow_set_rekey(int flow_id,
flow->rk.encrypted = true;
flow->rk.initiator = initiator;
flow->rk.epoch = 0;
+ if (crt != NULL) {
+ freebuf(flow->rk.peer_crt);
+ flow->rk.peer_crt.data = crt;
+ flow->rk.peer_crt.len = peer_crt.len;
+ crt = NULL;
+ }
}
pthread_mutex_unlock(&reg.mtx);
+
+ free(crt);
+}
+
+int reg_flow_get_peer_crt(int flow_id,
+ buffer_t * crt)
+{
+ struct reg_flow * flow;
+ int ret = -ENOENT;
+
+ assert(crt != NULL);
+
+ clrbuf(*crt);
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL && flow->rk.peer_crt.len > 0) {
+ crt->data = malloc(flow->rk.peer_crt.len);
+ if (crt->data == NULL) {
+ ret = -ENOMEM;
+ } else {
+ memcpy(crt->data, flow->rk.peer_crt.data,
+ flow->rk.peer_crt.len);
+ crt->len = flow->rk.peer_crt.len;
+ ret = 0;
+ }
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
}
int reg_flow_get_epoch(int flow_id)
@@ -2184,10 +2249,14 @@ int reg_flow_snapshot_rekey_due(struct rekey_info * snap,
f = list_entry(p, struct reg_flow, next);
- if (f->info.state != FLOW_ALLOCATED || f->direct)
+ if (f->info.state != FLOW_ALLOCATED)
+ continue;
+
+ if (!f->rk.encrypted)
continue;
- if (!f->rk.encrypted || !f->rk.initiator)
+ /* Direct flows have no IPCP initiator; either side drives. */
+ if (!f->direct && !f->rk.initiator)
continue;
if (f->rk.in_flight || f->rk.has_pending)
@@ -2199,6 +2268,7 @@ int reg_flow_snapshot_rekey_due(struct rekey_info * snap,
snap[n].n_pid = f->info.n_pid;
snap[n].n_1_pid = f->info.n_1_pid;
snap[n].epoch = f->rk.epoch;
+ snap[n].direct = f->direct;
strcpy(snap[n].name, f->name);
++n;
}
@@ -2221,9 +2291,48 @@ void reg_flow_clear_in_flight(int flow_id)
pthread_mutex_unlock(&reg.mtx);
}
+/* Test-and-set the in-flight latch; refuse if a re-key is already active. */
+bool reg_flow_rekey_begin(int flow_id)
+{
+ struct reg_flow * flow;
+ bool ret = false;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL && flow->rk.encrypted) {
+ if (!flow->rk.in_flight && !flow->rk.has_pending) {
+ flow->rk.in_flight = true;
+ ret = true;
+ }
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
+}
+
+/* Initiator yields the responder role while driving its own exchange. */
+bool reg_flow_rekey_should_yield(int flow_id)
+{
+ struct reg_flow * flow;
+ bool ret = false;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL)
+ ret = flow->rk.initiator && flow->rk.in_flight;
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
+}
+
int reg_flow_store_pending(int flow_id,
const uint8_t * seed,
- uint8_t epoch)
+ uint8_t epoch,
+ bool initiator)
{
struct reg_flow * flow;
int ret = -ENOENT;
@@ -2232,14 +2341,24 @@ int reg_flow_store_pending(int flow_id,
flow = __reg_get_flow(flow_id);
if (flow != NULL) {
- memcpy(flow->rk.pending_seed, seed, SYMMKEYSZ);
- flow->rk.pending_epoch = epoch;
- flow->rk.has_pending = true;
- flow->rk.in_flight = false;
- /* Doorbell raised only after the seed is parked. */
- if (flow->n_rb != NULL)
- ssm_rbuff_set_bits(flow->n_rb, RB_REKEY);
- ret = 0;
+ /* Exchange done: release the latch regardless of parking. */
+ flow->rk.in_flight = false;
+
+ if (flow->rk.pending_seed == NULL)
+ flow->rk.pending_seed = crypt_secure_malloc(SYMMKEYSZ);
+
+ if (flow->rk.pending_seed != NULL) {
+ memcpy(flow->rk.pending_seed, seed, SYMMKEYSZ);
+ flow->rk.pending_epoch = epoch;
+ flow->rk.pending_initiator = initiator;
+ flow->rk.has_pending = true;
+ /* Doorbell raised only after the seed is parked. */
+ if (flow->n_rb != NULL)
+ ssm_rbuff_set_bits(flow->n_rb, RB_REKEY);
+ ret = 0;
+ } else {
+ ret = -ENOMEM;
+ }
}
pthread_mutex_unlock(&reg.mtx);
@@ -2247,45 +2366,259 @@ int reg_flow_store_pending(int flow_id,
return ret;
}
-bool reg_flow_take_pending(int flow_id,
- uint8_t * seed,
- uint8_t * epoch)
+/* Direct re-key: which of the two local apps has pulled the seed. */
+#define RK_N_PID 0x1 /* acceptor (n_pid) pulled the seed */
+#define RK_N_1_PID 0x2 /* allocator (n_1_pid) pulled the seed */
+#define RK_PID_MASK (RK_N_PID | RK_N_1_PID)
+
+/*
+ * Park a single re-key seed for a direct flow and ring BOTH apps'
+ * doorbells. The seed is the one shared secret; each app pulls it once
+ * (reg_flow_take_pending), so it is held until both have taken it.
+ */
+int reg_flow_store_pending_direct(int flow_id,
+ const uint8_t * seed,
+ uint8_t epoch)
{
struct reg_flow * flow;
- bool ret = false;
+ int ret = -ENOENT;
pthread_mutex_lock(&reg.mtx);
flow = __reg_get_flow(flow_id);
- if (flow != NULL && flow->rk.has_pending) {
- memcpy(seed, flow->rk.pending_seed, SYMMKEYSZ);
- *epoch = flow->rk.pending_epoch;
- flow->rk.epoch = flow->rk.pending_epoch; /* app installed it */
- flow->rk.has_pending = false;
- crypt_secure_clear(flow->rk.pending_seed, SYMMKEYSZ);
+ if (flow == NULL)
+ goto out;
+
+ /* Exchange done: release the latch regardless of parking. */
+ flow->rk.in_flight = false;
+
+ if (flow->rk.pending_seed == NULL)
+ flow->rk.pending_seed = crypt_secure_malloc(SYMMKEYSZ);
+
+ if (flow->rk.pending_seed == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ memcpy(flow->rk.pending_seed, seed, SYMMKEYSZ);
+ flow->rk.pending_epoch = epoch;
+ flow->rk.has_pending = true;
+ flow->rk.pulled = 0;
+
+ /* A departed peer never pulls; treat its side as already done. */
+ if (flow->info.n_pid <= 0)
+ flow->rk.pulled |= RK_N_PID;
+
+ if (flow->info.n_1_pid <= 0)
+ flow->rk.pulled |= RK_N_1_PID;
+
+ if (flow->n_rb != NULL && !(flow->rk.pulled & RK_N_PID))
+ ssm_rbuff_set_bits(flow->n_rb, RB_REKEY);
+
+ if (flow->n_1_rb != NULL && !(flow->rk.pulled & RK_N_1_PID))
+ ssm_rbuff_set_bits(flow->n_1_rb, RB_REKEY);
+
+ ret = 0;
+ out:
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
+}
+
+/* A caller may act on a flow if it is privileged or owns the flow. */
+static bool uid_may_access(uid_t caller,
+ uid_t owner)
+{
+ return is_ouroboros_member_uid(caller) || caller == owner;
+}
+
+/*
+ * Caller holds reg.mtx. The direct seed is shared by both apps, so the
+ * per-app initiator role is resolved from the verified caller pid (the
+ * allocator is n_1_pid), and the seed is held until both have pulled.
+ */
+static void __take_pending_direct(struct reg_flow * flow,
+ pid_t cpid,
+ uint8_t * seed,
+ uint8_t * epoch,
+ bool * initiator)
+{
+ bool allocator;
+
+ allocator = cpid == flow->info.n_1_pid;
+
+ memcpy(seed, flow->rk.pending_seed, SYMMKEYSZ);
+ *epoch = flow->rk.pending_epoch;
+ *initiator = allocator;
+ flow->rk.epoch = flow->rk.pending_epoch;
+
+ if (allocator) {
+ flow->rk.pulled |= RK_N_1_PID;
+ if (flow->n_1_rb != NULL)
+ ssm_rbuff_clr_bits(flow->n_1_rb, RB_REKEY);
+ } else {
+ flow->rk.pulled |= RK_N_PID;
if (flow->n_rb != NULL)
ssm_rbuff_clr_bits(flow->n_rb, RB_REKEY);
- ret = true;
}
+ if ((flow->rk.pulled & RK_PID_MASK) != RK_PID_MASK)
+ return;
+
+ flow->rk.has_pending = false;
+ flow->rk.pulled = 0;
+ crypt_secure_clear(flow->rk.pending_seed, SYMMKEYSZ);
+}
+
+int reg_flow_take_pending(int flow_id,
+ uid_t uid,
+ pid_t cpid,
+ uint8_t * seed,
+ uint8_t * epoch,
+ bool * initiator)
+{
+ struct reg_flow * flow;
+ int ret = -ENOENT;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow == NULL || !flow->rk.has_pending)
+ goto out;
+
+ if (!uid_may_access(uid, flow->info.uid)) {
+ ret = -EPERM;
+ goto out;
+ }
+
+ if (flow->direct) {
+ __take_pending_direct(flow, cpid, seed, epoch, initiator);
+ ret = 0;
+ goto out;
+ }
+
+ memcpy(seed, flow->rk.pending_seed, SYMMKEYSZ);
+ *epoch = flow->rk.pending_epoch;
+ *initiator = flow->rk.pending_initiator;
+ flow->rk.epoch = flow->rk.pending_epoch;
+ flow->rk.has_pending = false;
+ crypt_secure_clear(flow->rk.pending_seed, SYMMKEYSZ);
+ if (flow->n_rb != NULL)
+ ssm_rbuff_clr_bits(flow->n_rb, RB_REKEY);
+
+ ret = 0;
+ out:
pthread_mutex_unlock(&reg.mtx);
return ret;
}
+/*
+ * Admit a peer-driven re-key arrival before a worker event is allocated:
+ * the flow must exist, carry a cipher, and the update must come from its
+ * own lower IPCP. Coalesces to one queued REQ and one queued RESP per flow
+ * so a flooding peer cannot grow the inbox without bound.
+ */
+bool reg_flow_rekey_arr_admit(int flow_id,
+ pid_t n_1_pid,
+ bool is_req)
+{
+ struct reg_flow * flow;
+ bool admit = false;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL && flow->rk.encrypted
+ && flow->info.n_1_pid == n_1_pid) {
+ if (is_req && !flow->rk.req_queued) {
+ flow->rk.req_queued = true;
+ admit = true;
+ } else if (!is_req && flow->rk.in_flight
+ && !flow->rk.resp_queued) {
+ flow->rk.resp_queued = true;
+ admit = true;
+ }
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return admit;
+}
+
+void reg_flow_rekey_arr_done(int flow_id,
+ bool is_req)
+{
+ struct reg_flow * flow;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL) {
+ if (is_req)
+ flow->rk.req_queued = false;
+ else
+ flow->rk.resp_queued = false;
+ }
+
+ pthread_mutex_unlock(&reg.mtx);
+}
+
+bool reg_flow_owned_by(int flow_id,
+ uid_t uid)
+{
+ struct reg_flow * flow;
+ bool ret = false;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL)
+ ret = uid_may_access(uid, flow->info.uid);
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
+}
+
+/* Caller holds reg.mtx. */
+static void __notify_proc(pid_t pid,
+ int flow_id,
+ int event)
+{
+ struct reg_proc * proc;
+
+ proc = __reg_get_proc(pid);
+ if (proc != NULL)
+ ssm_flow_set_notify(proc->set, flow_id, event);
+}
+
void reg_notify_flow(int flow_id,
int event)
{
struct reg_flow * flow;
- struct reg_proc * proc;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow != NULL)
+ __notify_proc(flow->info.n_pid, flow_id, event);
+
+ pthread_mutex_unlock(&reg.mtx);
+}
+
+/* Wake both endpoints of a direct flow (acceptor and allocator). */
+void reg_notify_flow_peers(int flow_id,
+ int event)
+{
+ struct reg_flow * flow;
pthread_mutex_lock(&reg.mtx);
flow = __reg_get_flow(flow_id);
if (flow != NULL) {
- proc = __reg_get_proc(flow->info.n_pid);
- if (proc != NULL)
- ssm_flow_set_notify(proc->set, flow_id, event);
+ __notify_proc(flow->info.n_pid, flow_id, event);
+ __notify_proc(flow->info.n_1_pid, flow_id, event);
}
pthread_mutex_unlock(&reg.mtx);
diff --git a/src/irmd/reg/reg.h b/src/irmd/reg/reg.h
index e0c64fed..8a313d46 100644
--- a/src/irmd/reg/reg.h
+++ b/src/irmd/reg/reg.h
@@ -109,6 +109,9 @@ int reg_get_name_for_hash(char * buf,
int reg_get_name_for_flow_id(char * buf,
int flow_id);
+void reg_set_name_for_flow_id(const char * name,
+ int flow_id);
+
/* TODO don't rely on protobuf here */
int reg_list_names(name_info_msg_t *** names);
@@ -170,10 +173,15 @@ struct rekey_info {
pid_t n_1_pid;
char name[NAME_SIZE + 1];
uint8_t epoch;
+ bool direct;
};
-void reg_flow_set_rekey(int flow_id,
- bool initiator);
+void reg_flow_set_rekey(int flow_id,
+ bool initiator,
+ buffer_t peer_crt);
+
+int reg_flow_get_peer_crt(int flow_id,
+ buffer_t * crt);
int reg_flow_get_epoch(int flow_id);
@@ -186,17 +194,42 @@ int reg_flow_snapshot_rekey_due(struct rekey_info * snap,
void reg_flow_clear_in_flight(int flow_id);
+bool reg_flow_rekey_begin(int flow_id);
+
+bool reg_flow_rekey_should_yield(int flow_id);
+
int reg_flow_store_pending(int flow_id,
const uint8_t * seed,
- uint8_t epoch);
+ uint8_t epoch,
+ bool initiator);
+
+int reg_flow_store_pending_direct(int flow_id,
+ const uint8_t * seed,
+ uint8_t epoch);
-bool reg_flow_take_pending(int flow_id,
+int reg_flow_take_pending(int flow_id,
+ uid_t uid,
+ pid_t cpid,
uint8_t * seed,
- uint8_t * epoch);
+ uint8_t * epoch,
+ bool * initiator);
+
+bool reg_flow_rekey_arr_admit(int flow_id,
+ pid_t n_1_pid,
+ bool is_req);
+
+void reg_flow_rekey_arr_done(int flow_id,
+ bool is_req);
+
+bool reg_flow_owned_by(int flow_id,
+ uid_t uid);
void reg_notify_flow(int flow_id,
int event);
+void reg_notify_flow_peers(int flow_id,
+ int event);
+
void reg_dealloc_flow(struct flow_info * info);
void reg_dealloc_flow_resp(struct flow_info * info);
diff --git a/src/irmd/reg/tests/reg_test.c b/src/irmd/reg/tests/reg_test.c
index 0b1014f9..a8c1b1fa 100644
--- a/src/irmd/reg/tests/reg_test.c
+++ b/src/irmd/reg/tests/reg_test.c
@@ -771,6 +771,167 @@ static int test_reg_direct_flow_success(void)
return TEST_RC_FAIL;
}
+/*
+ * Direct-flow re-key: one shared seed is parked for both local apps. The
+ * per-app initiator role is resolved from the verified caller pid (the
+ * allocator is n_1_pid), and the seed is held until both have pulled it.
+ */
+static int test_reg_direct_flow_rekey(void)
+{
+ pthread_t thr;
+ struct timespec abstime;
+ struct timespec timeo = TIMESPEC_INIT_S(1);
+ buffer_t rbuf = BUF_INIT;
+ buffer_t rsp;
+ buffer_t no_crt = BUF_INIT;
+ struct direct_alloc_info dai;
+ uint8_t seed[SYMMKEYSZ];
+ uint8_t out[SYMMKEYSZ];
+ uint8_t epoch;
+ bool initiator;
+ size_t i;
+
+ struct flow_info info = {
+ .n_pid = TEST_PID,
+ .qs = qos_raw
+ };
+
+ TEST_START();
+
+ for (i = 0; i < SYMMKEYSZ; ++i)
+ seed[i] = (uint8_t) i;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+
+ ts_add(&abstime, &timeo, &abstime);
+
+ if (reg_init() < 0) {
+ printf("Failed to init registry.\n");
+ goto fail;
+ }
+
+ if (reg_create_flow(&info) < 0) {
+ printf("Failed to add flow.\n");
+ goto fail;
+ }
+
+ if (reg_prepare_flow_accept(&info) < 0) {
+ printf("Failed to prepare for accept.\n");
+ goto fail;
+ }
+
+ dai.info.id = info.id;
+ dai.info.n_1_pid = TEST_N_1_PID;
+ dai.info.mpl = TEST_MPL;
+ dai.info.qs = qos_msg;
+ dai.info.state = FLOW_ALLOCATED;
+ dai.rsp.len = 0;
+ dai.rsp.data = NULL;
+ dai.abstime = abstime;
+
+ pthread_create(&thr, NULL, test_flow_alloc_direct, &dai);
+
+ if (reg_wait_flow_accepted(&info, &rbuf, &abstime) < 0) {
+ printf("Flow accept failed.\n");
+ pthread_join(thr, NULL);
+ goto fail;
+ }
+
+ freebuf(rbuf);
+
+ rsp.data = (uint8_t *) strdup(TEST_DATA2);
+ if (rsp.data == NULL) {
+ printf("Failed to strdup rsp data.\n");
+ pthread_join(thr, NULL);
+ goto fail;
+ }
+ rsp.len = strlen(TEST_DATA2) + 1;
+
+ if (reg_respond_flow_direct(info.id, &rsp) < 0) {
+ printf("Failed to respond direct.\n");
+ freebuf(rsp);
+ pthread_join(thr, NULL);
+ goto fail;
+ }
+
+ pthread_join(thr, NULL);
+
+ freebuf(dai.rsp);
+
+ if (!reg_flow_is_direct(info.id)) {
+ printf("Flow not marked direct.\n");
+ goto fail;
+ }
+
+ reg_flow_set_rekey(info.id, false, no_crt);
+
+ if (reg_flow_store_pending_direct(info.id, seed, 5) < 0) {
+ printf("Failed to store pending direct seed.\n");
+ goto fail;
+ }
+
+ if (!reg_flow_rekey_pending(info.id)) {
+ printf("Seed not pending after store.\n");
+ goto fail;
+ }
+
+ /* Allocator (n_1_pid) pulls: initiator role, seed still held. */
+ if (reg_flow_take_pending(info.id, 0, TEST_N_1_PID, out,
+ &epoch, &initiator) != 0) {
+ printf("Allocator failed to take pending seed.\n");
+ goto fail;
+ }
+
+ if (!initiator || epoch != 5 || memcmp(out, seed, SYMMKEYSZ) != 0) {
+ printf("Allocator got wrong seed/role/epoch.\n");
+ goto fail;
+ }
+
+ if (!reg_flow_rekey_pending(info.id)) {
+ printf("Seed cleared before both apps pulled.\n");
+ goto fail;
+ }
+
+ /* Acceptor (n_pid) pulls: responder role, seed now released. */
+ if (reg_flow_take_pending(info.id, 0, TEST_PID, out,
+ &epoch, &initiator) != 0) {
+ printf("Acceptor failed to take pending seed.\n");
+ goto fail;
+ }
+
+ if (initiator || epoch != 5 || memcmp(out, seed, SYMMKEYSZ) != 0) {
+ printf("Acceptor got wrong seed/role/epoch.\n");
+ goto fail;
+ }
+
+ if (reg_flow_rekey_pending(info.id)) {
+ printf("Seed still pending after both pulled.\n");
+ goto fail;
+ }
+
+ if (reg_flow_get_epoch(info.id) != 5) {
+ printf("Flow epoch not advanced.\n");
+ goto fail;
+ }
+
+ info.n_pid = TEST_PID;
+ reg_dealloc_flow(&info);
+
+ info.n_pid = TEST_N_1_PID;
+ reg_dealloc_flow(&info);
+
+ reg_destroy_flow(info.id);
+
+ reg_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail:
+ REG_TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
static int test_reg_flow(void) {
int rc = 0;
@@ -781,6 +942,7 @@ static int test_reg_flow(void) {
rc |= test_reg_allocate_flow_fail();
rc |= test_reg_respond_alloc_duplicate();
rc |= test_reg_direct_flow_success();
+ rc |= test_reg_direct_flow_rekey();
return rc;
}
@@ -875,6 +1037,7 @@ static int test_reg_list_ipcps(void)
while (len-- > 0)
ipcp_list_msg__free_unpacked(ipcps[len], NULL);
+
free(ipcps);
for (i = 0; i < 10; i++)
@@ -941,6 +1104,7 @@ static int test_insert_ipcps(void)
while (len-- > 0)
ipcp_list_msg__free_unpacked(ipcps[len], NULL);
+
free(ipcps);
reg_clear();
@@ -1118,6 +1282,7 @@ static int test_reg_list_names(void)
for (i = 0; i < len; i++)
name_info_msg__free_unpacked(names[i], NULL);
+
free(names);
for (i = 0; i < 10; i++) {
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 3064b1e2..88d6c5f6 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -98,11 +98,14 @@ struct flow {
ssize_t part_idx;
struct crypt_ctx * crypt;
- int headsz; /* selector */
- int tailsz; /* Tag + CRC */
- struct timespec rk_grace; /* TX-promote deadline (0 = none) */
- bool rk_wm_inflight; /* re-key trigger in flight */
- uint32_t rk_wm_ctr; /* throttles the consult */
+ int headsz; /* Selector */
+ int tailsz; /* Tag + CRC */
+
+ struct timespec rk_grace; /* TX-promote deadline (0 = none) */
+ struct timespec rk_attempt; /* Last re-key attempt (backoff) */
+ bool rk_wm_inflight; /* Re-key trigger in flight */
+ uint32_t rk_wm_ctr; /* Throttles the consult */
+ bool rk_initiator; /* OAP initiator this re-key */
struct timespec snd_act;
struct timespec rcv_act;
@@ -301,7 +304,6 @@ static int spb_decrypt(struct flow * flow,
if (crypt_decrypt(flow->crypt, in, &out) < 0)
return -ECRYPT;
-
head = ssm_pk_buff_pop(spb, flow->headsz) + flow->headsz;
ssm_pk_buff_pop_tail(spb, flow->tailsz);
@@ -515,6 +517,12 @@ static void flow_drain_rx_nb(struct flow * flow)
/* TX-promotion grace when the peer's install latency is unknown (raw). */
#define REKEY_GRACE_MS 1000
+/* Last-resort promote within N node-keys of exhaustion (< watermark). */
+#define REKEY_PROMOTE_FLOOR 1
+
+/* Throttle re-key retries so a failed attempt can't storm the IRMd. */
+#define REKEY_BACKOFF_NS (250 * MILLION)
+
/*
* Pull a parked re-key seed from the IRMd and install it. Driven from the
* data path when RB_REKEY shows on rx_rb. crypt_rekey is concurrency-safe
@@ -531,12 +539,22 @@ static void flow_rekey(struct flow * flow)
uint8_t buf[SOCK_BUF_SIZE];
buffer_t msg = {SOCK_BUF_SIZE, buf};
bool has_key;
+ bool initiator = false;
pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0 || flow->crypt == NULL) {
pthread_rwlock_unlock(&proc.lock);
return;
}
+
+ /* Back off so a failed attempt can't storm the IRMd per syscall. */
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ if (ts_diff_ns(&now, &flow->rk_attempt) < REKEY_BACKOFF_NS) {
+ pthread_rwlock_unlock(&proc.lock);
+ return;
+ }
+
+ flow->rk_attempt = now;
info = flow->info;
pthread_rwlock_unlock(&proc.lock);
@@ -547,7 +565,7 @@ static void flow_rekey(struct flow * flow)
return;
sk.key = key;
- if (flow_rekey__irm_result_des(&msg, &sk, &has_key) < 0)
+ if (flow_rekey__irm_result_des(&msg, &sk, &has_key, &initiator) < 0)
return;
if (!has_key)
@@ -556,6 +574,7 @@ static void flow_rekey(struct flow * flow)
pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id == info.id && flow->crypt != NULL) {
if (crypt_rekey(flow->crypt, &sk) == 0) {
+ flow->rk_initiator = initiator;
/* Hold TX on the old epoch until the peer installs. */
ms = flow->info.mpl > 0 ? flow->info.mpl * 3
: REKEY_GRACE_MS;
@@ -676,6 +695,7 @@ static void flow_quiesce(int fd)
if (rx_rb != NULL)
ssm_rbuff_set_bits(rx_rb, RB_FLOWDOWN);
+
if (tx_rb != NULL)
ssm_rbuff_set_bits(tx_rb, RB_FLOWDOWN);
}
@@ -1593,8 +1613,10 @@ static __inline__ uint16_t flow_frag_role(size_t i, size_t n)
{
if (n == 1)
return FRCT_FR_SOLE;
+
if (i == 0)
return FRCT_FR_FIRST;
+
if (i + 1 == n)
return FRCT_FR_LAST;
@@ -1682,6 +1704,7 @@ static ssize_t flow_write_frag(struct flow * flow,
/* Guard the ceil-divide against size_t overflow. */
if (count > SIZE_MAX - frag_payload + 1)
return -EMSGSIZE;
+
n = (count + frag_payload - 1) / frag_payload;
/* SDU larger than the FC window can ever offer would deadlock. */
@@ -1739,21 +1762,37 @@ static ssize_t flow_write_frag(struct flow * flow,
#define FLOW_WM_CHECK (1u << 16)
/*
- * Switch TX to the freshly installed epoch once the peer is seen on it
- * (peer_synced) or the install grace has elapsed (breaks the symmetric
- * wait where neither side sends the new epoch first).
+ * Switch TX to the freshly installed epoch. The initiator holds the OAP
+ * key-confirm tag and bootstraps after the install grace, which also lets
+ * the peer install the batch first. The responder has no such proof: it
+ * waits for peer_synced (a packet under the new batch), with a last-resort
+ * promote near exhaustion so a silent peer can't stall it.
*/
-static void flow_tx_promote(struct flow * flow,
- const struct timespec * now)
+static void flow_tx_promote(struct flow * flow)
{
+ struct timespec now;
+ int nodes_left;
+ bool promote;
+
if (flow->crypt == NULL)
return;
if (flow->rk_grace.tv_sec == 0 && flow->rk_grace.tv_nsec == 0)
return;
- if (!crypt_peer_synced(flow->crypt)
- && ts_diff_ns(now, &flow->rk_grace) < 0)
+ promote = crypt_peer_synced(flow->crypt);
+
+ if (!promote && flow->rk_initiator) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ promote = ts_diff_ns(&now, &flow->rk_grace) >= 0;
+ }
+
+ if (!promote && !flow->rk_initiator) {
+ nodes_left = crypt_nodes_left(flow->crypt);
+ promote = nodes_left >= 0 && nodes_left <= REKEY_PROMOTE_FLOOR;
+ }
+
+ if (!promote)
return;
crypt_tx_promote(flow->crypt);
@@ -1864,7 +1903,7 @@ ssize_t flow_write(int fd,
&& (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY))
flow_rekey(flow);
- flow_tx_promote(flow, &now);
+ flow_tx_promote(flow);
/* Pre-empt TX key exhaustion; the timer is the backstop. */
if (flow_wm_due(flow)) {
@@ -2089,10 +2128,7 @@ ssize_t flow_read(int fd,
flow_rekey(flow);
/* Advance TX off a stale epoch even on recv-mostly (ACK-only) flows. */
- if (flow->crypt != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- flow_tx_promote(flow, &now);
- }
+ flow_tx_promote(flow);
tw_move_safe();
diff --git a/src/lib/pb/irm.proto b/src/lib/pb/irm.proto
index 98b75a95..f54bc9ea 100644
--- a/src/lib/pb/irm.proto
+++ b/src/lib/pb/irm.proto
@@ -100,4 +100,5 @@ message irm_msg {
optional sint32 cipher_nid = 27; /* cipher NID */
optional uint32 generation = 28; /* re-key batch generation */
optional bool rekey = 29; /* re-key watermark trigger */
+ optional bool rk_initiator = 30; /* re-key proof-holder side */
}
diff --git a/src/lib/serdes-irm.c b/src/lib/serdes-irm.c
index 24bb349f..74ca694c 100644
--- a/src/lib/serdes-irm.c
+++ b/src/lib/serdes-irm.c
@@ -187,7 +187,8 @@ int flow__irm_result_des(buffer_t * buf,
int flow_rekey__irm_result_des(buffer_t * buf,
struct crypt_sk * sk,
- bool * has_key)
+ bool * has_key,
+ bool * initiator)
{
irm_msg_t * msg;
int err;
@@ -214,6 +215,7 @@ int flow_rekey__irm_result_des(buffer_t * buf,
sk->nid = NID_undef;
sk->epoch = msg->has_generation ?
(uint8_t) msg->generation : 0;
+ *initiator = msg->has_rk_initiator && msg->rk_initiator;
}
irm_msg__free_unpacked(msg, NULL);