Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1+ */
2 :
3 : #include <errno.h>
4 : #include <unistd.h>
5 :
6 : #include "alloc-util.h"
7 : #include "errno-util.h"
8 : #include "escape.h"
9 : #include "fd-util.h"
10 : #include "io-util.h"
11 : #include "journal-file.h"
12 : #include "journal-importer.h"
13 : #include "journal-util.h"
14 : #include "parse-util.h"
15 : #include "string-util.h"
16 : #include "unaligned.h"
17 :
18 : enum {
19 : IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
20 : IMPORTER_STATE_DATA_START, /* reading binary data header */
21 : IMPORTER_STATE_DATA, /* reading binary data */
22 : IMPORTER_STATE_DATA_FINISH, /* expecting newline */
23 : IMPORTER_STATE_EOF, /* done */
24 : };
25 :
26 2 : void journal_importer_cleanup(JournalImporter *imp) {
27 2 : if (imp->fd >= 0 && !imp->passive_fd) {
28 2 : log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
29 2 : safe_close(imp->fd);
30 : }
31 :
32 2 : free(imp->name);
33 2 : free(imp->buf);
34 2 : iovw_free_contents(&imp->iovw, false);
35 2 : }
36 :
37 2 : static char* realloc_buffer(JournalImporter *imp, size_t size) {
38 2 : char *b, *old = imp->buf;
39 :
40 2 : b = GREEDY_REALLOC(imp->buf, imp->size, size);
41 2 : if (!b)
42 0 : return NULL;
43 :
44 2 : iovw_rebase(&imp->iovw, old, imp->buf);
45 :
46 2 : return b;
47 : }
48 :
49 13 : static int get_line(JournalImporter *imp, char **line, size_t *size) {
50 : ssize_t n;
51 13 : char *c = NULL;
52 :
53 13 : assert(imp);
54 13 : assert(imp->state == IMPORTER_STATE_LINE);
55 13 : assert(imp->offset <= imp->filled);
56 13 : assert(imp->filled <= imp->size);
57 13 : assert(!imp->buf || imp->size > 0);
58 13 : assert(imp->fd >= 0);
59 :
60 : for (;;) {
61 15 : if (imp->buf) {
62 13 : size_t start = MAX(imp->scanned, imp->offset);
63 :
64 13 : c = memchr(imp->buf + start, '\n',
65 13 : imp->filled - start);
66 13 : if (c)
67 11 : break;
68 : }
69 :
70 4 : imp->scanned = imp->filled;
71 4 : if (imp->scanned >= DATA_SIZE_MAX)
72 0 : return log_error_errno(SYNTHETIC_ERRNO(ENOBUFS),
73 : "Entry is bigger than %u bytes.",
74 : DATA_SIZE_MAX);
75 :
76 4 : if (imp->passive_fd)
77 : /* we have to wait for some data to come to us */
78 0 : return -EAGAIN;
79 :
80 : /* We know that imp->filled is at most DATA_SIZE_MAX, so if
81 : we reallocate it, we'll increase the size at least a bit. */
82 : assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
83 6 : if (imp->size - imp->filled < LINE_CHUNK &&
84 2 : !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
85 0 : return log_oom();
86 :
87 4 : assert(imp->buf);
88 4 : assert(imp->size - imp->filled >= LINE_CHUNK ||
89 : imp->size == ENTRY_SIZE_MAX);
90 :
91 8 : n = read(imp->fd,
92 4 : imp->buf + imp->filled,
93 4 : imp->size - imp->filled);
94 4 : if (n < 0) {
95 0 : if (errno != EAGAIN)
96 0 : log_error_errno(errno, "read(%d, ..., %zu): %m",
97 : imp->fd,
98 : imp->size - imp->filled);
99 0 : return -errno;
100 4 : } else if (n == 0)
101 2 : return 0;
102 :
103 2 : imp->filled += n;
104 : }
105 :
106 11 : *line = imp->buf + imp->offset;
107 11 : *size = c + 1 - imp->buf - imp->offset;
108 11 : imp->offset += *size;
109 :
110 11 : return 1;
111 : }
112 :
113 9 : static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
114 :
115 9 : assert(imp);
116 9 : assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
117 9 : assert(size <= DATA_SIZE_MAX);
118 9 : assert(imp->offset <= imp->filled);
119 9 : assert(imp->filled <= imp->size);
120 9 : assert(imp->buf || imp->size == 0);
121 9 : assert(!imp->buf || imp->size > 0);
122 9 : assert(imp->fd >= 0);
123 9 : assert(data);
124 :
125 9 : while (imp->filled - imp->offset < size) {
126 : int n;
127 :
128 0 : if (imp->passive_fd)
129 : /* we have to wait for some data to come to us */
130 0 : return -EAGAIN;
131 :
132 0 : if (!realloc_buffer(imp, imp->offset + size))
133 0 : return log_oom();
134 :
135 0 : n = read(imp->fd, imp->buf + imp->filled,
136 0 : imp->size - imp->filled);
137 0 : if (n < 0) {
138 0 : if (errno != EAGAIN)
139 0 : log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
140 : imp->size - imp->filled);
141 0 : return -errno;
142 0 : } else if (n == 0)
143 0 : return 0;
144 :
145 0 : imp->filled += n;
146 : }
147 :
148 9 : *data = imp->buf + imp->offset;
149 9 : imp->offset += size;
150 :
151 9 : return 1;
152 : }
153 :
154 3 : static int get_data_size(JournalImporter *imp) {
155 : int r;
156 : void *data;
157 :
158 3 : assert(imp);
159 3 : assert(imp->state == IMPORTER_STATE_DATA_START);
160 3 : assert(imp->data_size == 0);
161 :
162 3 : r = fill_fixed_size(imp, &data, sizeof(uint64_t));
163 3 : if (r <= 0)
164 0 : return r;
165 :
166 3 : imp->data_size = unaligned_read_le64(data);
167 3 : if (imp->data_size > DATA_SIZE_MAX)
168 0 : return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
169 : "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
170 : imp->data_size, DATA_SIZE_MAX);
171 3 : if (imp->data_size == 0)
172 0 : log_warning("Binary field with zero length");
173 :
174 3 : return 1;
175 : }
176 :
177 3 : static int get_data_data(JournalImporter *imp, void **data) {
178 : int r;
179 :
180 3 : assert(imp);
181 3 : assert(data);
182 3 : assert(imp->state == IMPORTER_STATE_DATA);
183 :
184 3 : r = fill_fixed_size(imp, data, imp->data_size);
185 3 : if (r <= 0)
186 0 : return r;
187 :
188 3 : return 1;
189 : }
190 :
191 3 : static int get_data_newline(JournalImporter *imp) {
192 : int r;
193 : char *data;
194 :
195 3 : assert(imp);
196 3 : assert(imp->state == IMPORTER_STATE_DATA_FINISH);
197 :
198 3 : r = fill_fixed_size(imp, (void**) &data, 1);
199 3 : if (r <= 0)
200 0 : return r;
201 :
202 3 : assert(data);
203 3 : if (*data != '\n') {
204 : char buf[4];
205 : int l;
206 :
207 0 : l = cescape_char(*data, buf);
208 0 : return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
209 : "Expected newline, got '%.*s'", l, buf);
210 : }
211 :
212 3 : return 1;
213 : }
214 :
215 7 : static int process_special_field(JournalImporter *imp, char *line) {
216 : const char *value;
217 : char buf[CELLESCAPE_DEFAULT_LENGTH];
218 : int r;
219 :
220 7 : assert(line);
221 :
222 7 : value = startswith(line, "__CURSOR=");
223 7 : if (value)
224 : /* ignore __CURSOR */
225 1 : return 1;
226 :
227 6 : value = startswith(line, "__REALTIME_TIMESTAMP=");
228 6 : if (value) {
229 : uint64_t x;
230 :
231 1 : r = safe_atou64(value, &x);
232 1 : if (r < 0)
233 0 : return log_warning_errno(r, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
234 : cellescape(buf, sizeof buf, value));
235 1 : else if (!VALID_REALTIME(x)) {
236 0 : log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64, x);
237 0 : return -ERANGE;
238 : }
239 :
240 1 : imp->ts.realtime = x;
241 1 : return 1;
242 : }
243 :
244 5 : value = startswith(line, "__MONOTONIC_TIMESTAMP=");
245 5 : if (value) {
246 : uint64_t x;
247 :
248 1 : r = safe_atou64(value, &x);
249 1 : if (r < 0)
250 0 : return log_warning_errno(r, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
251 : cellescape(buf, sizeof buf, value));
252 1 : else if (!VALID_MONOTONIC(x)) {
253 0 : log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64, x);
254 0 : return -ERANGE;
255 : }
256 :
257 1 : imp->ts.monotonic = x;
258 1 : return 1;
259 : }
260 :
261 : /* Just a single underline, but it needs special treatment too. */
262 4 : value = startswith(line, "_BOOT_ID=");
263 4 : if (value) {
264 1 : r = sd_id128_from_string(value, &imp->boot_id);
265 1 : if (r < 0)
266 0 : return log_warning_errno(r, "Failed to parse _BOOT_ID '%s': %m",
267 : cellescape(buf, sizeof buf, value));
268 :
269 : /* store the field in the usual fashion too */
270 1 : return 0;
271 : }
272 :
273 3 : value = startswith(line, "__");
274 3 : if (value) {
275 0 : log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf, sizeof buf, value));
276 0 : return 1;
277 : }
278 :
279 : /* no dunder */
280 3 : return 0;
281 : }
282 :
283 22 : int journal_importer_process_data(JournalImporter *imp) {
284 : int r;
285 :
286 22 : switch(imp->state) {
287 13 : case IMPORTER_STATE_LINE: {
288 : char *line, *sep;
289 13 : size_t n = 0;
290 :
291 13 : assert(imp->data_size == 0);
292 :
293 13 : r = get_line(imp, &line, &n);
294 13 : if (r < 0)
295 0 : return r;
296 13 : if (r == 0) {
297 2 : imp->state = IMPORTER_STATE_EOF;
298 2 : return 0;
299 : }
300 11 : assert(n > 0);
301 11 : assert(line[n-1] == '\n');
302 :
303 11 : if (n == 1) {
304 1 : log_trace("Received empty line, event is ready");
305 1 : return 1;
306 : }
307 :
308 : /* MESSAGE=xxx\n
309 : or
310 : COREDUMP\n
311 : LLLLLLLL0011223344...\n
312 : */
313 10 : sep = memchr(line, '=', n);
314 10 : if (sep) {
315 : /* chomp newline */
316 7 : n--;
317 :
318 7 : if (!journal_field_valid(line, sep - line, true)) {
319 : char buf[64], *t;
320 :
321 0 : t = strndupa(line, sep - line);
322 0 : log_debug("Ignoring invalid field: \"%s\"",
323 : cellescape(buf, sizeof buf, t));
324 :
325 0 : return 0;
326 : }
327 :
328 7 : line[n] = '\0';
329 7 : r = process_special_field(imp, line);
330 7 : if (r != 0)
331 3 : return r < 0 ? r : 0;
332 :
333 4 : r = iovw_put(&imp->iovw, line, n);
334 4 : if (r < 0)
335 0 : return r;
336 : } else {
337 : /* replace \n with = */
338 3 : line[n-1] = '=';
339 :
340 3 : imp->field_len = n;
341 3 : imp->state = IMPORTER_STATE_DATA_START;
342 :
343 : /* we cannot put the field in iovec until we have all data */
344 : }
345 :
346 7 : log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
347 :
348 7 : return 0; /* continue */
349 : }
350 :
351 3 : case IMPORTER_STATE_DATA_START:
352 3 : assert(imp->data_size == 0);
353 :
354 3 : r = get_data_size(imp);
355 : // log_debug("get_data_size() -> %d", r);
356 3 : if (r < 0)
357 0 : return r;
358 3 : if (r == 0) {
359 0 : imp->state = IMPORTER_STATE_EOF;
360 0 : return 0;
361 : }
362 :
363 6 : imp->state = imp->data_size > 0 ?
364 3 : IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
365 :
366 3 : return 0; /* continue */
367 :
368 3 : case IMPORTER_STATE_DATA: {
369 : void *data;
370 : char *field;
371 :
372 3 : assert(imp->data_size > 0);
373 :
374 3 : r = get_data_data(imp, &data);
375 : // log_debug("get_data_data() -> %d", r);
376 3 : if (r < 0)
377 0 : return r;
378 3 : if (r == 0) {
379 0 : imp->state = IMPORTER_STATE_EOF;
380 0 : return 0;
381 : }
382 :
383 3 : assert(data);
384 :
385 3 : field = (char*) data - sizeof(uint64_t) - imp->field_len;
386 3 : memmove(field + sizeof(uint64_t), field, imp->field_len);
387 :
388 3 : r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
389 3 : if (r < 0)
390 0 : return r;
391 :
392 3 : imp->state = IMPORTER_STATE_DATA_FINISH;
393 :
394 3 : return 0; /* continue */
395 : }
396 :
397 3 : case IMPORTER_STATE_DATA_FINISH:
398 3 : r = get_data_newline(imp);
399 : // log_debug("get_data_newline() -> %d", r);
400 3 : if (r < 0)
401 0 : return r;
402 3 : if (r == 0) {
403 0 : imp->state = IMPORTER_STATE_EOF;
404 0 : return 0;
405 : }
406 :
407 3 : imp->data_size = 0;
408 3 : imp->state = IMPORTER_STATE_LINE;
409 :
410 3 : return 0; /* continue */
411 0 : default:
412 0 : assert_not_reached("wtf?");
413 : }
414 : }
415 :
416 0 : int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
417 0 : assert(imp);
418 0 : assert(imp->state != IMPORTER_STATE_EOF);
419 :
420 0 : if (!realloc_buffer(imp, imp->filled + size))
421 0 : return log_error_errno(SYNTHETIC_ERRNO(ENOMEM),
422 : "Failed to store received data of size %zu "
423 : "(in addition to existing %zu bytes with %zu filled): %s",
424 : size, imp->size, imp->filled,
425 : strerror_safe(ENOMEM));
426 :
427 0 : memcpy(imp->buf + imp->filled, data, size);
428 0 : imp->filled += size;
429 :
430 0 : return 0;
431 : }
432 :
433 0 : void journal_importer_drop_iovw(JournalImporter *imp) {
434 : size_t remain, target;
435 :
436 : /* This function drops processed data that along with the iovw that points at it */
437 :
438 0 : iovw_free_contents(&imp->iovw, false);
439 :
440 : /* possibly reset buffer position */
441 0 : remain = imp->filled - imp->offset;
442 :
443 0 : if (remain == 0) /* no brainer */
444 0 : imp->offset = imp->scanned = imp->filled = 0;
445 0 : else if (imp->offset > imp->size - imp->filled &&
446 0 : imp->offset > remain) {
447 0 : memcpy(imp->buf, imp->buf + imp->offset, remain);
448 0 : imp->offset = imp->scanned = 0;
449 0 : imp->filled = remain;
450 : }
451 :
452 0 : target = imp->size;
453 0 : while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
454 0 : target /= 2;
455 0 : if (target < imp->size) {
456 : char *tmp;
457 :
458 0 : tmp = realloc(imp->buf, target);
459 0 : if (!tmp)
460 0 : log_warning("Failed to reallocate buffer to (smaller) size %zu",
461 : target);
462 : else {
463 0 : log_debug("Reallocated buffer from %zu to %zu bytes",
464 : imp->size, target);
465 0 : imp->buf = tmp;
466 0 : imp->size = target;
467 : }
468 : }
469 0 : }
470 :
471 23 : bool journal_importer_eof(const JournalImporter *imp) {
472 23 : return imp->state == IMPORTER_STATE_EOF;
473 : }
|