summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2026-05-01 15:11:07 +0200
committerSander Vrijders <sander@ouroboros.rocks>2026-05-20 08:17:04 +0200
commite05bd477e73b9a5d533c4865022602dc60cec1ab (patch)
tree4aac5536b47b9a165ec4e988422964d01f22ad87
parent81ea9a137b87fb8264d439a944cc077aced71602 (diff)
downloadouroboros-e05bd477e73b9a5d533c4865022602dc60cec1ab.tar.gz
ouroboros-e05bd477e73b9a5d533c4865022602dc60cec1ab.zip
lib: Drain rbuff before close
ssm_rbuff_close used to unmap the SHM page immediately, leaving any in-flight peer-process thread that was inside pthread_mutex_lock or pthread_cond_wait on the SHM-resident sync primitives reading freed memory. Adds an n_users counter, bumped on entry and dropped on exit of every function that touches the mutex / cond vars (write, write_b, read, read_b, fini), and have ssm_rbuff_close poll-spin until the counter drains before tearing down. ssm_rbuff_read now re-checks IS_EMPTY after taking the mutex, plugging a TOCTOU where two readers could both pass a lock-free fast path and the loser would read a stale TAIL. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--src/lib/ssm/rbuff.c52
1 files changed, 48 insertions, 4 deletions
diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c
index 77e23010..c149c306 100644
--- a/src/lib/ssm/rbuff.c
+++ b/src/lib/ssm/rbuff.c
@@ -80,6 +80,7 @@ struct ssm_rbuff {
pthread_cond_t * del; /* signal when data removed */
pid_t pid; /* pid of the owner */
int flow_id; /* flow_id of the flow */
+ size_t n_users; /* in-flight users */
};
#define MM_FLAGS (PROT_READ | PROT_WRITE)
@@ -119,6 +120,7 @@ static struct ssm_rbuff * rbuff_create(pid_t pid,
rb->del = rb->add + 1;
rb->pid = pid;
rb->flow_id = flow_id;
+ rb->n_users = 0;
return rb;
@@ -228,6 +230,15 @@ void ssm_rbuff_close(struct ssm_rbuff * rb)
{
assert(rb);
+ /*
+ * Caller must set ACL_FLOWDOWN first; if a user becomes
+ * cancellable, push a cleanup that decrements n_users.
+ */
+ while (__atomic_load_n(&rb->n_users, __ATOMIC_SEQ_CST) > 0) {
+ struct timespec tic = { 0, 100000 };
+ nanosleep(&tic, NULL);
+ }
+
rbuff_destroy(rb);
}
@@ -240,6 +251,8 @@ int ssm_rbuff_write(struct ssm_rbuff * rb,
assert(rb != NULL);
+ __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
+
acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
if (acl != ACL_RDWR) {
if (acl & ACL_FLOWDOWN) {
@@ -269,11 +282,13 @@ int ssm_rbuff_write(struct ssm_rbuff * rb,
pthread_mutex_unlock(rb->mtx);
+ __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return 0;
fail_mutex:
pthread_mutex_unlock(rb->mtx);
fail_acl:
+ __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
@@ -287,6 +302,8 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
assert(rb != NULL);
+ __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
+
acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
if (acl != ACL_RDWR) {
if (acl & ACL_FLOWDOWN) {
@@ -325,6 +342,7 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
pthread_mutex_unlock(rb->mtx);
fail_acl:
+ __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
@@ -351,11 +369,21 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
assert(rb != NULL);
- if (IS_EMPTY(rb))
- return check_rb_acl(rb);
+ __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
+
+ if (IS_EMPTY(rb)) {
+ ret = check_rb_acl(rb);
+ goto out;
+ }
robust_mutex_lock(rb->mtx);
+ if (IS_EMPTY(rb)) {
+ pthread_mutex_unlock(rb->mtx);
+ ret = check_rb_acl(rb);
+ goto out;
+ }
+
ret = TAIL(rb);
ADVANCE_TAIL(rb);
@@ -363,6 +391,8 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
pthread_mutex_unlock(rb->mtx);
+ out:
+ __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
@@ -374,9 +404,13 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
assert(rb != NULL);
+ __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
+
acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN))
- return -EFLOWDOWN;
+ if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) {
+ idx = -EFLOWDOWN;
+ goto out;
+ }
robust_mutex_lock(rb->mtx);
@@ -402,6 +436,8 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
assert(idx != -EAGAIN);
+ out:
+ __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return idx;
}
@@ -410,7 +446,11 @@ void ssm_rbuff_set_acl(struct ssm_rbuff * rb,
{
assert(rb != NULL);
+ robust_mutex_lock(rb->mtx);
__atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST);
+ pthread_cond_broadcast(rb->add);
+ pthread_cond_broadcast(rb->del);
+ pthread_mutex_unlock(rb->mtx);
}
uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb)
@@ -424,6 +464,8 @@ void ssm_rbuff_fini(struct ssm_rbuff * rb)
{
assert(rb != NULL);
+ __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
+
robust_mutex_lock(rb->mtx);
pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
@@ -432,6 +474,8 @@ void ssm_rbuff_fini(struct ssm_rbuff * rb)
robust_wait(rb->del, rb->mtx, NULL);
pthread_cleanup_pop(true);
+
+ __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
}
size_t ssm_rbuff_queued(struct ssm_rbuff * rb)