Branch data Line data Source code
1 : : /* SPDX-License-Identifier: LGPL-2.1+ */
2 : :
3 : : #include <curl/curl.h>
4 : : #include <stdbool.h>
5 : :
6 : : #include "sd-daemon.h"
7 : :
8 : : #include "alloc-util.h"
9 : : #include "journal-upload.h"
10 : : #include "log.h"
11 : : #include "string-util.h"
12 : : #include "utf8.h"
13 : : #include "util.h"
14 : :
15 : : /**
16 : : * Write up to size bytes to buf. Return negative on error, and number of
17 : : * bytes written otherwise. The last case is a kind of an error too.
18 : : */
19 : 0 : static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
20 : : int r;
21 : 0 : size_t pos = 0;
22 : :
23 [ # # ]: 0 : assert(size <= SSIZE_MAX);
24 : :
25 : : for (;;) {
26 : :
27 [ # # # # : 0 : switch(u->entry_state) {
# # # # #
# ]
28 : 0 : case ENTRY_CURSOR: {
29 : 0 : u->current_cursor = mfree(u->current_cursor);
30 : :
31 : 0 : r = sd_journal_get_cursor(u->journal, &u->current_cursor);
32 [ # # ]: 0 : if (r < 0)
33 [ # # ]: 0 : return log_error_errno(r, "Failed to get cursor: %m");
34 : :
35 : 0 : r = snprintf(buf + pos, size - pos,
36 : : "__CURSOR=%s\n", u->current_cursor);
37 [ # # ]: 0 : assert(r >= 0);
38 [ # # ]: 0 : if ((size_t) r > size - pos)
39 : : /* not enough space */
40 : 0 : return pos;
41 : :
42 : 0 : u->entry_state++;
43 : :
44 [ # # ]: 0 : if (pos + r == size) {
45 : : /* exactly one character short, but we don't need it */
46 : 0 : buf[size - 1] = '\n';
47 : 0 : return size;
48 : : }
49 : :
50 : 0 : pos += r;
51 : : }
52 : : _fallthrough_;
53 : 0 : case ENTRY_REALTIME: {
54 : : usec_t realtime;
55 : :
56 : 0 : r = sd_journal_get_realtime_usec(u->journal, &realtime);
57 [ # # ]: 0 : if (r < 0)
58 [ # # ]: 0 : return log_error_errno(r, "Failed to get realtime timestamp: %m");
59 : :
60 : 0 : r = snprintf(buf + pos, size - pos,
61 : : "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
62 [ # # ]: 0 : assert(r >= 0);
63 [ # # ]: 0 : if ((size_t) r > size - pos)
64 : : /* not enough space */
65 : 0 : return pos;
66 : :
67 : 0 : u->entry_state++;
68 : :
69 [ # # ]: 0 : if (r + pos == size) {
70 : : /* exactly one character short, but we don't need it */
71 : 0 : buf[size - 1] = '\n';
72 : 0 : return size;
73 : : }
74 : :
75 : 0 : pos += r;
76 : : }
77 : : _fallthrough_;
78 : 0 : case ENTRY_MONOTONIC: {
79 : : usec_t monotonic;
80 : : sd_id128_t boot_id;
81 : :
82 : 0 : r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
83 [ # # ]: 0 : if (r < 0)
84 [ # # ]: 0 : return log_error_errno(r, "Failed to get monotonic timestamp: %m");
85 : :
86 : 0 : r = snprintf(buf + pos, size - pos,
87 : : "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
88 [ # # ]: 0 : assert(r >= 0);
89 [ # # ]: 0 : if ((size_t) r > size - pos)
90 : : /* not enough space */
91 : 0 : return pos;
92 : :
93 : 0 : u->entry_state++;
94 : :
95 [ # # ]: 0 : if (r + pos == size) {
96 : : /* exactly one character short, but we don't need it */
97 : 0 : buf[size - 1] = '\n';
98 : 0 : return size;
99 : : }
100 : :
101 : 0 : pos += r;
102 : : }
103 : : _fallthrough_;
104 : 0 : case ENTRY_BOOT_ID: {
105 : : sd_id128_t boot_id;
106 : : char sid[33];
107 : :
108 : 0 : r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
109 [ # # ]: 0 : if (r < 0)
110 [ # # ]: 0 : return log_error_errno(r, "Failed to get monotonic timestamp: %m");
111 : :
112 : 0 : r = snprintf(buf + pos, size - pos,
113 : : "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
114 [ # # ]: 0 : assert(r >= 0);
115 [ # # ]: 0 : if ((size_t) r > size - pos)
116 : : /* not enough space */
117 : 0 : return pos;
118 : :
119 : 0 : u->entry_state++;
120 : :
121 [ # # ]: 0 : if (r + pos == size) {
122 : : /* exactly one character short, but we don't need it */
123 : 0 : buf[size - 1] = '\n';
124 : 0 : return size;
125 : : }
126 : :
127 : 0 : pos += r;
128 : : }
129 : : _fallthrough_;
130 : : case ENTRY_NEW_FIELD: {
131 : 0 : u->field_pos = 0;
132 : :
133 : 0 : r = sd_journal_enumerate_data(u->journal,
134 : : &u->field_data,
135 : : &u->field_length);
136 [ # # ]: 0 : if (r < 0)
137 [ # # ]: 0 : return log_error_errno(r, "Failed to move to next field in entry: %m");
138 [ # # ]: 0 : else if (r == 0) {
139 : 0 : u->entry_state = ENTRY_OUTRO;
140 : 0 : continue;
141 : : }
142 : :
143 : : /* We already printed the boot id from the data in
144 : : * the header, hence let's suppress it here */
145 [ # # ]: 0 : if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
146 : 0 : continue;
147 : :
148 [ # # ]: 0 : if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
149 : 0 : u->entry_state = ENTRY_BINARY_FIELD_START;
150 : 0 : continue;
151 : : }
152 : :
153 : 0 : u->entry_state++;
154 : : }
155 : : _fallthrough_;
156 : 0 : case ENTRY_TEXT_FIELD:
157 : : case ENTRY_BINARY_FIELD: {
158 : : bool done;
159 : : size_t tocopy;
160 : :
161 : 0 : done = size - pos > u->field_length - u->field_pos;
162 [ # # ]: 0 : if (done)
163 : 0 : tocopy = u->field_length - u->field_pos;
164 : : else
165 : 0 : tocopy = size - pos;
166 : :
167 : 0 : memcpy(buf + pos,
168 : 0 : (char*) u->field_data + u->field_pos,
169 : : tocopy);
170 : :
171 [ # # ]: 0 : if (done) {
172 : 0 : buf[pos + tocopy] = '\n';
173 : 0 : pos += tocopy + 1;
174 : 0 : u->entry_state = ENTRY_NEW_FIELD;
175 : 0 : continue;
176 : : } else {
177 : 0 : u->field_pos += tocopy;
178 : 0 : return size;
179 : : }
180 : : }
181 : :
182 : 0 : case ENTRY_BINARY_FIELD_START: {
183 : : const char *c;
184 : : size_t len;
185 : :
186 : 0 : c = memchr(u->field_data, '=', u->field_length);
187 [ # # # # ]: 0 : if (!c || c == u->field_data)
188 [ # # ]: 0 : return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
189 : : "Invalid field.");
190 : :
191 : 0 : len = c - (const char*)u->field_data;
192 : :
193 : : /* need space for label + '\n' */
194 [ # # ]: 0 : if (size - pos < len + 1)
195 : 0 : return pos;
196 : :
197 : 0 : memcpy(buf + pos, u->field_data, len);
198 : 0 : buf[pos + len] = '\n';
199 : 0 : pos += len + 1;
200 : :
201 : 0 : u->field_pos = len + 1;
202 : 0 : u->entry_state++;
203 : : }
204 : : _fallthrough_;
205 : 0 : case ENTRY_BINARY_FIELD_SIZE: {
206 : : uint64_t le64;
207 : :
208 : : /* need space for uint64_t */
209 [ # # ]: 0 : if (size - pos < 8)
210 : 0 : return pos;
211 : :
212 : 0 : le64 = htole64(u->field_length - u->field_pos);
213 : 0 : memcpy(buf + pos, &le64, 8);
214 : 0 : pos += 8;
215 : :
216 : 0 : u->entry_state++;
217 : 0 : continue;
218 : : }
219 : :
220 : 0 : case ENTRY_OUTRO:
221 : : /* need space for '\n' */
222 [ # # ]: 0 : if (size - pos < 1)
223 : 0 : return pos;
224 : :
225 : 0 : buf[pos++] = '\n';
226 : 0 : u->entry_state++;
227 : 0 : u->entries_sent++;
228 : :
229 : 0 : return pos;
230 : :
231 : 0 : default:
232 : 0 : assert_not_reached("WTF?");
233 : : }
234 : : }
235 : : assert_not_reached("WTF?");
236 : : }
237 : :
238 : 0 : static void check_update_watchdog(Uploader *u) {
239 : : usec_t after;
240 : : usec_t elapsed_time;
241 : :
242 [ # # ]: 0 : if (u->watchdog_usec <= 0)
243 : 0 : return;
244 : :
245 : 0 : after = now(CLOCK_MONOTONIC);
246 : 0 : elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
247 [ # # ]: 0 : if (elapsed_time > u->watchdog_usec / 2) {
248 [ # # ]: 0 : log_debug("Update watchdog timer");
249 : 0 : sd_notify(false, "WATCHDOG=1");
250 : 0 : u->watchdog_timestamp = after;
251 : : }
252 : : }
253 : :
254 : 0 : static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
255 : 0 : Uploader *u = userp;
256 : : int r;
257 : : sd_journal *j;
258 : 0 : size_t filled = 0;
259 : : ssize_t w;
260 : :
261 [ # # ]: 0 : assert(u);
262 [ # # ]: 0 : assert(nmemb <= SSIZE_MAX / size);
263 : :
264 : 0 : check_update_watchdog(u);
265 : :
266 : 0 : j = u->journal;
267 : :
268 [ # # # # ]: 0 : while (j && filled < size * nmemb) {
269 [ # # ]: 0 : if (u->entry_state == ENTRY_DONE) {
270 : 0 : r = sd_journal_next(j);
271 [ # # ]: 0 : if (r < 0) {
272 [ # # ]: 0 : log_error_errno(r, "Failed to move to next entry in journal: %m");
273 : 0 : return CURL_READFUNC_ABORT;
274 [ # # ]: 0 : } else if (r == 0) {
275 [ # # ]: 0 : if (u->input_event)
276 [ # # ]: 0 : log_debug("No more entries, waiting for journal.");
277 : : else {
278 [ # # ]: 0 : log_info("No more entries, closing journal.");
279 : 0 : close_journal_input(u);
280 : : }
281 : :
282 : 0 : u->uploading = false;
283 : :
284 : 0 : break;
285 : : }
286 : :
287 : 0 : u->entry_state = ENTRY_CURSOR;
288 : : }
289 : :
290 : 0 : w = write_entry((char*)buf + filled, size * nmemb - filled, u);
291 [ # # ]: 0 : if (w < 0)
292 : 0 : return CURL_READFUNC_ABORT;
293 : 0 : filled += w;
294 : :
295 [ # # ]: 0 : if (filled == 0) {
296 [ # # ]: 0 : log_error("Buffer space is too small to write entry.");
297 : 0 : return CURL_READFUNC_ABORT;
298 [ # # ]: 0 : } else if (u->entry_state != ENTRY_DONE)
299 : : /* This means that all available space was used up */
300 : 0 : break;
301 : :
302 [ # # ]: 0 : log_debug("Entry %zu (%s) has been uploaded.",
303 : : u->entries_sent, u->current_cursor);
304 : : }
305 : :
306 : 0 : return filled;
307 : : }
308 : :
309 : 16 : void close_journal_input(Uploader *u) {
310 [ - + ]: 16 : assert(u);
311 : :
312 [ - + ]: 16 : if (u->journal) {
313 [ # # ]: 0 : log_debug("Closing journal input.");
314 : :
315 : 0 : sd_journal_close(u->journal);
316 : 0 : u->journal = NULL;
317 : : }
318 : 16 : u->timeout = 0;
319 : 16 : }
320 : :
321 : 0 : static int process_journal_input(Uploader *u, int skip) {
322 : : int r;
323 : :
324 [ # # ]: 0 : if (u->uploading)
325 : 0 : return 0;
326 : :
327 : 0 : r = sd_journal_next_skip(u->journal, skip);
328 [ # # ]: 0 : if (r < 0)
329 [ # # ]: 0 : return log_error_errno(r, "Failed to skip to next entry: %m");
330 [ # # ]: 0 : else if (r < skip)
331 : 0 : return 0;
332 : :
333 : : /* have data */
334 : 0 : u->entry_state = ENTRY_CURSOR;
335 : 0 : return start_upload(u, journal_input_callback, u);
336 : : }
337 : :
338 : 0 : int check_journal_input(Uploader *u) {
339 [ # # ]: 0 : if (u->input_event) {
340 : : int r;
341 : :
342 : 0 : r = sd_journal_process(u->journal);
343 [ # # ]: 0 : if (r < 0) {
344 [ # # ]: 0 : log_error_errno(r, "Failed to process journal: %m");
345 : 0 : close_journal_input(u);
346 : 0 : return r;
347 : : }
348 : :
349 [ # # ]: 0 : if (r == SD_JOURNAL_NOP)
350 : 0 : return 0;
351 : : }
352 : :
353 : 0 : return process_journal_input(u, 1);
354 : : }
355 : :
356 : 0 : static int dispatch_journal_input(sd_event_source *event,
357 : : int fd,
358 : : uint32_t revents,
359 : : void *userp) {
360 : 0 : Uploader *u = userp;
361 : :
362 [ # # ]: 0 : assert(u);
363 : :
364 [ # # ]: 0 : if (u->uploading)
365 : 0 : return 0;
366 : :
367 [ # # ]: 0 : log_debug("Detected journal input, checking for new data.");
368 : 0 : return check_journal_input(u);
369 : : }
370 : :
371 : 0 : int open_journal_for_upload(Uploader *u,
372 : : sd_journal *j,
373 : : const char *cursor,
374 : : bool after_cursor,
375 : : bool follow) {
376 : : int fd, r, events;
377 : :
378 : 0 : u->journal = j;
379 : :
380 : 0 : sd_journal_set_data_threshold(j, 0);
381 : :
382 [ # # ]: 0 : if (follow) {
383 : 0 : fd = sd_journal_get_fd(j);
384 [ # # ]: 0 : if (fd < 0)
385 [ # # ]: 0 : return log_error_errno(fd, "sd_journal_get_fd failed: %m");
386 : :
387 : 0 : events = sd_journal_get_events(j);
388 : :
389 : 0 : r = sd_journal_reliable_fd(j);
390 [ # # ]: 0 : assert(r >= 0);
391 [ # # ]: 0 : if (r > 0)
392 : 0 : u->timeout = -1;
393 : : else
394 : 0 : u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
395 : :
396 : 0 : r = sd_event_add_io(u->events, &u->input_event,
397 : : fd, events, dispatch_journal_input, u);
398 [ # # ]: 0 : if (r < 0)
399 [ # # ]: 0 : return log_error_errno(r, "Failed to register input event: %m");
400 : :
401 [ # # # # ]: 0 : log_debug("Listening for journal events on fd:%d, timeout %d",
402 : : fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
403 : : } else
404 [ # # ]: 0 : log_debug("Not listening for journal events.");
405 : :
406 [ # # ]: 0 : if (cursor) {
407 : 0 : r = sd_journal_seek_cursor(j, cursor);
408 [ # # ]: 0 : if (r < 0)
409 [ # # ]: 0 : return log_error_errno(r, "Failed to seek to cursor %s: %m",
410 : : cursor);
411 : : }
412 : :
413 [ # # ]: 0 : return process_journal_input(u, 1 + !!after_cursor);
414 : : }
|