andykimpe / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 months ago
Clone
dc8c34
From 42b9a6cf96af656441ed0a6e60d044254ffc81ba Mon Sep 17 00:00:00 2001
dc8c34
From: Ludwig Krispenz <lkrispen@redhat.com>
dc8c34
Date: Wed, 8 Jun 2016 11:28:07 +0200
dc8c34
Subject: [PATCH 386/386] Ticket 48766 - Replication changelog can incorrectly
dc8c34
 skip over updates
dc8c34
dc8c34
Bug Description:
dc8c34
      The changelog iterator uses a buffer to load and send changes, when the buffer is empty
dc8c34
      there were scenarios when the straing point for reloading the buffer was incorrectly set
dc8c34
      and changes were skipped
dc8c34
dc8c34
Fix Description: reworked clcach buffer code following design at
dc8c34
      http://www.port389.org/docs/389ds/design/changelog-processing-in-repl-state-sending-updates.html
dc8c34
dc8c34
https://fedorahosted.org/389/ticket/48766
dc8c34
dc8c34
Reviewed by: Mark and Thierry, thanks
dc8c34
dc8c34
(cherry picked from commit b08df71aa9eb18572f58e55e8d6b9ef7fe181773)
dc8c34
(cherry picked from commit ec15a75ccdba713e4d74dcd760e3244ba43b6191)
dc8c34
(cherry picked from commit 2acffca091d2d97b662a28e2ac6319f6ad2b4053)
dc8c34
(cherry picked from commit e0e9f4916ee750c3b6cbffb3f107e4d49e48605c)
dc8c34
---
dc8c34
 ldap/servers/plugins/replication/cl5_api.c     | 171 +++------------
dc8c34
 ldap/servers/plugins/replication/cl5_clcache.c | 292 +++++++++++++++----------
dc8c34
 ldap/servers/plugins/replication/cl5_clcache.h |   2 +-
dc8c34
 3 files changed, 214 insertions(+), 251 deletions(-)
dc8c34
dc8c34
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
dc8c34
index 13bf203..259c054 100644
dc8c34
--- a/ldap/servers/plugins/replication/cl5_api.c
dc8c34
+++ b/ldap/servers/plugins/replication/cl5_api.c
dc8c34
@@ -5422,18 +5422,13 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
dc8c34
 {
dc8c34
 	CLC_Buffer *clcache = NULL;
dc8c34
 	CL5DBFile *file;
dc8c34
-    int i;
dc8c34
-    CSN **csns = NULL;
dc8c34
     CSN *startCSN = NULL;
dc8c34
-    CSN *minCSN = NULL;
dc8c34
     char csnStr [CSN_STRSIZE];
dc8c34
     int rc = CL5_SUCCESS;
dc8c34
     Object *supplierRuvObj = NULL;
dc8c34
     RUV *supplierRuv = NULL;
dc8c34
-    PRBool newReplica;
dc8c34
     PRBool haveChanges = PR_FALSE;
dc8c34
 	char *agmt_name;
dc8c34
-	ReplicaId rid;
dc8c34
 
dc8c34
     PR_ASSERT (consumerRuv && replica && fileObj && iterator);
dc8c34
 	csnStr[0] = '\0';
dc8c34
@@ -5461,111 +5456,32 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
dc8c34
 		ruv_dump (supplierRuv, agmt_name, NULL);
dc8c34
 	}
dc8c34
    
dc8c34
-	/*
dc8c34
-	 * get the sorted list of SupplierMinCSN (if no ConsumerMaxCSN)
dc8c34
-	 * and ConsumerMaxCSN for those RIDs where consumer is not
dc8c34
-	 * up-to-date.
dc8c34
-	 */
dc8c34
-    csns = cl5BuildCSNList (consumerRuv, supplierRuv);
dc8c34
-    if (csns == NULL)
dc8c34
-    {
dc8c34
-        rc = CL5_NOTFOUND;
dc8c34
-        goto done;
dc8c34
-    }
dc8c34
 
dc8c34
-	/* iterate over elements of consumer's (and/or supplier's) ruv */
dc8c34
-    for (i = 0; csns[i]; i++)
dc8c34
-    {
dc8c34
-        CSN *consumerMaxCSN = NULL;
dc8c34
-
dc8c34
-		rid = csn_get_replicaid(csns[i]);
dc8c34
-
dc8c34
-		/*
dc8c34
-		 * Skip CSN that is originated from the consumer.
dc8c34
-		 * If RID==65535, the CSN is originated from a
dc8c34
-		 * legacy consumer. In this case the supplier
dc8c34
-		 * and the consumer may have the same RID.
dc8c34
-		 */
dc8c34
-		if ((rid == consumerRID && rid != MAX_REPLICA_ID) || (is_cleaned_rid(rid)) )
dc8c34
-			continue;
dc8c34
+	/* initialize the changelog buffer and do the initial load */
dc8c34
 
dc8c34
-        startCSN = csns[i];
dc8c34
+	rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
dc8c34
+	if ( rc != 0 ) goto done;
dc8c34
 
dc8c34
-		rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
dc8c34
-		if ( rc != 0 ) goto done;
dc8c34
-
dc8c34
-	    /* This is the first loading of this iteration. For replicas
dc8c34
-		 * already known to the consumer, we exclude the last entry
dc8c34
-		 * sent to the consumer by using DB_NEXT. However, for
dc8c34
-		 * replicas new to the consumer, we include the first change
dc8c34
-		 * ever generated by that replica.
dc8c34
-		 */
dc8c34
-		newReplica = ruv_get_largest_csn_for_replica (consumerRuv, rid, &consumerMaxCSN);
dc8c34
-		csn_free(&consumerMaxCSN);
dc8c34
-		rc = clcache_load_buffer (clcache, startCSN, (newReplica ? DB_SET : DB_NEXT));
dc8c34
-
dc8c34
-		/* there is a special case which can occur just after migration - in this case,
dc8c34
-		the consumer RUV will contain the last state of the supplier before migration,
dc8c34
-		but the supplier will have an empty changelog, or the supplier changelog will
dc8c34
-		not contain any entries within the consumer min and max CSN - also, since
dc8c34
-		the purge RUV contains no CSNs, the changelog has never been purged
dc8c34
-		ASSUMPTIONS - it is assumed that the supplier had no pending changes to send
dc8c34
-		to any consumers; that is, we can assume that no changes were lost due to
dc8c34
-		either changelog purging or database reload - bug# 603061 - richm@netscape.com
dc8c34
-		*/
dc8c34
-        if ((rc == DB_NOTFOUND) && !ruv_has_csns(file->purgeRUV))
dc8c34
-        {
dc8c34
-            char mincsnStr[CSN_STRSIZE];
dc8c34
-
dc8c34
-            /* use the supplier min csn for the buffer start csn - we know
dc8c34
-               this csn is in our changelog */
dc8c34
-            if ((RUV_SUCCESS == ruv_get_min_csn_ext(supplierRuv, &minCSN, 1 /* ignore cleaned rids */)) &&
dc8c34
-                minCSN)
dc8c34
-            { /* must now free startCSN */
dc8c34
-                if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
dc8c34
-                    csn_as_string(startCSN, PR_FALSE, csnStr);
dc8c34
-                    csn_as_string(minCSN, PR_FALSE, mincsnStr);
dc8c34
-                    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
dc8c34
-                                    "%s: CSN %s not found and no purging, probably a reinit\n",
dc8c34
-                                    agmt_name, csnStr);
dc8c34
-                    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
dc8c34
-                                    "%s: Will try to use supplier min CSN %s to load changelog\n",
dc8c34
-                                    agmt_name, mincsnStr);
dc8c34
-                }
dc8c34
-                startCSN = minCSN;
dc8c34
-                rc = clcache_load_buffer (clcache, startCSN, DB_SET);
dc8c34
-            }
dc8c34
-            else
dc8c34
-            {
dc8c34
-                if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
dc8c34
-                    csn_as_string(startCSN, PR_FALSE, csnStr); 
dc8c34
-                    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
-                                    "%s: CSN %s not found and no purging, probably a reinit\n",
dc8c34
-                                    agmt_name, csnStr);
dc8c34
-                    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
-                                    "%s: Could not get the min csn from the supplier RUV\n",
dc8c34
-                                    agmt_name);
dc8c34
-                }
dc8c34
-                rc = CL5_RUV_ERROR;
dc8c34
-                goto done;
dc8c34
-            }
dc8c34
-        }
dc8c34
+	rc = clcache_load_buffer (clcache, &startCSN);
dc8c34
 
dc8c34
         if (rc == 0) {
dc8c34
-            haveChanges = PR_TRUE;
dc8c34
-            rc = CL5_SUCCESS;
dc8c34
-            if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
dc8c34
-                csn_as_string(startCSN, PR_FALSE, csnStr); 
dc8c34
-                slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
dc8c34
-                                "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
dc8c34
-            }
dc8c34
-            if (startCSN != csns[i]) {
dc8c34
-                csn_free(&startCSN);
dc8c34
-            }
dc8c34
-            break;
dc8c34
+		haveChanges = PR_TRUE;
dc8c34
+		rc = CL5_SUCCESS;
dc8c34
+		if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
dc8c34
+			csn_as_string(startCSN, PR_FALSE, csnStr);
dc8c34
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
dc8c34
+				"%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
dc8c34
+		}
dc8c34
         }
dc8c34
-        else if (rc == DB_NOTFOUND)  /* entry not found */
dc8c34
-        {
dc8c34
+        else if (rc == DB_NOTFOUND)   {
dc8c34
+	    /* buffer not loaded.
dc8c34
+	     * either because no changes have to be sent ==> startCSN is NULL
dc8c34
+	     * or the calculated startCSN cannot be found in the changelog
dc8c34
+	     */
dc8c34
+	    if (startCSN == NULL) {
dc8c34
+		rc = CL5_NOTFOUND;
dc8c34
+		goto done;
dc8c34
+	    }
dc8c34
             /* check whether this csn should be present */
dc8c34
             rc = _cl5CheckMissingCSN (startCSN, supplierRuv, file);
dc8c34
             if (rc == CL5_MISSING_DATA)  /* we should have had the change but we don't */
dc8c34
@@ -5583,17 +5499,6 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
dc8c34
                                 "%s: CSN %s not found, we aren't as up to date, or we purged\n", 
dc8c34
                                 agmt_name, csnStr);
dc8c34
             }
dc8c34
-            if (startCSN != csns[i]) {
dc8c34
-                csn_free(&startCSN);
dc8c34
-            }
dc8c34
-            if (rc == CL5_MISSING_DATA)  /* we should have had the change but we don't */
dc8c34
-            {
dc8c34
-                break;
dc8c34
-            }
dc8c34
-            else /* we are not as up to date or we purged */
dc8c34
-            {
dc8c34
-                continue;
dc8c34
-            } 
dc8c34
         }
dc8c34
         else
dc8c34
         {
dc8c34
@@ -5602,34 +5507,29 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
dc8c34
 			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
                             "%s: Failed to retrieve change with CSN %s; db error - %d %s\n", 
dc8c34
                             agmt_name, csnStr, rc, db_strerror(rc));
dc8c34
-            if (startCSN != csns[i]) {
dc8c34
-                csn_free(&startCSN);
dc8c34
-            }
dc8c34
 
dc8c34
             rc = CL5_DB_ERROR;
dc8c34
-            break;
dc8c34
-        }
dc8c34
+    }
dc8c34
 
dc8c34
-    } /* end for */
dc8c34
 
dc8c34
     /* setup the iterator */
dc8c34
     if (haveChanges)
dc8c34
     {
dc8c34
-	    *iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
dc8c34
+	*iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
dc8c34
 
dc8c34
-	    if (*iterator == NULL)
dc8c34
-	    {
dc8c34
-		    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
+        if (*iterator == NULL)
dc8c34
+	{
dc8c34
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
 						"%s: _cl5PositionCursorForReplay: failed to allocate iterator\n", agmt_name);
dc8c34
-		    rc = CL5_MEMORY_ERROR;
dc8c34
-		    goto done;
dc8c34
-	    }
dc8c34
+            rc = CL5_MEMORY_ERROR;
dc8c34
+	    goto done;
dc8c34
+	}
dc8c34
 
dc8c34
         /* ONREPL - should we make a copy of both RUVs here ?*/
dc8c34
-		(*iterator)->fileObj = fileObj;
dc8c34
-		(*iterator)->clcache = clcache; clcache = NULL;
dc8c34
-		(*iterator)->consumerRID = consumerRID;
dc8c34
-	    (*iterator)->consumerRuv = consumerRuv;
dc8c34
+	(*iterator)->fileObj = fileObj;
dc8c34
+	(*iterator)->clcache = clcache; clcache = NULL;
dc8c34
+	(*iterator)->consumerRID = consumerRID;
dc8c34
+	(*iterator)->consumerRuv = consumerRuv;
dc8c34
         (*iterator)->supplierRuvObj = supplierRuvObj;
dc8c34
     }
dc8c34
     else if (rc == CL5_SUCCESS)
dc8c34
@@ -5639,11 +5539,8 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
dc8c34
     }
dc8c34
 
dc8c34
 done:
dc8c34
-	if ( clcache )
dc8c34
-		clcache_return_buffer ( &clcache );
dc8c34
-
dc8c34
-    if (csns)
dc8c34
-        cl5DestroyCSNList (&csns);
dc8c34
+    if ( clcache )
dc8c34
+	clcache_return_buffer ( &clcache );
dc8c34
 
dc8c34
     if (rc != CL5_SUCCESS)
dc8c34
     {
dc8c34
diff --git a/ldap/servers/plugins/replication/cl5_clcache.c b/ldap/servers/plugins/replication/cl5_clcache.c
dc8c34
index 60f288e..9e1d3b7 100644
dc8c34
--- a/ldap/servers/plugins/replication/cl5_clcache.c
dc8c34
+++ b/ldap/servers/plugins/replication/cl5_clcache.c
dc8c34
@@ -68,6 +68,7 @@
dc8c34
 #define DEFAULT_CLC_BUFFER_COUNT_MAX		0
dc8c34
 #define DEFAULT_CLC_BUFFER_PAGE_COUNT		32
dc8c34
 #define DEFAULT_CLC_BUFFER_PAGE_SIZE		1024
dc8c34
+#define WORK_CLC_BUFFER_PAGE_SIZE 8*DEFAULT_CLC_BUFFER_PAGE_SIZE
dc8c34
 
dc8c34
 enum {
dc8c34
 	CLC_STATE_READY = 0,		/* ready to iterate */
dc8c34
@@ -85,8 +86,9 @@ struct csn_seq_ctrl_block {
dc8c34
 	ReplicaId	rid;				/* RID this block serves */
dc8c34
 	CSN			*consumer_maxcsn;	/* Don't send CSN <= this */
dc8c34
 	CSN			*local_maxcsn;		/* Don't send CSN > this */
dc8c34
-	CSN			*prev_local_maxcsn;	/* */
dc8c34
-	int			state;				/* CLC_STATE_* */
dc8c34
+	CSN			*prev_local_maxcsn;	/* Copy of last state at buffer loading */
dc8c34
+	CSN			*local_mincsn;		/* Used to determin anchor csn*/
dc8c34
+	int			state;			/* CLC_STATE_* */
dc8c34
 };
dc8c34
 
dc8c34
 /*
dc8c34
@@ -99,6 +101,8 @@ struct clc_buffer {
dc8c34
 	ReplicaId	 buf_consumer_rid;	/* help checking threshold csn */
dc8c34
 	const RUV	*buf_consumer_ruv;	/* used to skip change */
dc8c34
 	const RUV	*buf_local_ruv;		/* used to refresh local_maxcsn */
dc8c34
+	int		buf_ignoreConsumerRID;	/* how to handle updates from consumer */
dc8c34
+	int	 	buf_load_cnt;		/* number of loads for session */
dc8c34
 
dc8c34
 	/*
dc8c34
 	 * fields for retriving data from DB
dc8c34
@@ -119,7 +123,6 @@ struct clc_buffer {
dc8c34
 	int			 buf_max_cscbs;
dc8c34
 
dc8c34
 	/* fields for debugging stat */
dc8c34
-	int		 	 buf_load_cnt;		/* number of loads for session */
dc8c34
 	int		 	 buf_record_cnt;	/* number of changes for session */
dc8c34
 	int		 	 buf_record_skipped;	/* number of changes skipped */
dc8c34
 	int		 	 buf_skipped_new_rid;	/* number of changes skipped due to new_rid */
dc8c34
@@ -162,7 +165,8 @@ struct clc_pool {
dc8c34
 static struct clc_pool *_pool = NULL;	/* process's buffer pool */
dc8c34
 
dc8c34
 /* static prototypes */
dc8c34
-static int	clcache_adjust_anchorcsn ( CLC_Buffer *buf );
dc8c34
+static int	clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag );
dc8c34
+static int	clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag );
dc8c34
 static void	clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf );
dc8c34
 static int	clcache_refresh_local_maxcsns ( CLC_Buffer *buf );
dc8c34
 static int	clcache_skip_change ( CLC_Buffer *buf );
dc8c34
@@ -281,8 +285,23 @@ clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV
dc8c34
 	}
dc8c34
 
dc8c34
 	if ( NULL != *buf ) {
dc8c34
+		CSN *c_csn = NULL;
dc8c34
+		CSN *l_csn = NULL;
dc8c34
 		(*buf)->buf_consumer_ruv = consumer_ruv;
dc8c34
 		(*buf)->buf_local_ruv = local_ruv;
dc8c34
+		(*buf)->buf_load_flag = DB_MULTIPLE_KEY;
dc8c34
+		ruv_get_largest_csn_for_replica (consumer_ruv, consumer_rid, &c_csn);
dc8c34
+		ruv_get_largest_csn_for_replica (local_ruv, consumer_rid, &l_csn);
dc8c34
+		if (l_csn && csn_compare(l_csn, c_csn) > 0) {
dc8c34
+			/* the supplier has updates for the consumer RID and
dc8c34
+			 * these updates are newer than on the consumer
dc8c34
+			 */
dc8c34
+			(*buf)->buf_ignoreConsumerRID = 0;
dc8c34
+		} else {
dc8c34
+			(*buf)->buf_ignoreConsumerRID = 1;
dc8c34
+		}
dc8c34
+		csn_free(&c_csn);
dc8c34
+		csn_free(&l_csn);
dc8c34
 	}
dc8c34
 	else {
dc8c34
 		slapi_log_error ( SLAPI_LOG_FATAL, get_thread_private_agmtname(),
dc8c34
@@ -335,36 +354,25 @@ clcache_return_buffer ( CLC_Buffer **buf )
dc8c34
  *		       historic reason.
dc8c34
  */
dc8c34
 int
dc8c34
-clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
dc8c34
+clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN )
dc8c34
 {
dc8c34
 	int rc = 0;
dc8c34
+        int flag = DB_NEXT;
dc8c34
 
dc8c34
+	if (anchorCSN) *anchorCSN = NULL;
dc8c34
 	clcache_refresh_local_maxcsns ( buf );
dc8c34
 
dc8c34
-	/* Set the loading key */
dc8c34
-	if ( anchorcsn ) {
dc8c34
+	if (buf->buf_load_cnt == 0 ) {
dc8c34
 		clcache_refresh_consumer_maxcsns ( buf );
dc8c34
-		buf->buf_load_flag = DB_MULTIPLE_KEY;
dc8c34
-		csn_as_string ( anchorcsn, 0, (char*)buf->buf_key.data );
dc8c34
-		slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
dc8c34
-				"session start: anchorcsn=%s\n", (char*)buf->buf_key.data );
dc8c34
-	}
dc8c34
-	else if ( csn_get_time(buf->buf_current_csn) == 0 ) {
dc8c34
-		/* time == 0 means this csn has never been set */
dc8c34
-		rc = DB_NOTFOUND;
dc8c34
-	}
dc8c34
-	else if ( clcache_adjust_anchorcsn ( buf ) != 0 ) {
dc8c34
-		rc = DB_NOTFOUND;
dc8c34
-	}
dc8c34
-	else {
dc8c34
-		csn_as_string ( buf->buf_current_csn, 0, (char*)buf->buf_key.data );
dc8c34
-		slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
dc8c34
-				"load next: anchorcsn=%s\n", (char*)buf->buf_key.data );
dc8c34
+		rc = clcache_initial_anchorcsn ( buf, &flag );
dc8c34
+        } else {
dc8c34
+		rc = clcache_adjust_anchorcsn ( buf, &flag );
dc8c34
 	}
dc8c34
 
dc8c34
 	if ( rc == 0 ) {
dc8c34
 
dc8c34
 		buf->buf_state = CLC_STATE_READY;
dc8c34
+		if (anchorCSN) *anchorCSN = buf->buf_current_csn;
dc8c34
 		rc = clcache_load_buffer_bulk ( buf, flag );
dc8c34
 
dc8c34
 		/* Reset some flag variables */
dc8c34
@@ -374,21 +382,15 @@ clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
dc8c34
 				buf->buf_cscbs[i]->state = CLC_STATE_READY;
dc8c34
 			}
dc8c34
 		}
dc8c34
-		else if ( anchorcsn ) {
dc8c34
-			/* Report error only when the missing is persistent */
dc8c34
-			if ( buf->buf_missing_csn && csn_compare (buf->buf_missing_csn, anchorcsn) == 0 ) {
dc8c34
-				if (!buf->buf_prev_missing_csn || csn_compare (buf->buf_prev_missing_csn, anchorcsn)) {
dc8c34
-					slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
dc8c34
-						"Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
dc8c34
-						(char*)buf->buf_key.data, rc );
dc8c34
-					csn_dup_or_init_by_csn (&buf->buf_prev_missing_csn, anchorcsn);
dc8c34
-				}
dc8c34
-			}
dc8c34
-			else {
dc8c34
-				csn_dup_or_init_by_csn (&buf->buf_missing_csn, anchorcsn);
dc8c34
-			}
dc8c34
+		else {
dc8c34
+			slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
dc8c34
+					"Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
dc8c34
+					(char*)buf->buf_key.data, rc );
dc8c34
 		}
dc8c34
+	} else if (rc == CLC_STATE_DONE) {
dc8c34
+		rc = DB_NOTFOUND;
dc8c34
 	}
dc8c34
+
dc8c34
 	if ( rc != 0 ) {
dc8c34
 		slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
dc8c34
 				"clcache_load_buffer: rc=%d\n", rc );
dc8c34
@@ -513,7 +515,7 @@ clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **da
dc8c34
 		 * We're done with the current buffer. Now load the next chunk.
dc8c34
 		 */
dc8c34
 		if ( NULL == *key && CLC_STATE_READY == buf->buf_state ) {
dc8c34
-			rc = clcache_load_buffer ( buf, NULL, DB_NEXT );
dc8c34
+			rc = clcache_load_buffer ( buf, NULL );
dc8c34
 			if ( 0 == rc && buf->buf_record_ptr ) {
dc8c34
 				DB_MULTIPLE_KEY_NEXT ( buf->buf_record_ptr, &buf->buf_data,
dc8c34
 								   *key, *keylen, *data, *datalen );
dc8c34
@@ -551,7 +553,6 @@ clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf )
dc8c34
 	int i;
dc8c34
 
dc8c34
 	for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
dc8c34
-		csn_free(&buf->buf_cscbs[i]->consumer_maxcsn);
dc8c34
 		ruv_get_largest_csn_for_replica (
dc8c34
 				buf->buf_consumer_ruv,
dc8c34
 				buf->buf_cscbs[i]->rid,
dc8c34
@@ -568,14 +569,11 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
dc8c34
 	int i;
dc8c34
 
dc8c34
 	rid = csn_get_replicaid ( rid_data->csn );
dc8c34
-
dc8c34
-	/*
dc8c34
-	 * No need to create cscb for consumer's RID.
dc8c34
-	 * If RID==65535, the CSN is originated from a
dc8c34
-	 * legacy consumer. In this case the supplier
dc8c34
-	 * and the consumer may have the same RID.
dc8c34
+	/* we do not handle updates originated at the consumer if not required
dc8c34
+	 * and we ignore RID which have been cleaned
dc8c34
 	 */
dc8c34
-	if ( rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID )
dc8c34
+	if ( (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID) ||
dc8c34
+		is_cleaned_rid(rid) )
dc8c34
 		return rc;
dc8c34
 
dc8c34
 	for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
dc8c34
@@ -594,9 +592,20 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
dc8c34
 		}
dc8c34
 		buf->buf_cscbs[i]->rid = rid;
dc8c34
 		buf->buf_num_cscbs++;
dc8c34
+		/* this is the first time we have a local change for the RID
dc8c34
+		 * we need to check what the consumer knows about it.
dc8c34
+		 */
dc8c34
+		ruv_get_largest_csn_for_replica (
dc8c34
+				buf->buf_consumer_ruv,
dc8c34
+				buf->buf_cscbs[i]->rid,
dc8c34
+				&buf->buf_cscbs[i]->consumer_maxcsn );
dc8c34
 	}
dc8c34
 
dc8c34
+	if (buf->buf_cscbs[i]->local_maxcsn)
dc8c34
+	    csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn, buf->buf_cscbs[i]->local_maxcsn );
dc8c34
+
dc8c34
 	csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_maxcsn, rid_data->csn );
dc8c34
+	csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_mincsn, rid_data->min_csn );
dc8c34
 
dc8c34
 	if ( buf->buf_cscbs[i]->consumer_maxcsn &&
dc8c34
 		 csn_compare (buf->buf_cscbs[i]->consumer_maxcsn, rid_data->csn) >= 0 ) {
dc8c34
@@ -610,88 +619,147 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
dc8c34
 static int
dc8c34
 clcache_refresh_local_maxcsns ( CLC_Buffer *buf )
dc8c34
 {
dc8c34
-	int i;
dc8c34
 
dc8c34
-	for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
dc8c34
-		csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn,
dc8c34
-								  buf->buf_cscbs[i]->local_maxcsn );
dc8c34
-	}
dc8c34
 	return ruv_enumerate_elements ( buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf );
dc8c34
 }
dc8c34
 
dc8c34
 /*
dc8c34
  * Algorithm:
dc8c34
  *
dc8c34
- *	1. Snapshot local RUVs;
dc8c34
- *	2. Load buffer;
dc8c34
- *	3. Send to the consumer only those CSNs that are covered
dc8c34
- *	   by the RUVs snapshot taken in the first step;
dc8c34
- *	   All CSNs that are covered by the RUVs snapshot taken in the
dc8c34
- *	   first step are guaranteed in consecutive order for the respected
dc8c34
- *	   RIDs because of the the CSN pending list control;
dc8c34
- *	   A CSN that is not covered by the RUVs snapshot may be out of order
dc8c34
- *	   since it is possible that a smaller CSN might not have committed 
dc8c34
- *	   yet by the time the buffer was loaded.
dc8c34
- *	4. Determine anchorcsn for each RID:
dc8c34
- *
dc8c34
- *	   Case|  Local vs. Buffer | New Local |       Next
dc8c34
- *	       | MaxCSN     MaxCSN |    MaxCSN | Anchor-CSN
dc8c34
- *	   ----+-------------------+-----------+----------------
dc8c34
- *       1 |   Cl    >=   Cb   |     *     | Cb
dc8c34
- *       2 |   Cl    <    Cb   |     Cl    | Cb
dc8c34
- *       3 |   Cl    <    Cb   |     Cl2   | Cl 
dc8c34
- *
dc8c34
- *	5. Determine anchorcsn for next load:
dc8c34
+ *	1. Determine anchorcsn for each RID:
dc8c34
+ *	2. Determine anchorcsn for next load:
dc8c34
  *	   Anchor-CSN = min { all Next-Anchor-CSN, Buffer-MaxCSN }
dc8c34
  */
dc8c34
 static int
dc8c34
-clcache_adjust_anchorcsn ( CLC_Buffer *buf )
dc8c34
+clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag )
dc8c34
 {
dc8c34
 	PRBool hasChange = PR_FALSE;
dc8c34
 	struct csn_seq_ctrl_block *cscb;
dc8c34
 	int i;
dc8c34
+	CSN *anchorcsn = NULL;
dc8c34
 
dc8c34
 	if ( buf->buf_state == CLC_STATE_READY ) {
dc8c34
 		for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
dc8c34
+			CSN *rid_anchor = NULL;
dc8c34
+			int rid_flag = DB_NEXT;
dc8c34
 			cscb = buf->buf_cscbs[i];
dc8c34
 
dc8c34
-			if ( cscb->state == CLC_STATE_UP_TO_DATE )
dc8c34
-				continue;
dc8c34
+			if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
dc8c34
+				char prevmax[CSN_STRSIZE];
dc8c34
+				char local[CSN_STRSIZE];
dc8c34
+				char curr[CSN_STRSIZE];
dc8c34
+				char conmaxcsn[CSN_STRSIZE];
dc8c34
+				csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
dc8c34
+				csn_as_string(cscb->local_maxcsn, 0, local);
dc8c34
+				csn_as_string(buf->buf_current_csn, 0, curr);
dc8c34
+				csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
dc8c34
+				slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn" ,
dc8c34
+								"%s - (cscb %d - state %d) - csnPrevMax (%s) "
dc8c34
+								"csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
dc8c34
+								buf->buf_agmt_name, i, cscb->state, prevmax, local,
dc8c34
+								curr, conmaxcsn);
dc8c34
+			}
dc8c34
 
dc8c34
-			/*
dc8c34
-			 * Case 3 unsafe ruv change: next buffer load should start
dc8c34
-			 * from where the maxcsn in the old ruv was. Since each
dc8c34
-			 * cscb has remembered the maxcsn sent to the consumer,
dc8c34
-			 * CSNs that may be loaded again could easily be skipped.
dc8c34
-			 */
dc8c34
-			if ( cscb->prev_local_maxcsn &&
dc8c34
-				 csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) < 0 &&
dc8c34
-				 csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) != 0 ) {
dc8c34
+			if (cscb->consumer_maxcsn == NULL) {
dc8c34
+				/* the consumer hasn't seen changes for this RID */
dc8c34
+				rid_anchor = cscb->local_mincsn;
dc8c34
+				rid_flag = DB_SET;
dc8c34
+			} else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
dc8c34
+				rid_anchor = cscb->consumer_maxcsn;
dc8c34
+			}
dc8c34
+
dc8c34
+			if (rid_anchor && (anchorcsn == NULL ||
dc8c34
+			    ( csn_compare(rid_anchor, anchorcsn) < 0))) {
dc8c34
+				anchorcsn = rid_anchor;
dc8c34
+				*flag = rid_flag;
dc8c34
 				hasChange = PR_TRUE;
dc8c34
-				cscb->state = CLC_STATE_READY;
dc8c34
-				csn_init_by_csn ( buf->buf_current_csn, cscb->prev_local_maxcsn );
dc8c34
-				csn_as_string ( cscb->prev_local_maxcsn, 0, (char*)buf->buf_key.data );
dc8c34
-				slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
dc8c34
-						"adjust anchor csn upon %s\n",
dc8c34
-						( cscb->state == CLC_STATE_CSN_GT_RUV ? "out of sequence csn" : "unsafe ruv change") );
dc8c34
-				continue;
dc8c34
 			}
dc8c34
 
dc8c34
-			/*
dc8c34
-			 * check if there are still changes to send for this RID
dc8c34
-			 * Assume we had compared the local maxcsn and the consumer
dc8c34
-			 * max csn before this function was called and hence the
dc8c34
-			 * cscb->state had been set accordingly.
dc8c34
-			 */ 
dc8c34
-			if ( hasChange == PR_FALSE &&
dc8c34
-				 csn_compare (cscb->local_maxcsn, buf->buf_current_csn) > 0 ) {
dc8c34
+
dc8c34
+		}
dc8c34
+	}
dc8c34
+
dc8c34
+	if ( !hasChange ) {
dc8c34
+		buf->buf_state = CLC_STATE_DONE;
dc8c34
+	} else {
dc8c34
+		csn_init_by_csn(buf->buf_current_csn, anchorcsn);
dc8c34
+		csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
dc8c34
+		slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn",
dc8c34
+						"anchor is now: %s\n", (char *)buf->buf_key.data);
dc8c34
+	}
dc8c34
+
dc8c34
+	return buf->buf_state;
dc8c34
+}
dc8c34
+
dc8c34
+static int
dc8c34
+clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag )
dc8c34
+{
dc8c34
+	PRBool hasChange = PR_FALSE;
dc8c34
+	struct csn_seq_ctrl_block *cscb;
dc8c34
+	int i;
dc8c34
+	CSN *anchorcsn = NULL;
dc8c34
+
dc8c34
+	if ( buf->buf_state == CLC_STATE_READY ) {
dc8c34
+		for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
dc8c34
+			CSN *rid_anchor = NULL;
dc8c34
+			int rid_flag = DB_NEXT;
dc8c34
+			cscb = buf->buf_cscbs[i];
dc8c34
+
dc8c34
+			if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
dc8c34
+				char prevmax[CSN_STRSIZE];
dc8c34
+				char local[CSN_STRSIZE];
dc8c34
+				char curr[CSN_STRSIZE];
dc8c34
+				char conmaxcsn[CSN_STRSIZE];
dc8c34
+				csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
dc8c34
+				csn_as_string(cscb->local_maxcsn, 0, local);
dc8c34
+				csn_as_string(buf->buf_current_csn, 0, curr);
dc8c34
+				csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
dc8c34
+				slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn" ,
dc8c34
+								"%s - (cscb %d - state %d) - csnPrevMax (%s) "
dc8c34
+								"csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
dc8c34
+								buf->buf_agmt_name, i, cscb->state, prevmax, local,
dc8c34
+								curr, conmaxcsn);
dc8c34
+			}
dc8c34
+
dc8c34
+			if (csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) == 0 ||
dc8c34
+			    csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) > 0 ) {
dc8c34
+				if (csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
dc8c34
+					rid_anchor = buf->buf_current_csn;
dc8c34
+				}
dc8c34
+			} else {
dc8c34
+				/* prev local max csn < csnBuffer AND different from local maxcsn */
dc8c34
+				if (cscb->prev_local_maxcsn == NULL) {
dc8c34
+					if (cscb->consumer_maxcsn == NULL) {
dc8c34
+						/* the consumer hasn't seen changes for this RID */
dc8c34
+						rid_anchor = cscb->local_mincsn;
dc8c34
+						rid_flag = DB_SET;
dc8c34
+					} else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
dc8c34
+						rid_anchor = cscb->consumer_maxcsn;
dc8c34
+					}
dc8c34
+				} else {
dc8c34
+					/* csnPrevMaxSup > 0 */
dc8c34
+					rid_anchor = cscb->consumer_maxcsn;
dc8c34
+				}
dc8c34
+			}
dc8c34
+
dc8c34
+			if (rid_anchor && (anchorcsn == NULL ||
dc8c34
+			    ( csn_compare(rid_anchor, anchorcsn) < 0))) {
dc8c34
+				anchorcsn = rid_anchor;
dc8c34
+				*flag = rid_flag;
dc8c34
 				hasChange = PR_TRUE;
dc8c34
 			}
dc8c34
+
dc8c34
+
dc8c34
 		}
dc8c34
 	}
dc8c34
 
dc8c34
 	if ( !hasChange ) {
dc8c34
 		buf->buf_state = CLC_STATE_DONE;
dc8c34
+	} else {
dc8c34
+		csn_init_by_csn(buf->buf_current_csn, anchorcsn);
dc8c34
+		csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
dc8c34
+		slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn",
dc8c34
+						"anchor is now: %s\n", (char *)buf->buf_key.data);
dc8c34
 	}
dc8c34
 
dc8c34
 	return buf->buf_state;
dc8c34
@@ -705,7 +773,6 @@ clcache_skip_change ( CLC_Buffer *buf )
dc8c34
 	int skip = 1;
dc8c34
 	int i;
dc8c34
 	char buf_cur_csn_str[CSN_STRSIZE];
dc8c34
-	char oth_csn_str[CSN_STRSIZE];
dc8c34
 
dc8c34
 	do {
dc8c34
 
dc8c34
@@ -718,25 +785,14 @@ clcache_skip_change ( CLC_Buffer *buf )
dc8c34
 		 * legacy consumer. In this case the supplier
dc8c34
 		 * and the consumer may have the same RID.
dc8c34
 		 */
dc8c34
-		if (rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID){
dc8c34
-			CSN *cons_maxcsn = NULL;
dc8c34
-
dc8c34
-			ruv_get_max_csn(buf->buf_consumer_ruv, &cons_maxcsn);
dc8c34
-			if ( csn_compare ( buf->buf_current_csn, cons_maxcsn) > 0 ) {
dc8c34
-				/*
dc8c34
-				 *  The consumer must have been "restored" and needs this newer update.
dc8c34
-				 */
dc8c34
-				skip = 0;
dc8c34
-			} else if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
dc8c34
+		if (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID){
dc8c34
+			if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
dc8c34
 				csn_as_string(buf->buf_current_csn, 0, buf_cur_csn_str);
dc8c34
-				csn_as_string(cons_maxcsn, 0, oth_csn_str);
dc8c34
 				slapi_log_error(SLAPI_LOG_REPL, buf->buf_agmt_name,
dc8c34
-					"Skipping update because the changelog buffer current csn [%s] is "
dc8c34
-				        "less than or equal to the consumer max csn [%s]\n",
dc8c34
-				        buf_cur_csn_str, oth_csn_str);
dc8c34
+					"Skipping update because the consumer with Rid: [%d] is "
dc8c34
+				        "ignored\n", rid);
dc8c34
 				buf->buf_skipped_csn_gt_cons_maxcsn++;
dc8c34
 			}
dc8c34
-			csn_free(&cons_maxcsn);
dc8c34
 			break;
dc8c34
 		}
dc8c34
 
dc8c34
@@ -851,6 +907,7 @@ clcache_free_cscb ( struct csn_seq_ctrl_block ** cscb )
dc8c34
 	csn_free ( & (*cscb)->consumer_maxcsn );
dc8c34
 	csn_free ( & (*cscb)->local_maxcsn );
dc8c34
 	csn_free ( & (*cscb)->prev_local_maxcsn );
dc8c34
+	csn_free ( & (*cscb)->local_mincsn );
dc8c34
 	slapi_ch_free ( (void **) cscb );
dc8c34
 }
dc8c34
 
dc8c34
@@ -1033,6 +1090,15 @@ clcache_cursor_get ( DBC *cursor, CLC_Buffer *buf, int flag )
dc8c34
 {
dc8c34
 	int rc;
dc8c34
 
dc8c34
+	if (buf->buf_data.ulen > WORK_CLC_BUFFER_PAGE_SIZE) {
dc8c34
+		/*
dc8c34
+		 * The buffer size had to be increased,
dc8c34
+		 * reset it to a smaller working size,
dc8c34
+		 * if not sufficient it will be increased again
dc8c34
+		 */
dc8c34
+		buf->buf_data.ulen = WORK_CLC_BUFFER_PAGE_SIZE;
dc8c34
+	}
dc8c34
+
dc8c34
 	rc = cursor->c_get ( cursor,
dc8c34
 						 & buf->buf_key,
dc8c34
 						 & buf->buf_data,
dc8c34
diff --git a/ldap/servers/plugins/replication/cl5_clcache.h b/ldap/servers/plugins/replication/cl5_clcache.h
dc8c34
index c096d55..5cab3cb 100644
dc8c34
--- a/ldap/servers/plugins/replication/cl5_clcache.h
dc8c34
+++ b/ldap/servers/plugins/replication/cl5_clcache.h
dc8c34
@@ -52,7 +52,7 @@ typedef struct clc_buffer CLC_Buffer;
dc8c34
 int	 clcache_init ( DB_ENV **dbenv );
dc8c34
 void clcache_set_config ();
dc8c34
 int	 clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV *consumer_ruv, const RUV *local_ruv );
dc8c34
-int	 clcache_load_buffer ( CLC_Buffer *buf, CSN *startCSN, int flag );
dc8c34
+int	 clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN );
dc8c34
 void clcache_return_buffer ( CLC_Buffer **buf );
dc8c34
 int	 clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **data, size_t *datalen, CSN **csn );
dc8c34
 void clcache_destroy ();
dc8c34
-- 
dc8c34
2.4.11
dc8c34