diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-02-19 22:03:16 +0100 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-02-22 16:02:16 +0100 |
| commit | c3636005831064e71b03a5f8796a21e89b2a714f (patch) | |
| tree | ca57f7d09e9de015107edb1bda6f30654bf7699b /src/tools/oping | |
| parent | 1bf1d33db3e7622c8b97c5518f0f0ff984b989a8 (diff) | |
| download | ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.tar.gz ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.zip | |
irmd: Allow direct rbuff between local processes
This allows bypassing the IPCP for local processes that share the same
packet pool, lowering latency between processes to comparable levels
as Unix sockets (RTT in the order of a microsecond).
For local processes, no IPCPs are needed:
$ irm b prog oping n oping
$ oping -l
Ouroboros ping server started.
New flow 64.
Received 64 bytes on fd 64.
The direct IPC can be disabled with the DISABLE_DIRECT_IPC build
flag. Note that this is needed for rumba 'local' experiments to
emulate network topologies. Without this flag all processes will just
communicate directly.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/tools/oping')
| -rw-r--r-- | src/tools/oping/oping.c | 15 | ||||
| -rw-r--r-- | src/tools/oping/oping_client.c | 122 | ||||
| -rw-r--r-- | src/tools/oping/oping_server.c | 68 |
3 files changed, 181 insertions, 24 deletions
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 86796552..763c0d62 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -72,17 +72,19 @@ "and reports the Round Trip Time (RTT)\n" \ "\n" \ " -l, --listen Run in server mode\n" \ +" --poll Server uses polling (lower latency)\n" \ +" --busy Server uses busy-poll (single flow)\n" \ "\n" \ " -c, --count Number of packets\n" \ " -d, --duration Duration of the test (default 1s)\n" \ " -f, --flood Send back-to-back without waiting\n" \ +" -F, --flood-busy Flood with busy-polling (lower latency)\n" \ " -i, --interval Interval (default 1000ms)\n" \ " -n, --server-name Name of the oping server\n" \ -" -q, --qos QoS (raw, best, video, voice, data)\n" \ +" -q, --qos QoS (raw, best, video, voice, data)\n" \ " -s, --size Payload size (B, default 64)\n" \ " -Q, --quiet Only print final statistics\n" \ " -D, --timeofday Print time of day before each line\n" \ -" --poll Server uses polling (lower latency)\n" \ "\n" \ " --help Display this help text and exit\n" \ @@ -93,6 +95,7 @@ struct { int size; bool timestamp; bool flood; + bool flood_busy; qosspec_t qs; /* stats */ @@ -118,6 +121,7 @@ struct { bool quiet; bool poll; + bool busy; pthread_t cleaner_pt; pthread_t accept_pt; @@ -177,10 +181,12 @@ int main(int argc, client.count = INT_MAX; client.timestamp = false; client.flood = false; + client.flood_busy = false; client.qs = qos_raw; client.quiet = false; server.quiet = false; server.poll = false; + server.busy = false; while (argc > 0) { if ((strcmp(*argv, "-i") == 0 || @@ -221,6 +227,9 @@ int main(int argc, } else if (strcmp(*argv, "-f") == 0 || strcmp(*argv, "--flood") == 0) { client.flood = true; + } else if (strcmp(*argv, "-F") == 0 || + strcmp(*argv, "--flood-busy") == 0) { + client.flood_busy = true; } else if (strcmp(*argv, "-D") == 0 || strcmp(*argv, "--timeofday") == 0) { client.timestamp = true; @@ -230,6 +239,8 @@ int main(int argc, server.quiet = true; } else if (strcmp(*argv, "--poll") == 0) { server.poll = true; + } else if (strcmp(*argv, "--busy") == 0) { + server.busy = true; } else { goto fail; } diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 18dd3078..23807f65 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -67,6 +67,26 @@ static void update_rtt_stats(double ms) client.rtt_m2 += d * (ms - client.rtt_avg); } +static double rtt_val(double ms) +{ + return ms < 0.1 ? ms * 1000 : ms; +} + +static const char * rtt_unit(double ms) +{ + return ms < 0.1 ? "µs" : "ms"; +} + +static void print_rtt(int len, int seq, + double ms, const char * suf) +{ + printf("%d bytes from %s: seq=%d " + "time=%.3f %s%s\n", + len, client.s_apn, seq, + rtt_val(ms), rtt_unit(ms), + suf != NULL ? suf : ""); +} + void * reader(void * o) { struct timespec timeout = {client.interval / 1000 + 2, 0}; @@ -127,12 +147,9 @@ void * reader(void * o) (size_t) rtc.tv_nsec / 1000); } - printf("%d bytes from %s: seq=%d time=%.3f ms%s\n", - msg_len, - client.s_apn, - ntohl(msg->id), - ms, - id < exp_id ? " [out-of-order]" : ""); + print_rtt(msg_len, ntohl(msg->id), ms, + id < exp_id ? + " [out-of-order]" : NULL); } update_rtt_stats(ms); @@ -223,16 +240,87 @@ static void print_stats(struct timespec * tic, printf("time: %.3f ms\n", ts_diff_us(toc, tic) / 1000.0); if (client.rcvd > 0) { + double a = client.rtt_avg; + double f = a < 0.1 ? 1000 : 1; printf("rtt min/avg/max/mdev = %.3f/%.3f/%.3f/", - client.rtt_min, - client.rtt_avg, - client.rtt_max); + client.rtt_min * f, client.rtt_avg * f, + client.rtt_max * f); if (client.rcvd > 1) - printf("%.3f ms\n", - sqrt(client.rtt_m2 / (client.rcvd - 1))); + printf("%.3f %s\n", + sqrt(client.rtt_m2 / + (client.rcvd - 1)) * f, + rtt_unit(a)); else - printf("NaN ms\n"); + printf("NaN %s\n", rtt_unit(a)); + } +} + +static int flood_busy_ping(int fd) +{ + char buf[OPING_BUF_SIZE]; + struct oping_msg * msg = (struct oping_msg *) buf; + struct timespec sent; + struct timespec rcvd; + double ms; + int n; + + memset(buf, 0, client.size); + + fccntl(fd, FLOWSFLAGS, + FLOWFRDWR | FLOWFRNOPART | FLOWFRNOBLOCK); + + if (!client.quiet) + printf("Pinging %s with %d bytes" + " of data (%u packets," + " busy-poll):\n\n", + client.s_apn, client.size, + client.count); + + while (!stop && client.sent < client.count) { + clock_gettime(CLOCK_MONOTONIC, &sent); + + msg->type = htonl(ECHO_REQUEST); + msg->id = htonl(client.sent); + msg->tv_sec = sent.tv_sec; + msg->tv_nsec = sent.tv_nsec; + + if (flow_write(fd, buf, + client.size) < 0) { + printf("Failed to send " + "packet.\n"); + break; + } + + ++client.sent; + + do { + n = flow_read(fd, buf, + OPING_BUF_SIZE); + } while (n == -EAGAIN && !stop); + + if (n < 0) + break; + + clock_gettime(CLOCK_MONOTONIC, &rcvd); + + if (ntohl(msg->type) != ECHO_REPLY) + continue; + + ++client.rcvd; + + sent.tv_sec = msg->tv_sec; + sent.tv_nsec = msg->tv_nsec; + ms = ts_diff_us(&rcvd, &sent) / 1000.0; + + update_rtt_stats(ms); + + if (!client.quiet) + print_rtt(client.size, + ntohl(msg->id), ms, + NULL); } + + return 0; } static int flood_ping(int fd) @@ -283,9 +371,9 @@ static int flood_ping(int fd) update_rtt_stats(ms); if (!client.quiet) - printf("%d bytes from %s: seq=%d time=%.3f ms\n", - client.size, client.s_apn, - ntohl(msg->id), ms); + print_rtt(client.size, + ntohl(msg->id), ms, + NULL); } return 0; @@ -337,7 +425,9 @@ static int client_main(void) clock_gettime(CLOCK_REALTIME, &tic); - if (client.flood) + if (client.flood_busy) + flood_busy_ping(fd); + else if (client.flood) flood_ping(fd); else threaded_ping(fd); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 1670ebf3..33af28c4 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -138,7 +138,10 @@ void * accept_thread(void * o) (void) o; - printf("Ouroboros ping server started.\n"); + printf("Ouroboros ping server started."); + if (server.busy) + printf(" [busy-poll]"); + printf("\n"); while (true) { fd = flow_accept(&qs, NULL); @@ -158,12 +161,56 @@ void * accept_thread(void * o) pthread_mutex_unlock(&server.lock); fccntl(fd, FLOWSFLAGS, - FLOWFRNOBLOCK | FLOWFRDWR | FLOWFRNOPART); + FLOWFRNOBLOCK | FLOWFRDWR + | FLOWFRNOPART); } return (void *) 0; } +void * busy_thread(void * o) +{ + char buf[OPING_BUF_SIZE]; + struct oping_msg * msg = (struct oping_msg *) buf; + int fd; + int msg_len; + + (void) o; + + /* Accept a single flow. */ + fd = flow_accept(NULL, NULL); + if (fd < 0) { + printf("Failed to accept flow.\n"); + return (void *) -1; + } + + printf("New flow %d (busy-poll).\n", fd); + + fccntl(fd, FLOWSFLAGS, + FLOWFRNOBLOCK | FLOWFRDWR + | FLOWFRNOPART); + + while (true) { + msg_len = flow_read(fd, buf, + OPING_BUF_SIZE); + if (msg_len == -EAGAIN) + continue; + if (msg_len < 0) + break; + + if (ntohl(msg->type) != ECHO_REQUEST) + continue; + + msg->type = htonl(ECHO_REPLY); + + flow_write(fd, buf, msg_len); + } + + flow_dealloc(fd); + + return (void *) 0; +} + int server_main(void) { struct sigaction sig_act; @@ -191,12 +238,21 @@ int server_main(void) } pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL); - pthread_create(&server.accept_pt, NULL, accept_thread, NULL); - pthread_create(&server.server_pt, NULL, server_thread, NULL); - pthread_join(server.accept_pt, NULL); + if (server.busy) { + pthread_create(&server.server_pt, NULL, + busy_thread, NULL); + pthread_join(server.server_pt, NULL); + pthread_cancel(server.cleaner_pt); + } else { + pthread_create(&server.accept_pt, NULL, + accept_thread, NULL); + pthread_create(&server.server_pt, NULL, + server_thread, NULL); + pthread_join(server.accept_pt, NULL); + pthread_cancel(server.server_pt); + } - pthread_cancel(server.server_pt); pthread_cancel(server.cleaner_pt); fset_destroy(server.flows); |
