From c3636005831064e71b03a5f8796a21e89b2a714f Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 19 Feb 2026 22:03:16 +0100 Subject: irmd: Allow direct rbuff between local processes This allows bypassing the IPCP for local processes that share the same packet pool, lowering latency between processes to comparable levels as Unix sockets (RTT in the order of a microsecond). For local processes, no IPCPs are needed: $ irm b prog oping n oping $ oping -l Ouroboros ping server started. New flow 64. Received 64 bytes on fd 64. The direct IPC can be disabled with the DISABLE_DIRECT_IPC build flag. Note that this is needed for rumba 'local' experiments to emulate network topologies. Without this flag all processes will just communicate directly. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/tools/oping/oping_server.c | 68 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 6 deletions(-) (limited to 'src/tools/oping/oping_server.c') diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 1670ebf3..33af28c4 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -138,7 +138,10 @@ void * accept_thread(void * o) (void) o; - printf("Ouroboros ping server started.\n"); + printf("Ouroboros ping server started."); + if (server.busy) + printf(" [busy-poll]"); + printf("\n"); while (true) { fd = flow_accept(&qs, NULL); @@ -158,12 +161,56 @@ void * accept_thread(void * o) pthread_mutex_unlock(&server.lock); fccntl(fd, FLOWSFLAGS, - FLOWFRNOBLOCK | FLOWFRDWR | FLOWFRNOPART); + FLOWFRNOBLOCK | FLOWFRDWR + | FLOWFRNOPART); } return (void *) 0; } +void * busy_thread(void * o) +{ + char buf[OPING_BUF_SIZE]; + struct oping_msg * msg = (struct oping_msg *) buf; + int fd; + int msg_len; + + (void) o; + + /* Accept a single flow. */ + fd = flow_accept(NULL, NULL); + if (fd < 0) { + printf("Failed to accept flow.\n"); + return (void *) -1; + } + + printf("New flow %d (busy-poll).\n", fd); + + fccntl(fd, FLOWSFLAGS, + FLOWFRNOBLOCK | FLOWFRDWR + | FLOWFRNOPART); + + while (true) { + msg_len = flow_read(fd, buf, + OPING_BUF_SIZE); + if (msg_len == -EAGAIN) + continue; + if (msg_len < 0) + break; + + if (ntohl(msg->type) != ECHO_REQUEST) + continue; + + msg->type = htonl(ECHO_REPLY); + + flow_write(fd, buf, msg_len); + } + + flow_dealloc(fd); + + return (void *) 0; +} + int server_main(void) { struct sigaction sig_act; @@ -191,12 +238,21 @@ int server_main(void) } pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL); - pthread_create(&server.accept_pt, NULL, accept_thread, NULL); - pthread_create(&server.server_pt, NULL, server_thread, NULL); - pthread_join(server.accept_pt, NULL); + if (server.busy) { + pthread_create(&server.server_pt, NULL, + busy_thread, NULL); + pthread_join(server.server_pt, NULL); + pthread_cancel(server.cleaner_pt); + } else { + pthread_create(&server.accept_pt, NULL, + accept_thread, NULL); + pthread_create(&server.server_pt, NULL, + server_thread, NULL); + pthread_join(server.accept_pt, NULL); + pthread_cancel(server.server_pt); + } - pthread_cancel(server.server_pt); pthread_cancel(server.cleaner_pt); fset_destroy(server.flows); -- cgit v1.2.3