Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1+ */
2 :
3 : #include <sys/poll.h>
4 :
5 : #include "alloc-util.h"
6 : #include "errno-util.h"
7 : #include "fd-util.h"
8 : #include "hashmap.h"
9 : #include "list.h"
10 : #include "process-util.h"
11 : #include "set.h"
12 : #include "socket-util.h"
13 : #include "string-table.h"
14 : #include "string-util.h"
15 : #include "strv.h"
16 : #include "time-util.h"
17 : #include "umask-util.h"
18 : #include "user-util.h"
19 : #include "varlink.h"
20 :
21 : #define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
22 : #define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U
23 :
24 : #define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC)
25 : #define VARLINK_BUFFER_MAX (16U*1024U*1024U)
26 : #define VARLINK_READ_SIZE (64U*1024U)
27 :
28 : typedef enum VarlinkState {
29 : /* Client side states */
30 : VARLINK_IDLE_CLIENT,
31 : VARLINK_AWAITING_REPLY,
32 : VARLINK_CALLING,
33 : VARLINK_CALLED,
34 : VARLINK_PROCESSING_REPLY,
35 :
36 : /* Server side states */
37 : VARLINK_IDLE_SERVER,
38 : VARLINK_PROCESSING_METHOD,
39 : VARLINK_PROCESSING_METHOD_MORE,
40 : VARLINK_PROCESSING_METHOD_ONEWAY,
41 : VARLINK_PROCESSED_METHOD,
42 : VARLINK_PROCESSED_METHOD_MORE,
43 : VARLINK_PENDING_METHOD,
44 : VARLINK_PENDING_METHOD_MORE,
45 :
46 : /* Common states (only during shutdown) */
47 : VARLINK_PENDING_DISCONNECT,
48 : VARLINK_PENDING_TIMEOUT,
49 : VARLINK_PROCESSING_DISCONNECT,
50 : VARLINK_PROCESSING_TIMEOUT,
51 : VARLINK_PROCESSING_FAILURE,
52 : VARLINK_DISCONNECTED,
53 :
54 : _VARLINK_STATE_MAX,
55 : _VARLINK_STATE_INVALID = -1
56 : } VarlinkState;
57 :
58 : /* Tests whether we are not yet disconnected. Note that this is true during all states where the connection
59 : * is still good for something, and false only when it's dead for good. This means: when we are
60 : * asynchronously connecting to a peer and the connect() is still pending, then this will return 'true', as
61 : * the connection is still good, and we are likely to be able to properly operate on it soon. */
62 : #define VARLINK_STATE_IS_ALIVE(state) \
63 : IN_SET(state, \
64 : VARLINK_IDLE_CLIENT, \
65 : VARLINK_AWAITING_REPLY, \
66 : VARLINK_CALLING, \
67 : VARLINK_CALLED, \
68 : VARLINK_PROCESSING_REPLY, \
69 : VARLINK_IDLE_SERVER, \
70 : VARLINK_PROCESSING_METHOD, \
71 : VARLINK_PROCESSING_METHOD_MORE, \
72 : VARLINK_PROCESSING_METHOD_ONEWAY, \
73 : VARLINK_PROCESSED_METHOD, \
74 : VARLINK_PROCESSED_METHOD_MORE, \
75 : VARLINK_PENDING_METHOD, \
76 : VARLINK_PENDING_METHOD_MORE)
77 :
78 : struct Varlink {
79 : unsigned n_ref;
80 :
81 : VarlinkServer *server;
82 :
83 : VarlinkState state;
84 : bool connecting; /* This boolean indicates whether the socket fd we are operating on is currently
85 : * processing an asynchronous connect(). In that state we watch the socket for
86 : * EPOLLOUT, but we refrain from calling read() or write() on the socket as that
87 : * will trigger ENOTCONN. Note that this boolean is kept separate from the
88 : * VarlinkState above on purpose: while the connect() is still not complete we
89 : * already want to allow queuing of messages and similar. Thus it's nice to keep
90 : * these two state concepts separate: the VarlinkState encodes what our own view of
91 : * the connection is, i.e. whether we think it's a server, a client, and has
92 : * something queued already, while 'connecting' tells us a detail about the
93 : * transport used below, that should have no effect on how we otherwise accept and
94 : * process operations from the user.
95 : *
96 : * Or to say this differently: VARLINK_STATE_IS_ALIVE(state) tells you whether the
97 : * connection is good to use, even if it might not be fully connected
98 : * yet. connecting=true then informs you that actually we are still connecting, and
99 : * the connection is actually not established yet and thus any requests you enqueue
100 : * now will still work fine but will be queued only, not sent yet, but that
101 : * shouldn't stop you from using the connection, since eventually whatever you queue
102 : * *will* be sent.
103 : *
104 : * Or to say this even differently: 'state' is a high-level ("application layer"
105 : * high, if you so will) state, while 'conecting' is a low-level ("transport layer"
106 : * low, if you so will) state, and while they are not entirely unrelated and
107 : * sometimes propagate effects to each other they are only asynchronously connected
108 : * at most. */
109 : unsigned n_pending;
110 :
111 : int fd;
112 :
113 : char *input_buffer; /* valid data starts at input_buffer_index, ends at input_buffer_index+input_buffer_size */
114 : size_t input_buffer_allocated;
115 : size_t input_buffer_index;
116 : size_t input_buffer_size;
117 : size_t input_buffer_unscanned;
118 :
119 : char *output_buffer; /* valid data starts at output_buffer_index, ends at output_buffer_index+output_buffer_size */
120 : size_t output_buffer_allocated;
121 : size_t output_buffer_index;
122 : size_t output_buffer_size;
123 :
124 : VarlinkReply reply_callback;
125 :
126 : JsonVariant *current;
127 : JsonVariant *reply;
128 :
129 : struct ucred ucred;
130 : bool ucred_acquired:1;
131 :
132 : bool write_disconnected:1;
133 : bool read_disconnected:1;
134 : bool prefer_read_write:1;
135 : bool got_pollhup:1;
136 :
137 : usec_t timestamp;
138 : usec_t timeout;
139 :
140 : void *userdata;
141 : char *description;
142 :
143 : sd_event *event;
144 : sd_event_source *io_event_source;
145 : sd_event_source *time_event_source;
146 : sd_event_source *quit_event_source;
147 : sd_event_source *defer_event_source;
148 : };
149 :
150 : typedef struct VarlinkServerSocket VarlinkServerSocket;
151 :
152 : struct VarlinkServerSocket {
153 : VarlinkServer *server;
154 :
155 : int fd;
156 : char *address;
157 :
158 : sd_event_source *event_source;
159 :
160 : LIST_FIELDS(VarlinkServerSocket, sockets);
161 : };
162 :
163 : struct VarlinkServer {
164 : unsigned n_ref;
165 : VarlinkServerFlags flags;
166 :
167 : LIST_HEAD(VarlinkServerSocket, sockets);
168 :
169 : Hashmap *methods;
170 : VarlinkConnect connect_callback;
171 :
172 : sd_event *event;
173 : int64_t event_priority;
174 :
175 : unsigned n_connections;
176 : Hashmap *by_uid;
177 :
178 : void *userdata;
179 : char *description;
180 :
181 : unsigned connections_max;
182 : unsigned connections_per_uid_max;
183 : };
184 :
185 : static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
186 : [VARLINK_IDLE_CLIENT] = "idle-client",
187 : [VARLINK_AWAITING_REPLY] = "awaiting-reply",
188 : [VARLINK_CALLING] = "calling",
189 : [VARLINK_CALLED] = "called",
190 : [VARLINK_PROCESSING_REPLY] = "processing-reply",
191 : [VARLINK_IDLE_SERVER] = "idle-server",
192 : [VARLINK_PROCESSING_METHOD] = "processing-method",
193 : [VARLINK_PROCESSING_METHOD_MORE] = "processing-method-more",
194 : [VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway",
195 : [VARLINK_PROCESSED_METHOD] = "processed-method",
196 : [VARLINK_PROCESSED_METHOD_MORE] = "processed-method-more",
197 : [VARLINK_PENDING_METHOD] = "pending-method",
198 : [VARLINK_PENDING_METHOD_MORE] = "pending-method-more",
199 : [VARLINK_PENDING_DISCONNECT] = "pending-disconnect",
200 : [VARLINK_PENDING_TIMEOUT] = "pending-timeout",
201 : [VARLINK_PROCESSING_DISCONNECT] = "processing-disconnect",
202 : [VARLINK_PROCESSING_TIMEOUT] = "processing-timeout",
203 : [VARLINK_PROCESSING_FAILURE] = "processing-failure",
204 : [VARLINK_DISCONNECTED] = "disconnected",
205 : };
206 :
207 3295 : DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state, VarlinkState);
208 :
209 : #define varlink_log_errno(v, error, fmt, ...) \
210 : log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__)
211 :
212 : #define varlink_log(v, fmt, ...) \
213 : log_debug("%s: " fmt, varlink_description(v), ##__VA_ARGS__)
214 :
215 : #define varlink_server_log_errno(s, error, fmt, ...) \
216 : log_debug_errno(error, "%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
217 :
218 : #define varlink_server_log(s, fmt, ...) \
219 : log_debug("%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
220 :
221 2693 : static inline const char *varlink_description(Varlink *v) {
222 2693 : return strna(v ? v->description : NULL);
223 : }
224 :
225 262 : static inline const char *varlink_server_description(VarlinkServer *s) {
226 262 : return strna(s ? s->description : NULL);
227 : }
228 :
229 1881 : static void varlink_set_state(Varlink *v, VarlinkState state) {
230 1881 : assert(v);
231 1881 : assert(state >= 0 && state < _VARLINK_STATE_MAX);
232 :
233 1881 : if (v->state < 0)
234 467 : varlink_log(v, "varlink: setting state %s",
235 : varlink_state_to_string(state));
236 : else
237 1414 : varlink_log(v, "varlink: changing state %s → %s",
238 : varlink_state_to_string(v->state),
239 : varlink_state_to_string(state));
240 :
241 1881 : v->state = state;
242 1881 : }
243 :
244 467 : static int varlink_new(Varlink **ret) {
245 : Varlink *v;
246 :
247 467 : assert(ret);
248 :
249 467 : v = new(Varlink, 1);
250 467 : if (!v)
251 0 : return -ENOMEM;
252 :
253 467 : *v = (Varlink) {
254 : .n_ref = 1,
255 : .fd = -1,
256 :
257 : .state = _VARLINK_STATE_INVALID,
258 :
259 : .ucred.uid = UID_INVALID,
260 : .ucred.gid = GID_INVALID,
261 :
262 : .timestamp = USEC_INFINITY,
263 : .timeout = VARLINK_DEFAULT_TIMEOUT_USEC
264 : };
265 :
266 467 : *ret = v;
267 467 : return 0;
268 : }
269 :
270 336 : int varlink_connect_address(Varlink **ret, const char *address) {
271 336 : _cleanup_(varlink_unrefp) Varlink *v = NULL;
272 : union sockaddr_union sockaddr;
273 : int r;
274 :
275 336 : assert_return(ret, -EINVAL);
276 336 : assert_return(address, -EINVAL);
277 :
278 336 : r = sockaddr_un_set_path(&sockaddr.un, address);
279 336 : if (r < 0)
280 0 : return r;
281 :
282 336 : r = varlink_new(&v);
283 336 : if (r < 0)
284 0 : return r;
285 :
286 336 : v->fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
287 336 : if (v->fd < 0)
288 0 : return -errno;
289 :
290 336 : if (connect(v->fd, &sockaddr.sa, SOCKADDR_UN_LEN(sockaddr.un)) < 0) {
291 205 : if (!IN_SET(errno, EAGAIN, EINPROGRESS))
292 0 : return -errno;
293 :
294 205 : v->connecting = true; /* We are asynchronously connecting, i.e. the connect() is being
295 : * processed in the background. As long as that's the case the socket
296 : * is in a special state: it's there, we can poll it for EPOLLOUT, but
297 : * if we attempt to write() to it before we see EPOLLOUT we'll get
298 : * ENOTCONN (and not EAGAIN, like we would for a normal connected
299 : * socket that isn't writable at the moment). Since ENOTCONN on write()
300 : * hence can mean two different things (i.e. connection not complete
301 : * yet vs. already disconnected again), we store as a boolean whether
302 : * we are still in connect(). */
303 : }
304 :
305 336 : varlink_set_state(v, VARLINK_IDLE_CLIENT);
306 :
307 336 : *ret = TAKE_PTR(v);
308 336 : return r;
309 : }
310 :
311 0 : int varlink_connect_fd(Varlink **ret, int fd) {
312 : Varlink *v;
313 : int r;
314 :
315 0 : assert_return(ret, -EINVAL);
316 0 : assert_return(fd >= 0, -EBADF);
317 :
318 0 : r = fd_nonblock(fd, true);
319 0 : if (r < 0)
320 0 : return r;
321 :
322 0 : r = varlink_new(&v);
323 0 : if (r < 0)
324 0 : return r;
325 :
326 0 : v->fd = fd;
327 0 : varlink_set_state(v, VARLINK_IDLE_CLIENT);
328 :
329 : /* Note that if this function is called we assume the passed socket (if it is one) is already
330 : * properly connected, i.e. any asynchronous connect() done on it already completed. Because of that
331 : * we'll not set the 'connecting' boolean here, i.e. we don't need to avoid write()ing to the socket
332 : * until the connection is fully set up. Behaviour here is hence a bit different from
333 : * varlink_connect_address() above, as there we do handle asynchronous connections ourselves and
334 : * avoid doing write() on it before we saw EPOLLOUT for the first time. */
335 :
336 0 : *ret = v;
337 0 : return 0;
338 : }
339 :
340 934 : static void varlink_detach_event_sources(Varlink *v) {
341 934 : assert(v);
342 :
343 934 : v->io_event_source = sd_event_source_disable_unref(v->io_event_source);
344 :
345 934 : v->time_event_source = sd_event_source_disable_unref(v->time_event_source);
346 :
347 934 : v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source);
348 :
349 934 : v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source);
350 934 : }
351 :
352 934 : static void varlink_clear(Varlink *v) {
353 934 : assert(v);
354 :
355 934 : varlink_detach_event_sources(v);
356 :
357 934 : v->fd = safe_close(v->fd);
358 :
359 934 : v->input_buffer = mfree(v->input_buffer);
360 934 : v->output_buffer = mfree(v->output_buffer);
361 :
362 934 : v->current = json_variant_unref(v->current);
363 934 : v->reply = json_variant_unref(v->reply);
364 :
365 934 : v->event = sd_event_unref(v->event);
366 934 : }
367 :
368 467 : static Varlink* varlink_destroy(Varlink *v) {
369 467 : if (!v)
370 0 : return NULL;
371 :
372 : /* If this is called the server object must already been unreffed here. Why that? because when we
373 : * linked up the varlink connection with the server object we took one ref in each direction */
374 467 : assert(!v->server);
375 :
376 467 : varlink_clear(v);
377 :
378 467 : free(v->description);
379 467 : return mfree(v);
380 : }
381 :
382 5691 : DEFINE_TRIVIAL_REF_UNREF_FUNC(Varlink, varlink, varlink_destroy);
383 :
384 1265 : static int varlink_test_disconnect(Varlink *v) {
385 1265 : assert(v);
386 :
387 : /* Tests whether we the the connection has been terminated. We are careful to not stop processing it
388 : * prematurely, since we want to handle half-open connections as well as possible and want to flush
389 : * out and read data before we close down if we can. */
390 :
391 : /* Already disconnected? */
392 1265 : if (!VARLINK_STATE_IS_ALIVE(v->state))
393 334 : return 0;
394 :
395 : /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */
396 931 : if (v->connecting)
397 205 : return 0;
398 :
399 : /* Still something to write and we can write? Stay around */
400 726 : if (v->output_buffer_size > 0 && !v->write_disconnected)
401 0 : return 0;
402 :
403 : /* Both sides gone already? Then there's no need to stick around */
404 726 : if (v->read_disconnected && v->write_disconnected)
405 0 : goto disconnect;
406 :
407 : /* If we are waiting for incoming data but the read side is shut down, disconnect. */
408 726 : if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
409 129 : goto disconnect;
410 :
411 : /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
412 : * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side
413 : * being down if we never wrote anything. */
414 597 : if (IN_SET(v->state, VARLINK_IDLE_CLIENT) && (v->write_disconnected || v->got_pollhup))
415 204 : goto disconnect;
416 :
417 393 : return 0;
418 :
419 333 : disconnect:
420 333 : varlink_set_state(v, VARLINK_PENDING_DISCONNECT);
421 333 : return 1;
422 : }
423 :
424 2015 : static int varlink_write(Varlink *v) {
425 : ssize_t n;
426 :
427 2015 : assert(v);
428 :
429 2015 : if (!VARLINK_STATE_IS_ALIVE(v->state))
430 334 : return 0;
431 1681 : if (v->connecting) /* Writing while we are still wait for a non-blocking connect() to complete will
432 : * result in ENOTCONN, hence exit early here */
433 205 : return 0;
434 1476 : if (v->output_buffer_size == 0)
435 930 : return 0;
436 546 : if (v->write_disconnected)
437 205 : return 0;
438 :
439 341 : assert(v->fd >= 0);
440 :
441 : /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible
442 : * with non-socket IO, hence fall back automatically */
443 341 : if (!v->prefer_read_write) {
444 341 : n = send(v->fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size, MSG_DONTWAIT|MSG_NOSIGNAL);
445 341 : if (n < 0 && errno == ENOTSOCK)
446 0 : v->prefer_read_write = true;
447 : }
448 341 : if (v->prefer_read_write)
449 0 : n = write(v->fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size);
450 341 : if (n < 0) {
451 205 : if (errno == EAGAIN)
452 0 : return 0;
453 :
454 205 : if (ERRNO_IS_DISCONNECT(errno)) {
455 : /* If we get informed about a disconnect on write, then let's remember that, but not
456 : * act on it just yet. Let's wait for read() to report the issue first. */
457 205 : v->write_disconnected = true;
458 205 : return 1;
459 : }
460 :
461 0 : return -errno;
462 : }
463 :
464 136 : v->output_buffer_size -= n;
465 :
466 136 : if (v->output_buffer_size == 0)
467 136 : v->output_buffer_index = 0;
468 : else
469 0 : v->output_buffer_index += n;
470 :
471 136 : v->timestamp = now(CLOCK_MONOTONIC);
472 136 : return 1;
473 : }
474 :
475 1532 : static int varlink_read(Varlink *v) {
476 : size_t rs;
477 : ssize_t n;
478 :
479 1532 : assert(v);
480 :
481 1532 : if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER))
482 872 : return 0;
483 660 : if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
484 1 : return 0;
485 659 : if (v->current)
486 0 : return 0;
487 659 : if (v->input_buffer_unscanned > 0)
488 0 : return 0;
489 659 : if (v->read_disconnected)
490 129 : return 0;
491 :
492 530 : if (v->input_buffer_size >= VARLINK_BUFFER_MAX)
493 0 : return -ENOBUFS;
494 :
495 530 : assert(v->fd >= 0);
496 :
497 530 : if (v->input_buffer_allocated <= v->input_buffer_index + v->input_buffer_size) {
498 : size_t add;
499 :
500 134 : add = MIN(VARLINK_BUFFER_MAX - v->input_buffer_size, VARLINK_READ_SIZE);
501 :
502 134 : if (v->input_buffer_index == 0) {
503 :
504 134 : if (!GREEDY_REALLOC(v->input_buffer, v->input_buffer_allocated, v->input_buffer_size + add))
505 0 : return -ENOMEM;
506 :
507 : } else {
508 : char *b;
509 :
510 0 : b = new(char, v->input_buffer_size + add);
511 0 : if (!b)
512 0 : return -ENOMEM;
513 :
514 0 : memcpy(b, v->input_buffer + v->input_buffer_index, v->input_buffer_size);
515 :
516 0 : free_and_replace(v->input_buffer, b);
517 :
518 0 : v->input_buffer_allocated = v->input_buffer_size + add;
519 0 : v->input_buffer_index = 0;
520 : }
521 : }
522 :
523 530 : rs = v->input_buffer_allocated - (v->input_buffer_index + v->input_buffer_size);
524 :
525 530 : if (!v->prefer_read_write) {
526 530 : n = recv(v->fd, v->input_buffer + v->input_buffer_index + v->input_buffer_size, rs, MSG_DONTWAIT);
527 530 : if (n < 0 && errno == ENOTSOCK)
528 0 : v->prefer_read_write = true;
529 : }
530 530 : if (v->prefer_read_write)
531 0 : n = read(v->fd, v->input_buffer + v->input_buffer_index + v->input_buffer_size, rs);
532 530 : if (n < 0) {
533 264 : if (errno == EAGAIN)
534 263 : return 0;
535 :
536 1 : if (ERRNO_IS_DISCONNECT(errno)) {
537 0 : v->read_disconnected = true;
538 0 : return 1;
539 : }
540 :
541 1 : return -errno;
542 : }
543 266 : if (n == 0) { /* EOF */
544 130 : v->read_disconnected = true;
545 130 : return 1;
546 : }
547 :
548 136 : v->input_buffer_size += n;
549 136 : v->input_buffer_unscanned += n;
550 :
551 136 : return 1;
552 : }
553 :
554 1668 : static int varlink_parse_message(Varlink *v) {
555 : const char *e, *begin;
556 : size_t sz;
557 : int r;
558 :
559 1668 : assert(v);
560 :
561 1668 : if (v->current)
562 0 : return 0;
563 1668 : if (v->input_buffer_unscanned <= 0)
564 1532 : return 0;
565 :
566 136 : assert(v->input_buffer_unscanned <= v->input_buffer_size);
567 136 : assert(v->input_buffer_index + v->input_buffer_size <= v->input_buffer_allocated);
568 :
569 136 : begin = v->input_buffer + v->input_buffer_index;
570 :
571 136 : e = memchr(begin + v->input_buffer_size - v->input_buffer_unscanned, 0, v->input_buffer_unscanned);
572 136 : if (!e) {
573 0 : v->input_buffer_unscanned = 0;
574 0 : return 0;
575 : }
576 :
577 136 : sz = e - begin + 1;
578 :
579 136 : varlink_log(v, "New incoming message: %s", begin);
580 :
581 136 : r = json_parse(begin, &v->current, NULL, NULL);
582 136 : if (r < 0)
583 0 : return r;
584 :
585 136 : v->input_buffer_size -= sz;
586 :
587 136 : if (v->input_buffer_size == 0)
588 136 : v->input_buffer_index = 0;
589 : else
590 0 : v->input_buffer_index += sz;
591 :
592 136 : v->input_buffer_unscanned = v->input_buffer_size;
593 136 : return 1;
594 : }
595 :
596 598 : static int varlink_test_timeout(Varlink *v) {
597 598 : assert(v);
598 :
599 598 : if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
600 594 : return 0;
601 4 : if (v->timeout == USEC_INFINITY)
602 0 : return 0;
603 :
604 4 : if (now(CLOCK_MONOTONIC) < usec_add(v->timestamp, v->timeout))
605 4 : return 0;
606 :
607 0 : varlink_set_state(v, VARLINK_PENDING_TIMEOUT);
608 :
609 0 : return 1;
610 : }
611 :
612 334 : static int varlink_dispatch_local_error(Varlink *v, const char *error) {
613 : int r;
614 :
615 334 : assert(v);
616 334 : assert(error);
617 :
618 334 : if (!v->reply_callback)
619 333 : return 0;
620 :
621 1 : r = v->reply_callback(v, NULL, error, VARLINK_REPLY_ERROR|VARLINK_REPLY_LOCAL, v->userdata);
622 1 : if (r < 0)
623 0 : log_debug_errno(r, "Reply callback returned error, ignoring: %m");
624 :
625 1 : return 1;
626 : }
627 :
628 598 : static int varlink_dispatch_timeout(Varlink *v) {
629 598 : assert(v);
630 :
631 598 : if (v->state != VARLINK_PENDING_TIMEOUT)
632 598 : return 0;
633 :
634 0 : varlink_set_state(v, VARLINK_PROCESSING_TIMEOUT);
635 0 : varlink_dispatch_local_error(v, VARLINK_ERROR_TIMEOUT);
636 0 : varlink_close(v);
637 :
638 0 : return 1;
639 : }
640 :
641 932 : static int varlink_dispatch_disconnect(Varlink *v) {
642 932 : assert(v);
643 :
644 932 : if (v->state != VARLINK_PENDING_DISCONNECT)
645 598 : return 0;
646 :
647 334 : varlink_set_state(v, VARLINK_PROCESSING_DISCONNECT);
648 334 : varlink_dispatch_local_error(v, VARLINK_ERROR_DISCONNECTED);
649 334 : varlink_close(v);
650 :
651 334 : return 1;
652 : }
653 :
654 477 : static int varlink_sanitize_parameters(JsonVariant **v) {
655 477 : assert(v);
656 :
657 : /* Varlink always wants a parameters list, hence make one if the caller doesn't want any */
658 477 : if (!*v)
659 1 : return json_variant_new_object(v, NULL, 0);
660 476 : else if (!json_variant_is_object(*v))
661 0 : return -EINVAL;
662 :
663 476 : return 0;
664 : }
665 :
666 1674 : static int varlink_dispatch_reply(Varlink *v) {
667 1674 : _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
668 1674 : VarlinkReplyFlags flags = 0;
669 1674 : const char *error = NULL;
670 : JsonVariant *e;
671 : const char *k;
672 : int r;
673 :
674 1674 : assert(v);
675 :
676 1674 : if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
677 1660 : return 0;
678 14 : if (!v->current)
679 11 : return 0;
680 :
681 3 : assert(v->n_pending > 0);
682 :
683 3 : if (!json_variant_is_object(v->current))
684 0 : goto invalid;
685 :
686 7 : JSON_VARIANT_OBJECT_FOREACH(k, e, v->current) {
687 :
688 4 : if (streq(k, "error")) {
689 1 : if (error)
690 0 : goto invalid;
691 1 : if (!json_variant_is_string(e))
692 0 : goto invalid;
693 :
694 1 : error = json_variant_string(e);
695 1 : flags |= VARLINK_REPLY_ERROR;
696 :
697 3 : } else if (streq(k, "parameters")) {
698 3 : if (parameters)
699 0 : goto invalid;
700 3 : if (!json_variant_is_object(e))
701 0 : goto invalid;
702 :
703 3 : parameters = json_variant_ref(e);
704 :
705 0 : } else if (streq(k, "continues")) {
706 0 : if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
707 0 : goto invalid;
708 :
709 0 : if (!json_variant_is_boolean(e))
710 0 : goto invalid;
711 :
712 0 : if (json_variant_boolean(e))
713 0 : flags |= VARLINK_REPLY_CONTINUES;
714 : } else
715 0 : goto invalid;
716 : }
717 :
718 3 : if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
719 0 : goto invalid;
720 :
721 3 : r = varlink_sanitize_parameters(¶meters);
722 3 : if (r < 0)
723 0 : goto invalid;
724 :
725 3 : if (v->state == VARLINK_AWAITING_REPLY) {
726 1 : varlink_set_state(v, VARLINK_PROCESSING_REPLY);
727 :
728 1 : if (v->reply_callback) {
729 1 : r = v->reply_callback(v, parameters, error, flags, v->userdata);
730 1 : if (r < 0)
731 0 : log_debug_errno(r, "Reply callback returned error, ignoring: %m");
732 : }
733 :
734 1 : v->current = json_variant_unref(v->current);
735 :
736 1 : if (v->state == VARLINK_PROCESSING_REPLY) {
737 1 : assert(v->n_pending > 0);
738 1 : v->n_pending--;
739 :
740 1 : varlink_set_state(v, v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
741 : }
742 : } else {
743 2 : assert(v->state == VARLINK_CALLING);
744 :
745 2 : if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
746 0 : goto invalid;
747 :
748 2 : varlink_set_state(v, VARLINK_CALLED);
749 : }
750 :
751 3 : return 1;
752 :
753 0 : invalid:
754 0 : varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
755 0 : varlink_dispatch_local_error(v, VARLINK_ERROR_PROTOCOL);
756 0 : varlink_close(v);
757 :
758 0 : return 1;
759 : }
760 :
761 1671 : static int varlink_dispatch_method(Varlink *v) {
762 1671 : _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
763 1671 : VarlinkMethodFlags flags = 0;
764 1671 : const char *method = NULL, *error;
765 : JsonVariant *e;
766 : VarlinkMethod callback;
767 : const char *k;
768 : int r;
769 :
770 1671 : assert(v);
771 :
772 1671 : if (v->state != VARLINK_IDLE_SERVER)
773 883 : return 0;
774 788 : if (!v->current)
775 655 : return 0;
776 :
777 133 : if (!json_variant_is_object(v->current))
778 0 : goto invalid;
779 :
780 529 : JSON_VARIANT_OBJECT_FOREACH(k, e, v->current) {
781 :
782 396 : if (streq(k, "method")) {
783 133 : if (method)
784 0 : goto invalid;
785 133 : if (!json_variant_is_string(e))
786 0 : goto invalid;
787 :
788 133 : method = json_variant_string(e);
789 :
790 263 : } else if (streq(k, "parameters")) {
791 133 : if (parameters)
792 0 : goto invalid;
793 133 : if (!json_variant_is_object(e))
794 0 : goto invalid;
795 :
796 133 : parameters = json_variant_ref(e);
797 :
798 130 : } else if (streq(k, "oneway")) {
799 :
800 130 : if ((flags & (VARLINK_METHOD_ONEWAY|VARLINK_METHOD_MORE)) != 0)
801 0 : goto invalid;
802 :
803 130 : if (!json_variant_is_boolean(e))
804 0 : goto invalid;
805 :
806 130 : if (json_variant_boolean(e))
807 130 : flags |= VARLINK_METHOD_ONEWAY;
808 :
809 0 : } else if (streq(k, "more")) {
810 :
811 0 : if ((flags & (VARLINK_METHOD_ONEWAY|VARLINK_METHOD_MORE)) != 0)
812 0 : goto invalid;
813 :
814 0 : if (!json_variant_is_boolean(e))
815 0 : goto invalid;
816 :
817 0 : if (json_variant_boolean(e))
818 0 : flags |= VARLINK_METHOD_MORE;
819 :
820 : } else
821 0 : goto invalid;
822 : }
823 :
824 133 : if (!method)
825 0 : goto invalid;
826 :
827 133 : r = varlink_sanitize_parameters(¶meters);
828 133 : if (r < 0)
829 0 : goto fail;
830 :
831 266 : varlink_set_state(v, (flags & VARLINK_METHOD_MORE) ? VARLINK_PROCESSING_METHOD_MORE :
832 133 : (flags & VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY :
833 : VARLINK_PROCESSING_METHOD);
834 :
835 133 : assert(v->server);
836 :
837 133 : if (STR_IN_SET(method, "org.varlink.service.GetInfo", "org.varlink.service.GetInterface")) {
838 : /* For now, we don't implement a single of varlink's own methods */
839 0 : callback = NULL;
840 0 : error = VARLINK_ERROR_METHOD_NOT_IMPLEMENTED;
841 133 : } else if (startswith(method, "org.varlink.service.")) {
842 0 : callback = NULL;
843 0 : error = VARLINK_ERROR_METHOD_NOT_FOUND;
844 : } else {
845 133 : callback = hashmap_get(v->server->methods, method);
846 133 : error = VARLINK_ERROR_METHOD_NOT_FOUND;
847 : }
848 :
849 133 : if (callback) {
850 3 : r = callback(v, parameters, flags, v->userdata);
851 3 : if (r < 0) {
852 0 : log_debug_errno(r, "Callback for %s returned error: %m", method);
853 :
854 : /* We got an error back from the callback. Propagate it to the client if the method call remains unanswered. */
855 0 : if (!FLAGS_SET(flags, VARLINK_METHOD_ONEWAY)) {
856 0 : r = varlink_errorb(v, VARLINK_ERROR_SYSTEM, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("errno", JSON_BUILD_INTEGER(-r))));
857 0 : if (r < 0)
858 0 : return r;
859 : }
860 : }
861 130 : } else if (!FLAGS_SET(flags, VARLINK_METHOD_ONEWAY)) {
862 1 : assert(error);
863 :
864 1 : r = varlink_errorb(v, error, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method))));
865 1 : if (r < 0)
866 0 : return r;
867 : }
868 :
869 133 : switch (v->state) {
870 :
871 133 : case VARLINK_PROCESSED_METHOD: /* Method call is fully processed */
872 : case VARLINK_PROCESSING_METHOD_ONEWAY: /* ditto */
873 133 : v->current = json_variant_unref(v->current);
874 133 : varlink_set_state(v, VARLINK_IDLE_SERVER);
875 133 : break;
876 :
877 0 : case VARLINK_PROCESSING_METHOD: /* Method call wasn't replied to, will be replied to later */
878 0 : varlink_set_state(v, VARLINK_PENDING_METHOD);
879 0 : break;
880 :
881 0 : case VARLINK_PROCESSED_METHOD_MORE: /* One reply for a "more" message was sent, more to come */
882 : case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */
883 0 : varlink_set_state(v, VARLINK_PENDING_METHOD_MORE);
884 0 : break;
885 :
886 0 : default:
887 0 : assert_not_reached("Unexpected state");
888 :
889 : }
890 :
891 133 : return r;
892 :
893 0 : invalid:
894 0 : r = -EINVAL;
895 :
896 0 : fail:
897 0 : varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
898 0 : varlink_dispatch_local_error(v, VARLINK_ERROR_PROTOCOL);
899 0 : varlink_close(v);
900 :
901 0 : return r;
902 : }
903 :
904 2014 : int varlink_process(Varlink *v) {
905 : int r;
906 :
907 2014 : assert_return(v, -EINVAL);
908 :
909 2014 : if (v->state == VARLINK_DISCONNECTED)
910 0 : return -ENOTCONN;
911 :
912 2014 : varlink_ref(v);
913 :
914 2014 : r = varlink_write(v);
915 2014 : if (r != 0)
916 340 : goto finish;
917 :
918 1674 : r = varlink_dispatch_reply(v);
919 1674 : if (r != 0)
920 3 : goto finish;
921 :
922 1671 : r = varlink_dispatch_method(v);
923 1671 : if (r != 0)
924 3 : goto finish;
925 :
926 1668 : r = varlink_parse_message(v);
927 1668 : if (r != 0)
928 136 : goto finish;
929 :
930 1532 : r = varlink_read(v);
931 1532 : if (r != 0)
932 267 : goto finish;
933 :
934 1265 : r = varlink_test_disconnect(v);
935 1265 : if (r != 0)
936 333 : goto finish;
937 :
938 932 : r = varlink_dispatch_disconnect(v);
939 932 : if (r != 0)
940 334 : goto finish;
941 :
942 598 : r = varlink_test_timeout(v);
943 598 : if (r != 0)
944 0 : goto finish;
945 :
946 598 : r = varlink_dispatch_timeout(v);
947 598 : if (r != 0)
948 0 : goto finish;
949 :
950 598 : finish:
951 2014 : if (r >= 0 && v->defer_event_source) {
952 : int q;
953 :
954 : /* If we did some processing, make sure we are called again soon */
955 1669 : q = sd_event_source_set_enabled(v->defer_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF);
956 1669 : if (q < 0)
957 0 : r = q;
958 : }
959 :
960 2014 : if (r < 0) {
961 1 : if (VARLINK_STATE_IS_ALIVE(v->state))
962 : /* Initiate disconnection */
963 1 : varlink_set_state(v, VARLINK_PENDING_DISCONNECT);
964 : else
965 : /* We failed while disconnecting, in that case close right away */
966 0 : varlink_close(v);
967 : }
968 :
969 2014 : varlink_unref(v);
970 2014 : return r;
971 : }
972 :
973 467 : static void handle_revents(Varlink *v, int revents) {
974 467 : assert(v);
975 :
976 467 : if (v->connecting) {
977 : /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect()
978 : * to complete on, we know we are ready. We don't read the connection error here though,
979 : * we'll get the error on the next read() or write(). */
980 205 : if ((revents & (POLLOUT|POLLHUP)) == 0)
981 0 : return;
982 :
983 205 : varlink_log(v, "Anynchronous connection completed.");
984 205 : v->connecting = false;
985 : } else {
986 : /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing
987 : * what we can. However, we do care about POLLHUP to detect connection termination even if we
988 : * momentarily don't want to read nor write anything. */
989 :
990 262 : if (!FLAGS_SET(revents, POLLHUP))
991 132 : return;
992 :
993 130 : varlink_log(v, "Got POLLHUP from socket.");
994 130 : v->got_pollhup = true;
995 : }
996 : }
997 :
998 2 : int varlink_wait(Varlink *v, usec_t timeout) {
999 : struct timespec ts;
1000 : struct pollfd pfd;
1001 : int r, fd, events;
1002 : usec_t t;
1003 :
1004 2 : assert_return(v, -EINVAL);
1005 :
1006 2 : if (v->state == VARLINK_DISCONNECTED)
1007 0 : return -ENOTCONN;
1008 :
1009 2 : r = varlink_get_timeout(v, &t);
1010 2 : if (r < 0)
1011 0 : return r;
1012 2 : if (t != USEC_INFINITY) {
1013 : usec_t n;
1014 :
1015 2 : n = now(CLOCK_MONOTONIC);
1016 2 : if (t < n)
1017 0 : t = 0;
1018 : else
1019 2 : t = usec_sub_unsigned(t, n);
1020 : }
1021 :
1022 2 : if (timeout != USEC_INFINITY &&
1023 0 : (t == USEC_INFINITY || timeout < t))
1024 0 : t = timeout;
1025 :
1026 2 : fd = varlink_get_fd(v);
1027 2 : if (fd < 0)
1028 0 : return fd;
1029 :
1030 2 : events = varlink_get_events(v);
1031 2 : if (events < 0)
1032 0 : return events;
1033 :
1034 2 : pfd = (struct pollfd) {
1035 : .fd = fd,
1036 : .events = events,
1037 : };
1038 :
1039 2 : r = ppoll(&pfd, 1,
1040 2 : t == USEC_INFINITY ? NULL : timespec_store(&ts, t),
1041 : NULL);
1042 2 : if (r < 0)
1043 0 : return -errno;
1044 :
1045 2 : handle_revents(v, pfd.revents);
1046 :
1047 2 : return r > 0 ? 1 : 0;
1048 : }
1049 :
1050 2 : int varlink_get_fd(Varlink *v) {
1051 :
1052 2 : assert_return(v, -EINVAL);
1053 :
1054 2 : if (v->state == VARLINK_DISCONNECTED)
1055 0 : return -ENOTCONN;
1056 2 : if (v->fd < 0)
1057 0 : return -EBADF;
1058 :
1059 2 : return v->fd;
1060 : }
1061 :
1062 368012 : int varlink_get_events(Varlink *v) {
1063 368012 : int ret = 0;
1064 :
1065 368012 : assert_return(v, -EINVAL);
1066 :
1067 368012 : if (v->state == VARLINK_DISCONNECTED)
1068 0 : return -ENOTCONN;
1069 :
1070 368012 : if (v->connecting) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which
1071 : * tells us that the connection is now complete. Before that we should neither
1072 : * write() or read() from the fd. */
1073 136940 : return EPOLLOUT;
1074 :
1075 231072 : if (!v->read_disconnected &&
1076 230814 : IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
1077 90297 : !v->current &&
1078 90163 : v->input_buffer_unscanned <= 0)
1079 90029 : ret |= EPOLLIN;
1080 :
1081 231072 : if (!v->write_disconnected &&
1082 230662 : v->output_buffer_size > 0)
1083 16645 : ret |= EPOLLOUT;
1084 :
1085 231072 : return ret;
1086 : }
1087 :
1088 368012 : int varlink_get_timeout(Varlink *v, usec_t *ret) {
1089 368012 : assert_return(v, -EINVAL);
1090 :
1091 368012 : if (v->state == VARLINK_DISCONNECTED)
1092 0 : return -ENOTCONN;
1093 :
1094 368012 : if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING) &&
1095 1091 : v->timeout != USEC_INFINITY) {
1096 1091 : if (ret)
1097 1091 : *ret = usec_add(v->timestamp, v->timeout);
1098 1091 : return 1;
1099 : } else {
1100 366921 : if (ret)
1101 366921 : *ret = USEC_INFINITY;
1102 366921 : return 0;
1103 : }
1104 : }
1105 :
1106 135 : int varlink_flush(Varlink *v) {
1107 135 : int ret = 0, r;
1108 :
1109 135 : assert_return(v, -EINVAL);
1110 :
1111 135 : if (v->state == VARLINK_DISCONNECTED)
1112 2 : return -ENOTCONN;
1113 :
1114 1 : for (;;) {
1115 : struct pollfd pfd;
1116 :
1117 134 : if (v->output_buffer_size == 0)
1118 133 : break;
1119 1 : if (v->write_disconnected)
1120 0 : return -ECONNRESET;
1121 :
1122 1 : r = varlink_write(v);
1123 1 : if (r < 0)
1124 0 : return r;
1125 1 : if (r > 0) {
1126 1 : ret = 1;
1127 1 : continue;
1128 : }
1129 :
1130 0 : pfd = (struct pollfd) {
1131 0 : .fd = v->fd,
1132 : .events = POLLOUT,
1133 : };
1134 :
1135 0 : if (poll(&pfd, 1, -1) < 0)
1136 0 : return -errno;
1137 :
1138 0 : handle_revents(v, pfd.revents);
1139 : }
1140 :
1141 133 : return ret;
1142 : }
1143 :
1144 467 : static void varlink_detach_server(Varlink *v) {
1145 467 : assert(v);
1146 :
1147 467 : if (!v->server)
1148 336 : return;
1149 :
1150 131 : if (v->server->by_uid &&
1151 131 : v->ucred_acquired &&
1152 131 : uid_is_valid(v->ucred.uid)) {
1153 : unsigned c;
1154 :
1155 131 : c = PTR_TO_UINT(hashmap_get(v->server->by_uid, UID_TO_PTR(v->ucred.uid)));
1156 131 : assert(c > 0);
1157 :
1158 131 : if (c == 1)
1159 1 : (void) hashmap_remove(v->server->by_uid, UID_TO_PTR(v->ucred.uid));
1160 : else
1161 130 : (void) hashmap_replace(v->server->by_uid, UID_TO_PTR(v->ucred.uid), UINT_TO_PTR(c - 1));
1162 : }
1163 :
1164 131 : assert(v->server->n_connections > 0);
1165 131 : v->server->n_connections--;
1166 :
1167 : /* If this is a connection associated to a server, then let's disconnect the server and the
1168 : * connection from each other. This drops the dangling reference that connect_callback() set up. */
1169 131 : v->server = varlink_server_unref(v->server);
1170 131 : varlink_unref(v);
1171 : }
1172 :
1173 469 : int varlink_close(Varlink *v) {
1174 :
1175 469 : assert_return(v, -EINVAL);
1176 :
1177 469 : if (v->state == VARLINK_DISCONNECTED)
1178 2 : return 0;
1179 :
1180 467 : varlink_set_state(v, VARLINK_DISCONNECTED);
1181 :
1182 : /* Let's take a reference first, since varlink_detach_server() might drop the final (dangling) ref
1183 : * which would destroy us before we can call varlink_clear() */
1184 467 : varlink_ref(v);
1185 467 : varlink_detach_server(v);
1186 467 : varlink_clear(v);
1187 467 : varlink_unref(v);
1188 :
1189 467 : return 1;
1190 : }
1191 :
1192 3 : Varlink* varlink_flush_close_unref(Varlink *v) {
1193 :
1194 3 : if (!v)
1195 0 : return NULL;
1196 :
1197 3 : (void) varlink_flush(v);
1198 3 : (void) varlink_close(v);
1199 :
1200 3 : return varlink_unref(v);
1201 : }
1202 :
1203 341 : static int varlink_enqueue_json(Varlink *v, JsonVariant *m) {
1204 341 : _cleanup_free_ char *text = NULL;
1205 : int r;
1206 :
1207 341 : assert(v);
1208 341 : assert(m);
1209 :
1210 341 : r = json_variant_format(m, 0, &text);
1211 341 : if (r < 0)
1212 0 : return r;
1213 341 : assert(text[r] == '\0');
1214 :
1215 341 : if (v->output_buffer_size + r + 1 > VARLINK_BUFFER_MAX)
1216 0 : return -ENOBUFS;
1217 :
1218 341 : varlink_log(v, "Sending message: %s", text);
1219 :
1220 341 : if (v->output_buffer_size == 0) {
1221 :
1222 341 : free_and_replace(v->output_buffer, text);
1223 :
1224 341 : v->output_buffer_size = v->output_buffer_allocated = r + 1;
1225 341 : v->output_buffer_index = 0;
1226 :
1227 0 : } else if (v->output_buffer_index == 0) {
1228 :
1229 0 : if (!GREEDY_REALLOC(v->output_buffer, v->output_buffer_allocated, v->output_buffer_size + r + 1))
1230 0 : return -ENOMEM;
1231 :
1232 0 : memcpy(v->output_buffer + v->output_buffer_size, text, r + 1);
1233 0 : v->output_buffer_size += r + 1;
1234 :
1235 : } else {
1236 : char *n;
1237 0 : const size_t new_size = v->output_buffer_size + r + 1;
1238 :
1239 0 : n = new(char, new_size);
1240 0 : if (!n)
1241 0 : return -ENOMEM;
1242 :
1243 0 : memcpy(mempcpy(n, v->output_buffer + v->output_buffer_index, v->output_buffer_size), text, r + 1);
1244 :
1245 0 : free_and_replace(v->output_buffer, n);
1246 0 : v->output_buffer_allocated = v->output_buffer_size = new_size;
1247 0 : v->output_buffer_index = 0;
1248 : }
1249 :
1250 341 : return 0;
1251 : }
1252 :
1253 334 : int varlink_send(Varlink *v, const char *method, JsonVariant *parameters) {
1254 334 : _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1255 : int r;
1256 :
1257 334 : assert_return(v, -EINVAL);
1258 334 : assert_return(method, -EINVAL);
1259 :
1260 334 : if (v->state == VARLINK_DISCONNECTED)
1261 0 : return -ENOTCONN;
1262 334 : if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
1263 0 : return -EBUSY;
1264 :
1265 334 : r = varlink_sanitize_parameters(¶meters);
1266 334 : if (r < 0)
1267 0 : return r;
1268 :
1269 334 : r = json_build(&m, JSON_BUILD_OBJECT(
1270 : JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1271 : JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
1272 : JSON_BUILD_PAIR("oneway", JSON_BUILD_BOOLEAN(true))));
1273 334 : if (r < 0)
1274 0 : return r;
1275 :
1276 334 : r = varlink_enqueue_json(v, m);
1277 334 : if (r < 0)
1278 0 : return r;
1279 :
1280 : /* No state change here, this is one-way only after all */
1281 334 : v->timestamp = now(CLOCK_MONOTONIC);
1282 334 : return 0;
1283 : }
1284 :
1285 333 : int varlink_sendb(Varlink *v, const char *method, ...) {
1286 333 : _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1287 : va_list ap;
1288 : int r;
1289 :
1290 333 : assert_return(v, -EINVAL);
1291 :
1292 333 : va_start(ap, method);
1293 333 : r = json_buildv(¶meters, ap);
1294 333 : va_end(ap);
1295 :
1296 333 : if (r < 0)
1297 0 : return r;
1298 :
1299 333 : return varlink_send(v, method, parameters);
1300 : }
1301 :
1302 2 : int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters) {
1303 2 : _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1304 : int r;
1305 :
1306 2 : assert_return(v, -EINVAL);
1307 2 : assert_return(method, -EINVAL);
1308 :
1309 2 : if (v->state == VARLINK_DISCONNECTED)
1310 0 : return -ENOTCONN;
1311 2 : if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
1312 0 : return -EBUSY;
1313 :
1314 2 : r = varlink_sanitize_parameters(¶meters);
1315 2 : if (r < 0)
1316 0 : return r;
1317 :
1318 2 : r = json_build(&m, JSON_BUILD_OBJECT(
1319 : JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1320 : JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1321 2 : if (r < 0)
1322 0 : return r;
1323 :
1324 2 : r = varlink_enqueue_json(v, m);
1325 2 : if (r < 0)
1326 0 : return r;
1327 :
1328 2 : varlink_set_state(v, VARLINK_AWAITING_REPLY);
1329 2 : v->n_pending++;
1330 2 : v->timestamp = now(CLOCK_MONOTONIC);
1331 :
1332 2 : return 0;
1333 : }
1334 :
1335 1 : int varlink_invokeb(Varlink *v, const char *method, ...) {
1336 1 : _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1337 : va_list ap;
1338 : int r;
1339 :
1340 1 : assert_return(v, -EINVAL);
1341 :
1342 1 : va_start(ap, method);
1343 1 : r = json_buildv(¶meters, ap);
1344 1 : va_end(ap);
1345 :
1346 1 : if (r < 0)
1347 0 : return r;
1348 :
1349 1 : return varlink_invoke(v, method, parameters);
1350 : }
1351 :
1352 2 : int varlink_call(
1353 : Varlink *v,
1354 : const char *method,
1355 : JsonVariant *parameters,
1356 : JsonVariant **ret_parameters,
1357 : const char **ret_error_id,
1358 : VarlinkReplyFlags *ret_flags) {
1359 :
1360 2 : _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1361 : int r;
1362 :
1363 2 : assert_return(v, -EINVAL);
1364 2 : assert_return(method, -EINVAL);
1365 :
1366 2 : if (v->state == VARLINK_DISCONNECTED)
1367 0 : return -ENOTCONN;
1368 2 : if (!IN_SET(v->state, VARLINK_IDLE_CLIENT))
1369 0 : return -EBUSY;
1370 :
1371 2 : assert(v->n_pending == 0); /* n_pending can't be > 0 if we are in VARLINK_IDLE_CLIENT state */
1372 :
1373 2 : r = varlink_sanitize_parameters(¶meters);
1374 2 : if (r < 0)
1375 0 : return r;
1376 :
1377 2 : r = json_build(&m, JSON_BUILD_OBJECT(
1378 : JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1379 : JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1380 2 : if (r < 0)
1381 0 : return r;
1382 :
1383 2 : r = varlink_enqueue_json(v, m);
1384 2 : if (r < 0)
1385 0 : return r;
1386 :
1387 2 : varlink_set_state(v, VARLINK_CALLING);
1388 2 : v->n_pending++;
1389 2 : v->timestamp = now(CLOCK_MONOTONIC);
1390 :
1391 12 : while (v->state == VARLINK_CALLING) {
1392 :
1393 10 : r = varlink_process(v);
1394 10 : if (r < 0)
1395 0 : return r;
1396 10 : if (r > 0)
1397 8 : continue;
1398 :
1399 2 : r = varlink_wait(v, USEC_INFINITY);
1400 2 : if (r < 0)
1401 0 : return r;
1402 : }
1403 :
1404 2 : switch (v->state) {
1405 :
1406 2 : case VARLINK_CALLED:
1407 2 : assert(v->current);
1408 :
1409 2 : json_variant_unref(v->reply);
1410 2 : v->reply = TAKE_PTR(v->current);
1411 :
1412 2 : varlink_set_state(v, VARLINK_IDLE_CLIENT);
1413 2 : assert(v->n_pending == 1);
1414 2 : v->n_pending--;
1415 :
1416 2 : if (ret_parameters)
1417 2 : *ret_parameters = json_variant_by_key(v->reply, "parameters");
1418 2 : if (ret_error_id)
1419 2 : *ret_error_id = json_variant_string(json_variant_by_key(v->reply, "error"));
1420 2 : if (ret_flags)
1421 0 : *ret_flags = 0;
1422 :
1423 2 : return 1;
1424 :
1425 0 : case VARLINK_PENDING_DISCONNECT:
1426 : case VARLINK_DISCONNECTED:
1427 0 : return -ECONNRESET;
1428 :
1429 0 : case VARLINK_PENDING_TIMEOUT:
1430 0 : return -ETIME;
1431 :
1432 0 : default:
1433 0 : assert_not_reached("Unexpected state after method call.");
1434 : }
1435 : }
1436 :
1437 1 : int varlink_callb(
1438 : Varlink *v,
1439 : const char *method,
1440 : JsonVariant **ret_parameters,
1441 : const char **ret_error_id,
1442 : VarlinkReplyFlags *ret_flags, ...) {
1443 :
1444 1 : _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1445 : va_list ap;
1446 : int r;
1447 :
1448 1 : assert_return(v, -EINVAL);
1449 :
1450 1 : va_start(ap, ret_flags);
1451 1 : r = json_buildv(¶meters, ap);
1452 1 : va_end(ap);
1453 :
1454 1 : if (r < 0)
1455 0 : return r;
1456 :
1457 1 : return varlink_call(v, method, parameters, ret_parameters, ret_error_id, ret_flags);
1458 : }
1459 :
1460 2 : int varlink_reply(Varlink *v, JsonVariant *parameters) {
1461 2 : _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1462 : int r;
1463 :
1464 2 : assert_return(v, -EINVAL);
1465 :
1466 2 : if (v->state == VARLINK_DISCONNECTED)
1467 0 : return -ENOTCONN;
1468 2 : if (!IN_SET(v->state,
1469 : VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
1470 : VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
1471 0 : return -EBUSY;
1472 :
1473 2 : r = varlink_sanitize_parameters(¶meters);
1474 2 : if (r < 0)
1475 0 : return r;
1476 :
1477 2 : r = json_build(&m, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1478 2 : if (r < 0)
1479 0 : return r;
1480 :
1481 2 : r = varlink_enqueue_json(v, m);
1482 2 : if (r < 0)
1483 0 : return r;
1484 :
1485 2 : if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
1486 : /* We just replied to a method call that was let hanging for a while (i.e. we were outside of
1487 : * the varlink_dispatch_method() stack frame), which means with this reply we are ready to
1488 : * process further messages. */
1489 0 : v->current = json_variant_unref(v->current);
1490 0 : varlink_set_state(v, VARLINK_IDLE_SERVER);
1491 : } else
1492 : /* We replied to a method call from within the varlink_dispatch_method() stack frame), which
1493 : * means we should it handle the rest of the state engine. */
1494 2 : varlink_set_state(v, VARLINK_PROCESSED_METHOD);
1495 :
1496 2 : return 1;
1497 : }
1498 :
1499 0 : int varlink_replyb(Varlink *v, ...) {
1500 0 : _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1501 : va_list ap;
1502 : int r;
1503 :
1504 0 : assert_return(v, -EINVAL);
1505 :
1506 0 : va_start(ap, v);
1507 0 : r = json_buildv(¶meters, ap);
1508 0 : va_end(ap);
1509 :
1510 0 : if (r < 0)
1511 0 : return r;
1512 :
1513 0 : return varlink_reply(v, parameters);
1514 : }
1515 :
1516 1 : int varlink_error(Varlink *v, const char *error_id, JsonVariant *parameters) {
1517 1 : _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1518 : int r;
1519 :
1520 1 : assert_return(v, -EINVAL);
1521 1 : assert_return(error_id, -EINVAL);
1522 :
1523 1 : if (v->state == VARLINK_DISCONNECTED)
1524 0 : return -ENOTCONN;
1525 1 : if (!IN_SET(v->state,
1526 : VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
1527 : VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
1528 0 : return -EBUSY;
1529 :
1530 1 : r = varlink_sanitize_parameters(¶meters);
1531 1 : if (r < 0)
1532 0 : return r;
1533 :
1534 1 : r = json_build(&m, JSON_BUILD_OBJECT(
1535 : JSON_BUILD_PAIR("error", JSON_BUILD_STRING(error_id)),
1536 : JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1537 1 : if (r < 0)
1538 0 : return r;
1539 :
1540 1 : r = varlink_enqueue_json(v, m);
1541 1 : if (r < 0)
1542 0 : return r;
1543 :
1544 1 : if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
1545 0 : v->current = json_variant_unref(v->current);
1546 0 : varlink_set_state(v, VARLINK_IDLE_SERVER);
1547 : } else
1548 1 : varlink_set_state(v, VARLINK_PROCESSED_METHOD);
1549 :
1550 1 : return 1;
1551 : }
1552 :
1553 1 : int varlink_errorb(Varlink *v, const char *error_id, ...) {
1554 1 : _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1555 : va_list ap;
1556 : int r;
1557 :
1558 1 : assert_return(v, -EINVAL);
1559 1 : assert_return(error_id, -EINVAL);
1560 :
1561 1 : va_start(ap, error_id);
1562 1 : r = json_buildv(¶meters, ap);
1563 1 : va_end(ap);
1564 :
1565 1 : if (r < 0)
1566 0 : return r;
1567 :
1568 1 : return varlink_error(v, error_id, parameters);
1569 : }
1570 :
1571 0 : int varlink_error_invalid_parameter(Varlink *v, JsonVariant *parameters) {
1572 :
1573 0 : assert_return(v, -EINVAL);
1574 0 : assert_return(parameters, -EINVAL);
1575 :
1576 : /* We expect to be called in one of two ways: the 'parameters' argument is a string variant in which
1577 : * case it is the parameter key name that is invalid. Or the 'parameters' argument is an object
1578 : * variant in which case we'll pull out the first key. The latter mode is useful in functions that
1579 : * don't expect any arguments. */
1580 :
1581 0 : if (json_variant_is_string(parameters))
1582 0 : return varlink_error(v, VARLINK_ERROR_INVALID_PARAMETER, parameters);
1583 :
1584 0 : if (json_variant_is_object(parameters) &&
1585 0 : json_variant_elements(parameters) > 0)
1586 0 : return varlink_error(v, VARLINK_ERROR_INVALID_PARAMETER,
1587 : json_variant_by_index(parameters, 0));
1588 :
1589 0 : return -EINVAL;
1590 : }
1591 :
1592 0 : int varlink_notify(Varlink *v, JsonVariant *parameters) {
1593 0 : _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1594 : int r;
1595 :
1596 0 : assert_return(v, -EINVAL);
1597 :
1598 0 : if (v->state == VARLINK_DISCONNECTED)
1599 0 : return -ENOTCONN;
1600 0 : if (!IN_SET(v->state, VARLINK_PROCESSING_METHOD_MORE, VARLINK_PENDING_METHOD_MORE))
1601 0 : return -EBUSY;
1602 :
1603 0 : r = varlink_sanitize_parameters(¶meters);
1604 0 : if (r < 0)
1605 0 : return r;
1606 :
1607 0 : r = json_build(&m, JSON_BUILD_OBJECT(
1608 : JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
1609 : JSON_BUILD_PAIR("continues", JSON_BUILD_BOOLEAN(true))));
1610 0 : if (r < 0)
1611 0 : return r;
1612 :
1613 0 : r = varlink_enqueue_json(v, m);
1614 0 : if (r < 0)
1615 0 : return r;
1616 :
1617 : /* No state change, as more is coming */
1618 0 : return 1;
1619 : }
1620 :
1621 0 : int varlink_notifyb(Varlink *v, ...) {
1622 0 : _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1623 : va_list ap;
1624 : int r;
1625 :
1626 0 : assert_return(v, -EINVAL);
1627 :
1628 0 : va_start(ap, v);
1629 0 : r = json_buildv(¶meters, ap);
1630 0 : va_end(ap);
1631 :
1632 0 : if (r < 0)
1633 0 : return r;
1634 :
1635 0 : return varlink_notify(v, parameters);
1636 : }
1637 :
1638 2 : int varlink_bind_reply(Varlink *v, VarlinkReply callback) {
1639 2 : assert_return(v, -EINVAL);
1640 :
1641 2 : if (callback && v->reply_callback && callback != v->reply_callback)
1642 0 : return -EBUSY;
1643 :
1644 2 : v->reply_callback = callback;
1645 :
1646 2 : return 0;
1647 : }
1648 :
1649 0 : void* varlink_set_userdata(Varlink *v, void *userdata) {
1650 : void *old;
1651 :
1652 0 : assert_return(v, NULL);
1653 :
1654 0 : old = v->userdata;
1655 0 : v->userdata = userdata;
1656 :
1657 0 : return old;
1658 : }
1659 :
1660 0 : void* varlink_get_userdata(Varlink *v) {
1661 0 : assert_return(v, NULL);
1662 :
1663 0 : return v->userdata;
1664 : }
1665 :
1666 131 : static int varlink_acquire_ucred(Varlink *v) {
1667 : int r;
1668 :
1669 131 : assert(v);
1670 :
1671 131 : if (v->ucred_acquired)
1672 131 : return 0;
1673 :
1674 0 : r = getpeercred(v->fd, &v->ucred);
1675 0 : if (r < 0)
1676 0 : return r;
1677 :
1678 0 : v->ucred_acquired = true;
1679 0 : return 0;
1680 : }
1681 :
1682 131 : int varlink_get_peer_uid(Varlink *v, uid_t *ret) {
1683 : int r;
1684 :
1685 131 : assert_return(v, -EINVAL);
1686 131 : assert_return(ret, -EINVAL);
1687 :
1688 131 : r = varlink_acquire_ucred(v);
1689 131 : if (r < 0)
1690 0 : return r;
1691 :
1692 131 : if (!uid_is_valid(v->ucred.uid))
1693 0 : return -ENODATA;
1694 :
1695 131 : *ret = v->ucred.uid;
1696 131 : return 0;
1697 : }
1698 :
1699 0 : int varlink_get_peer_pid(Varlink *v, pid_t *ret) {
1700 : int r;
1701 :
1702 0 : assert_return(v, -EINVAL);
1703 0 : assert_return(ret, -EINVAL);
1704 :
1705 0 : r = varlink_acquire_ucred(v);
1706 0 : if (r < 0)
1707 0 : return r;
1708 :
1709 0 : if (!pid_is_valid(v->ucred.pid))
1710 0 : return -ENODATA;
1711 :
1712 0 : *ret = v->ucred.pid;
1713 0 : return 0;
1714 : }
1715 :
1716 0 : int varlink_set_relative_timeout(Varlink *v, usec_t timeout) {
1717 0 : assert_return(v, -EINVAL);
1718 0 : assert_return(timeout > 0, -EINVAL);
1719 :
1720 0 : v->timeout = timeout;
1721 0 : return 0;
1722 : }
1723 :
1724 0 : VarlinkServer *varlink_get_server(Varlink *v) {
1725 0 : assert_return(v, NULL);
1726 :
1727 0 : return v->server;
1728 : }
1729 :
1730 336 : int varlink_set_description(Varlink *v, const char *description) {
1731 336 : assert_return(v, -EINVAL);
1732 :
1733 336 : return free_and_strdup(&v->description, description);
1734 : }
1735 :
1736 465 : static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
1737 465 : Varlink *v = userdata;
1738 :
1739 465 : assert(s);
1740 465 : assert(v);
1741 :
1742 465 : handle_revents(v, revents);
1743 465 : (void) varlink_process(v);
1744 :
1745 465 : return 1;
1746 : }
1747 :
1748 0 : static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
1749 0 : Varlink *v = userdata;
1750 :
1751 0 : assert(s);
1752 0 : assert(v);
1753 :
1754 0 : (void) varlink_process(v);
1755 0 : return 1;
1756 : }
1757 :
1758 1539 : static int defer_callback(sd_event_source *s, void *userdata) {
1759 1539 : Varlink *v = userdata;
1760 :
1761 1539 : assert(s);
1762 1539 : assert(v);
1763 :
1764 1539 : (void) varlink_process(v);
1765 1539 : return 1;
1766 : }
1767 :
1768 368010 : static int prepare_callback(sd_event_source *s, void *userdata) {
1769 368010 : Varlink *v = userdata;
1770 : int r, e;
1771 : usec_t until;
1772 :
1773 368010 : assert(s);
1774 368010 : assert(v);
1775 :
1776 368010 : e = varlink_get_events(v);
1777 368010 : if (e < 0)
1778 0 : return e;
1779 :
1780 368010 : r = sd_event_source_set_io_events(v->io_event_source, e);
1781 368010 : if (r < 0)
1782 0 : return r;
1783 :
1784 368010 : r = varlink_get_timeout(v, &until);
1785 368010 : if (r < 0)
1786 0 : return r;
1787 368010 : if (r > 0) {
1788 1089 : r = sd_event_source_set_time(v->time_event_source, until);
1789 1089 : if (r < 0)
1790 0 : return r;
1791 : }
1792 :
1793 368010 : r = sd_event_source_set_enabled(v->time_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF);
1794 368010 : if (r < 0)
1795 0 : return r;
1796 :
1797 368010 : return 1;
1798 : }
1799 :
1800 132 : static int quit_callback(sd_event_source *event, void *userdata) {
1801 132 : Varlink *v = userdata;
1802 :
1803 132 : assert(event);
1804 132 : assert(v);
1805 :
1806 132 : varlink_flush(v);
1807 132 : varlink_close(v);
1808 :
1809 132 : return 1;
1810 : }
1811 :
1812 466 : int varlink_attach_event(Varlink *v, sd_event *e, int64_t priority) {
1813 : int r;
1814 :
1815 466 : assert_return(v, -EINVAL);
1816 466 : assert_return(!v->event, -EBUSY);
1817 :
1818 466 : if (e)
1819 466 : v->event = sd_event_ref(e);
1820 : else {
1821 0 : r = sd_event_default(&v->event);
1822 0 : if (r < 0)
1823 0 : return r;
1824 : }
1825 :
1826 466 : r = sd_event_add_time(v->event, &v->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, v);
1827 466 : if (r < 0)
1828 0 : goto fail;
1829 :
1830 466 : r = sd_event_source_set_priority(v->time_event_source, priority);
1831 466 : if (r < 0)
1832 0 : goto fail;
1833 :
1834 466 : (void) sd_event_source_set_description(v->time_event_source, "varlink-time");
1835 :
1836 466 : r = sd_event_add_exit(v->event, &v->quit_event_source, quit_callback, v);
1837 466 : if (r < 0)
1838 0 : goto fail;
1839 :
1840 466 : r = sd_event_source_set_priority(v->quit_event_source, priority);
1841 466 : if (r < 0)
1842 0 : goto fail;
1843 :
1844 466 : (void) sd_event_source_set_description(v->quit_event_source, "varlink-quit");
1845 :
1846 466 : r = sd_event_add_io(v->event, &v->io_event_source, v->fd, 0, io_callback, v);
1847 466 : if (r < 0)
1848 0 : goto fail;
1849 :
1850 466 : r = sd_event_source_set_prepare(v->io_event_source, prepare_callback);
1851 466 : if (r < 0)
1852 0 : goto fail;
1853 :
1854 466 : r = sd_event_source_set_priority(v->io_event_source, priority);
1855 466 : if (r < 0)
1856 0 : goto fail;
1857 :
1858 466 : (void) sd_event_source_set_description(v->io_event_source, "varlink-io");
1859 :
1860 466 : r = sd_event_add_defer(v->event, &v->defer_event_source, defer_callback, v);
1861 466 : if (r < 0)
1862 0 : goto fail;
1863 :
1864 466 : r = sd_event_source_set_priority(v->defer_event_source, priority);
1865 466 : if (r < 0)
1866 0 : goto fail;
1867 :
1868 466 : (void) sd_event_source_set_description(v->defer_event_source, "varlink-defer");
1869 :
1870 466 : return 0;
1871 :
1872 0 : fail:
1873 0 : varlink_detach_event(v);
1874 0 : return r;
1875 : }
1876 :
1877 0 : void varlink_detach_event(Varlink *v) {
1878 0 : if (!v)
1879 0 : return;
1880 :
1881 0 : varlink_detach_event_sources(v);
1882 :
1883 0 : v->event = sd_event_unref(v->event);
1884 : }
1885 :
1886 2 : sd_event *varlink_get_event(Varlink *v) {
1887 2 : assert_return(v, NULL);
1888 :
1889 2 : return v->event;
1890 : }
1891 :
1892 1 : int varlink_server_new(VarlinkServer **ret, VarlinkServerFlags flags) {
1893 : VarlinkServer *s;
1894 :
1895 1 : assert_return(ret, -EINVAL);
1896 1 : assert_return((flags & ~_VARLINK_SERVER_FLAGS_ALL) == 0, -EINVAL);
1897 :
1898 1 : s = new(VarlinkServer, 1);
1899 1 : if (!s)
1900 0 : return -ENOMEM;
1901 :
1902 1 : *s = (VarlinkServer) {
1903 : .n_ref = 1,
1904 : .flags = flags,
1905 1 : .connections_max = varlink_server_connections_max(NULL),
1906 1 : .connections_per_uid_max = varlink_server_connections_per_uid_max(NULL),
1907 : };
1908 :
1909 1 : *ret = s;
1910 1 : return 0;
1911 : }
1912 :
1913 1 : static VarlinkServer* varlink_server_destroy(VarlinkServer *s) {
1914 : char *m;
1915 :
1916 1 : if (!s)
1917 0 : return NULL;
1918 :
1919 1 : varlink_server_shutdown(s);
1920 :
1921 3 : while ((m = hashmap_steal_first_key(s->methods)))
1922 2 : free(m);
1923 :
1924 1 : hashmap_free(s->methods);
1925 1 : hashmap_free(s->by_uid);
1926 :
1927 1 : sd_event_unref(s->event);
1928 :
1929 1 : free(s->description);
1930 :
1931 1 : return mfree(s);
1932 : }
1933 :
1934 263 : DEFINE_TRIVIAL_REF_UNREF_FUNC(VarlinkServer, varlink_server, varlink_server_destroy);
1935 :
1936 131 : static int validate_connection(VarlinkServer *server, const struct ucred *ucred) {
1937 131 : int allowed = -1;
1938 :
1939 131 : assert(server);
1940 131 : assert(ucred);
1941 :
1942 131 : if (FLAGS_SET(server->flags, VARLINK_SERVER_ROOT_ONLY))
1943 0 : allowed = ucred->uid == 0;
1944 :
1945 131 : if (FLAGS_SET(server->flags, VARLINK_SERVER_MYSELF_ONLY))
1946 0 : allowed = allowed > 0 || ucred->uid == getuid();
1947 :
1948 131 : if (allowed == 0) { /* Allow access when it is explicitly allowed or when neither
1949 : * VARLINK_SERVER_ROOT_ONLY nor VARLINK_SERVER_MYSELF_ONLY are specified. */
1950 0 : varlink_server_log(server, "Unprivileged client attempted connection, refusing.");
1951 0 : return 0;
1952 : }
1953 :
1954 131 : if (server->n_connections >= server->connections_max) {
1955 0 : varlink_server_log(server, "Connection limit of %u reached, refusing.", server->connections_max);
1956 0 : return 0;
1957 : }
1958 :
1959 131 : if (FLAGS_SET(server->flags, VARLINK_SERVER_ACCOUNT_UID)) {
1960 : unsigned c;
1961 :
1962 131 : if (!uid_is_valid(ucred->uid)) {
1963 0 : varlink_server_log(server, "Client with invalid UID attempted connection, refusing.");
1964 0 : return 0;
1965 : }
1966 :
1967 131 : c = PTR_TO_UINT(hashmap_get(server->by_uid, UID_TO_PTR(ucred->uid)));
1968 131 : if (c >= server->connections_per_uid_max) {
1969 0 : varlink_server_log(server, "Per-UID connection limit of %u reached, refusing.",
1970 : server->connections_per_uid_max);
1971 0 : return 0;
1972 : }
1973 : }
1974 :
1975 131 : return 1;
1976 : }
1977 :
1978 131 : static int count_connection(VarlinkServer *server, struct ucred *ucred) {
1979 : unsigned c;
1980 : int r;
1981 :
1982 131 : assert(server);
1983 131 : assert(ucred);
1984 :
1985 131 : server->n_connections++;
1986 :
1987 131 : if (FLAGS_SET(server->flags, VARLINK_SERVER_ACCOUNT_UID)) {
1988 131 : r = hashmap_ensure_allocated(&server->by_uid, NULL);
1989 131 : if (r < 0)
1990 0 : return log_debug_errno(r, "Failed to allocate UID hash table: %m");
1991 :
1992 131 : c = PTR_TO_UINT(hashmap_get(server->by_uid, UID_TO_PTR(ucred->uid)));
1993 :
1994 131 : varlink_server_log(server, "Connections of user " UID_FMT ": %u (of %u max)",
1995 : ucred->uid, c, server->connections_per_uid_max);
1996 :
1997 131 : r = hashmap_replace(server->by_uid, UID_TO_PTR(ucred->uid), UINT_TO_PTR(c + 1));
1998 131 : if (r < 0)
1999 0 : return log_debug_errno(r, "Failed to increment counter in UID hash table: %m");
2000 : }
2001 :
2002 131 : return 0;
2003 : }
2004 :
2005 131 : int varlink_server_add_connection(VarlinkServer *server, int fd, Varlink **ret) {
2006 131 : _cleanup_(varlink_unrefp) Varlink *v = NULL;
2007 : bool ucred_acquired;
2008 : struct ucred ucred;
2009 : int r;
2010 :
2011 131 : assert_return(server, -EINVAL);
2012 131 : assert_return(fd >= 0, -EBADF);
2013 :
2014 131 : if ((server->flags & (VARLINK_SERVER_ROOT_ONLY|VARLINK_SERVER_ACCOUNT_UID)) != 0) {
2015 131 : r = getpeercred(fd, &ucred);
2016 131 : if (r < 0)
2017 0 : return varlink_server_log_errno(server, r, "Failed to acquire peer credentials of incoming socket, refusing: %m");
2018 :
2019 131 : ucred_acquired = true;
2020 :
2021 131 : r = validate_connection(server, &ucred);
2022 131 : if (r < 0)
2023 0 : return r;
2024 131 : if (r == 0)
2025 0 : return -EPERM;
2026 : } else
2027 0 : ucred_acquired = false;
2028 :
2029 131 : r = varlink_new(&v);
2030 131 : if (r < 0)
2031 0 : return varlink_server_log_errno(server, r, "Failed to allocate connection object: %m");
2032 :
2033 131 : r = count_connection(server, &ucred);
2034 131 : if (r < 0)
2035 0 : return r;
2036 :
2037 131 : v->fd = fd;
2038 131 : v->userdata = server->userdata;
2039 131 : if (ucred_acquired) {
2040 131 : v->ucred = ucred;
2041 131 : v->ucred_acquired = true;
2042 : }
2043 :
2044 131 : (void) asprintf(&v->description, "%s-%i", server->description ?: "varlink", v->fd);
2045 :
2046 : /* Link up the server and the connection, and take reference in both directions. Note that the
2047 : * reference on the connection is left dangling. It will be dropped when the connection is closed,
2048 : * which happens in varlink_close(), including in the event loop quit callback. */
2049 131 : v->server = varlink_server_ref(server);
2050 131 : varlink_ref(v);
2051 :
2052 131 : varlink_set_state(v, VARLINK_IDLE_SERVER);
2053 :
2054 131 : if (server->event) {
2055 131 : r = varlink_attach_event(v, server->event, server->event_priority);
2056 131 : if (r < 0) {
2057 0 : varlink_log_errno(v, r, "Failed to attach new connection: %m");
2058 0 : v->fd = -1; /* take the fd out of the connection again */
2059 0 : varlink_close(v);
2060 0 : return r;
2061 : }
2062 : }
2063 :
2064 131 : if (ret)
2065 131 : *ret = v;
2066 :
2067 131 : return 0;
2068 : }
2069 :
2070 131 : static int connect_callback(sd_event_source *source, int fd, uint32_t revents, void *userdata) {
2071 131 : VarlinkServerSocket *ss = userdata;
2072 131 : _cleanup_close_ int cfd = -1;
2073 131 : Varlink *v = NULL;
2074 : int r;
2075 :
2076 131 : assert(source);
2077 131 : assert(ss);
2078 :
2079 131 : varlink_server_log(ss->server, "New incoming connection.");
2080 :
2081 131 : cfd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
2082 131 : if (cfd < 0) {
2083 0 : if (ERRNO_IS_ACCEPT_AGAIN(errno))
2084 0 : return 0;
2085 :
2086 0 : return varlink_server_log_errno(ss->server, errno, "Failed to accept incoming socket: %m");
2087 : }
2088 :
2089 131 : r = varlink_server_add_connection(ss->server, cfd, &v);
2090 131 : if (r < 0)
2091 0 : return 0;
2092 :
2093 131 : TAKE_FD(cfd);
2094 :
2095 131 : if (ss->server->connect_callback) {
2096 131 : r = ss->server->connect_callback(ss->server, v, ss->server->userdata);
2097 131 : if (r < 0) {
2098 0 : varlink_log_errno(v, r, "Connection callback returned error, disconnecting client: %m");
2099 0 : varlink_close(v);
2100 0 : return 0;
2101 : }
2102 : }
2103 :
2104 131 : return 0;
2105 : }
2106 :
2107 1 : int varlink_server_listen_fd(VarlinkServer *s, int fd) {
2108 1 : _cleanup_free_ VarlinkServerSocket *ss = NULL;
2109 : int r;
2110 :
2111 1 : assert_return(s, -EINVAL);
2112 1 : assert_return(fd >= 0, -EBADF);
2113 :
2114 1 : r = fd_nonblock(fd, true);
2115 1 : if (r < 0)
2116 0 : return r;
2117 :
2118 1 : ss = new(VarlinkServerSocket, 1);
2119 1 : if (!ss)
2120 0 : return -ENOMEM;
2121 :
2122 1 : *ss = (VarlinkServerSocket) {
2123 : .server = s,
2124 : .fd = fd,
2125 : };
2126 :
2127 1 : if (s->event) {
2128 0 : _cleanup_(sd_event_source_unrefp) sd_event_source *es = NULL;
2129 :
2130 0 : r = sd_event_add_io(s->event, &es, fd, EPOLLIN, connect_callback, ss);
2131 0 : if (r < 0)
2132 0 : return r;
2133 :
2134 0 : r = sd_event_source_set_priority(ss->event_source, s->event_priority);
2135 0 : if (r < 0)
2136 0 : return r;
2137 : }
2138 :
2139 1 : LIST_PREPEND(sockets, s->sockets, TAKE_PTR(ss));
2140 1 : return 0;
2141 : }
2142 :
2143 1 : int varlink_server_listen_address(VarlinkServer *s, const char *address, mode_t m) {
2144 : union sockaddr_union sockaddr;
2145 1 : _cleanup_close_ int fd = -1;
2146 : int r;
2147 :
2148 1 : assert_return(s, -EINVAL);
2149 1 : assert_return(address, -EINVAL);
2150 1 : assert_return((m & ~0777) == 0, -EINVAL);
2151 :
2152 1 : r = sockaddr_un_set_path(&sockaddr.un, address);
2153 1 : if (r < 0)
2154 0 : return r;
2155 :
2156 1 : fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
2157 1 : if (fd < 0)
2158 0 : return -errno;
2159 :
2160 1 : (void) sockaddr_un_unlink(&sockaddr.un);
2161 :
2162 2 : RUN_WITH_UMASK(~m & 0777)
2163 1 : if (bind(fd, &sockaddr.sa, SOCKADDR_UN_LEN(sockaddr.un)) < 0)
2164 0 : return -errno;
2165 :
2166 1 : if (listen(fd, SOMAXCONN) < 0)
2167 0 : return -errno;
2168 :
2169 1 : r = varlink_server_listen_fd(s, fd);
2170 1 : if (r < 0)
2171 0 : return r;
2172 :
2173 1 : TAKE_FD(fd);
2174 1 : return 0;
2175 : }
2176 :
2177 0 : void* varlink_server_set_userdata(VarlinkServer *s, void *userdata) {
2178 : void *ret;
2179 :
2180 0 : assert_return(s, NULL);
2181 :
2182 0 : ret = s->userdata;
2183 0 : s->userdata = userdata;
2184 :
2185 0 : return ret;
2186 : }
2187 :
2188 0 : void* varlink_server_get_userdata(VarlinkServer *s) {
2189 0 : assert_return(s, NULL);
2190 :
2191 0 : return s->userdata;
2192 : }
2193 :
2194 1 : static VarlinkServerSocket* varlink_server_socket_destroy(VarlinkServerSocket *ss) {
2195 1 : if (!ss)
2196 0 : return NULL;
2197 :
2198 1 : if (ss->server)
2199 1 : LIST_REMOVE(sockets, ss->server->sockets, ss);
2200 :
2201 1 : sd_event_source_disable_unref(ss->event_source);
2202 :
2203 1 : free(ss->address);
2204 1 : safe_close(ss->fd);
2205 :
2206 1 : return mfree(ss);
2207 : }
2208 :
2209 1 : int varlink_server_shutdown(VarlinkServer *s) {
2210 1 : assert_return(s, -EINVAL);
2211 :
2212 2 : while (s->sockets)
2213 1 : varlink_server_socket_destroy(s->sockets);
2214 :
2215 1 : return 0;
2216 : }
2217 :
2218 1 : int varlink_server_attach_event(VarlinkServer *s, sd_event *e, int64_t priority) {
2219 : VarlinkServerSocket *ss;
2220 : int r;
2221 :
2222 1 : assert_return(s, -EINVAL);
2223 1 : assert_return(!s->event, -EBUSY);
2224 :
2225 1 : if (e)
2226 1 : s->event = sd_event_ref(e);
2227 : else {
2228 0 : r = sd_event_default(&s->event);
2229 0 : if (r < 0)
2230 0 : return r;
2231 : }
2232 :
2233 2 : LIST_FOREACH(sockets, ss, s->sockets) {
2234 1 : assert(!ss->event_source);
2235 :
2236 1 : r = sd_event_add_io(s->event, &ss->event_source, ss->fd, EPOLLIN, connect_callback, ss);
2237 1 : if (r < 0)
2238 0 : goto fail;
2239 :
2240 1 : r = sd_event_source_set_priority(ss->event_source, priority);
2241 1 : if (r < 0)
2242 0 : goto fail;
2243 : }
2244 :
2245 1 : s->event_priority = priority;
2246 1 : return 0;
2247 :
2248 0 : fail:
2249 0 : varlink_server_detach_event(s);
2250 0 : return r;
2251 : }
2252 :
2253 0 : int varlink_server_detach_event(VarlinkServer *s) {
2254 : VarlinkServerSocket *ss;
2255 :
2256 0 : assert_return(s, -EINVAL);
2257 :
2258 0 : LIST_FOREACH(sockets, ss, s->sockets) {
2259 :
2260 0 : if (!ss->event_source)
2261 0 : continue;
2262 :
2263 0 : (void) sd_event_source_set_enabled(ss->event_source, SD_EVENT_OFF);
2264 0 : ss->event_source = sd_event_source_unref(ss->event_source);
2265 : }
2266 :
2267 0 : sd_event_unref(s->event);
2268 0 : return 0;
2269 : }
2270 :
2271 0 : sd_event *varlink_server_get_event(VarlinkServer *s) {
2272 0 : assert_return(s, NULL);
2273 :
2274 0 : return s->event;
2275 : }
2276 :
2277 2 : int varlink_server_bind_method(VarlinkServer *s, const char *method, VarlinkMethod callback) {
2278 : char *m;
2279 : int r;
2280 :
2281 2 : assert_return(s, -EINVAL);
2282 2 : assert_return(method, -EINVAL);
2283 2 : assert_return(callback, -EINVAL);
2284 :
2285 2 : if (startswith(method, "org.varlink.service."))
2286 0 : return -EEXIST;
2287 :
2288 2 : r = hashmap_ensure_allocated(&s->methods, &string_hash_ops);
2289 2 : if (r < 0)
2290 0 : return r;
2291 :
2292 2 : m = strdup(method);
2293 2 : if (!m)
2294 0 : return -ENOMEM;
2295 :
2296 2 : r = hashmap_put(s->methods, m, callback);
2297 2 : if (r < 0) {
2298 0 : free(m);
2299 0 : return r;
2300 : }
2301 :
2302 2 : return 0;
2303 : }
2304 :
2305 0 : int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) {
2306 : va_list ap;
2307 0 : int r = 0;
2308 :
2309 0 : assert_return(s, -EINVAL);
2310 :
2311 0 : va_start(ap, s);
2312 0 : for (;;) {
2313 : VarlinkMethod callback;
2314 : const char *method;
2315 :
2316 0 : method = va_arg(ap, const char *);
2317 0 : if (!method)
2318 0 : break;
2319 :
2320 0 : callback = va_arg(ap, VarlinkMethod);
2321 :
2322 0 : r = varlink_server_bind_method(s, method, callback);
2323 0 : if (r < 0)
2324 0 : break;
2325 : }
2326 0 : va_end(ap);
2327 :
2328 0 : return r;
2329 : }
2330 :
2331 1 : int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) {
2332 1 : assert_return(s, -EINVAL);
2333 :
2334 1 : if (callback && s->connect_callback && callback != s->connect_callback)
2335 0 : return -EBUSY;
2336 :
2337 1 : s->connect_callback = callback;
2338 1 : return 0;
2339 : }
2340 :
2341 2 : unsigned varlink_server_connections_max(VarlinkServer *s) {
2342 : struct rlimit rl;
2343 :
2344 : /* If a server is specified, return the setting for that server, otherwise the default value */
2345 2 : if (s)
2346 0 : return s->connections_max;
2347 :
2348 2 : assert_se(getrlimit(RLIMIT_NOFILE, &rl) >= 0);
2349 :
2350 : /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */
2351 2 : if (VARLINK_DEFAULT_CONNECTIONS_MAX > rl.rlim_cur / 4 * 3)
2352 2 : return rl.rlim_cur / 4 * 3;
2353 :
2354 0 : return VARLINK_DEFAULT_CONNECTIONS_MAX;
2355 : }
2356 :
2357 1 : unsigned varlink_server_connections_per_uid_max(VarlinkServer *s) {
2358 : unsigned m;
2359 :
2360 1 : if (s)
2361 0 : return s->connections_per_uid_max;
2362 :
2363 : /* Make sure to never use up more than ¾th of available connections for a single user */
2364 1 : m = varlink_server_connections_max(NULL);
2365 1 : if (VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX > m)
2366 1 : return m / 4 * 3;
2367 :
2368 0 : return VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX;
2369 : }
2370 :
2371 0 : int varlink_server_set_connections_per_uid_max(VarlinkServer *s, unsigned m) {
2372 0 : assert_return(s, -EINVAL);
2373 0 : assert_return(m > 0, -EINVAL);
2374 :
2375 0 : s->connections_per_uid_max = m;
2376 0 : return 0;
2377 : }
2378 :
2379 1 : int varlink_server_set_connections_max(VarlinkServer *s, unsigned m) {
2380 1 : assert_return(s, -EINVAL);
2381 1 : assert_return(m > 0, -EINVAL);
2382 :
2383 1 : s->connections_max = m;
2384 1 : return 0;
2385 : }
2386 :
2387 1 : int varlink_server_set_description(VarlinkServer *s, const char *description) {
2388 1 : assert_return(s, -EINVAL);
2389 :
2390 1 : return free_and_strdup(&s->description, description);
2391 : }
|