andykimpe / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 months ago
Clone
Blob Blame History Raw
From f22fe3da910889ab2530d84b647b5b36b6e7e95f Mon Sep 17 00:00:00 2001
From: tbordaz <tbordaz@redhat.com>
Date: Mon, 14 Dec 2020 10:02:24 +0100
Subject: [PATCH] Issue 4492 - Changelog cache can upload updates from a wrong
 starting point (CSN) (#4493)

Bug description:
          When a replication session starts, a starting point is computed
          according to supplier/consumer RUVs.
	  from the starting point the updates are bulk loaded from the CL.
          When a bulk set have been fully evaluated the server needs to bulk load another set.
	  It iterates until there is no more updates to send.
          The bug is that during bulk load, it recomputes the CL cursor position
          and this computation can be wrong. For example if a new update on
          a rarely updated replica (or not known replica) the new position will
          be set before the inital starting point

Fix description:
          Fixing the invalid computation is a bit risky (complex code resulting from
          years of corner cases handling) and a fix could fail to address others flavor
          with the same symptom
          The fix is only (sorry for that) safety checking fix that would end a replication session
          if the computed cursor position goes before the initial starting point.
	  In case of large jump behind (24h) the starting point, a warning is logged.

relates: https://github.com/389ds/389-ds-base/issues/4492

Reviewed by: Mark Reynolds, William Brown

Platforms tested: F31
---
 ldap/servers/plugins/replication/cl5_api.c    |  6 +-
 .../servers/plugins/replication/cl5_clcache.c | 60 ++++++++++++++++++-
 .../servers/plugins/replication/cl5_clcache.h |  4 +-
 3 files changed, 63 insertions(+), 7 deletions(-)

diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
index 65801bc01..1d6e20b07 100644
--- a/ldap/servers/plugins/replication/cl5_api.c
+++ b/ldap/servers/plugins/replication/cl5_api.c
@@ -143,6 +143,7 @@ struct cl5replayiterator
     ReplicaId consumerRID;  /* consumer's RID */
     const RUV *consumerRuv; /* consumer's update vector                    */
     Object *supplierRuvObj; /* supplier's update vector object          */
+    char starting_csn[CSN_STRSIZE];
 };
 
 typedef struct cl5iterator
@@ -1542,7 +1543,7 @@ cl5GetNextOperationToReplay(CL5ReplayIterator *iterator, CL5Entry *entry)
         return CL5_BAD_DATA;
     }
 
-    rc = clcache_get_next_change(iterator->clcache, (void **)&key, &keylen, (void **)&data, &datalen, &csn);
+    rc = clcache_get_next_change(iterator->clcache, (void **)&key, &keylen, (void **)&data, &datalen, &csn, iterator->starting_csn);
 
     if (rc == DB_NOTFOUND) {
         /*
@@ -5256,7 +5257,7 @@ _cl5PositionCursorForReplay(ReplicaId consumerRID, const RUV *consumerRuv, Objec
     if (rc != 0)
         goto done;
 
-    rc = clcache_load_buffer(clcache, &startCSN, continue_on_missing);
+    rc = clcache_load_buffer(clcache, &startCSN, continue_on_missing, NULL /* retrieving startCSN, no limit enforced on this call */);
 
     if (rc == 0) {
         haveChanges = PR_TRUE;
@@ -5320,6 +5321,7 @@ _cl5PositionCursorForReplay(ReplicaId consumerRID, const RUV *consumerRuv, Objec
         (*iterator)->consumerRID = consumerRID;
         (*iterator)->consumerRuv = consumerRuv;
         (*iterator)->supplierRuvObj = supplierRuvObj;
+        csn_as_string(startCSN, PR_FALSE, (*iterator)->starting_csn);
     } else if (rc == CL5_SUCCESS) {
         /* we have no changes to send */
         rc = CL5_NOTFOUND;
diff --git a/ldap/servers/plugins/replication/cl5_clcache.c b/ldap/servers/plugins/replication/cl5_clcache.c
index a8477a83a..43b7c77d8 100644
--- a/ldap/servers/plugins/replication/cl5_clcache.c
+++ b/ldap/servers/plugins/replication/cl5_clcache.c
@@ -15,6 +15,8 @@
 #include "db.h"    /* Berkeley DB */
 #include "cl5.h"   /* changelog5Config */
 #include "cl5_clcache.h"
+#include "slap.h"
+#include "proto-slap.h"
 
 /* newer bdb uses DB_BUFFER_SMALL instead of ENOMEM as the
    error return if the given buffer in which to load a
@@ -323,14 +325,21 @@ clcache_return_buffer(CLC_Buffer **buf)
  * anchorcsn - passed in for the first load of a replication session;
  * flag         - DB_SET to load in the key CSN record.
  *                DB_NEXT to load in the records greater than key CSN.
+ * initial_starting_csn
+ *              This is the starting_csn computed at the beginning of
+ *              the replication session. It never change during a session
+ *              (aka iterator creation).
+ *              This is used for safety checking that the next CSN use
+ *              for bulk load is not before the initial csn
  * return    - DB error code instead of cl5 one because of the
  *               historic reason.
  */
 int
-clcache_load_buffer(CLC_Buffer *buf, CSN **anchorCSN, int *continue_on_miss)
+clcache_load_buffer(CLC_Buffer *buf, CSN **anchorCSN, int *continue_on_miss, char *initial_starting_csn)
 {
     int rc = 0;
     int flag = DB_NEXT;
+    CSN limit_csn = {0};
 
     if (anchorCSN)
         *anchorCSN = NULL;
@@ -343,6 +352,30 @@ clcache_load_buffer(CLC_Buffer *buf, CSN **anchorCSN, int *continue_on_miss)
         rc = clcache_adjust_anchorcsn(buf, &flag);
     }
 
+    /* safety checking, we do not want to (re)start replication before
+     * the inital computed starting point
+     */
+    if (initial_starting_csn) {
+        csn_init_by_string(&limit_csn, initial_starting_csn);
+        if (csn_compare(&limit_csn, buf->buf_current_csn) > 0) {
+            char curr[CSN_STRSIZE];
+            int loglevel = SLAPI_LOG_REPL;
+
+            if (csn_time_difference(&limit_csn, buf->buf_current_csn) > (24 * 60 * 60)) {
+                /* This is a big jump (more than a day) behind the
+                 * initial starting csn. Log a warning before ending
+                 * the session
+                 */
+                loglevel = SLAPI_LOG_WARNING;
+            }
+            csn_as_string(buf->buf_current_csn, 0, curr);
+            slapi_log_err(loglevel, buf->buf_agmt_name,
+                      "clcache_load_buffer - bulk load cursor (%s) is lower than starting csn %s. Ending session.\n", curr, initial_starting_csn);
+            /* it just end the session with UPDATE_NO_MORE_UPDATES */
+            rc = CLC_STATE_DONE;
+        }
+    }
+
     if (rc == 0) {
 
         buf->buf_state = CLC_STATE_READY;
@@ -365,6 +398,27 @@ clcache_load_buffer(CLC_Buffer *buf, CSN **anchorCSN, int *continue_on_miss)
             }
             /* the use of alternative start csns can be limited, record its usage */
             (*continue_on_miss)--;
+
+            if (initial_starting_csn) {
+                if (csn_compare(&limit_csn, buf->buf_current_csn) > 0) {
+                    char curr[CSN_STRSIZE];
+                    int loglevel = SLAPI_LOG_REPL;
+
+                    if (csn_time_difference(&limit_csn, buf->buf_current_csn) > (24 * 60 * 60)) {
+                        /* This is a big jump (more than a day) behind the
+                         * initial starting csn. Log a warning before ending
+                         * the session
+                         */
+                        loglevel = SLAPI_LOG_WARNING;
+                    }
+                    csn_as_string(buf->buf_current_csn, 0, curr);
+                    slapi_log_err(loglevel, buf->buf_agmt_name,
+                            "clcache_load_buffer - (DB_SET_RANGE) bulk load cursor (%s) is lower than starting csn %s. Ending session.\n", curr, initial_starting_csn);
+                    rc = DB_NOTFOUND;
+
+                    return rc;
+                }
+            }
         }
         /* Reset some flag variables */
         if (rc == 0) {
@@ -492,7 +546,7 @@ retry:
  * *data: output - data of the next change, or NULL if no more change
  */
 int
-clcache_get_next_change(CLC_Buffer *buf, void **key, size_t *keylen, void **data, size_t *datalen, CSN **csn)
+clcache_get_next_change(CLC_Buffer *buf, void **key, size_t *keylen, void **data, size_t *datalen, CSN **csn, char *initial_starting_csn)
 {
     int skip = 1;
     int rc = 0;
@@ -510,7 +564,7 @@ clcache_get_next_change(CLC_Buffer *buf, void **key, size_t *keylen, void **data
          * We're done with the current buffer. Now load the next chunk.
          */
         if (NULL == *key && CLC_STATE_READY == buf->buf_state) {
-            rc = clcache_load_buffer(buf, NULL, NULL);
+            rc = clcache_load_buffer(buf, NULL, NULL, initial_starting_csn);
             if (0 == rc && buf->buf_record_ptr) {
                 DB_MULTIPLE_KEY_NEXT(buf->buf_record_ptr, &buf->buf_data,
                                      *key, *keylen, *data, *datalen);
diff --git a/ldap/servers/plugins/replication/cl5_clcache.h b/ldap/servers/plugins/replication/cl5_clcache.h
index 73eb41590..16d53d563 100644
--- a/ldap/servers/plugins/replication/cl5_clcache.h
+++ b/ldap/servers/plugins/replication/cl5_clcache.h
@@ -23,9 +23,9 @@ typedef struct clc_buffer CLC_Buffer;
 int clcache_init(DB_ENV **dbenv);
 void clcache_set_config(void);
 int clcache_get_buffer(CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV *consumer_ruv, const RUV *local_ruv);
-int clcache_load_buffer(CLC_Buffer *buf, CSN **anchorCSN, int *continue_on_miss);
+int clcache_load_buffer(CLC_Buffer *buf, CSN **anchorCSN, int *continue_on_miss, char *initial_starting_csn);
 void clcache_return_buffer(CLC_Buffer **buf);
-int clcache_get_next_change(CLC_Buffer *buf, void **key, size_t *keylen, void **data, size_t *datalen, CSN **csn);
+int clcache_get_next_change(CLC_Buffer *buf, void **key, size_t *keylen, void **data, size_t *datalen, CSN **csn, char *initial_starting_csn);
 void clcache_destroy(void);
 
 #endif
-- 
2.26.2