Branch data Line data Source code
1 : : /* SPDX-License-Identifier: LGPL-2.1+ */
2 : :
3 : : #include <errno.h>
4 : : #include <fcntl.h>
5 : : #include <poll.h>
6 : : #include <stdbool.h>
7 : : #include <stdint.h>
8 : : #include <stdlib.h>
9 : : #include <sys/eventfd.h>
10 : : #include <sys/types.h>
11 : : #include <unistd.h>
12 : :
13 : : #include "barrier.h"
14 : : #include "fd-util.h"
15 : : #include "macro.h"
16 : :
17 : : /**
18 : : * Barriers
19 : : * This barrier implementation provides a simple synchronization method based
20 : : * on file-descriptors that can safely be used between threads and processes. A
21 : : * barrier object contains 2 shared counters based on eventfd. Both processes
22 : : * can now place barriers and wait for the other end to reach a random or
23 : : * specific barrier.
24 : : * Barriers are numbered, so you can either wait for the other end to reach any
25 : : * barrier or the last barrier that you placed. This way, you can use barriers
26 : : * for one-way *and* full synchronization. Note that even-though barriers are
27 : : * numbered, these numbers are internal and recycled once both sides reached the
28 : : * same barrier (implemented as a simple signed counter). It is thus not
29 : : * possible to address barriers by their ID.
30 : : *
31 : : * Barrier-API: Both ends can place as many barriers via barrier_place() as
32 : : * they want and each pair of barriers on both sides will be implicitly linked.
33 : : * Each side can use the barrier_wait/sync_*() family of calls to wait for the
34 : : * other side to place a specific barrier. barrier_wait_next() waits until the
35 : : * other side calls barrier_place(). No links between the barriers are
36 : : * considered and this simply serves as most basic asynchronous barrier.
37 : : * barrier_sync_next() is like barrier_wait_next() and waits for the other side
38 : : * to place their next barrier via barrier_place(). However, it only waits for
39 : : * barriers that are linked to a barrier we already placed. If the other side
40 : : * already placed more barriers than we did, barrier_sync_next() returns
41 : : * immediately.
42 : : * barrier_sync() extends barrier_sync_next() and waits until the other end
43 : : * placed as many barriers via barrier_place() as we did. If they already placed
44 : : * as many as we did (or more), it returns immediately.
45 : : *
46 : : * Additionally to basic barriers, an abortion event is available.
47 : : * barrier_abort() places an abortion event that cannot be undone. An abortion
48 : : * immediately cancels all placed barriers and replaces them. Any running and
49 : : * following wait/sync call besides barrier_wait_abortion() will immediately
50 : : * return false on both sides (otherwise, they always return true).
51 : : * barrier_abort() can be called multiple times on both ends and will be a
52 : : * no-op if already called on this side.
53 : : * barrier_wait_abortion() can be used to wait for the other side to call
54 : : * barrier_abort() and is the only wait/sync call that does not return
55 : : * immediately if we aborted outself. It only returns once the other side
56 : : * called barrier_abort().
57 : : *
58 : : * Barriers can be used for in-process and inter-process synchronization.
59 : : * However, for in-process synchronization you could just use mutexes.
60 : : * Therefore, main target is IPC and we require both sides to *not* share the FD
61 : : * table. If that's given, barriers provide target tracking: If the remote side
62 : : * exit()s, an abortion event is implicitly queued on the other side. This way,
63 : : * a sync/wait call will be woken up if the remote side crashed or exited
64 : : * unexpectedly. However, note that these abortion events are only queued if the
65 : : * barrier-queue has been drained. Therefore, it is safe to place a barrier and
66 : : * exit. The other side can safely wait on the barrier even though the exit
67 : : * queued an abortion event. Usually, the abortion event would overwrite the
68 : : * barrier, however, that's not true for exit-abortion events. Those are only
69 : : * queued if the barrier-queue is drained (thus, the receiving side has placed
70 : : * more barriers than the remote side).
71 : : */
72 : :
73 : : /**
74 : : * barrier_create() - Initialize a barrier object
75 : : * @obj: barrier to initialize
76 : : *
77 : : * This initializes a barrier object. The caller is responsible of allocating
78 : : * the memory and keeping it valid. The memory does not have to be zeroed
79 : : * beforehand.
80 : : * Two eventfd objects are allocated for each barrier. If allocation fails, an
81 : : * error is returned.
82 : : *
83 : : * If this function fails, the barrier is reset to an invalid state so it is
84 : : * safe to call barrier_destroy() on the object regardless whether the
85 : : * initialization succeeded or not.
86 : : *
87 : : * The caller is responsible to destroy the object via barrier_destroy() before
88 : : * releasing the underlying memory.
89 : : *
90 : : * Returns: 0 on success, negative error code on failure.
91 : : */
92 : 54 : int barrier_create(Barrier *b) {
93 : 54 : _cleanup_(barrier_destroyp) Barrier *staging = b;
94 : : int r;
95 : :
96 [ - + ]: 54 : assert(b);
97 : :
98 : 54 : b->me = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
99 [ - + ]: 54 : if (b->me < 0)
100 : 0 : return -errno;
101 : :
102 : 54 : b->them = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
103 [ - + ]: 54 : if (b->them < 0)
104 : 0 : return -errno;
105 : :
106 : 54 : r = pipe2(b->pipe, O_CLOEXEC | O_NONBLOCK);
107 [ - + ]: 54 : if (r < 0)
108 : 0 : return -errno;
109 : :
110 : 54 : staging = NULL;
111 : 54 : return 0;
112 : : }
113 : :
114 : : /**
115 : : * barrier_destroy() - Destroy a barrier object
116 : : * @b: barrier to destroy or NULL
117 : : *
118 : : * This destroys a barrier object that has previously been passed to
119 : : * barrier_create(). The object is released and reset to invalid
120 : : * state. Therefore, it is safe to call barrier_destroy() multiple
121 : : * times or even if barrier_create() failed. However, barrier must be
122 : : * always initialized with BARRIER_NULL.
123 : : *
124 : : * If @b is NULL, this is a no-op.
125 : : */
126 : 53 : void barrier_destroy(Barrier *b) {
127 [ - + ]: 53 : if (!b)
128 : 0 : return;
129 : :
130 : 53 : b->me = safe_close(b->me);
131 : 53 : b->them = safe_close(b->them);
132 : 53 : safe_close_pair(b->pipe);
133 : 53 : b->barriers = 0;
134 : : }
135 : :
136 : : /**
137 : : * barrier_set_role() - Set the local role of the barrier
138 : : * @b: barrier to operate on
139 : : * @role: role to set on the barrier
140 : : *
141 : : * This sets the roles on a barrier object. This is needed to know
142 : : * which side of the barrier you're on. Usually, the parent creates
143 : : * the barrier via barrier_create() and then calls fork() or clone().
144 : : * Therefore, the FDs are duplicated and the child retains the same
145 : : * barrier object.
146 : : *
147 : : * Both sides need to call barrier_set_role() after fork() or clone()
148 : : * are done. If this is not done, barriers will not work correctly.
149 : : *
150 : : * Note that barriers could be supported without fork() or clone(). However,
151 : : * this is currently not needed so it hasn't been implemented.
152 : : */
153 : 87 : void barrier_set_role(Barrier *b, unsigned role) {
154 [ - + ]: 87 : assert(b);
155 [ + - - + ]: 87 : assert(IN_SET(role, BARRIER_PARENT, BARRIER_CHILD));
156 : : /* make sure this is only called once */
157 [ + - - + ]: 87 : assert(b->pipe[0] >= 0 && b->pipe[1] >= 0);
158 : :
159 [ + + ]: 87 : if (role == BARRIER_PARENT)
160 : 51 : b->pipe[1] = safe_close(b->pipe[1]);
161 : : else {
162 : 36 : b->pipe[0] = safe_close(b->pipe[0]);
163 : :
164 : : /* swap me/them for children */
165 : 36 : SWAP_TWO(b->me, b->them);
166 : : }
167 : 87 : }
168 : :
169 : : /* places barrier; returns false if we aborted, otherwise true */
170 : 93 : static bool barrier_write(Barrier *b, uint64_t buf) {
171 : : ssize_t len;
172 : :
173 : : /* prevent new sync-points if we already aborted */
174 [ - + ]: 93 : if (barrier_i_aborted(b))
175 : 0 : return false;
176 : :
177 [ - + ]: 93 : assert(b->me >= 0);
178 : : do {
179 : 93 : len = write(b->me, &buf, sizeof(buf));
180 [ - + # # : 93 : } while (len < 0 && IN_SET(errno, EAGAIN, EINTR));
# # ]
181 : :
182 [ - + ]: 93 : if (len != sizeof(buf))
183 : 0 : goto error;
184 : :
185 : : /* lock if we aborted */
186 [ + + ]: 93 : if (buf >= (uint64_t)BARRIER_ABORTION) {
187 [ - + ]: 12 : if (barrier_they_aborted(b))
188 : 0 : b->barriers = BARRIER_WE_ABORTED;
189 : : else
190 : 12 : b->barriers = BARRIER_I_ABORTED;
191 [ + - ]: 81 : } else if (!barrier_is_aborted(b))
192 : 81 : b->barriers += buf;
193 : :
194 : 93 : return !barrier_i_aborted(b);
195 : :
196 : 0 : error:
197 : : /* If there is an unexpected error, we have to make this fatal. There
198 : : * is no way we can recover from sync-errors. Therefore, we close the
199 : : * pipe-ends and treat this as abortion. The other end will notice the
200 : : * pipe-close and treat it as abortion, too. */
201 : :
202 : 0 : safe_close_pair(b->pipe);
203 : 0 : b->barriers = BARRIER_WE_ABORTED;
204 : 0 : return false;
205 : : }
206 : :
207 : : /* waits for barriers; returns false if they aborted, otherwise true */
208 : 76 : static bool barrier_read(Barrier *b, int64_t comp) {
209 [ - + ]: 76 : if (barrier_they_aborted(b))
210 : 0 : return false;
211 : :
212 [ + + ]: 123 : while (b->barriers > comp) {
213 : 141 : struct pollfd pfd[2] = {
214 [ + + ]: 47 : { .fd = b->pipe[0] >= 0 ? b->pipe[0] : b->pipe[1],
215 : : .events = POLLHUP },
216 : 47 : { .fd = b->them,
217 : : .events = POLLIN }};
218 : : uint64_t buf;
219 : : int r;
220 : :
221 : 47 : r = poll(pfd, 2, -1);
222 [ - + # # : 47 : if (r < 0 && IN_SET(errno, EAGAIN, EINTR))
# # ]
223 : 0 : continue;
224 [ - + ]: 47 : else if (r < 0)
225 : 0 : goto error;
226 : :
227 [ + + ]: 47 : if (pfd[1].revents) {
228 : : ssize_t len;
229 : :
230 : : /* events on @them signal new data for us */
231 : 41 : len = read(b->them, &buf, sizeof(buf));
232 [ - + # # : 41 : if (len < 0 && IN_SET(errno, EAGAIN, EINTR))
# # ]
233 : 0 : continue;
234 : :
235 [ - + ]: 41 : if (len != sizeof(buf))
236 : 0 : goto error;
237 [ + - ]: 6 : } else if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL))
238 : : /* POLLHUP on the pipe tells us the other side exited.
239 : : * We treat this as implicit abortion. But we only
240 : : * handle it if there's no event on the eventfd. This
241 : : * guarantees that exit-abortions do not overwrite real
242 : : * barriers. */
243 : 6 : buf = BARRIER_ABORTION;
244 : : else
245 : 0 : continue;
246 : :
247 : : /* lock if they aborted */
248 [ + + ]: 47 : if (buf >= (uint64_t)BARRIER_ABORTION) {
249 [ + + ]: 15 : if (barrier_i_aborted(b))
250 : 3 : b->barriers = BARRIER_WE_ABORTED;
251 : : else
252 : 12 : b->barriers = BARRIER_THEY_ABORTED;
253 [ + - ]: 32 : } else if (!barrier_is_aborted(b))
254 : 32 : b->barriers -= buf;
255 : : }
256 : :
257 : 76 : return !barrier_they_aborted(b);
258 : :
259 : 0 : error:
260 : : /* If there is an unexpected error, we have to make this fatal. There
261 : : * is no way we can recover from sync-errors. Therefore, we close the
262 : : * pipe-ends and treat this as abortion. The other end will notice the
263 : : * pipe-close and treat it as abortion, too. */
264 : :
265 : 0 : safe_close_pair(b->pipe);
266 : 0 : b->barriers = BARRIER_WE_ABORTED;
267 : 0 : return false;
268 : : }
269 : :
270 : : /**
271 : : * barrier_place() - Place a new barrier
272 : : * @b: barrier object
273 : : *
274 : : * This places a new barrier on the barrier object. If either side already
275 : : * aborted, this is a no-op and returns "false". Otherwise, the barrier is
276 : : * placed and this returns "true".
277 : : *
278 : : * Returns: true if barrier was placed, false if either side aborted.
279 : : */
280 : 81 : bool barrier_place(Barrier *b) {
281 [ - + ]: 81 : assert(b);
282 : :
283 [ - + ]: 81 : if (barrier_is_aborted(b))
284 : 0 : return false;
285 : :
286 : 81 : barrier_write(b, BARRIER_SINGLE);
287 : 81 : return true;
288 : : }
289 : :
290 : : /**
291 : : * barrier_abort() - Abort the synchronization
292 : : * @b: barrier object to abort
293 : : *
294 : : * This aborts the barrier-synchronization. If barrier_abort() was already
295 : : * called on this side, this is a no-op. Otherwise, the barrier is put into the
296 : : * ABORT-state and will stay there. The other side is notified about the
297 : : * abortion. Any following attempt to place normal barriers or to wait on normal
298 : : * barriers will return immediately as "false".
299 : : *
300 : : * You can wait for the other side to call barrier_abort(), too. Use
301 : : * barrier_wait_abortion() for that.
302 : : *
303 : : * Returns: false if the other side already aborted, true otherwise.
304 : : */
305 : 12 : bool barrier_abort(Barrier *b) {
306 [ - + ]: 12 : assert(b);
307 : :
308 : 12 : barrier_write(b, BARRIER_ABORTION);
309 : 12 : return !barrier_they_aborted(b);
310 : : }
311 : :
312 : : /**
313 : : * barrier_wait_next() - Wait for the next barrier of the other side
314 : : * @b: barrier to operate on
315 : : *
316 : : * This waits until the other side places its next barrier. This is independent
317 : : * of any barrier-links and just waits for any next barrier of the other side.
318 : : *
319 : : * If either side aborted, this returns false.
320 : : *
321 : : * Returns: false if either side aborted, true otherwise.
322 : : */
323 : 14 : bool barrier_wait_next(Barrier *b) {
324 [ - + ]: 14 : assert(b);
325 : :
326 [ - + ]: 14 : if (barrier_is_aborted(b))
327 : 0 : return false;
328 : :
329 : 14 : barrier_read(b, b->barriers - 1);
330 : 14 : return !barrier_is_aborted(b);
331 : : }
332 : :
333 : : /**
334 : : * barrier_wait_abortion() - Wait for the other side to abort
335 : : * @b: barrier to operate on
336 : : *
337 : : * This waits until the other side called barrier_abort(). This can be called
338 : : * regardless whether the local side already called barrier_abort() or not.
339 : : *
340 : : * If the other side has already aborted, this returns immediately.
341 : : *
342 : : * Returns: false if the local side aborted, true otherwise.
343 : : */
344 : 6 : bool barrier_wait_abortion(Barrier *b) {
345 [ - + ]: 6 : assert(b);
346 : :
347 : 6 : barrier_read(b, BARRIER_THEY_ABORTED);
348 : 6 : return !barrier_i_aborted(b);
349 : : }
350 : :
351 : : /**
352 : : * barrier_sync_next() - Wait for the other side to place a next linked barrier
353 : : * @b: barrier to operate on
354 : : *
355 : : * This is like barrier_wait_next() and waits for the other side to call
356 : : * barrier_place(). However, this only waits for linked barriers. That means, if
357 : : * the other side already placed more barriers than (or as much as) we did, this
358 : : * returns immediately instead of waiting.
359 : : *
360 : : * If either side aborted, this returns false.
361 : : *
362 : : * Returns: false if either side aborted, true otherwise.
363 : : */
364 : 28 : bool barrier_sync_next(Barrier *b) {
365 [ - + ]: 28 : assert(b);
366 : :
367 [ - + ]: 28 : if (barrier_is_aborted(b))
368 : 0 : return false;
369 : :
370 : 28 : barrier_read(b, MAX((int64_t)0, b->barriers - 1));
371 : 28 : return !barrier_is_aborted(b);
372 : : }
373 : :
374 : : /**
375 : : * barrier_sync() - Wait for the other side to place as many barriers as we did
376 : : * @b: barrier to operate on
377 : : *
378 : : * This is like barrier_sync_next() but waits for the other side to call
379 : : * barrier_place() as often as we did (in total). If they already placed as much
380 : : * as we did (or more), this returns immediately instead of waiting.
381 : : *
382 : : * If either side aborted, this returns false.
383 : : *
384 : : * Returns: false if either side aborted, true otherwise.
385 : : */
386 : 28 : bool barrier_sync(Barrier *b) {
387 [ - + ]: 28 : assert(b);
388 : :
389 [ - + ]: 28 : if (barrier_is_aborted(b))
390 : 0 : return false;
391 : :
392 : 28 : barrier_read(b, 0);
393 : 28 : return !barrier_is_aborted(b);
394 : : }
|