Blob Blame History Raw
From 538bd9b42dabf1145cf7ab77f32347d516b01e88 Mon Sep 17 00:00:00 2001
From: Lennart Poettering <lennart@poettering.net>
Date: Tue, 12 May 2020 18:56:34 +0200
Subject: [PATCH] journald: rework pid change handling

Let's introduce an explicit line ending marker for line endings due to
pid change.

Let's also make sure we don't get confused with buffer management.

Fixes: #15654
(cherry picked from commit 45ba1ea5e9264d385fa565328fe957ef1d78caa1)

Resolves: #2029426
---
 src/journal/journald-stream.c | 99 +++++++++++++++++++++++------------
 1 file changed, 66 insertions(+), 33 deletions(-)

diff --git a/src/journal/journald-stream.c b/src/journal/journald-stream.c
index ab1a855943..5be5b0939c 100644
--- a/src/journal/journald-stream.c
+++ b/src/journal/journald-stream.c
@@ -54,6 +54,7 @@ typedef enum LineBreak {
         LINE_BREAK_NUL,
         LINE_BREAK_LINE_MAX,
         LINE_BREAK_EOF,
+        LINE_BREAK_PID_CHANGE,
         _LINE_BREAK_MAX,
         _LINE_BREAK_INVALID = -1,
 } LineBreak;
@@ -310,6 +311,7 @@ static int stdout_stream_log(
                 [LINE_BREAK_NUL]        = "_LINE_BREAK=nul",
                 [LINE_BREAK_LINE_MAX]   = "_LINE_BREAK=line-max",
                 [LINE_BREAK_EOF]        = "_LINE_BREAK=eof",
+                [LINE_BREAK_PID_CHANGE] = "_LINE_BREAK=pid-change",
         };
 
         const char *c = line_break_field_table[line_break];
@@ -431,21 +433,43 @@ static int stdout_stream_line(StdoutStream *s, char *p, LineBreak line_break) {
         assert_not_reached("Unknown stream state");
 }
 
-static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
-        char *p;
-        size_t remaining;
+static int stdout_stream_found(
+                StdoutStream *s,
+                char *p,
+                size_t l,
+                LineBreak line_break) {
+
+        char saved;
         int r;
 
         assert(s);
+        assert(p);
+
+        /* Let's NUL terminate the specified buffer for this call, and revert back afterwards */
+        saved = p[l];
+        p[l] = 0;
+        r = stdout_stream_line(s, p, line_break);
+        p[l] = saved;
 
-        p = s->buffer;
-        remaining = s->length;
+        return r;
+}
+
+static int stdout_stream_scan(
+                StdoutStream *s,
+                char *p,
+                size_t remaining,
+                LineBreak force_flush,
+                size_t *ret_consumed) {
 
-        /* XXX: This function does nothing if (s->length == 0) */
+        size_t consumed = 0;
+        int r;
+
+        assert(s);
+        assert(p);
 
         for (;;) {
                 LineBreak line_break;
-                size_t skip;
+                size_t skip, found;
                 char *end1, *end2;
 
                 end1 = memchr(p, '\n', remaining);
@@ -453,43 +477,40 @@ static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
 
                 if (end2) {
                         /* We found a NUL terminator */
-                        skip = end2 - p + 1;
+                        found = end2 - p;
+                        skip = found + 1;
                         line_break = LINE_BREAK_NUL;
                 } else if (end1) {
                         /* We found a \n terminator */
-                        *end1 = 0;
-                        skip = end1 - p + 1;
+                        found = end1 - p;
+                        skip = found + 1;
                         line_break = LINE_BREAK_NEWLINE;
                 } else if (remaining >= s->server->line_max) {
                         /* Force a line break after the maximum line length */
-                        *(p + s->server->line_max) = 0;
-                        skip = remaining;
+                        found = skip = s->server->line_max;
                         line_break = LINE_BREAK_LINE_MAX;
                 } else
                         break;
 
-                r = stdout_stream_line(s, p, line_break);
+                r = stdout_stream_found(s, p, found, line_break);
                 if (r < 0)
                         return r;
 
-                remaining -= skip;
                 p += skip;
+                consumed += skip;
+                remaining -= skip;
         }
 
-        if (force_flush && remaining > 0) {
-                p[remaining] = 0;
-                r = stdout_stream_line(s, p, LINE_BREAK_EOF);
+        if (force_flush >= 0 && remaining > 0) {
+                r = stdout_stream_found(s, p, remaining, force_flush);
                 if (r < 0)
                         return r;
 
-                p += remaining;
-                remaining = 0;
+                consumed += remaining;
         }
 
-        if (p > s->buffer) {
-                memmove(s->buffer, p, remaining);
-                s->length = remaining;
-        }
+        if (ret_consumed)
+                *ret_consumed = consumed;
 
         return 0;
 }
@@ -497,11 +518,12 @@ static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
         uint8_t buf[CMSG_SPACE(sizeof(struct ucred))];
         StdoutStream *s = userdata;
+        size_t limit, consumed;
         struct ucred *ucred = NULL;
         struct cmsghdr *cmsg;
         struct iovec iovec;
-        size_t limit;
         ssize_t l;
+        char *p;
         int r;
 
         struct msghdr msghdr = {
@@ -529,7 +551,7 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
         /* Try to make use of the allocated buffer in full, but never read more than the configured line size. Also,
          * always leave room for a terminating NUL we might need to add. */
         limit = MIN(s->allocated - 1, s->server->line_max);
-
+        assert(s->length <= limit);
         iovec = IOVEC_MAKE(s->buffer + s->length, limit - s->length);
 
         l = recvmsg(s->fd, &msghdr, MSG_DONTWAIT|MSG_CMSG_CLOEXEC);
@@ -543,7 +565,7 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
         cmsg_close_all(&msghdr);
 
         if (l == 0) {
-                stdout_stream_scan(s, true);
+                (void) stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_EOF, NULL);
                 goto terminate;
         }
 
@@ -562,22 +584,33 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
          * in the meantime.
          */
         if (ucred && ucred->pid != s->ucred.pid) {
-                /* force out any previously half-written lines from a
-                 * different process, before we switch to the new ucred
-                 * structure for everything we just added */
-                r = stdout_stream_scan(s, true);
+                /* Force out any previously half-written lines from a different process, before we switch to
+                 * the new ucred structure for everything we just added */
+                r = stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_PID_CHANGE, NULL);
                 if (r < 0)
                         goto terminate;
 
-                s->ucred = *ucred;
                 s->context = client_context_release(s->server, s->context);
+
+                p = s->buffer + s->length;
+        } else {
+                p = s->buffer;
+                l += s->length;
         }
 
-        s->length += l;
-        r = stdout_stream_scan(s, false);
+        /* Always copy in the new credentials */
+        if (ucred)
+                s->ucred = *ucred;
+
+        r = stdout_stream_scan(s, p, l, _LINE_BREAK_INVALID, &consumed);
         if (r < 0)
                 goto terminate;
 
+        /* Move what wasn't consumed to the front of the buffer */
+        assert(consumed <= (size_t) l);
+        s->length = l - consumed;
+        memmove(s->buffer, p + consumed, s->length);
+
         return 1;
 
 terminate: