Branch data Line data Source code
1 : : /* SPDX-License-Identifier: LGPL-2.1+ */
2 : :
3 : : #include <errno.h>
4 : : #include <unistd.h>
5 : :
6 : : #include "alloc-util.h"
7 : : #include "errno-util.h"
8 : : #include "escape.h"
9 : : #include "fd-util.h"
10 : : #include "io-util.h"
11 : : #include "journal-file.h"
12 : : #include "journal-importer.h"
13 : : #include "journal-util.h"
14 : : #include "parse-util.h"
15 : : #include "string-util.h"
16 : : #include "unaligned.h"
17 : :
18 : : enum {
19 : : IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
20 : : IMPORTER_STATE_DATA_START, /* reading binary data header */
21 : : IMPORTER_STATE_DATA, /* reading binary data */
22 : : IMPORTER_STATE_DATA_FINISH, /* expecting newline */
23 : : IMPORTER_STATE_EOF, /* done */
24 : : };
25 : :
26 : 8 : void journal_importer_cleanup(JournalImporter *imp) {
27 [ + - + - ]: 8 : if (imp->fd >= 0 && !imp->passive_fd) {
28 [ + - - + ]: 8 : log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
29 : 8 : safe_close(imp->fd);
30 : : }
31 : :
32 : 8 : free(imp->name);
33 : 8 : free(imp->buf);
34 : 8 : iovw_free_contents(&imp->iovw, false);
35 : 8 : }
36 : :
37 : 8 : static char* realloc_buffer(JournalImporter *imp, size_t size) {
38 : 8 : char *b, *old = imp->buf;
39 : :
40 : 8 : b = GREEDY_REALLOC(imp->buf, imp->size, size);
41 [ - + ]: 8 : if (!b)
42 : 0 : return NULL;
43 : :
44 : 8 : iovw_rebase(&imp->iovw, old, imp->buf);
45 : :
46 : 8 : return b;
47 : : }
48 : :
49 : 52 : static int get_line(JournalImporter *imp, char **line, size_t *size) {
50 : : ssize_t n;
51 : 52 : char *c = NULL;
52 : :
53 [ - + ]: 52 : assert(imp);
54 [ - + ]: 52 : assert(imp->state == IMPORTER_STATE_LINE);
55 [ - + ]: 52 : assert(imp->offset <= imp->filled);
56 [ - + ]: 52 : assert(imp->filled <= imp->size);
57 [ + + - + ]: 52 : assert(!imp->buf || imp->size > 0);
58 [ - + ]: 52 : assert(imp->fd >= 0);
59 : :
60 : : for (;;) {
61 [ + + ]: 60 : if (imp->buf) {
62 : 52 : size_t start = MAX(imp->scanned, imp->offset);
63 : :
64 : 52 : c = memchr(imp->buf + start, '\n',
65 : 52 : imp->filled - start);
66 [ + + ]: 52 : if (c)
67 : 44 : break;
68 : : }
69 : :
70 : 16 : imp->scanned = imp->filled;
71 [ - + ]: 16 : if (imp->scanned >= DATA_SIZE_MAX)
72 [ # # ]: 0 : return log_error_errno(SYNTHETIC_ERRNO(ENOBUFS),
73 : : "Entry is bigger than %u bytes.",
74 : : DATA_SIZE_MAX);
75 : :
76 [ - + ]: 16 : if (imp->passive_fd)
77 : : /* we have to wait for some data to come to us */
78 : 0 : return -EAGAIN;
79 : :
80 : : /* We know that imp->filled is at most DATA_SIZE_MAX, so if
81 : : we reallocate it, we'll increase the size at least a bit. */
82 : : assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
83 [ + + - + ]: 24 : if (imp->size - imp->filled < LINE_CHUNK &&
84 : 8 : !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
85 : 0 : return log_oom();
86 : :
87 [ - + ]: 16 : assert(imp->buf);
88 [ - + # # ]: 16 : assert(imp->size - imp->filled >= LINE_CHUNK ||
89 : : imp->size == ENTRY_SIZE_MAX);
90 : :
91 : 32 : n = read(imp->fd,
92 : 16 : imp->buf + imp->filled,
93 : 16 : imp->size - imp->filled);
94 [ - + ]: 16 : if (n < 0) {
95 [ # # ]: 0 : if (errno != EAGAIN)
96 [ # # ]: 0 : log_error_errno(errno, "read(%d, ..., %zu): %m",
97 : : imp->fd,
98 : : imp->size - imp->filled);
99 : 0 : return -errno;
100 [ + + ]: 16 : } else if (n == 0)
101 : 8 : return 0;
102 : :
103 : 8 : imp->filled += n;
104 : : }
105 : :
106 : 44 : *line = imp->buf + imp->offset;
107 : 44 : *size = c + 1 - imp->buf - imp->offset;
108 : 44 : imp->offset += *size;
109 : :
110 : 44 : return 1;
111 : : }
112 : :
113 : 36 : static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
114 : :
115 [ - + ]: 36 : assert(imp);
116 [ + - - + ]: 36 : assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
117 [ - + ]: 36 : assert(size <= DATA_SIZE_MAX);
118 [ - + ]: 36 : assert(imp->offset <= imp->filled);
119 [ - + ]: 36 : assert(imp->filled <= imp->size);
120 [ - + # # ]: 36 : assert(imp->buf || imp->size == 0);
121 [ + - - + ]: 36 : assert(!imp->buf || imp->size > 0);
122 [ - + ]: 36 : assert(imp->fd >= 0);
123 [ - + ]: 36 : assert(data);
124 : :
125 [ - + ]: 36 : while (imp->filled - imp->offset < size) {
126 : : int n;
127 : :
128 [ # # ]: 0 : if (imp->passive_fd)
129 : : /* we have to wait for some data to come to us */
130 : 0 : return -EAGAIN;
131 : :
132 [ # # ]: 0 : if (!realloc_buffer(imp, imp->offset + size))
133 : 0 : return log_oom();
134 : :
135 : 0 : n = read(imp->fd, imp->buf + imp->filled,
136 : 0 : imp->size - imp->filled);
137 [ # # ]: 0 : if (n < 0) {
138 [ # # ]: 0 : if (errno != EAGAIN)
139 [ # # ]: 0 : log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
140 : : imp->size - imp->filled);
141 : 0 : return -errno;
142 [ # # ]: 0 : } else if (n == 0)
143 : 0 : return 0;
144 : :
145 : 0 : imp->filled += n;
146 : : }
147 : :
148 : 36 : *data = imp->buf + imp->offset;
149 : 36 : imp->offset += size;
150 : :
151 : 36 : return 1;
152 : : }
153 : :
154 : 12 : static int get_data_size(JournalImporter *imp) {
155 : : int r;
156 : : void *data;
157 : :
158 [ - + ]: 12 : assert(imp);
159 [ - + ]: 12 : assert(imp->state == IMPORTER_STATE_DATA_START);
160 [ - + ]: 12 : assert(imp->data_size == 0);
161 : :
162 : 12 : r = fill_fixed_size(imp, &data, sizeof(uint64_t));
163 [ - + ]: 12 : if (r <= 0)
164 : 0 : return r;
165 : :
166 : 12 : imp->data_size = unaligned_read_le64(data);
167 [ - + ]: 12 : if (imp->data_size > DATA_SIZE_MAX)
168 [ # # ]: 0 : return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
169 : : "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
170 : : imp->data_size, DATA_SIZE_MAX);
171 [ - + ]: 12 : if (imp->data_size == 0)
172 [ # # ]: 0 : log_warning("Binary field with zero length");
173 : :
174 : 12 : return 1;
175 : : }
176 : :
177 : 12 : static int get_data_data(JournalImporter *imp, void **data) {
178 : : int r;
179 : :
180 [ - + ]: 12 : assert(imp);
181 [ - + ]: 12 : assert(data);
182 [ - + ]: 12 : assert(imp->state == IMPORTER_STATE_DATA);
183 : :
184 : 12 : r = fill_fixed_size(imp, data, imp->data_size);
185 [ - + ]: 12 : if (r <= 0)
186 : 0 : return r;
187 : :
188 : 12 : return 1;
189 : : }
190 : :
191 : 12 : static int get_data_newline(JournalImporter *imp) {
192 : : int r;
193 : : char *data;
194 : :
195 [ - + ]: 12 : assert(imp);
196 [ - + ]: 12 : assert(imp->state == IMPORTER_STATE_DATA_FINISH);
197 : :
198 : 12 : r = fill_fixed_size(imp, (void**) &data, 1);
199 [ - + ]: 12 : if (r <= 0)
200 : 0 : return r;
201 : :
202 [ - + ]: 12 : assert(data);
203 [ - + ]: 12 : if (*data != '\n') {
204 : : char buf[4];
205 : : int l;
206 : :
207 : 0 : l = cescape_char(*data, buf);
208 [ # # ]: 0 : return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
209 : : "Expected newline, got '%.*s'", l, buf);
210 : : }
211 : :
212 : 12 : return 1;
213 : : }
214 : :
215 : 28 : static int process_special_field(JournalImporter *imp, char *line) {
216 : : const char *value;
217 : : char buf[CELLESCAPE_DEFAULT_LENGTH];
218 : : int r;
219 : :
220 [ - + ]: 28 : assert(line);
221 : :
222 : 28 : value = startswith(line, "__CURSOR=");
223 [ + + ]: 28 : if (value)
224 : : /* ignore __CURSOR */
225 : 4 : return 1;
226 : :
227 : 24 : value = startswith(line, "__REALTIME_TIMESTAMP=");
228 [ + + ]: 24 : if (value) {
229 : : uint64_t x;
230 : :
231 : 4 : r = safe_atou64(value, &x);
232 [ - + ]: 4 : if (r < 0)
233 [ # # ]: 0 : return log_warning_errno(r, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
234 : : cellescape(buf, sizeof buf, value));
235 [ - + ]: 4 : else if (!VALID_REALTIME(x)) {
236 [ # # ]: 0 : log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64, x);
237 : 0 : return -ERANGE;
238 : : }
239 : :
240 : 4 : imp->ts.realtime = x;
241 : 4 : return 1;
242 : : }
243 : :
244 : 20 : value = startswith(line, "__MONOTONIC_TIMESTAMP=");
245 [ + + ]: 20 : if (value) {
246 : : uint64_t x;
247 : :
248 : 4 : r = safe_atou64(value, &x);
249 [ - + ]: 4 : if (r < 0)
250 [ # # ]: 0 : return log_warning_errno(r, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
251 : : cellescape(buf, sizeof buf, value));
252 [ - + ]: 4 : else if (!VALID_MONOTONIC(x)) {
253 [ # # ]: 0 : log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64, x);
254 : 0 : return -ERANGE;
255 : : }
256 : :
257 : 4 : imp->ts.monotonic = x;
258 : 4 : return 1;
259 : : }
260 : :
261 : : /* Just a single underline, but it needs special treatment too. */
262 : 16 : value = startswith(line, "_BOOT_ID=");
263 [ + + ]: 16 : if (value) {
264 : 4 : r = sd_id128_from_string(value, &imp->boot_id);
265 [ - + ]: 4 : if (r < 0)
266 [ # # ]: 0 : return log_warning_errno(r, "Failed to parse _BOOT_ID '%s': %m",
267 : : cellescape(buf, sizeof buf, value));
268 : :
269 : : /* store the field in the usual fashion too */
270 : 4 : return 0;
271 : : }
272 : :
273 : 12 : value = startswith(line, "__");
274 [ - + ]: 12 : if (value) {
275 [ # # ]: 0 : log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf, sizeof buf, value));
276 : 0 : return 1;
277 : : }
278 : :
279 : : /* no dunder */
280 : 12 : return 0;
281 : : }
282 : :
283 : 88 : int journal_importer_process_data(JournalImporter *imp) {
284 : : int r;
285 : :
286 [ + + + + : 88 : switch(imp->state) {
- ]
287 : 52 : case IMPORTER_STATE_LINE: {
288 : : char *line, *sep;
289 : 52 : size_t n = 0;
290 : :
291 [ - + ]: 52 : assert(imp->data_size == 0);
292 : :
293 : 52 : r = get_line(imp, &line, &n);
294 [ - + ]: 52 : if (r < 0)
295 : 0 : return r;
296 [ + + ]: 52 : if (r == 0) {
297 : 8 : imp->state = IMPORTER_STATE_EOF;
298 : 8 : return 0;
299 : : }
300 [ - + ]: 44 : assert(n > 0);
301 [ - + ]: 44 : assert(line[n-1] == '\n');
302 : :
303 [ + + ]: 44 : if (n == 1) {
304 [ + - ]: 4 : log_trace("Received empty line, event is ready");
305 : 4 : return 1;
306 : : }
307 : :
308 : : /* MESSAGE=xxx\n
309 : : or
310 : : COREDUMP\n
311 : : LLLLLLLL0011223344...\n
312 : : */
313 : 40 : sep = memchr(line, '=', n);
314 [ + + ]: 40 : if (sep) {
315 : : /* chomp newline */
316 : 28 : n--;
317 : :
318 [ - + ]: 28 : if (!journal_field_valid(line, sep - line, true)) {
319 : : char buf[64], *t;
320 : :
321 : 0 : t = strndupa(line, sep - line);
322 [ # # ]: 0 : log_debug("Ignoring invalid field: \"%s\"",
323 : : cellescape(buf, sizeof buf, t));
324 : :
325 : 0 : return 0;
326 : : }
327 : :
328 : 28 : line[n] = '\0';
329 : 28 : r = process_special_field(imp, line);
330 [ + + ]: 28 : if (r != 0)
331 : 12 : return r < 0 ? r : 0;
332 : :
333 : 16 : r = iovw_put(&imp->iovw, line, n);
334 [ - + ]: 16 : if (r < 0)
335 : 0 : return r;
336 : : } else {
337 : : /* replace \n with = */
338 : 12 : line[n-1] = '=';
339 : :
340 : 12 : imp->field_len = n;
341 : 12 : imp->state = IMPORTER_STATE_DATA_START;
342 : :
343 : : /* we cannot put the field in iovec until we have all data */
344 : : }
345 : :
346 [ + - + + ]: 28 : log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
347 : :
348 : 28 : return 0; /* continue */
349 : : }
350 : :
351 : 12 : case IMPORTER_STATE_DATA_START:
352 [ - + ]: 12 : assert(imp->data_size == 0);
353 : :
354 : 12 : r = get_data_size(imp);
355 : : // log_debug("get_data_size() -> %d", r);
356 [ - + ]: 12 : if (r < 0)
357 : 0 : return r;
358 [ - + ]: 12 : if (r == 0) {
359 : 0 : imp->state = IMPORTER_STATE_EOF;
360 : 0 : return 0;
361 : : }
362 : :
363 : 24 : imp->state = imp->data_size > 0 ?
364 [ + - ]: 12 : IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
365 : :
366 : 12 : return 0; /* continue */
367 : :
368 : 12 : case IMPORTER_STATE_DATA: {
369 : : void *data;
370 : : char *field;
371 : :
372 [ - + ]: 12 : assert(imp->data_size > 0);
373 : :
374 : 12 : r = get_data_data(imp, &data);
375 : : // log_debug("get_data_data() -> %d", r);
376 [ - + ]: 12 : if (r < 0)
377 : 0 : return r;
378 [ - + ]: 12 : if (r == 0) {
379 : 0 : imp->state = IMPORTER_STATE_EOF;
380 : 0 : return 0;
381 : : }
382 : :
383 [ - + ]: 12 : assert(data);
384 : :
385 : 12 : field = (char*) data - sizeof(uint64_t) - imp->field_len;
386 : 12 : memmove(field + sizeof(uint64_t), field, imp->field_len);
387 : :
388 : 12 : r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
389 [ - + ]: 12 : if (r < 0)
390 : 0 : return r;
391 : :
392 : 12 : imp->state = IMPORTER_STATE_DATA_FINISH;
393 : :
394 : 12 : return 0; /* continue */
395 : : }
396 : :
397 : 12 : case IMPORTER_STATE_DATA_FINISH:
398 : 12 : r = get_data_newline(imp);
399 : : // log_debug("get_data_newline() -> %d", r);
400 [ - + ]: 12 : if (r < 0)
401 : 0 : return r;
402 [ - + ]: 12 : if (r == 0) {
403 : 0 : imp->state = IMPORTER_STATE_EOF;
404 : 0 : return 0;
405 : : }
406 : :
407 : 12 : imp->data_size = 0;
408 : 12 : imp->state = IMPORTER_STATE_LINE;
409 : :
410 : 12 : return 0; /* continue */
411 : 0 : default:
412 : 0 : assert_not_reached("wtf?");
413 : : }
414 : : }
415 : :
416 : 0 : int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
417 [ # # ]: 0 : assert(imp);
418 [ # # ]: 0 : assert(imp->state != IMPORTER_STATE_EOF);
419 : :
420 [ # # ]: 0 : if (!realloc_buffer(imp, imp->filled + size))
421 [ # # ]: 0 : return log_error_errno(SYNTHETIC_ERRNO(ENOMEM),
422 : : "Failed to store received data of size %zu "
423 : : "(in addition to existing %zu bytes with %zu filled): %s",
424 : : size, imp->size, imp->filled,
425 : : strerror_safe(ENOMEM));
426 : :
427 : 0 : memcpy(imp->buf + imp->filled, data, size);
428 : 0 : imp->filled += size;
429 : :
430 : 0 : return 0;
431 : : }
432 : :
433 : 0 : void journal_importer_drop_iovw(JournalImporter *imp) {
434 : : size_t remain, target;
435 : :
436 : : /* This function drops processed data that along with the iovw that points at it */
437 : :
438 : 0 : iovw_free_contents(&imp->iovw, false);
439 : :
440 : : /* possibly reset buffer position */
441 : 0 : remain = imp->filled - imp->offset;
442 : :
443 [ # # ]: 0 : if (remain == 0) /* no brainer */
444 : 0 : imp->offset = imp->scanned = imp->filled = 0;
445 [ # # ]: 0 : else if (imp->offset > imp->size - imp->filled &&
446 [ # # ]: 0 : imp->offset > remain) {
447 : 0 : memcpy(imp->buf, imp->buf + imp->offset, remain);
448 : 0 : imp->offset = imp->scanned = 0;
449 : 0 : imp->filled = remain;
450 : : }
451 : :
452 : 0 : target = imp->size;
453 [ # # # # ]: 0 : while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
454 : 0 : target /= 2;
455 [ # # ]: 0 : if (target < imp->size) {
456 : : char *tmp;
457 : :
458 : 0 : tmp = realloc(imp->buf, target);
459 [ # # ]: 0 : if (!tmp)
460 [ # # ]: 0 : log_warning("Failed to reallocate buffer to (smaller) size %zu",
461 : : target);
462 : : else {
463 [ # # ]: 0 : log_debug("Reallocated buffer from %zu to %zu bytes",
464 : : imp->size, target);
465 : 0 : imp->buf = tmp;
466 : 0 : imp->size = target;
467 : : }
468 : : }
469 : 0 : }
470 : :
471 : 92 : bool journal_importer_eof(const JournalImporter *imp) {
472 : 92 : return imp->state == IMPORTER_STATE_EOF;
473 : : }
|