LCOV - code coverage report
Current view: top level - journal-remote - journal-remote.c (source / functions) Hit Total Coverage
Test: main_coverage.info Lines: 13 254 5.1 %
Date: 2019-08-22 15:41:25 Functions: 1 16 6.2 %

          Line data    Source code
       1             : /* SPDX-License-Identifier: LGPL-2.1+ */
       2             : 
       3             : #include <errno.h>
       4             : #include <fcntl.h>
       5             : #include <stdio.h>
       6             : #include <stdlib.h>
       7             : #include <string.h>
       8             : #include <sys/prctl.h>
       9             : #include <sys/socket.h>
      10             : #include <stdint.h>
      11             : 
      12             : #include "sd-daemon.h"
      13             : 
      14             : #include "alloc-util.h"
      15             : #include "def.h"
      16             : #include "errno-util.h"
      17             : #include "escape.h"
      18             : #include "fd-util.h"
      19             : #include "journal-file.h"
      20             : #include "journal-remote-write.h"
      21             : #include "journal-remote.h"
      22             : #include "journald-native.h"
      23             : #include "macro.h"
      24             : #include "parse-util.h"
      25             : #include "process-util.h"
      26             : #include "socket-util.h"
      27             : #include "stdio-util.h"
      28             : #include "string-util.h"
      29             : #include "strv.h"
      30             : 
      31             : #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
      32             : 
      33             : #define filename_escape(s) xescape((s), "/ ")
      34             : 
      35           0 : static int open_output(RemoteServer *s, Writer *w, const char* host) {
      36           0 :         _cleanup_free_ char *_filename = NULL;
      37             :         const char *filename;
      38             :         int r;
      39             : 
      40           0 :         switch (s->split_mode) {
      41           0 :         case JOURNAL_WRITE_SPLIT_NONE:
      42           0 :                 filename = s->output;
      43           0 :                 break;
      44             : 
      45           0 :         case JOURNAL_WRITE_SPLIT_HOST: {
      46           0 :                 _cleanup_free_ char *name;
      47             : 
      48           0 :                 assert(host);
      49             : 
      50           0 :                 name = filename_escape(host);
      51           0 :                 if (!name)
      52           0 :                         return log_oom();
      53             : 
      54           0 :                 r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
      55           0 :                 if (r < 0)
      56           0 :                         return log_oom();
      57             : 
      58           0 :                 filename = _filename;
      59           0 :                 break;
      60             :         }
      61             : 
      62           0 :         default:
      63           0 :                 assert_not_reached("what?");
      64             :         }
      65             : 
      66           0 :         r = journal_file_open_reliably(filename,
      67             :                                        O_RDWR|O_CREAT, 0640,
      68           0 :                                        s->compress, (uint64_t) -1, s->seal,
      69             :                                        &w->metrics,
      70             :                                        w->mmap, NULL,
      71             :                                        NULL, &w->journal);
      72           0 :         if (r < 0)
      73           0 :                 return log_error_errno(r, "Failed to open output journal %s: %m", filename);
      74             : 
      75           0 :         log_debug("Opened output file %s", w->journal->path);
      76           0 :         return 0;
      77             : }
      78             : 
      79             : /**********************************************************************
      80             :  **********************************************************************
      81             :  **********************************************************************/
      82             : 
      83           0 : static int init_writer_hashmap(RemoteServer *s) {
      84             :         static const struct hash_ops* const hash_ops[] = {
      85             :                 [JOURNAL_WRITE_SPLIT_NONE] = NULL,
      86             :                 [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
      87             :         };
      88             : 
      89           0 :         assert(s);
      90           0 :         assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
      91             : 
      92           0 :         s->writers = hashmap_new(hash_ops[s->split_mode]);
      93           0 :         if (!s->writers)
      94           0 :                 return log_oom();
      95             : 
      96           0 :         return 0;
      97             : }
      98             : 
      99           0 : int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
     100           0 :         _cleanup_(writer_unrefp) Writer *w = NULL;
     101             :         const void *key;
     102             :         int r;
     103             : 
     104           0 :         switch(s->split_mode) {
     105           0 :         case JOURNAL_WRITE_SPLIT_NONE:
     106           0 :                 key = "one and only";
     107           0 :                 break;
     108             : 
     109           0 :         case JOURNAL_WRITE_SPLIT_HOST:
     110           0 :                 assert(host);
     111           0 :                 key = host;
     112           0 :                 break;
     113             : 
     114           0 :         default:
     115           0 :                 assert_not_reached("what split mode?");
     116             :         }
     117             : 
     118           0 :         w = hashmap_get(s->writers, key);
     119           0 :         if (w)
     120           0 :                 writer_ref(w);
     121             :         else {
     122           0 :                 w = writer_new(s);
     123           0 :                 if (!w)
     124           0 :                         return log_oom();
     125             : 
     126           0 :                 if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
     127           0 :                         w->hashmap_key = strdup(key);
     128           0 :                         if (!w->hashmap_key)
     129           0 :                                 return log_oom();
     130             :                 }
     131             : 
     132           0 :                 r = open_output(s, w, host);
     133           0 :                 if (r < 0)
     134           0 :                         return r;
     135             : 
     136           0 :                 r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
     137           0 :                 if (r < 0)
     138           0 :                         return r;
     139             :         }
     140             : 
     141           0 :         *writer = TAKE_PTR(w);
     142             : 
     143           0 :         return 0;
     144             : }
     145             : 
     146             : /**********************************************************************
     147             :  **********************************************************************
     148             :  **********************************************************************/
     149             : 
     150             : /* This should go away as soon as µhttpd allows state to be passed around. */
     151             : RemoteServer *journal_remote_server_global;
     152             : 
     153             : static int dispatch_raw_source_event(sd_event_source *event,
     154             :                                      int fd,
     155             :                                      uint32_t revents,
     156             :                                      void *userdata);
     157             : static int dispatch_raw_source_until_block(sd_event_source *event,
     158             :                                            void *userdata);
     159             : static int dispatch_blocking_source_event(sd_event_source *event,
     160             :                                           void *userdata);
     161             : static int dispatch_raw_connection_event(sd_event_source *event,
     162             :                                          int fd,
     163             :                                          uint32_t revents,
     164             :                                          void *userdata);
     165             : 
     166           0 : static int get_source_for_fd(RemoteServer *s,
     167             :                              int fd, char *name, RemoteSource **source) {
     168             :         Writer *writer;
     169             :         int r;
     170             : 
     171             :         /* This takes ownership of name, but only on success. */
     172             : 
     173           0 :         assert(fd >= 0);
     174           0 :         assert(source);
     175             : 
     176           0 :         if (!GREEDY_REALLOC0(s->sources, s->sources_size, fd + 1))
     177           0 :                 return log_oom();
     178             : 
     179           0 :         r = journal_remote_get_writer(s, name, &writer);
     180           0 :         if (r < 0)
     181           0 :                 return log_warning_errno(r, "Failed to get writer for source %s: %m",
     182             :                                          name);
     183             : 
     184           0 :         if (!s->sources[fd]) {
     185           0 :                 s->sources[fd] = source_new(fd, false, name, writer);
     186           0 :                 if (!s->sources[fd]) {
     187           0 :                         writer_unref(writer);
     188           0 :                         return log_oom();
     189             :                 }
     190             : 
     191           0 :                 s->active++;
     192             :         }
     193             : 
     194           0 :         *source = s->sources[fd];
     195           0 :         return 0;
     196             : }
     197             : 
     198           0 : static int remove_source(RemoteServer *s, int fd) {
     199             :         RemoteSource *source;
     200             : 
     201           0 :         assert(s);
     202           0 :         assert(fd >= 0 && fd < (ssize_t) s->sources_size);
     203             : 
     204           0 :         source = s->sources[fd];
     205           0 :         if (source) {
     206             :                 /* this closes fd too */
     207           0 :                 source_free(source);
     208           0 :                 s->sources[fd] = NULL;
     209           0 :                 s->active--;
     210             :         }
     211             : 
     212           0 :         return 0;
     213             : }
     214             : 
     215           0 : int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
     216           0 :         RemoteSource *source = NULL;
     217             :         int r;
     218             : 
     219             :         /* This takes ownership of name, even on failure, if own_name is true. */
     220             : 
     221           0 :         assert(s);
     222           0 :         assert(fd >= 0);
     223           0 :         assert(name);
     224             : 
     225           0 :         if (!own_name) {
     226           0 :                 name = strdup(name);
     227           0 :                 if (!name)
     228           0 :                         return log_oom();
     229             :         }
     230             : 
     231           0 :         r = get_source_for_fd(s, fd, name, &source);
     232           0 :         if (r < 0) {
     233           0 :                 log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
     234             :                                 fd, name);
     235           0 :                 free(name);
     236           0 :                 return r;
     237             :         }
     238             : 
     239           0 :         r = sd_event_add_io(s->events, &source->event,
     240             :                             fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
     241             :                             dispatch_raw_source_event, source);
     242           0 :         if (r == 0) {
     243             :                 /* Add additional source for buffer processing. It will be
     244             :                  * enabled later. */
     245           0 :                 r = sd_event_add_defer(s->events, &source->buffer_event,
     246             :                                        dispatch_raw_source_until_block, source);
     247           0 :                 if (r == 0)
     248           0 :                         sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
     249           0 :         } else if (r == -EPERM) {
     250           0 :                 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
     251           0 :                 r = sd_event_add_defer(s->events, &source->event,
     252             :                                        dispatch_blocking_source_event, source);
     253           0 :                 if (r == 0)
     254           0 :                         sd_event_source_set_enabled(source->event, SD_EVENT_ON);
     255             :         }
     256           0 :         if (r < 0) {
     257           0 :                 log_error_errno(r, "Failed to register event source for fd:%d: %m",
     258             :                                 fd);
     259           0 :                 goto error;
     260             :         }
     261             : 
     262           0 :         r = sd_event_source_set_description(source->event, name);
     263           0 :         if (r < 0) {
     264           0 :                 log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
     265           0 :                 goto error;
     266             :         }
     267             : 
     268           0 :         return 1; /* work to do */
     269             : 
     270           0 :  error:
     271           0 :         remove_source(s, fd);
     272           0 :         return r;
     273             : }
     274             : 
     275           0 : int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
     276             :         int r;
     277           0 :         _cleanup_close_ int fd_ = fd;
     278             :         char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
     279             : 
     280           0 :         assert(fd >= 0);
     281             : 
     282           0 :         r = sd_event_add_io(s->events, &s->listen_event,
     283             :                             fd, EPOLLIN,
     284             :                             dispatch_raw_connection_event, s);
     285           0 :         if (r < 0)
     286           0 :                 return r;
     287             : 
     288           0 :         xsprintf(name, "raw-socket-%d", fd);
     289             : 
     290           0 :         r = sd_event_source_set_description(s->listen_event, name);
     291           0 :         if (r < 0)
     292           0 :                 return r;
     293             : 
     294           0 :         fd_ = -1;
     295           0 :         s->active++;
     296           0 :         return 0;
     297             : }
     298             : 
     299             : /**********************************************************************
     300             :  **********************************************************************
     301             :  **********************************************************************/
     302             : 
     303           0 : int journal_remote_server_init(
     304             :                 RemoteServer *s,
     305             :                 const char *output,
     306             :                 JournalWriteSplitMode split_mode,
     307             :                 bool compress,
     308             :                 bool seal) {
     309             : 
     310             :         int r;
     311             : 
     312           0 :         assert(s);
     313             : 
     314           0 :         assert(journal_remote_server_global == NULL);
     315           0 :         journal_remote_server_global = s;
     316             : 
     317           0 :         s->split_mode = split_mode;
     318           0 :         s->compress = compress;
     319           0 :         s->seal = seal;
     320             : 
     321           0 :         if (output)
     322           0 :                 s->output = output;
     323           0 :         else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
     324           0 :                 s->output = REMOTE_JOURNAL_PATH "/remote.journal";
     325           0 :         else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
     326           0 :                 s->output = REMOTE_JOURNAL_PATH;
     327             :         else
     328           0 :                 assert_not_reached("bad split mode");
     329             : 
     330           0 :         r = sd_event_default(&s->events);
     331           0 :         if (r < 0)
     332           0 :                 return log_error_errno(r, "Failed to allocate event loop: %m");
     333             : 
     334           0 :         r = init_writer_hashmap(s);
     335           0 :         if (r < 0)
     336           0 :                 return r;
     337             : 
     338           0 :         return 0;
     339             : }
     340             : 
     341             : #if HAVE_MICROHTTPD
     342           0 : static void MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
     343           0 :         MHD_stop_daemon(d->daemon);
     344           0 :         sd_event_source_unref(d->io_event);
     345           0 :         sd_event_source_unref(d->timer_event);
     346           0 :         free(d);
     347           0 : }
     348             : #endif
     349             : 
     350           4 : void journal_remote_server_destroy(RemoteServer *s) {
     351             :         size_t i;
     352             : 
     353             : #if HAVE_MICROHTTPD
     354           4 :         hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
     355             : #endif
     356             : 
     357           4 :         assert(s->sources_size == 0 || s->sources);
     358           4 :         for (i = 0; i < s->sources_size; i++)
     359           0 :                 remove_source(s, i);
     360           4 :         free(s->sources);
     361             : 
     362           4 :         writer_unref(s->_single_writer);
     363           4 :         hashmap_free(s->writers);
     364             : 
     365           4 :         sd_event_source_unref(s->sigterm_event);
     366           4 :         sd_event_source_unref(s->sigint_event);
     367           4 :         sd_event_source_unref(s->listen_event);
     368           4 :         sd_event_unref(s->events);
     369             : 
     370           4 :         if (s == journal_remote_server_global)
     371           0 :                 journal_remote_server_global = NULL;
     372             : 
     373             :         /* fds that we're listening on remain open... */
     374           4 : }
     375             : 
     376             : /**********************************************************************
     377             :  **********************************************************************
     378             :  **********************************************************************/
     379             : 
     380           0 : int journal_remote_handle_raw_source(
     381             :                 sd_event_source *event,
     382             :                 int fd,
     383             :                 uint32_t revents,
     384             :                 RemoteServer *s) {
     385             : 
     386             :         RemoteSource *source;
     387             :         int r;
     388             : 
     389             :         /* Returns 1 if there might be more data pending,
     390             :          * 0 if data is currently exhausted, negative on error.
     391             :          */
     392             : 
     393           0 :         assert(fd >= 0 && fd < (ssize_t) s->sources_size);
     394           0 :         source = s->sources[fd];
     395           0 :         assert(source->importer.fd == fd);
     396             : 
     397           0 :         r = process_source(source, s->compress, s->seal);
     398           0 :         if (journal_importer_eof(&source->importer)) {
     399             :                 size_t remaining;
     400             : 
     401           0 :                 log_debug("EOF reached with source %s (fd=%d)",
     402             :                           source->importer.name, source->importer.fd);
     403             : 
     404           0 :                 remaining = journal_importer_bytes_remaining(&source->importer);
     405           0 :                 if (remaining > 0)
     406           0 :                         log_notice("Premature EOF. %zu bytes lost.", remaining);
     407           0 :                 remove_source(s, source->importer.fd);
     408           0 :                 log_debug("%zu active sources remaining", s->active);
     409           0 :                 return 0;
     410           0 :         } else if (r == -E2BIG) {
     411           0 :                 log_notice("Entry with too many fields, skipped");
     412           0 :                 return 1;
     413           0 :         } else if (r == -ENOBUFS) {
     414           0 :                 log_notice("Entry too big, skipped");
     415           0 :                 return 1;
     416           0 :         } else if (r == -EAGAIN) {
     417           0 :                 return 0;
     418           0 :         } else if (r < 0) {
     419           0 :                 log_debug_errno(r, "Closing connection: %m");
     420           0 :                 remove_source(s, fd);
     421           0 :                 return 0;
     422             :         } else
     423           0 :                 return 1;
     424             : }
     425             : 
     426           0 : static int dispatch_raw_source_until_block(sd_event_source *event,
     427             :                                            void *userdata) {
     428           0 :         RemoteSource *source = userdata;
     429             :         int r;
     430             : 
     431             :         /* Make sure event stays around even if source is destroyed */
     432           0 :         sd_event_source_ref(event);
     433             : 
     434           0 :         r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
     435           0 :         if (r != 1)
     436             :                 /* No more data for now */
     437           0 :                 sd_event_source_set_enabled(event, SD_EVENT_OFF);
     438             : 
     439           0 :         sd_event_source_unref(event);
     440             : 
     441           0 :         return r;
     442             : }
     443             : 
     444           0 : static int dispatch_raw_source_event(sd_event_source *event,
     445             :                                      int fd,
     446             :                                      uint32_t revents,
     447             :                                      void *userdata) {
     448           0 :         RemoteSource *source = userdata;
     449             :         int r;
     450             : 
     451           0 :         assert(source->event);
     452           0 :         assert(source->buffer_event);
     453             : 
     454           0 :         r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
     455           0 :         if (r == 1)
     456             :                 /* Might have more data. We need to rerun the handler
     457             :                  * until we are sure the buffer is exhausted. */
     458           0 :                 sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
     459             : 
     460           0 :         return r;
     461             : }
     462             : 
     463           0 : static int dispatch_blocking_source_event(sd_event_source *event,
     464             :                                           void *userdata) {
     465           0 :         RemoteSource *source = userdata;
     466             : 
     467           0 :         return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
     468             : }
     469             : 
     470           0 : static int accept_connection(
     471             :                 const char* type,
     472             :                 int fd,
     473             :                 SocketAddress *addr,
     474             :                 char **hostname) {
     475             : 
     476           0 :         _cleanup_close_ int fd2 = -1;
     477             :         int r;
     478             : 
     479           0 :         log_debug("Accepting new %s connection on fd:%d", type, fd);
     480           0 :         fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
     481           0 :         if (fd2 < 0) {
     482           0 :                 if (ERRNO_IS_ACCEPT_AGAIN(errno))
     483           0 :                         return -EAGAIN;
     484             : 
     485           0 :                 return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
     486             :         }
     487             : 
     488           0 :         switch(socket_address_family(addr)) {
     489           0 :         case AF_INET:
     490             :         case AF_INET6: {
     491           0 :                 _cleanup_free_ char *a = NULL;
     492             :                 char *b;
     493             : 
     494           0 :                 r = socket_address_print(addr, &a);
     495           0 :                 if (r < 0)
     496           0 :                         return log_error_errno(r, "socket_address_print(): %m");
     497             : 
     498           0 :                 r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b);
     499           0 :                 if (r < 0)
     500           0 :                         return log_error_errno(r, "Resolving hostname failed: %m");
     501             : 
     502           0 :                 log_debug("Accepted %s %s connection from %s",
     503             :                           type,
     504             :                           socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
     505             :                           a);
     506             : 
     507           0 :                 *hostname = b;
     508           0 :                 return TAKE_FD(fd2);
     509             :         }
     510             : 
     511           0 :         default:
     512           0 :                 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
     513             :                                        "Rejected %s connection with unsupported family %d",
     514             :                                        type, socket_address_family(addr));
     515             :         }
     516             : }
     517             : 
     518           0 : static int dispatch_raw_connection_event(
     519             :                 sd_event_source *event,
     520             :                 int fd,
     521             :                 uint32_t revents,
     522             :                 void *userdata) {
     523             : 
     524           0 :         RemoteServer *s = userdata;
     525             :         int fd2;
     526           0 :         SocketAddress addr = {
     527             :                 .size = sizeof(union sockaddr_union),
     528             :                 .type = SOCK_STREAM,
     529             :         };
     530           0 :         char *hostname = NULL;
     531             : 
     532           0 :         fd2 = accept_connection("raw", fd, &addr, &hostname);
     533           0 :         if (fd2 == -EAGAIN)
     534           0 :                 return 0;
     535           0 :         if (fd2 < 0)
     536           0 :                 return fd2;
     537             : 
     538           0 :         return journal_remote_add_source(s, fd2, hostname, true);
     539             : }

Generated by: LCOV version 1.14