summaryrefslogtreecommitdiff
path: root/src/lib/tw.c
blob: ccde7dd1cc5265703567b964ee370c88322b7a41 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
/*
 * Ouroboros - Copyright (C) 2016 - 2026
 *
 * Generic deadline-ordered callback queue (timing wheel)
 *
 *    Dimitri Staessens <dimitri@ouroboros.rocks>
 *    Sander Vrijders   <sander@ouroboros.rocks>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * version 2.1 as published by the Free Software Foundation.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., http://www.fsf.org/about/contact/.
 */

#if defined(__linux__) || defined(__CYGWIN__)
#define _DEFAULT_SOURCE
#else
#define _POSIX_C_SOURCE 200809L
#endif

#include "config.h"

#include <ouroboros/list.h>
#include <ouroboros/pthread.h>
#include <ouroboros/time.h>
#include <ouroboros/tw.h>

#include <assert.h>
#include <stdbool.h>
#include <stdint.h>

/* 3 levels × 256 slots, 1 ms / 16 ms / 256 ms per-slot resolution. */
#define TW_LVLS  3
#define TW_SLOTS 256
#define TW_BUMP  4
#define TW_RES   20  /* 2^20 ns ≈ 1 ms per slot at level 0. */

#define TW_SLOT(x) ((x) & (TW_SLOTS - 1))

static struct {
        struct list_head levels[TW_LVLS][TW_SLOTS];
        size_t           prv[TW_LVLS];
        pthread_mutex_t  mtx;
        pthread_mutex_t  move_mtx;
        bool             initialised;
} tw;

static size_t tw_lvl_res(size_t lvl)
{
        return TW_RES + TW_BUMP * lvl;
}

/* Smallest level whose slot range covers the deadline. */
static size_t tw_pick_lvl(uint64_t now_ns,
                          uint64_t deadline_ns)
{
        uint64_t delta;
        size_t   lvl;

        delta = deadline_ns > now_ns ? deadline_ns - now_ns : 0;
        lvl = 0;

        while (lvl < TW_LVLS - 1 && (delta >> tw_lvl_res(lvl)) >= TW_SLOTS)
                ++lvl;

        return lvl;
}

static size_t tw_slot(uint64_t ns,
                      size_t   lvl)
{
        return TW_SLOT(ns >> tw_lvl_res(lvl));
}

int tw_init(void)
{
        struct timespec now;
        size_t          i;
        size_t          j;

        assert(!tw.initialised);

        if (pthread_mutex_init(&tw.mtx, NULL))
                goto fail_mtx;

        if (pthread_mutex_init(&tw.move_mtx, NULL))
                goto fail_move_mtx;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        for (i = 0; i < TW_LVLS; ++i) {
                tw.prv[i] = TW_SLOT(tw_slot(TS_TO_UINT64(now), i) - 1);
                for (j = 0; j < TW_SLOTS; ++j)
                        list_head_init(&tw.levels[i][j]);
        }

        tw.initialised = true;

        return 0;

 fail_move_mtx:
        pthread_mutex_destroy(&tw.mtx);
 fail_mtx:
        return -1;
}

void tw_fini(void)
{
        size_t i;
        size_t j;

        assert(tw.initialised);

        for (i = 0; i < TW_LVLS; ++i) {
                for (j = 0; j < TW_SLOTS; ++j)
                        assert(list_is_empty(&tw.levels[i][j]));
        }

        pthread_mutex_destroy(&tw.move_mtx);
        pthread_mutex_destroy(&tw.mtx);

        tw.initialised = false;
}

void tw_init_entry(struct tw_entry * e)
{
        list_head_init(&e->next);

        e->deadline_ns = 0;
        e->fire        = NULL;
        e->arg         = NULL;
        e->lvl         = 0;
}

void tw_post(struct tw_entry * e,
             uint64_t          deadline_ns,
             tw_fire_fn_t      fire,
             void *            arg)
{
        struct timespec now;
        size_t          lvl;
        size_t          slot;

        assert(tw.initialised);

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        lvl  = tw_pick_lvl(TS_TO_UINT64(now), deadline_ns);
        /* +1 so deadline <= slot_start; lands later in slot. */
        slot = TW_SLOT(tw_slot(deadline_ns, lvl) + 1);

        e->deadline_ns = deadline_ns;
        e->fire        = fire;
        e->arg         = arg;
        e->lvl         = lvl;

        pthread_mutex_lock(&tw.mtx);

        if (!list_is_empty(&e->next))
                list_del(&e->next);

        list_add_tail(&e->next, &tw.levels[lvl][slot]);

        pthread_mutex_unlock(&tw.mtx);
}

void tw_cancel(struct tw_entry * e)
{
        if (e == NULL)
                return;

        assert(tw.initialised);

        pthread_mutex_lock(&tw.mtx);

        if (!list_is_empty(&e->next)) {
                list_del(&e->next);
                list_head_init(&e->next);
        }

        pthread_mutex_unlock(&tw.mtx);
}

void tw_move(void)
{
        struct timespec    now;
        struct list_head   deferred;
        struct list_head * p;
        uint64_t           now_ns;
        size_t             i;
        size_t             j;
        size_t             cur;

        assert(tw.initialised);

        if (pthread_mutex_trylock(&tw.move_mtx) != 0)
                return;

        pthread_cleanup_push(__cleanup_mutex_unlock, &tw.move_mtx);

        pthread_mutex_lock(&tw.mtx);

        pthread_cleanup_push(__cleanup_mutex_unlock, &tw.mtx);

        clock_gettime(PTHREAD_COND_CLOCK, &now);
        now_ns = TS_TO_UINT64(now);

        for (i = 0; i < TW_LVLS; ++i) {
                cur = tw_slot(now_ns, i);

                j = tw.prv[i];
                if (cur < j)
                        cur += TW_SLOTS;

                while (j++ < cur) {
                        size_t s = TW_SLOT(j);

                        /* Pop-front so fire may mutate any entry. */
                        list_head_init(&deferred);

                        while (!list_is_empty(&tw.levels[i][s])) {
                                struct tw_entry * e;
                                p = tw.levels[i][s].nxt;
                                e = list_entry(p, struct tw_entry, next);
                                list_del(&e->next);

                                if (e->deadline_ns > now_ns) {
                                        list_add_tail(&e->next, &deferred);
                                        continue;
                                }

                                pthread_mutex_unlock(&tw.mtx);
                                e->fire(e->arg);
                                pthread_mutex_lock(&tw.mtx);
                        }

                        while (!list_is_empty(&deferred)) {
                                p = deferred.nxt;
                                list_del(p);
                                list_add_tail(p, &tw.levels[i][s]);
                        }
                }

                tw.prv[i] = TW_SLOT(cur);
        }

        pthread_cleanup_pop(true); /* tw.mtx */
        pthread_cleanup_pop(true); /* tw.move_mtx */
}

/* Earliest pending deadline at level lvl, INT64_MAX if level is empty. */
static int64_t tw_lvl_earliest(size_t   lvl,
                               uint64_t now_ns)
{
        size_t cur = tw_slot(now_ns, lvl);
        size_t j;

        for (j = 1; j <= TW_SLOTS; ++j) {
                size_t s = TW_SLOT(cur + j);

                if (list_is_empty(&tw.levels[lvl][s]))
                        continue;

                return (int64_t)(now_ns + ((uint64_t) j << tw_lvl_res(lvl)));
        }

        return INT64_MAX;
}

void tw_next_expiry(struct timespec * out)
{
        struct timespec now;
        uint64_t        now_ns;
        int64_t         earliest = INT64_MAX;
        size_t          i;

        assert(tw.initialised);

        clock_gettime(PTHREAD_COND_CLOCK, &now);
        now_ns = TS_TO_UINT64(now);

        pthread_mutex_lock(&tw.mtx);

        for (i = 0; i < TW_LVLS; ++i) {
                int64_t dl = tw_lvl_earliest(i, now_ns);
                if (dl < earliest)
                        earliest = dl;
        }

        pthread_mutex_unlock(&tw.mtx);

        if (earliest == INT64_MAX) {
                /* Empty wheel: tv_nsec=-1 is an invalid normalised value. */
                out->tv_sec  = 0;
                out->tv_nsec = -1;
        } else {
                UINT64_TO_TS((uint64_t) earliest, out);
        }
}