Blame SOURCES/0057-Ticket-49287-v3-extend-csnpl-handling-to-multiple-ba.patch

6f51e1
From 6b5aa0e288f1ea5553d4dd5d220d4e5daf50a247 Mon Sep 17 00:00:00 2001
6f51e1
From: Mark Reynolds <mreynolds@redhat.com>
6f51e1
Date: Mon, 31 Jul 2017 14:45:50 -0400
6f51e1
Subject: [PATCH] Ticket 49287 - v3 extend csnpl handling to multiple backends
6f51e1
6f51e1
        The csn pending list mechanism failed if internal operation affected multiple backends
6f51e1
6f51e1
        This fix is an extension to the fix in ticket 49008, the thread local data now also contains
6f51e1
        a list of all affected replicas.
6f51e1
6f51e1
        http://www.port389.org/docs/389ds/design/csn-pending-lists-and-ruv-update.html
6f51e1
6f51e1
        Reviewed by: William, Thierry - thanks
6f51e1
---
6f51e1
 ldap/servers/plugins/replication/csnpl.c         |  85 ++++++++--
6f51e1
 ldap/servers/plugins/replication/csnpl.h         |   8 +-
6f51e1
 ldap/servers/plugins/replication/repl5.h         |  22 ++-
6f51e1
 ldap/servers/plugins/replication/repl5_init.c    |  48 +++++-
6f51e1
 ldap/servers/plugins/replication/repl5_plugins.c |  16 +-
6f51e1
 ldap/servers/plugins/replication/repl5_replica.c |  18 ++-
6f51e1
 ldap/servers/plugins/replication/repl5_ruv.c     | 191 ++++++++++++++---------
6f51e1
 ldap/servers/plugins/replication/repl5_ruv.h     |   6 +-
6f51e1
 ldap/servers/slapd/slapi-private.h               |   2 +-
6f51e1
 9 files changed, 283 insertions(+), 113 deletions(-)
6f51e1
6f51e1
diff --git a/ldap/servers/plugins/replication/csnpl.c b/ldap/servers/plugins/replication/csnpl.c
6f51e1
index 4a0f5f5..12a0bb8 100644
6f51e1
--- a/ldap/servers/plugins/replication/csnpl.c
6f51e1
+++ b/ldap/servers/plugins/replication/csnpl.c
6f51e1
@@ -14,7 +14,6 @@
6f51e1
 
6f51e1
 #include "csnpl.h"
6f51e1
 #include "llist.h"
6f51e1
-#include "repl_shared.h"
6f51e1
 
6f51e1
 struct csnpl 
6f51e1
 {
6f51e1
@@ -22,13 +21,17 @@ struct csnpl
6f51e1
 	Slapi_RWLock*	csnLock;	/* lock to serialize access to PL */
6f51e1
 };	
6f51e1
 
6f51e1
+
6f51e1
 typedef struct _csnpldata
6f51e1
 {
6f51e1
 	PRBool	committed;  /* True if CSN committed */
6f51e1
 	CSN	*csn;       /* The actual CSN */
6f51e1
+	Replica * prim_replica; /* The replica where the prom csn was generated */
6f51e1
 	const CSN *prim_csn;  /* The primary CSN of an operation consising of multiple sub ops*/
6f51e1
 } csnpldata;
6f51e1
 
6f51e1
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx);
6f51e1
+
6f51e1
 /* forward declarations */
6f51e1
 #ifdef DEBUG
6f51e1
 static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller);
6f51e1
@@ -104,7 +107,7 @@ void csnplFree (CSNPL **csnpl)
6f51e1
  *          1 if the csn has already been seen
6f51e1
  *         -1 for any other kind of errors
6f51e1
  */
6f51e1
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
6f51e1
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn)
6f51e1
 {
6f51e1
 	int rc;
6f51e1
 	csnpldata *csnplnode;
6f51e1
@@ -129,10 +132,13 @@ int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
6f51e1
         return 1;
6f51e1
     }
6f51e1
 
6f51e1
-	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
6f51e1
+	csnplnode = (csnpldata *)slapi_ch_calloc(1, sizeof(csnpldata));
6f51e1
 	csnplnode->committed = PR_FALSE;
6f51e1
 	csnplnode->csn = csn_dup(csn);
6f51e1
-	csnplnode->prim_csn = prim_csn;
6f51e1
+	if (prim_csn) {
6f51e1
+		csnplnode->prim_csn = prim_csn->prim_csn;
6f51e1
+		csnplnode->prim_replica =  prim_csn->prim_repl;
6f51e1
+	}
6f51e1
 	csn_as_string(csn, PR_FALSE, csn_str);
6f51e1
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
6f51e1
 
6f51e1
@@ -187,8 +193,58 @@ int csnplRemove (CSNPL *csnpl, const CSN *csn)
6f51e1
 
6f51e1
 	return 0;
6f51e1
 }
6f51e1
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx)
6f51e1
+{
6f51e1
+    if (csn_ctx == NULL)
6f51e1
+        return PR_FALSE;
6f51e1
+    
6f51e1
+    if (replica != csn_ctx->prim_repl) {
6f51e1
+        /* The CSNs are not from the same replication topology
6f51e1
+         * so even if the csn values are equal they are not related
6f51e1
+         * to the same operation
6f51e1
+         */
6f51e1
+        return PR_FALSE;
6f51e1
+    }
6f51e1
+    
6f51e1
+    /* Here the two CSNs belong to the same replication topology */
6f51e1
+    
6f51e1
+    /* check if the CSN identifies the primary update */
6f51e1
+    if (csn_is_equal(csn, csn_ctx->prim_csn)) {
6f51e1
+        return PR_TRUE;
6f51e1
+    }
6f51e1
+    
6f51e1
+    return PR_FALSE;
6f51e1
+}
6f51e1
+
6f51e1
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx)
6f51e1
+{
6f51e1
+    if ((csn_data == NULL) || (csn_ctx == NULL))
6f51e1
+        return PR_FALSE;
6f51e1
+    
6f51e1
+    if (csn_data->prim_replica != csn_ctx->prim_repl) {
6f51e1
+        /* The CSNs are not from the same replication topology
6f51e1
+         * so even if the csn values are equal they are not related
6f51e1
+         * to the same operation
6f51e1
+         */
6f51e1
+        return PR_FALSE;
6f51e1
+    }
6f51e1
+    
6f51e1
+    /* Here the two CSNs belong to the same replication topology */
6f51e1
+    
6f51e1
+    /* First check if the CSN identifies the primary update */
6f51e1
+    if (csn_is_equal(csn_data->csn, csn_ctx->prim_csn)) {
6f51e1
+        return PR_TRUE;
6f51e1
+    }
6f51e1
+    
6f51e1
+    /* Second check if the CSN identifies a nested update */
6f51e1
+    if (csn_is_equal(csn_data->prim_csn, csn_ctx->prim_csn)) {
6f51e1
+        return PR_TRUE;
6f51e1
+    }
6f51e1
+    
6f51e1
+    return PR_FALSE;
6f51e1
+}
6f51e1
 
6f51e1
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
6f51e1
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
6f51e1
 {
6f51e1
 	csnpldata *data;
6f51e1
 	void *iterator;
6f51e1
@@ -197,8 +253,7 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
6f51e1
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
6f51e1
 	while (NULL != data)
6f51e1
 	{
6f51e1
-		if (csn_is_equal(data->csn, csn) ||
6f51e1
-		    csn_is_equal(data->prim_csn, csn)) {
6f51e1
+		if (csn_primary_or_nested(data, csn_ctx)) {
6f51e1
 			csnpldata_free(&data);
6f51e1
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
6f51e1
 		} else {
6f51e1
@@ -213,13 +268,13 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
6f51e1
 }
6f51e1
 
6f51e1
 
6f51e1
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
6f51e1
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
6f51e1
 {
6f51e1
 	csnpldata *data;
6f51e1
 	void *iterator;
6f51e1
 	char csn_str[CSN_STRSIZE];
6f51e1
 
6f51e1
-	csn_as_string(csn, PR_FALSE, csn_str);
6f51e1
+	csn_as_string(csn_ctx->prim_csn, PR_FALSE, csn_str);
6f51e1
 	slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
6f51e1
 		            "csnplCommitALL: committing all csns for csn %s\n", csn_str);
6f51e1
 	slapi_rwlock_wrlock (csnpl->csnLock);
6f51e1
@@ -229,8 +284,7 @@ int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
6f51e1
 		csn_as_string(data->csn, PR_FALSE, csn_str);
6f51e1
 		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
6f51e1
 				"csnplCommitALL: processing data csn %s\n", csn_str);
6f51e1
-		if (csn_is_equal(data->csn, csn) ||
6f51e1
-		    csn_is_equal(data->prim_csn, csn)) {
6f51e1
+                if (csn_primary_or_nested(data, csn_ctx)) {
6f51e1
 			data->committed = PR_TRUE;
6f51e1
 		}
6f51e1
 		data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
6f51e1
@@ -395,7 +449,12 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
6f51e1
 
6f51e1
 /* wrapper around csn_free, to satisfy NSPR thread context API */
6f51e1
 void
6f51e1
-csnplFreeCSN (void *arg)
6f51e1
+csnplFreeCSNPL_CTX (void *arg)
6f51e1
 {
6f51e1
-	csn_free((CSN **)&arg;;
6f51e1
+	CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)arg;
6f51e1
+	csn_free(&csnpl_ctx->prim_csn);
6f51e1
+	if (csnpl_ctx->sec_repl) {
6f51e1
+		slapi_ch_free((void **)&csnpl_ctx->sec_repl);
6f51e1
+	}
6f51e1
+	slapi_ch_free((void **)&csnpl_ctx);
6f51e1
 }
6f51e1
diff --git a/ldap/servers/plugins/replication/csnpl.h b/ldap/servers/plugins/replication/csnpl.h
6f51e1
index 594c8f2..1036c62 100644
6f51e1
--- a/ldap/servers/plugins/replication/csnpl.h
6f51e1
+++ b/ldap/servers/plugins/replication/csnpl.h
6f51e1
@@ -17,15 +17,17 @@
6f51e1
 #define CSNPL_H
6f51e1
 
6f51e1
 #include "slapi-private.h"
6f51e1
+#include "repl5.h"
6f51e1
 
6f51e1
 typedef struct csnpl CSNPL;
6f51e1
 
6f51e1
 CSNPL* csnplNew(void);
6f51e1
 void csnplFree (CSNPL **csnpl);
6f51e1
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn);
6f51e1
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn);
6f51e1
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
6f51e1
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn);
6f51e1
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn);
6f51e1
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
6f51e1
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
6f51e1
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx);
6f51e1
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
6f51e1
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
6f51e1
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
6f51e1
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
6f51e1
index 1d8989c..718f64e 100644
6f51e1
--- a/ldap/servers/plugins/replication/repl5.h
6f51e1
+++ b/ldap/servers/plugins/replication/repl5.h
6f51e1
@@ -228,12 +228,27 @@ int multimaster_be_betxnpostop_delete (Slapi_PBlock *pb);
6f51e1
 int multimaster_be_betxnpostop_add (Slapi_PBlock *pb);
6f51e1
 int multimaster_be_betxnpostop_modify (Slapi_PBlock *pb);
6f51e1
 
6f51e1
+/* In repl5_replica.c */
6f51e1
+typedef struct replica Replica;
6f51e1
+
6f51e1
+/* csn pending lists */
6f51e1
+#define CSNPL_CTX_REPLCNT 4
6f51e1
+typedef struct CSNPL_CTX
6f51e1
+{
6f51e1
+	CSN *prim_csn;
6f51e1
+	size_t repl_alloc; /* max number of replicas  */
6f51e1
+	size_t repl_cnt; /* number of replicas affected by operation */
6f51e1
+	Replica *prim_repl; /* pirmary replica */
6f51e1
+	Replica **sec_repl; /* additional replicas affected */
6f51e1
+} CSNPL_CTX;
6f51e1
+
6f51e1
 /* In repl5_init.c */
6f51e1
 extern int repl5_is_betxn;
6f51e1
 char* get_thread_private_agmtname(void);
6f51e1
 void  set_thread_private_agmtname (const char *agmtname);
6f51e1
-void  set_thread_primary_csn (const CSN *prim_csn);
6f51e1
-CSN*  get_thread_primary_csn(void);
6f51e1
+void  set_thread_primary_csn (const CSN *prim_csn, Replica *repl);
6f51e1
+void  add_replica_to_primcsn(CSNPL_CTX *prim_csn, Replica *repl);
6f51e1
+CSNPL_CTX*  get_thread_primary_csn(void);
6f51e1
 void* get_thread_private_cache(void);
6f51e1
 void  set_thread_private_cache (void *buf);
6f51e1
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
6f51e1
@@ -302,7 +317,6 @@ typedef struct repl_bos Repl_Bos;
6f51e1
 
6f51e1
 /* In repl5_agmt.c */
6f51e1
 typedef struct repl5agmt Repl_Agmt;
6f51e1
-typedef struct replica Replica;
6f51e1
 
6f51e1
 #define TRANSPORT_FLAG_SSL 1
6f51e1
 #define TRANSPORT_FLAG_TLS 2
6f51e1
@@ -629,6 +643,8 @@ PRUint64 replica_get_precise_purging(Replica *r);
6f51e1
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
6f51e1
 PRBool ignore_error_and_keep_going(int error);
6f51e1
 void replica_check_release_timeout(Replica *r, Slapi_PBlock *pb);
6f51e1
+void replica_lock_replica(Replica *r);
6f51e1
+void replica_unlock_replica(Replica *r);
6f51e1
 
6f51e1
 /* The functions below handles the state flag */
6f51e1
 /* Current internal state flags */
6f51e1
diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c
6f51e1
index edffb84..b0bc515 100644
6f51e1
--- a/ldap/servers/plugins/replication/repl5_init.c
6f51e1
+++ b/ldap/servers/plugins/replication/repl5_init.c
6f51e1
@@ -154,26 +154,62 @@ set_thread_private_agmtname(const char *agmtname)
6f51e1
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
6f51e1
 }
6f51e1
 
6f51e1
-CSN*
6f51e1
+CSNPL_CTX*
6f51e1
 get_thread_primary_csn(void)
6f51e1
 {
6f51e1
-	CSN *prim_csn = NULL;
6f51e1
+	CSNPL_CTX *prim_csn = NULL;
6f51e1
 	if (thread_primary_csn)
6f51e1
-		prim_csn = (CSN *)PR_GetThreadPrivate(thread_primary_csn);
6f51e1
+		prim_csn = (CSNPL_CTX *)PR_GetThreadPrivate(thread_primary_csn);
6f51e1
+
6f51e1
 	return prim_csn;
6f51e1
 }
6f51e1
 void
6f51e1
-set_thread_primary_csn(const CSN *prim_csn)
6f51e1
+set_thread_primary_csn (const CSN *prim_csn, Replica *repl)
6f51e1
 {
6f51e1
 	if (thread_primary_csn) {
6f51e1
 		if (prim_csn) {
6f51e1
-			PR_SetThreadPrivate(thread_primary_csn, (void *)csn_dup(prim_csn));
6f51e1
+			CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)slapi_ch_calloc(1,sizeof(CSNPL_CTX));
6f51e1
+			csnpl_ctx->prim_csn = csn_dup(prim_csn);
6f51e1
+			/* repl_alloc, repl_cnt and sec_repl are 0 by calloc */
6f51e1
+			csnpl_ctx->prim_repl = repl;
6f51e1
+			PR_SetThreadPrivate(thread_primary_csn, (void *)csnpl_ctx);
6f51e1
 		} else {
6f51e1
 			PR_SetThreadPrivate(thread_primary_csn, NULL);
6f51e1
 		}
6f51e1
 	}
6f51e1
 }
6f51e1
 
6f51e1
+void
6f51e1
+add_replica_to_primcsn(CSNPL_CTX *csnpl_ctx, Replica *repl)
6f51e1
+{
6f51e1
+	size_t found = 0;
6f51e1
+	size_t it = 0;
6f51e1
+
6f51e1
+	if (repl == csnpl_ctx->prim_repl) return;
6f51e1
+
6f51e1
+	while (it < csnpl_ctx->repl_cnt) {
6f51e1
+		if (csnpl_ctx->sec_repl[it] == repl) {
6f51e1
+			found = 1;
6f51e1
+			break;
6f51e1
+		}
6f51e1
+		it++;
6f51e1
+	}
6f51e1
+	if (found) return;
6f51e1
+
6f51e1
+	if (csnpl_ctx->repl_cnt < csnpl_ctx->repl_alloc) {
6f51e1
+		csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
6f51e1
+		return;
6f51e1
+	}
6f51e1
+	csnpl_ctx->repl_alloc += CSNPL_CTX_REPLCNT;
6f51e1
+	if (csnpl_ctx->repl_cnt == 0) {
6f51e1
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_calloc(csnpl_ctx->repl_alloc, sizeof(Replica *));
6f51e1
+	} else {
6f51e1
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_realloc((char *)csnpl_ctx->sec_repl, csnpl_ctx->repl_alloc * sizeof(Replica *));
6f51e1
+	}
6f51e1
+	csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
6f51e1
+	return;
6f51e1
+}
6f51e1
+
6f51e1
 void*
6f51e1
 get_thread_private_cache ()
6f51e1
 {
6f51e1
@@ -740,7 +776,7 @@ multimaster_start( Slapi_PBlock *pb )
6f51e1
 		/* Initialize thread private data for logging. Ignore if fails */
6f51e1
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
6f51e1
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
6f51e1
-		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSN);
6f51e1
+		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSNPL_CTX);
6f51e1
 
6f51e1
 		/* Decode the command line args to see if we're dumping to LDIF */
6f51e1
 		is_ldif_dump = check_for_ldif_dump(pb);
6f51e1
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
6f51e1
index 9ef06af..c31d9d5 100644
6f51e1
--- a/ldap/servers/plugins/replication/repl5_plugins.c
6f51e1
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
6f51e1
@@ -45,6 +45,7 @@
6f51e1
 #include "repl.h"
6f51e1
 #include "cl5_api.h"
6f51e1
 #include "urp.h"
6f51e1
+#include "csnpl.h"
6f51e1
 
6f51e1
 static char *local_purl = NULL;
6f51e1
 static char *purl_attrs[] = {"nsslapd-localhost", "nsslapd-port", "nsslapd-secureport", NULL};
6f51e1
@@ -1034,7 +1035,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
6f51e1
 {
6f51e1
 	Slapi_Operation *op = NULL;
6f51e1
 	CSN *opcsn;
6f51e1
-	CSN *prim_csn;
6f51e1
+	CSNPL_CTX *prim_csn;
6f51e1
 	int rc;
6f51e1
 	slapi_operation_parameters *op_params = NULL;
6f51e1
 	Object *repl_obj = NULL;
6f51e1
@@ -1070,14 +1071,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
6f51e1
 	if (repl_obj == NULL)
6f51e1
 		return return_value;
6f51e1
 
6f51e1
+	r = (Replica*)object_get_data (repl_obj);
6f51e1
+	PR_ASSERT (r);
6f51e1
+
6f51e1
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
6f51e1
 	if (rc) { /* op failed - just return */
6f51e1
 		cancel_opcsn(pb);
6f51e1
 		goto common_return;
6f51e1
 	}
6f51e1
 
6f51e1
-	r = (Replica*)object_get_data (repl_obj);
6f51e1
-	PR_ASSERT (r);
6f51e1
 
6f51e1
 	replica_check_release_timeout(r, pb);
6f51e1
 
6f51e1
@@ -1223,12 +1225,12 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
6f51e1
 common_return:
6f51e1
 	opcsn = operation_get_csn(op);
6f51e1
 	prim_csn = get_thread_primary_csn();
6f51e1
-	if (csn_is_equal(opcsn, prim_csn)) {
6f51e1
+	if (csn_primary(r, opcsn, prim_csn)) {
6f51e1
 		if (return_value == 0) {
6f51e1
 			/* the primary csn was succesfully committed
6f51e1
 			 * unset it in the thread local data
6f51e1
 			 */
6f51e1
-			set_thread_primary_csn(NULL);
6f51e1
+			set_thread_primary_csn(NULL, NULL);
6f51e1
 		}
6f51e1
 	}
6f51e1
 	if (repl_obj) {
6f51e1
@@ -1430,7 +1432,7 @@ cancel_opcsn (Slapi_PBlock *pb)
6f51e1
 
6f51e1
             ruv_obj = replica_get_ruv (r);
6f51e1
             PR_ASSERT (ruv_obj);
6f51e1
-            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
6f51e1
+            ruv_cancel_csn_inprogress (r, (RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
6f51e1
             object_release (ruv_obj);
6f51e1
         }
6f51e1
 
6f51e1
@@ -1491,7 +1493,7 @@ process_operation (Slapi_PBlock *pb, const CSN *csn)
6f51e1
     ruv = (RUV*)object_get_data (ruv_obj);
6f51e1
     PR_ASSERT (ruv);
6f51e1
  
6f51e1
-    rc = ruv_add_csn_inprogress (ruv, csn);
6f51e1
+    rc = ruv_add_csn_inprogress (r, ruv, csn);
6f51e1
 
6f51e1
     object_release (ruv_obj);
6f51e1
     object_release (r_obj);
6f51e1
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
6f51e1
index 1bdc138..7927ac3 100644
6f51e1
--- a/ldap/servers/plugins/replication/repl5_replica.c
6f51e1
+++ b/ldap/servers/plugins/replication/repl5_replica.c
6f51e1
@@ -923,7 +923,7 @@ replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
6f51e1
 					}
6f51e1
 				}
6f51e1
 				/* Update max csn for local and remote replicas */
6f51e1
-				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r->repl_rid);
6f51e1
+				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r, r->repl_rid);
6f51e1
 				if (RUV_COVERS_CSN == rc)
6f51e1
 				{
6f51e1
 					slapi_log_err(SLAPI_LOG_REPL,
6f51e1
@@ -3663,7 +3663,7 @@ assign_csn_callback(const CSN *csn, void *data)
6f51e1
         }
6f51e1
     }
6f51e1
 
6f51e1
-    ruv_add_csn_inprogress (ruv, csn);
6f51e1
+    ruv_add_csn_inprogress (r, ruv, csn);
6f51e1
 
6f51e1
     replica_unlock(r->repl_lock);
6f51e1
 
6f51e1
@@ -3692,13 +3692,13 @@ abort_csn_callback(const CSN *csn, void *data)
6f51e1
     {
6f51e1
         int rc = csnplRemove(r->min_csn_pl, csn);
6f51e1
         if (rc) {
6f51e1
-            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed");
6f51e1
+            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed\n");
6f51e1
             replica_unlock(r->repl_lock);
6f51e1
             return;
6f51e1
         }
6f51e1
     }
6f51e1
 
6f51e1
-    ruv_cancel_csn_inprogress (ruv, csn, replica_get_rid(r));
6f51e1
+    ruv_cancel_csn_inprogress (r, ruv, csn, replica_get_rid(r));
6f51e1
     replica_unlock(r->repl_lock);
6f51e1
 
6f51e1
     object_release (ruv_obj);
6f51e1
@@ -4489,3 +4489,13 @@ replica_check_release_timeout(Replica *r, Slapi_PBlock *pb)
6f51e1
 	}
6f51e1
 	replica_unlock(r->repl_lock);
6f51e1
 }
6f51e1
+void
6f51e1
+replica_lock_replica(Replica *r)
6f51e1
+{
6f51e1
+	replica_lock(r->repl_lock);
6f51e1
+}
6f51e1
+void
6f51e1
+replica_unlock_replica(Replica *r)
6f51e1
+{
6f51e1
+	replica_unlock(r->repl_lock);
6f51e1
+}
6f51e1
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c
6f51e1
index d59e6d2..39449b6 100644
6f51e1
--- a/ldap/servers/plugins/replication/repl5_ruv.c
6f51e1
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
6f51e1
@@ -77,7 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
6f51e1
 static const char * const prefix_replicageneration = "{replicageneration}";
6f51e1
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
6f51e1
 
6f51e1
-static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
6f51e1
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal);
6f51e1
 
6f51e1
 /* API implementation */
6f51e1
 
6f51e1
@@ -1599,13 +1599,13 @@ ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile)
6f51e1
 
6f51e1
 /* this function notifies the ruv that there are operations in progress so that
6f51e1
    they can be added to the pending list for the appropriate client. */
6f51e1
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
6f51e1
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn)
6f51e1
 {
6f51e1
     RUVElement* replica;
6f51e1
     char csn_str[CSN_STRSIZE];
6f51e1
     int rc = RUV_SUCCESS;
6f51e1
     int rid = csn_get_replicaid (csn);
6f51e1
-    CSN *prim_csn;
6f51e1
+    CSNPL_CTX *prim_csn;
6f51e1
 
6f51e1
     PR_ASSERT (ruv && csn);
6f51e1
 
6f51e1
@@ -1645,8 +1645,13 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
6f51e1
     }
6f51e1
     prim_csn = get_thread_primary_csn();
6f51e1
     if (prim_csn == NULL) {
6f51e1
-        set_thread_primary_csn(csn);
6f51e1
+        set_thread_primary_csn(csn, (Replica *)repl);
6f51e1
         prim_csn = get_thread_primary_csn();
6f51e1
+    } else {
6f51e1
+	/* the prim csn data already exist, need to check if
6f51e1
+	 * current replica is already present
6f51e1
+	 */
6f51e1
+	add_replica_to_primcsn(prim_csn, (Replica *)repl);
6f51e1
     }
6f51e1
     rc = csnplInsert (replica->csnpl, csn, prim_csn);
6f51e1
     if (rc == 1)    /* we already seen this csn */
6f51e1
@@ -1656,7 +1661,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
6f51e1
                             "The csn %s has already be seen - ignoring\n",
6f51e1
                             csn_as_string (csn, PR_FALSE, csn_str));
6f51e1
         }
6f51e1
-        set_thread_primary_csn(NULL);
6f51e1
+        set_thread_primary_csn(NULL, NULL);
6f51e1
         rc = RUV_COVERS_CSN;    
6f51e1
     }
6f51e1
     else if(rc != 0)
6f51e1
@@ -1681,11 +1686,13 @@ done:
6f51e1
     return rc;
6f51e1
 }
6f51e1
 
6f51e1
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
6f51e1
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId local_rid)
6f51e1
 {
6f51e1
-    RUVElement* replica;
6f51e1
+    RUVElement* repl_ruv;
6f51e1
     int rc = RUV_SUCCESS;
6f51e1
-    CSN *prim_csn = NULL;
6f51e1
+    CSNPL_CTX *prim_csn = NULL;
6f51e1
+    Replica *repl_it;
6f51e1
+    size_t it;
6f51e1
 
6f51e1
 
6f51e1
     PR_ASSERT (ruv && csn);
6f51e1
@@ -1693,29 +1700,53 @@ int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
6f51e1
     prim_csn = get_thread_primary_csn();
6f51e1
     /* locate ruvElement */
6f51e1
     slapi_rwlock_wrlock (ruv->lock);
6f51e1
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
6f51e1
-    if (replica == NULL) {
6f51e1
+    repl_ruv = ruvGetReplica (ruv, csn_get_replicaid (csn));
6f51e1
+    if (repl_ruv == NULL) {
6f51e1
         /* ONREPL - log error */
6f51e1
 	rc = RUV_NOTFOUND;
6f51e1
 	goto done;
6f51e1
     }
6f51e1
-    if (csn_is_equal(csn, prim_csn)) {
6f51e1
-	/* the prim csn is cancelled, lets remove all dependent csns */
6f51e1
-	ReplicaId prim_rid = csn_get_replicaid (csn);
6f51e1
-	replica = ruvGetReplica (ruv, prim_rid);
6f51e1
-	rc = csnplRemoveAll (replica->csnpl, prim_csn);
6f51e1
-	if (prim_rid != local_rid) {
6f51e1
-		if( local_rid != READ_ONLY_REPLICA_ID) {
6f51e1
-			replica = ruvGetReplica (ruv, local_rid);
6f51e1
-			if (replica) {
6f51e1
-				rc = csnplRemoveAll (replica->csnpl, prim_csn);
6f51e1
-			} else {
6f51e1
-				rc = RUV_NOTFOUND;
6f51e1
-			}
6f51e1
-		}
6f51e1
-	}
6f51e1
+    if (csn_primary(repl, csn, prim_csn)) {
6f51e1
+        /* the prim csn is cancelled, lets remove all dependent csns */
6f51e1
+        /* for the primary replica we can have modifications for two RIDS:
6f51e1
+         * - the local RID for direct or internal operations
6f51e1
+         * - a remote RID if the primary csn is for a replciated op.
6f51e1
+         */
6f51e1
+        ReplicaId prim_rid = csn_get_replicaid(csn);
6f51e1
+        repl_ruv = ruvGetReplica(ruv, prim_rid);
6f51e1
+        if (!repl_ruv) {
6f51e1
+            rc = RUV_NOTFOUND;
6f51e1
+            goto done;
6f51e1
+        }
6f51e1
+        rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
6f51e1
+
6f51e1
+        if (prim_rid != local_rid && local_rid != READ_ONLY_REPLICA_ID) {
6f51e1
+            repl_ruv = ruvGetReplica(ruv, local_rid);
6f51e1
+            if (!repl_ruv) {
6f51e1
+                rc = RUV_NOTFOUND;
6f51e1
+                goto done;
6f51e1
+            }
6f51e1
+            rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
6f51e1
+        }
6f51e1
+
6f51e1
+        for (it = 0; it < prim_csn->repl_cnt; it++) {
6f51e1
+            repl_it = prim_csn->sec_repl[it];
6f51e1
+            replica_lock_replica(repl_it);
6f51e1
+            local_rid = replica_get_rid(repl_it);
6f51e1
+            if (local_rid != READ_ONLY_REPLICA_ID) {
6f51e1
+                Object *ruv_obj = replica_get_ruv(repl_it);
6f51e1
+                RUV *ruv_it = object_get_data(ruv_obj);
6f51e1
+                repl_ruv = ruvGetReplica(ruv_it, local_rid);
6f51e1
+                if (repl_ruv) {
6f51e1
+                    rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
6f51e1
+                } else {
6f51e1
+                    rc = RUV_NOTFOUND;
6f51e1
+                }
6f51e1
+            }
6f51e1
+            replica_unlock_replica(repl_it);
6f51e1
+        }
6f51e1
     } else {
6f51e1
-	rc = csnplRemove (replica->csnpl, csn);
6f51e1
+	rc = csnplRemove (repl_ruv->csnpl, csn);
6f51e1
     }
6f51e1
     if (rc != 0)
6f51e1
         rc = RUV_NOTFOUND;
6f51e1
@@ -1727,86 +1758,100 @@ done:
6f51e1
     return rc;
6f51e1
 }
6f51e1
 
6f51e1
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
6f51e1
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid)
6f51e1
 {
6f51e1
     int rc=RUV_SUCCESS;
6f51e1
-    RUVElement *replica;
6f51e1
+    RUVElement *repl_ruv;
6f51e1
     ReplicaId prim_rid;
6f51e1
+    Replica *repl_it = NULL;
6f51e1
+    size_t it = 0;
6f51e1
 
6f51e1
-    CSN *prim_csn = get_thread_primary_csn();
6f51e1
+    CSNPL_CTX *prim_csn = get_thread_primary_csn();
6f51e1
 
6f51e1
-    if (! csn_is_equal(csn, prim_csn)) {
6f51e1
+    if (! csn_primary(replica, csn, prim_csn)) {
6f51e1
 	/* not a primary csn, nothing to do */
6f51e1
 	return rc;
6f51e1
     }
6f51e1
-    slapi_rwlock_wrlock (ruv->lock);
6f51e1
+
6f51e1
+    /* first handle primary replica 
6f51e1
+     * there can be two ruv elements affected
6f51e1
+     */
6f51e1
     prim_rid = csn_get_replicaid (csn);
6f51e1
-    replica = ruvGetReplica (ruv, local_rid);
6f51e1
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
6f51e1
-    if ( rc || local_rid == prim_rid) goto done;
6f51e1
-    replica = ruvGetReplica (ruv, prim_rid);
6f51e1
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
6f51e1
-done:
6f51e1
+    slapi_rwlock_wrlock (ruv->lock);
6f51e1
+    if ( local_rid != prim_rid) {
6f51e1
+	repl_ruv = ruvGetReplica (ruv, prim_rid);
6f51e1
+	rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_FALSE);
6f51e1
+    }
6f51e1
+    repl_ruv = ruvGetReplica (ruv, local_rid);
6f51e1
+    rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_TRUE);
6f51e1
     slapi_rwlock_unlock (ruv->lock);
6f51e1
+    if (rc) return rc;
6f51e1
+
6f51e1
+    /* now handle secondary replicas */
6f51e1
+    for (it=0; it<prim_csn->repl_cnt; it++) {
6f51e1
+	repl_it = prim_csn->sec_repl[it];
6f51e1
+	replica_lock_replica(repl_it);
6f51e1
+	Object *ruv_obj = replica_get_ruv (repl_it);
6f51e1
+	RUV *ruv_it = object_get_data (ruv_obj);
6f51e1
+	slapi_rwlock_wrlock (ruv_it->lock);
6f51e1
+	repl_ruv = ruvGetReplica (ruv_it, replica_get_rid(repl_it));
6f51e1
+	rc = ruv_update_ruv_element(ruv_it, repl_ruv, prim_csn, replica_purl, PR_TRUE);
6f51e1
+	slapi_rwlock_unlock (ruv_it->lock);
6f51e1
+	replica_unlock_replica(repl_it);
6f51e1
+	if (rc) break;
6f51e1
+    }
6f51e1
     return rc;
6f51e1
 }
6f51e1
+
6f51e1
 static int
6f51e1
-ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
6f51e1
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal)
6f51e1
 {
6f51e1
     int rc=RUV_SUCCESS;
6f51e1
     char csn_str[CSN_STRSIZE];
6f51e1
     CSN *max_csn;
6f51e1
     CSN *first_csn = NULL;
6f51e1
     
6f51e1
-    if (replica == NULL)
6f51e1
-    {
6f51e1
+    if (replica == NULL) {
6f51e1
         /* we should have a ruv element at this point because it would have
6f51e1
            been added by ruv_add_inprogress function */
6f51e1
         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
6f51e1
-			            "Can't locate RUV element for replica %d\n", csn_get_replicaid (csn)); 
6f51e1
+                        "Can't locate RUV element for replica %d\n", csn_get_replicaid (prim_csn->prim_csn));
6f51e1
         goto done;
6f51e1
     } 
6f51e1
 
6f51e1
-	if (csnplCommitAll(replica->csnpl, csn) != 0)
6f51e1
-	{
6f51e1
-		slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
6f51e1
-			            csn_as_string(csn, PR_FALSE, csn_str));
6f51e1
+    if (csnplCommitAll(replica->csnpl, prim_csn) != 0) {
6f51e1
+        slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
6f51e1
+                        csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
6f51e1
         rc = RUV_CSNPL_ERROR;
6f51e1
         goto done;
6f51e1
-	}
6f51e1
-    else
6f51e1
-    {
6f51e1
+    } else {
6f51e1
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
6f51e1
             slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
6f51e1
-                            "Successfully committed csn %s\n", csn_as_string(csn, PR_FALSE, csn_str));
6f51e1
+                            "Successfully committed csn %s\n", csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
6f51e1
         }
6f51e1
     }
6f51e1
 
6f51e1
-	if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL)
6f51e1
-	{
6f51e1
-#ifdef DEBUG
6f51e1
-		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
6f51e1
-			            csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
6f51e1
-#endif
6f51e1
+    if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL) {
6f51e1
+        slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
6f51e1
+                        csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
6f51e1
         /* replica object sets min csn for local replica */
6f51e1
-		if (!isLocal && replica->min_csn == NULL) {
6f51e1
-		  /* bug 559223 - it seems that, under huge stress, a server might pass
6f51e1
-		   * through this code when more than 1 change has already been sent and commited into
6f51e1
-		   * the pending lists... Therefore, as we are trying to set the min_csn ever 
6f51e1
-		   * generated by this replica, we need to set the first_csn as the min csn in the
6f51e1
-		   * ruv */
6f51e1
-		  set_min_csn_nolock(ruv, first_csn, replica_purl);
6f51e1
-		}
6f51e1
-		/* only update the max_csn in the RUV if it is greater than the existing one */
6f51e1
-		rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
6f51e1
-		/* It is possible that first_csn points to max_csn.
6f51e1
-		   We need to free it once */
6f51e1
-		if (max_csn != first_csn) {
6f51e1
-			csn_free(&first_csn); 
6f51e1
-		}
6f51e1
-		csn_free(&max_csn);
6f51e1
-	}
6f51e1
-
6f51e1
+        if (!isLocal && replica->min_csn == NULL) {
6f51e1
+            /* bug 559223 - it seems that, under huge stress, a server might pass
6f51e1
+             * through this code when more than 1 change has already been sent and commited into
6f51e1
+             * the pending lists... Therefore, as we are trying to set the min_csn ever
6f51e1
+             * generated by this replica, we need to set the first_csn as the min csn in the
6f51e1
+             * ruv */
6f51e1
+        set_min_csn_nolock(ruv, first_csn, replica_purl);
6f51e1
+        }
6f51e1
+        /* only update the max_csn in the RUV if it is greater than the existing one */
6f51e1
+        rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
6f51e1
+        /* It is possible that first_csn points to max_csn.
6f51e1
+           We need to free it once */
6f51e1
+        if (max_csn != first_csn) {
6f51e1
+            csn_free(&first_csn);
6f51e1
+        }
6f51e1
+        csn_free(&max_csn);
6f51e1
+    }
6f51e1
 done:
6f51e1
 
6f51e1
     return rc;
6f51e1
diff --git a/ldap/servers/plugins/replication/repl5_ruv.h b/ldap/servers/plugins/replication/repl5_ruv.h
6f51e1
index c8960fd..f3cd38b 100644
6f51e1
--- a/ldap/servers/plugins/replication/repl5_ruv.h
6f51e1
+++ b/ldap/servers/plugins/replication/repl5_ruv.h
6f51e1
@@ -108,9 +108,9 @@ int ruv_to_bervals(const RUV *ruv, struct berval ***bvals);
6f51e1
 PRInt32 ruv_replica_count (const RUV *ruv);
6f51e1
 char **ruv_get_referrals(const RUV *ruv);
6f51e1
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
6f51e1
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
6f51e1
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId rid);
6f51e1
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid);
6f51e1
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn);
6f51e1
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId rid);
6f51e1
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid);
6f51e1
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
6f51e1
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
6f51e1
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);
6f51e1
diff --git a/ldap/servers/slapd/slapi-private.h b/ldap/servers/slapd/slapi-private.h
6f51e1
index 0836d66..3910dbe 100644
6f51e1
--- a/ldap/servers/slapd/slapi-private.h
6f51e1
+++ b/ldap/servers/slapd/slapi-private.h
6f51e1
@@ -193,7 +193,7 @@ const CSN *csn_max(const CSN *csn1,const CSN *csn2);
6f51e1
    a csn from the set.*/
6f51e1
 int csn_increment_subsequence (CSN *csn);
6f51e1
 
6f51e1
-void csnplFreeCSN (void *arg);
6f51e1
+void csnplFreeCSNPL_CTX (void *arg);
6f51e1
 /*
6f51e1
  * csnset.c
6f51e1
  */
6f51e1
-- 
6f51e1
2.9.4
6f51e1