LCOV - code coverage report
Current view: top level - shared - journal-importer.c (source / functions) Hit Total Coverage
Test: main_coverage.info Lines: 171 253 67.6 %
Date: 2019-08-22 15:41:25 Functions: 10 12 83.3 %

          Line data    Source code
       1             : /* SPDX-License-Identifier: LGPL-2.1+ */
       2             : 
       3             : #include <errno.h>
       4             : #include <unistd.h>
       5             : 
       6             : #include "alloc-util.h"
       7             : #include "errno-util.h"
       8             : #include "escape.h"
       9             : #include "fd-util.h"
      10             : #include "io-util.h"
      11             : #include "journal-file.h"
      12             : #include "journal-importer.h"
      13             : #include "journal-util.h"
      14             : #include "parse-util.h"
      15             : #include "string-util.h"
      16             : #include "unaligned.h"
      17             : 
      18             : enum {
      19             :         IMPORTER_STATE_LINE = 0,    /* waiting to read, or reading line */
      20             :         IMPORTER_STATE_DATA_START,  /* reading binary data header */
      21             :         IMPORTER_STATE_DATA,        /* reading binary data */
      22             :         IMPORTER_STATE_DATA_FINISH, /* expecting newline */
      23             :         IMPORTER_STATE_EOF,         /* done */
      24             : };
      25             : 
      26           2 : void journal_importer_cleanup(JournalImporter *imp) {
      27           2 :         if (imp->fd >= 0 && !imp->passive_fd) {
      28           2 :                 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
      29           2 :                 safe_close(imp->fd);
      30             :         }
      31             : 
      32           2 :         free(imp->name);
      33           2 :         free(imp->buf);
      34           2 :         iovw_free_contents(&imp->iovw, false);
      35           2 : }
      36             : 
      37           2 : static char* realloc_buffer(JournalImporter *imp, size_t size) {
      38           2 :         char *b, *old = imp->buf;
      39             : 
      40           2 :         b = GREEDY_REALLOC(imp->buf, imp->size, size);
      41           2 :         if (!b)
      42           0 :                 return NULL;
      43             : 
      44           2 :         iovw_rebase(&imp->iovw, old, imp->buf);
      45             : 
      46           2 :         return b;
      47             : }
      48             : 
      49          13 : static int get_line(JournalImporter *imp, char **line, size_t *size) {
      50             :         ssize_t n;
      51          13 :         char *c = NULL;
      52             : 
      53          13 :         assert(imp);
      54          13 :         assert(imp->state == IMPORTER_STATE_LINE);
      55          13 :         assert(imp->offset <= imp->filled);
      56          13 :         assert(imp->filled <= imp->size);
      57          13 :         assert(!imp->buf || imp->size > 0);
      58          13 :         assert(imp->fd >= 0);
      59             : 
      60             :         for (;;) {
      61          15 :                 if (imp->buf) {
      62          13 :                         size_t start = MAX(imp->scanned, imp->offset);
      63             : 
      64          13 :                         c = memchr(imp->buf + start, '\n',
      65          13 :                                    imp->filled - start);
      66          13 :                         if (c)
      67          11 :                                 break;
      68             :                 }
      69             : 
      70           4 :                 imp->scanned = imp->filled;
      71           4 :                 if (imp->scanned >= DATA_SIZE_MAX)
      72           0 :                         return log_error_errno(SYNTHETIC_ERRNO(ENOBUFS),
      73             :                                                "Entry is bigger than %u bytes.",
      74             :                                                DATA_SIZE_MAX);
      75             : 
      76           4 :                 if (imp->passive_fd)
      77             :                         /* we have to wait for some data to come to us */
      78           0 :                         return -EAGAIN;
      79             : 
      80             :                 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
      81             :                    we reallocate it, we'll increase the size at least a bit. */
      82             :                 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
      83           6 :                 if (imp->size - imp->filled < LINE_CHUNK &&
      84           2 :                     !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
      85           0 :                                 return log_oom();
      86             : 
      87           4 :                 assert(imp->buf);
      88           4 :                 assert(imp->size - imp->filled >= LINE_CHUNK ||
      89             :                        imp->size == ENTRY_SIZE_MAX);
      90             : 
      91           8 :                 n = read(imp->fd,
      92           4 :                          imp->buf + imp->filled,
      93           4 :                          imp->size - imp->filled);
      94           4 :                 if (n < 0) {
      95           0 :                         if (errno != EAGAIN)
      96           0 :                                 log_error_errno(errno, "read(%d, ..., %zu): %m",
      97             :                                                 imp->fd,
      98             :                                                 imp->size - imp->filled);
      99           0 :                         return -errno;
     100           4 :                 } else if (n == 0)
     101           2 :                         return 0;
     102             : 
     103           2 :                 imp->filled += n;
     104             :         }
     105             : 
     106          11 :         *line = imp->buf + imp->offset;
     107          11 :         *size = c + 1 - imp->buf - imp->offset;
     108          11 :         imp->offset += *size;
     109             : 
     110          11 :         return 1;
     111             : }
     112             : 
     113           9 : static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
     114             : 
     115           9 :         assert(imp);
     116           9 :         assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
     117           9 :         assert(size <= DATA_SIZE_MAX);
     118           9 :         assert(imp->offset <= imp->filled);
     119           9 :         assert(imp->filled <= imp->size);
     120           9 :         assert(imp->buf || imp->size == 0);
     121           9 :         assert(!imp->buf || imp->size > 0);
     122           9 :         assert(imp->fd >= 0);
     123           9 :         assert(data);
     124             : 
     125           9 :         while (imp->filled - imp->offset < size) {
     126             :                 int n;
     127             : 
     128           0 :                 if (imp->passive_fd)
     129             :                         /* we have to wait for some data to come to us */
     130           0 :                         return -EAGAIN;
     131             : 
     132           0 :                 if (!realloc_buffer(imp, imp->offset + size))
     133           0 :                         return log_oom();
     134             : 
     135           0 :                 n = read(imp->fd, imp->buf + imp->filled,
     136           0 :                          imp->size - imp->filled);
     137           0 :                 if (n < 0) {
     138           0 :                         if (errno != EAGAIN)
     139           0 :                                 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
     140             :                                                 imp->size - imp->filled);
     141           0 :                         return -errno;
     142           0 :                 } else if (n == 0)
     143           0 :                         return 0;
     144             : 
     145           0 :                 imp->filled += n;
     146             :         }
     147             : 
     148           9 :         *data = imp->buf + imp->offset;
     149           9 :         imp->offset += size;
     150             : 
     151           9 :         return 1;
     152             : }
     153             : 
     154           3 : static int get_data_size(JournalImporter *imp) {
     155             :         int r;
     156             :         void *data;
     157             : 
     158           3 :         assert(imp);
     159           3 :         assert(imp->state == IMPORTER_STATE_DATA_START);
     160           3 :         assert(imp->data_size == 0);
     161             : 
     162           3 :         r = fill_fixed_size(imp, &data, sizeof(uint64_t));
     163           3 :         if (r <= 0)
     164           0 :                 return r;
     165             : 
     166           3 :         imp->data_size = unaligned_read_le64(data);
     167           3 :         if (imp->data_size > DATA_SIZE_MAX)
     168           0 :                 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
     169             :                                        "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
     170             :                                        imp->data_size, DATA_SIZE_MAX);
     171           3 :         if (imp->data_size == 0)
     172           0 :                 log_warning("Binary field with zero length");
     173             : 
     174           3 :         return 1;
     175             : }
     176             : 
     177           3 : static int get_data_data(JournalImporter *imp, void **data) {
     178             :         int r;
     179             : 
     180           3 :         assert(imp);
     181           3 :         assert(data);
     182           3 :         assert(imp->state == IMPORTER_STATE_DATA);
     183             : 
     184           3 :         r = fill_fixed_size(imp, data, imp->data_size);
     185           3 :         if (r <= 0)
     186           0 :                 return r;
     187             : 
     188           3 :         return 1;
     189             : }
     190             : 
     191           3 : static int get_data_newline(JournalImporter *imp) {
     192             :         int r;
     193             :         char *data;
     194             : 
     195           3 :         assert(imp);
     196           3 :         assert(imp->state == IMPORTER_STATE_DATA_FINISH);
     197             : 
     198           3 :         r = fill_fixed_size(imp, (void**) &data, 1);
     199           3 :         if (r <= 0)
     200           0 :                 return r;
     201             : 
     202           3 :         assert(data);
     203           3 :         if (*data != '\n') {
     204             :                 char buf[4];
     205             :                 int l;
     206             : 
     207           0 :                 l = cescape_char(*data, buf);
     208           0 :                 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
     209             :                                        "Expected newline, got '%.*s'", l, buf);
     210             :         }
     211             : 
     212           3 :         return 1;
     213             : }
     214             : 
     215           7 : static int process_special_field(JournalImporter *imp, char *line) {
     216             :         const char *value;
     217             :         char buf[CELLESCAPE_DEFAULT_LENGTH];
     218             :         int r;
     219             : 
     220           7 :         assert(line);
     221             : 
     222           7 :         value = startswith(line, "__CURSOR=");
     223           7 :         if (value)
     224             :                 /* ignore __CURSOR */
     225           1 :                 return 1;
     226             : 
     227           6 :         value = startswith(line, "__REALTIME_TIMESTAMP=");
     228           6 :         if (value) {
     229             :                 uint64_t x;
     230             : 
     231           1 :                 r = safe_atou64(value, &x);
     232           1 :                 if (r < 0)
     233           0 :                         return log_warning_errno(r, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
     234             :                                                  cellescape(buf, sizeof buf, value));
     235           1 :                 else if (!VALID_REALTIME(x)) {
     236           0 :                         log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64, x);
     237           0 :                         return -ERANGE;
     238             :                 }
     239             : 
     240           1 :                 imp->ts.realtime = x;
     241           1 :                 return 1;
     242             :         }
     243             : 
     244           5 :         value = startswith(line, "__MONOTONIC_TIMESTAMP=");
     245           5 :         if (value) {
     246             :                 uint64_t x;
     247             : 
     248           1 :                 r = safe_atou64(value, &x);
     249           1 :                 if (r < 0)
     250           0 :                         return log_warning_errno(r, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
     251             :                                                  cellescape(buf, sizeof buf, value));
     252           1 :                 else if (!VALID_MONOTONIC(x)) {
     253           0 :                         log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64, x);
     254           0 :                         return -ERANGE;
     255             :                 }
     256             : 
     257           1 :                 imp->ts.monotonic = x;
     258           1 :                 return 1;
     259             :         }
     260             : 
     261             :         /* Just a single underline, but it needs special treatment too. */
     262           4 :         value = startswith(line, "_BOOT_ID=");
     263           4 :         if (value) {
     264           1 :                 r = sd_id128_from_string(value, &imp->boot_id);
     265           1 :                 if (r < 0)
     266           0 :                         return log_warning_errno(r, "Failed to parse _BOOT_ID '%s': %m",
     267             :                                                  cellescape(buf, sizeof buf, value));
     268             : 
     269             :                 /* store the field in the usual fashion too */
     270           1 :                 return 0;
     271             :         }
     272             : 
     273           3 :         value = startswith(line, "__");
     274           3 :         if (value) {
     275           0 :                 log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf, sizeof buf, value));
     276           0 :                 return 1;
     277             :         }
     278             : 
     279             :         /* no dunder */
     280           3 :         return 0;
     281             : }
     282             : 
     283          22 : int journal_importer_process_data(JournalImporter *imp) {
     284             :         int r;
     285             : 
     286          22 :         switch(imp->state) {
     287          13 :         case IMPORTER_STATE_LINE: {
     288             :                 char *line, *sep;
     289          13 :                 size_t n = 0;
     290             : 
     291          13 :                 assert(imp->data_size == 0);
     292             : 
     293          13 :                 r = get_line(imp, &line, &n);
     294          13 :                 if (r < 0)
     295           0 :                         return r;
     296          13 :                 if (r == 0) {
     297           2 :                         imp->state = IMPORTER_STATE_EOF;
     298           2 :                         return 0;
     299             :                 }
     300          11 :                 assert(n > 0);
     301          11 :                 assert(line[n-1] == '\n');
     302             : 
     303          11 :                 if (n == 1) {
     304           1 :                         log_trace("Received empty line, event is ready");
     305           1 :                         return 1;
     306             :                 }
     307             : 
     308             :                 /* MESSAGE=xxx\n
     309             :                    or
     310             :                    COREDUMP\n
     311             :                    LLLLLLLL0011223344...\n
     312             :                 */
     313          10 :                 sep = memchr(line, '=', n);
     314          10 :                 if (sep) {
     315             :                         /* chomp newline */
     316           7 :                         n--;
     317             : 
     318           7 :                         if (!journal_field_valid(line, sep - line, true)) {
     319             :                                 char buf[64], *t;
     320             : 
     321           0 :                                 t = strndupa(line, sep - line);
     322           0 :                                 log_debug("Ignoring invalid field: \"%s\"",
     323             :                                           cellescape(buf, sizeof buf, t));
     324             : 
     325           0 :                                 return 0;
     326             :                         }
     327             : 
     328           7 :                         line[n] = '\0';
     329           7 :                         r = process_special_field(imp, line);
     330           7 :                         if (r != 0)
     331           3 :                                 return r < 0 ? r : 0;
     332             : 
     333           4 :                         r = iovw_put(&imp->iovw, line, n);
     334           4 :                         if (r < 0)
     335           0 :                                 return r;
     336             :                 } else {
     337             :                         /* replace \n with = */
     338           3 :                         line[n-1] = '=';
     339             : 
     340           3 :                         imp->field_len = n;
     341           3 :                         imp->state = IMPORTER_STATE_DATA_START;
     342             : 
     343             :                         /* we cannot put the field in iovec until we have all data */
     344             :                 }
     345             : 
     346           7 :                 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
     347             : 
     348           7 :                 return 0; /* continue */
     349             :         }
     350             : 
     351           3 :         case IMPORTER_STATE_DATA_START:
     352           3 :                 assert(imp->data_size == 0);
     353             : 
     354           3 :                 r = get_data_size(imp);
     355             :                 // log_debug("get_data_size() -> %d", r);
     356           3 :                 if (r < 0)
     357           0 :                         return r;
     358           3 :                 if (r == 0) {
     359           0 :                         imp->state = IMPORTER_STATE_EOF;
     360           0 :                         return 0;
     361             :                 }
     362             : 
     363           6 :                 imp->state = imp->data_size > 0 ?
     364           3 :                         IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
     365             : 
     366           3 :                 return 0; /* continue */
     367             : 
     368           3 :         case IMPORTER_STATE_DATA: {
     369             :                 void *data;
     370             :                 char *field;
     371             : 
     372           3 :                 assert(imp->data_size > 0);
     373             : 
     374           3 :                 r = get_data_data(imp, &data);
     375             :                 // log_debug("get_data_data() -> %d", r);
     376           3 :                 if (r < 0)
     377           0 :                         return r;
     378           3 :                 if (r == 0) {
     379           0 :                         imp->state = IMPORTER_STATE_EOF;
     380           0 :                         return 0;
     381             :                 }
     382             : 
     383           3 :                 assert(data);
     384             : 
     385           3 :                 field = (char*) data - sizeof(uint64_t) - imp->field_len;
     386           3 :                 memmove(field + sizeof(uint64_t), field, imp->field_len);
     387             : 
     388           3 :                 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
     389           3 :                 if (r < 0)
     390           0 :                         return r;
     391             : 
     392           3 :                 imp->state = IMPORTER_STATE_DATA_FINISH;
     393             : 
     394           3 :                 return 0; /* continue */
     395             :         }
     396             : 
     397           3 :         case IMPORTER_STATE_DATA_FINISH:
     398           3 :                 r = get_data_newline(imp);
     399             :                 // log_debug("get_data_newline() -> %d", r);
     400           3 :                 if (r < 0)
     401           0 :                         return r;
     402           3 :                 if (r == 0) {
     403           0 :                         imp->state = IMPORTER_STATE_EOF;
     404           0 :                         return 0;
     405             :                 }
     406             : 
     407           3 :                 imp->data_size = 0;
     408           3 :                 imp->state = IMPORTER_STATE_LINE;
     409             : 
     410           3 :                 return 0; /* continue */
     411           0 :         default:
     412           0 :                 assert_not_reached("wtf?");
     413             :         }
     414             : }
     415             : 
     416           0 : int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
     417           0 :         assert(imp);
     418           0 :         assert(imp->state != IMPORTER_STATE_EOF);
     419             : 
     420           0 :         if (!realloc_buffer(imp, imp->filled + size))
     421           0 :                 return log_error_errno(SYNTHETIC_ERRNO(ENOMEM),
     422             :                                        "Failed to store received data of size %zu "
     423             :                                        "(in addition to existing %zu bytes with %zu filled): %s",
     424             :                                        size, imp->size, imp->filled,
     425             :                                        strerror_safe(ENOMEM));
     426             : 
     427           0 :         memcpy(imp->buf + imp->filled, data, size);
     428           0 :         imp->filled += size;
     429             : 
     430           0 :         return 0;
     431             : }
     432             : 
     433           0 : void journal_importer_drop_iovw(JournalImporter *imp) {
     434             :         size_t remain, target;
     435             : 
     436             :         /* This function drops processed data that along with the iovw that points at it */
     437             : 
     438           0 :         iovw_free_contents(&imp->iovw, false);
     439             : 
     440             :         /* possibly reset buffer position */
     441           0 :         remain = imp->filled - imp->offset;
     442             : 
     443           0 :         if (remain == 0) /* no brainer */
     444           0 :                 imp->offset = imp->scanned = imp->filled = 0;
     445           0 :         else if (imp->offset > imp->size - imp->filled &&
     446           0 :                  imp->offset > remain) {
     447           0 :                 memcpy(imp->buf, imp->buf + imp->offset, remain);
     448           0 :                 imp->offset = imp->scanned = 0;
     449           0 :                 imp->filled = remain;
     450             :         }
     451             : 
     452           0 :         target = imp->size;
     453           0 :         while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
     454           0 :                 target /= 2;
     455           0 :         if (target < imp->size) {
     456             :                 char *tmp;
     457             : 
     458           0 :                 tmp = realloc(imp->buf, target);
     459           0 :                 if (!tmp)
     460           0 :                         log_warning("Failed to reallocate buffer to (smaller) size %zu",
     461             :                                     target);
     462             :                 else {
     463           0 :                         log_debug("Reallocated buffer from %zu to %zu bytes",
     464             :                                   imp->size, target);
     465           0 :                         imp->buf = tmp;
     466           0 :                         imp->size = target;
     467             :                 }
     468             :         }
     469           0 : }
     470             : 
     471          23 : bool journal_importer_eof(const JournalImporter *imp) {
     472          23 :         return imp->state == IMPORTER_STATE_EOF;
     473             : }

Generated by: LCOV version 1.14