diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-05-01 15:11:07 +0200 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-05-20 08:17:04 +0200 |
| commit | e05bd477e73b9a5d533c4865022602dc60cec1ab (patch) | |
| tree | 4aac5536b47b9a165ec4e988422964d01f22ad87 /src/lib/ssm | |
| parent | 81ea9a137b87fb8264d439a944cc077aced71602 (diff) | |
| download | ouroboros-e05bd477e73b9a5d533c4865022602dc60cec1ab.tar.gz ouroboros-e05bd477e73b9a5d533c4865022602dc60cec1ab.zip | |
lib: Drain rbuff before close
ssm_rbuff_close used to unmap the SHM page immediately, leaving any
in-flight peer-process thread that was inside pthread_mutex_lock or
pthread_cond_wait on the SHM-resident sync primitives reading freed
memory. Adds an n_users counter, bumped on entry and dropped on exit
of every function that touches the mutex / cond vars (write, write_b,
read, read_b, fini), and have ssm_rbuff_close poll-spin until the
counter drains before tearing down.
ssm_rbuff_read now re-checks IS_EMPTY after taking the mutex, plugging
a TOCTOU where two readers could both pass a lock-free fast path and
the loser would read a stale TAIL.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/ssm')
| -rw-r--r-- | src/lib/ssm/rbuff.c | 52 |
1 files changed, 48 insertions, 4 deletions
diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c index 77e23010..c149c306 100644 --- a/src/lib/ssm/rbuff.c +++ b/src/lib/ssm/rbuff.c @@ -80,6 +80,7 @@ struct ssm_rbuff { pthread_cond_t * del; /* signal when data removed */ pid_t pid; /* pid of the owner */ int flow_id; /* flow_id of the flow */ + size_t n_users; /* in-flight users */ }; #define MM_FLAGS (PROT_READ | PROT_WRITE) @@ -119,6 +120,7 @@ static struct ssm_rbuff * rbuff_create(pid_t pid, rb->del = rb->add + 1; rb->pid = pid; rb->flow_id = flow_id; + rb->n_users = 0; return rb; @@ -228,6 +230,15 @@ void ssm_rbuff_close(struct ssm_rbuff * rb) { assert(rb); + /* + * Caller must set ACL_FLOWDOWN first; if a user becomes + * cancellable, push a cleanup that decrements n_users. + */ + while (__atomic_load_n(&rb->n_users, __ATOMIC_SEQ_CST) > 0) { + struct timespec tic = { 0, 100000 }; + nanosleep(&tic, NULL); + } + rbuff_destroy(rb); } @@ -240,6 +251,8 @@ int ssm_rbuff_write(struct ssm_rbuff * rb, assert(rb != NULL); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); if (acl != ACL_RDWR) { if (acl & ACL_FLOWDOWN) { @@ -269,11 +282,13 @@ int ssm_rbuff_write(struct ssm_rbuff * rb, pthread_mutex_unlock(rb->mtx); + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return 0; fail_mutex: pthread_mutex_unlock(rb->mtx); fail_acl: + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } @@ -287,6 +302,8 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, assert(rb != NULL); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); if (acl != ACL_RDWR) { if (acl & ACL_FLOWDOWN) { @@ -325,6 +342,7 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, pthread_mutex_unlock(rb->mtx); fail_acl: + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } @@ -351,11 +369,21 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) assert(rb != NULL); - if (IS_EMPTY(rb)) - return check_rb_acl(rb); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + + if (IS_EMPTY(rb)) { + ret = check_rb_acl(rb); + goto out; + } robust_mutex_lock(rb->mtx); + if (IS_EMPTY(rb)) { + pthread_mutex_unlock(rb->mtx); + ret = check_rb_acl(rb); + goto out; + } + ret = TAIL(rb); ADVANCE_TAIL(rb); @@ -363,6 +391,8 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) pthread_mutex_unlock(rb->mtx); + out: + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } @@ -374,9 +404,13 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, assert(rb != NULL); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) - return -EFLOWDOWN; + if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) { + idx = -EFLOWDOWN; + goto out; + } robust_mutex_lock(rb->mtx); @@ -402,6 +436,8 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, assert(idx != -EAGAIN); + out: + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return idx; } @@ -410,7 +446,11 @@ void ssm_rbuff_set_acl(struct ssm_rbuff * rb, { assert(rb != NULL); + robust_mutex_lock(rb->mtx); __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST); + pthread_cond_broadcast(rb->add); + pthread_cond_broadcast(rb->del); + pthread_mutex_unlock(rb->mtx); } uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb) @@ -424,6 +464,8 @@ void ssm_rbuff_fini(struct ssm_rbuff * rb) { assert(rb != NULL); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + robust_mutex_lock(rb->mtx); pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); @@ -432,6 +474,8 @@ void ssm_rbuff_fini(struct ssm_rbuff * rb) robust_wait(rb->del, rb->mtx, NULL); pthread_cleanup_pop(true); + + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); } size_t ssm_rbuff_queued(struct ssm_rbuff * rb) |
