Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1+ */
2 :
3 : #include <curl/curl.h>
4 : #include <stdbool.h>
5 :
6 : #include "sd-daemon.h"
7 :
8 : #include "alloc-util.h"
9 : #include "journal-upload.h"
10 : #include "log.h"
11 : #include "string-util.h"
12 : #include "utf8.h"
13 : #include "util.h"
14 :
15 : /**
16 : * Write up to size bytes to buf. Return negative on error, and number of
17 : * bytes written otherwise. The last case is a kind of an error too.
18 : */
19 0 : static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
20 : int r;
21 0 : size_t pos = 0;
22 :
23 0 : assert(size <= SSIZE_MAX);
24 :
25 : for (;;) {
26 :
27 0 : switch(u->entry_state) {
28 0 : case ENTRY_CURSOR: {
29 0 : u->current_cursor = mfree(u->current_cursor);
30 :
31 0 : r = sd_journal_get_cursor(u->journal, &u->current_cursor);
32 0 : if (r < 0)
33 0 : return log_error_errno(r, "Failed to get cursor: %m");
34 :
35 0 : r = snprintf(buf + pos, size - pos,
36 : "__CURSOR=%s\n", u->current_cursor);
37 0 : assert(r >= 0);
38 0 : if ((size_t) r > size - pos)
39 : /* not enough space */
40 0 : return pos;
41 :
42 0 : u->entry_state++;
43 :
44 0 : if (pos + r == size) {
45 : /* exactly one character short, but we don't need it */
46 0 : buf[size - 1] = '\n';
47 0 : return size;
48 : }
49 :
50 0 : pos += r;
51 : }
52 : _fallthrough_;
53 0 : case ENTRY_REALTIME: {
54 : usec_t realtime;
55 :
56 0 : r = sd_journal_get_realtime_usec(u->journal, &realtime);
57 0 : if (r < 0)
58 0 : return log_error_errno(r, "Failed to get realtime timestamp: %m");
59 :
60 0 : r = snprintf(buf + pos, size - pos,
61 : "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
62 0 : assert(r >= 0);
63 0 : if ((size_t) r > size - pos)
64 : /* not enough space */
65 0 : return pos;
66 :
67 0 : u->entry_state++;
68 :
69 0 : if (r + pos == size) {
70 : /* exactly one character short, but we don't need it */
71 0 : buf[size - 1] = '\n';
72 0 : return size;
73 : }
74 :
75 0 : pos += r;
76 : }
77 : _fallthrough_;
78 0 : case ENTRY_MONOTONIC: {
79 : usec_t monotonic;
80 : sd_id128_t boot_id;
81 :
82 0 : r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
83 0 : if (r < 0)
84 0 : return log_error_errno(r, "Failed to get monotonic timestamp: %m");
85 :
86 0 : r = snprintf(buf + pos, size - pos,
87 : "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
88 0 : assert(r >= 0);
89 0 : if ((size_t) r > size - pos)
90 : /* not enough space */
91 0 : return pos;
92 :
93 0 : u->entry_state++;
94 :
95 0 : if (r + pos == size) {
96 : /* exactly one character short, but we don't need it */
97 0 : buf[size - 1] = '\n';
98 0 : return size;
99 : }
100 :
101 0 : pos += r;
102 : }
103 : _fallthrough_;
104 0 : case ENTRY_BOOT_ID: {
105 : sd_id128_t boot_id;
106 : char sid[33];
107 :
108 0 : r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
109 0 : if (r < 0)
110 0 : return log_error_errno(r, "Failed to get monotonic timestamp: %m");
111 :
112 0 : r = snprintf(buf + pos, size - pos,
113 : "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
114 0 : assert(r >= 0);
115 0 : if ((size_t) r > size - pos)
116 : /* not enough space */
117 0 : return pos;
118 :
119 0 : u->entry_state++;
120 :
121 0 : if (r + pos == size) {
122 : /* exactly one character short, but we don't need it */
123 0 : buf[size - 1] = '\n';
124 0 : return size;
125 : }
126 :
127 0 : pos += r;
128 : }
129 : _fallthrough_;
130 : case ENTRY_NEW_FIELD: {
131 0 : u->field_pos = 0;
132 :
133 0 : r = sd_journal_enumerate_data(u->journal,
134 : &u->field_data,
135 : &u->field_length);
136 0 : if (r < 0)
137 0 : return log_error_errno(r, "Failed to move to next field in entry: %m");
138 0 : else if (r == 0) {
139 0 : u->entry_state = ENTRY_OUTRO;
140 0 : continue;
141 : }
142 :
143 : /* We already printed the boot id from the data in
144 : * the header, hence let's suppress it here */
145 0 : if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
146 0 : continue;
147 :
148 0 : if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
149 0 : u->entry_state = ENTRY_BINARY_FIELD_START;
150 0 : continue;
151 : }
152 :
153 0 : u->entry_state++;
154 : }
155 : _fallthrough_;
156 0 : case ENTRY_TEXT_FIELD:
157 : case ENTRY_BINARY_FIELD: {
158 : bool done;
159 : size_t tocopy;
160 :
161 0 : done = size - pos > u->field_length - u->field_pos;
162 0 : if (done)
163 0 : tocopy = u->field_length - u->field_pos;
164 : else
165 0 : tocopy = size - pos;
166 :
167 0 : memcpy(buf + pos,
168 0 : (char*) u->field_data + u->field_pos,
169 : tocopy);
170 :
171 0 : if (done) {
172 0 : buf[pos + tocopy] = '\n';
173 0 : pos += tocopy + 1;
174 0 : u->entry_state = ENTRY_NEW_FIELD;
175 0 : continue;
176 : } else {
177 0 : u->field_pos += tocopy;
178 0 : return size;
179 : }
180 : }
181 :
182 0 : case ENTRY_BINARY_FIELD_START: {
183 : const char *c;
184 : size_t len;
185 :
186 0 : c = memchr(u->field_data, '=', u->field_length);
187 0 : if (!c || c == u->field_data)
188 0 : return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
189 : "Invalid field.");
190 :
191 0 : len = c - (const char*)u->field_data;
192 :
193 : /* need space for label + '\n' */
194 0 : if (size - pos < len + 1)
195 0 : return pos;
196 :
197 0 : memcpy(buf + pos, u->field_data, len);
198 0 : buf[pos + len] = '\n';
199 0 : pos += len + 1;
200 :
201 0 : u->field_pos = len + 1;
202 0 : u->entry_state++;
203 : }
204 : _fallthrough_;
205 0 : case ENTRY_BINARY_FIELD_SIZE: {
206 : uint64_t le64;
207 :
208 : /* need space for uint64_t */
209 0 : if (size - pos < 8)
210 0 : return pos;
211 :
212 0 : le64 = htole64(u->field_length - u->field_pos);
213 0 : memcpy(buf + pos, &le64, 8);
214 0 : pos += 8;
215 :
216 0 : u->entry_state++;
217 0 : continue;
218 : }
219 :
220 0 : case ENTRY_OUTRO:
221 : /* need space for '\n' */
222 0 : if (size - pos < 1)
223 0 : return pos;
224 :
225 0 : buf[pos++] = '\n';
226 0 : u->entry_state++;
227 0 : u->entries_sent++;
228 :
229 0 : return pos;
230 :
231 0 : default:
232 0 : assert_not_reached("WTF?");
233 : }
234 : }
235 : assert_not_reached("WTF?");
236 : }
237 :
238 0 : static void check_update_watchdog(Uploader *u) {
239 : usec_t after;
240 : usec_t elapsed_time;
241 :
242 0 : if (u->watchdog_usec <= 0)
243 0 : return;
244 :
245 0 : after = now(CLOCK_MONOTONIC);
246 0 : elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
247 0 : if (elapsed_time > u->watchdog_usec / 2) {
248 0 : log_debug("Update watchdog timer");
249 0 : sd_notify(false, "WATCHDOG=1");
250 0 : u->watchdog_timestamp = after;
251 : }
252 : }
253 :
254 0 : static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
255 0 : Uploader *u = userp;
256 : int r;
257 : sd_journal *j;
258 0 : size_t filled = 0;
259 : ssize_t w;
260 :
261 0 : assert(u);
262 0 : assert(nmemb <= SSIZE_MAX / size);
263 :
264 0 : check_update_watchdog(u);
265 :
266 0 : j = u->journal;
267 :
268 0 : while (j && filled < size * nmemb) {
269 0 : if (u->entry_state == ENTRY_DONE) {
270 0 : r = sd_journal_next(j);
271 0 : if (r < 0) {
272 0 : log_error_errno(r, "Failed to move to next entry in journal: %m");
273 0 : return CURL_READFUNC_ABORT;
274 0 : } else if (r == 0) {
275 0 : if (u->input_event)
276 0 : log_debug("No more entries, waiting for journal.");
277 : else {
278 0 : log_info("No more entries, closing journal.");
279 0 : close_journal_input(u);
280 : }
281 :
282 0 : u->uploading = false;
283 :
284 0 : break;
285 : }
286 :
287 0 : u->entry_state = ENTRY_CURSOR;
288 : }
289 :
290 0 : w = write_entry((char*)buf + filled, size * nmemb - filled, u);
291 0 : if (w < 0)
292 0 : return CURL_READFUNC_ABORT;
293 0 : filled += w;
294 :
295 0 : if (filled == 0) {
296 0 : log_error("Buffer space is too small to write entry.");
297 0 : return CURL_READFUNC_ABORT;
298 0 : } else if (u->entry_state != ENTRY_DONE)
299 : /* This means that all available space was used up */
300 0 : break;
301 :
302 0 : log_debug("Entry %zu (%s) has been uploaded.",
303 : u->entries_sent, u->current_cursor);
304 : }
305 :
306 0 : return filled;
307 : }
308 :
309 4 : void close_journal_input(Uploader *u) {
310 4 : assert(u);
311 :
312 4 : if (u->journal) {
313 0 : log_debug("Closing journal input.");
314 :
315 0 : sd_journal_close(u->journal);
316 0 : u->journal = NULL;
317 : }
318 4 : u->timeout = 0;
319 4 : }
320 :
321 0 : static int process_journal_input(Uploader *u, int skip) {
322 : int r;
323 :
324 0 : if (u->uploading)
325 0 : return 0;
326 :
327 0 : r = sd_journal_next_skip(u->journal, skip);
328 0 : if (r < 0)
329 0 : return log_error_errno(r, "Failed to skip to next entry: %m");
330 0 : else if (r < skip)
331 0 : return 0;
332 :
333 : /* have data */
334 0 : u->entry_state = ENTRY_CURSOR;
335 0 : return start_upload(u, journal_input_callback, u);
336 : }
337 :
338 0 : int check_journal_input(Uploader *u) {
339 0 : if (u->input_event) {
340 : int r;
341 :
342 0 : r = sd_journal_process(u->journal);
343 0 : if (r < 0) {
344 0 : log_error_errno(r, "Failed to process journal: %m");
345 0 : close_journal_input(u);
346 0 : return r;
347 : }
348 :
349 0 : if (r == SD_JOURNAL_NOP)
350 0 : return 0;
351 : }
352 :
353 0 : return process_journal_input(u, 1);
354 : }
355 :
356 0 : static int dispatch_journal_input(sd_event_source *event,
357 : int fd,
358 : uint32_t revents,
359 : void *userp) {
360 0 : Uploader *u = userp;
361 :
362 0 : assert(u);
363 :
364 0 : if (u->uploading)
365 0 : return 0;
366 :
367 0 : log_debug("Detected journal input, checking for new data.");
368 0 : return check_journal_input(u);
369 : }
370 :
371 0 : int open_journal_for_upload(Uploader *u,
372 : sd_journal *j,
373 : const char *cursor,
374 : bool after_cursor,
375 : bool follow) {
376 : int fd, r, events;
377 :
378 0 : u->journal = j;
379 :
380 0 : sd_journal_set_data_threshold(j, 0);
381 :
382 0 : if (follow) {
383 0 : fd = sd_journal_get_fd(j);
384 0 : if (fd < 0)
385 0 : return log_error_errno(fd, "sd_journal_get_fd failed: %m");
386 :
387 0 : events = sd_journal_get_events(j);
388 :
389 0 : r = sd_journal_reliable_fd(j);
390 0 : assert(r >= 0);
391 0 : if (r > 0)
392 0 : u->timeout = -1;
393 : else
394 0 : u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
395 :
396 0 : r = sd_event_add_io(u->events, &u->input_event,
397 : fd, events, dispatch_journal_input, u);
398 0 : if (r < 0)
399 0 : return log_error_errno(r, "Failed to register input event: %m");
400 :
401 0 : log_debug("Listening for journal events on fd:%d, timeout %d",
402 : fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
403 : } else
404 0 : log_debug("Not listening for journal events.");
405 :
406 0 : if (cursor) {
407 0 : r = sd_journal_seek_cursor(j, cursor);
408 0 : if (r < 0)
409 0 : return log_error_errno(r, "Failed to seek to cursor %s: %m",
410 : cursor);
411 : }
412 :
413 0 : return process_journal_input(u, 1 + !!after_cursor);
414 : }
|