Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1+ */ 2 : 3 : #include "alloc-util.h" 4 : #include "fd-util.h" 5 : #include "journal-remote-parse.h" 6 : #include "journald-native.h" 7 : #include "parse-util.h" 8 : #include "string-util.h" 9 : 10 0 : void source_free(RemoteSource *source) { 11 0 : if (!source) 12 0 : return; 13 : 14 0 : journal_importer_cleanup(&source->importer); 15 : 16 0 : log_debug("Writer ref count %i", source->writer->n_ref); 17 0 : writer_unref(source->writer); 18 : 19 0 : sd_event_source_unref(source->event); 20 0 : sd_event_source_unref(source->buffer_event); 21 : 22 0 : free(source); 23 : } 24 : 25 : /** 26 : * Initialize zero-filled source with given values. On success, takes 27 : * ownership of fd, name, and writer, otherwise does not touch them. 28 : */ 29 0 : RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) { 30 : RemoteSource *source; 31 : 32 0 : log_debug("Creating source for %sfd:%d (%s)", 33 : passive_fd ? "passive " : "", fd, name); 34 : 35 0 : assert(fd >= 0); 36 : 37 0 : source = new0(RemoteSource, 1); 38 0 : if (!source) 39 0 : return NULL; 40 : 41 0 : source->importer = JOURNAL_IMPORTER_MAKE(fd); 42 0 : source->importer.passive_fd = passive_fd; 43 0 : source->importer.name = name; 44 : 45 0 : source->writer = writer; 46 : 47 0 : return source; 48 : } 49 : 50 0 : int process_source(RemoteSource *source, bool compress, bool seal) { 51 : int r; 52 : 53 0 : assert(source); 54 0 : assert(source->writer); 55 : 56 0 : r = journal_importer_process_data(&source->importer); 57 0 : if (r <= 0) 58 0 : return r; 59 : 60 : /* We have a full event */ 61 0 : log_trace("Received full event from source@%p fd:%d (%s)", 62 : source, source->importer.fd, source->importer.name); 63 : 64 0 : if (source->importer.iovw.count == 0) { 65 0 : log_warning("Entry with no payload, skipping"); 66 0 : goto freeing; 67 : } 68 : 69 0 : assert(source->importer.iovw.iovec); 70 : 71 0 : r = writer_write(source->writer, 72 : &source->importer.iovw, 73 : &source->importer.ts, 74 : &source->importer.boot_id, 75 : compress, seal); 76 0 : if (r == -EBADMSG) { 77 0 : log_error_errno(r, "Entry is invalid, ignoring."); 78 0 : r = 0; 79 0 : } else if (r < 0) 80 0 : log_error_errno(r, "Failed to write entry of %zu bytes: %m", 81 : iovw_size(&source->importer.iovw)); 82 : else 83 0 : r = 1; 84 : 85 0 : freeing: 86 0 : journal_importer_drop_iovw(&source->importer); 87 0 : return r; 88 : }