andykimpe / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 months ago
Clone

Blame SOURCES/0057-Ticket-49287-v3-extend-csnpl-handling-to-multiple-ba.patch

b69e47
From 6b5aa0e288f1ea5553d4dd5d220d4e5daf50a247 Mon Sep 17 00:00:00 2001
b69e47
From: Mark Reynolds <mreynolds@redhat.com>
b69e47
Date: Mon, 31 Jul 2017 14:45:50 -0400
b69e47
Subject: [PATCH] Ticket 49287 - v3 extend csnpl handling to multiple backends
b69e47
b69e47
        The csn pending list mechanism failed if internal operation affected multiple backends
b69e47
b69e47
        This fix is an extension to the fix in ticket 49008, the thread local data now also contains
b69e47
        a list of all affected replicas.
b69e47
b69e47
        http://www.port389.org/docs/389ds/design/csn-pending-lists-and-ruv-update.html
b69e47
b69e47
        Reviewed by: William, Thierry - thanks
b69e47
---
b69e47
 ldap/servers/plugins/replication/csnpl.c         |  85 ++++++++--
b69e47
 ldap/servers/plugins/replication/csnpl.h         |   8 +-
b69e47
 ldap/servers/plugins/replication/repl5.h         |  22 ++-
b69e47
 ldap/servers/plugins/replication/repl5_init.c    |  48 +++++-
b69e47
 ldap/servers/plugins/replication/repl5_plugins.c |  16 +-
b69e47
 ldap/servers/plugins/replication/repl5_replica.c |  18 ++-
b69e47
 ldap/servers/plugins/replication/repl5_ruv.c     | 191 ++++++++++++++---------
b69e47
 ldap/servers/plugins/replication/repl5_ruv.h     |   6 +-
b69e47
 ldap/servers/slapd/slapi-private.h               |   2 +-
b69e47
 9 files changed, 283 insertions(+), 113 deletions(-)
b69e47
b69e47
diff --git a/ldap/servers/plugins/replication/csnpl.c b/ldap/servers/plugins/replication/csnpl.c
b69e47
index 4a0f5f5..12a0bb8 100644
b69e47
--- a/ldap/servers/plugins/replication/csnpl.c
b69e47
+++ b/ldap/servers/plugins/replication/csnpl.c
b69e47
@@ -14,7 +14,6 @@
b69e47
 
b69e47
 #include "csnpl.h"
b69e47
 #include "llist.h"
b69e47
-#include "repl_shared.h"
b69e47
 
b69e47
 struct csnpl 
b69e47
 {
b69e47
@@ -22,13 +21,17 @@ struct csnpl
b69e47
 	Slapi_RWLock*	csnLock;	/* lock to serialize access to PL */
b69e47
 };	
b69e47
 
b69e47
+
b69e47
 typedef struct _csnpldata
b69e47
 {
b69e47
 	PRBool	committed;  /* True if CSN committed */
b69e47
 	CSN	*csn;       /* The actual CSN */
b69e47
+	Replica * prim_replica; /* The replica where the prom csn was generated */
b69e47
 	const CSN *prim_csn;  /* The primary CSN of an operation consising of multiple sub ops*/
b69e47
 } csnpldata;
b69e47
 
b69e47
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx);
b69e47
+
b69e47
 /* forward declarations */
b69e47
 #ifdef DEBUG
b69e47
 static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller);
b69e47
@@ -104,7 +107,7 @@ void csnplFree (CSNPL **csnpl)
b69e47
  *          1 if the csn has already been seen
b69e47
  *         -1 for any other kind of errors
b69e47
  */
b69e47
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
b69e47
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn)
b69e47
 {
b69e47
 	int rc;
b69e47
 	csnpldata *csnplnode;
b69e47
@@ -129,10 +132,13 @@ int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
b69e47
         return 1;
b69e47
     }
b69e47
 
b69e47
-	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
b69e47
+	csnplnode = (csnpldata *)slapi_ch_calloc(1, sizeof(csnpldata));
b69e47
 	csnplnode->committed = PR_FALSE;
b69e47
 	csnplnode->csn = csn_dup(csn);
b69e47
-	csnplnode->prim_csn = prim_csn;
b69e47
+	if (prim_csn) {
b69e47
+		csnplnode->prim_csn = prim_csn->prim_csn;
b69e47
+		csnplnode->prim_replica =  prim_csn->prim_repl;
b69e47
+	}
b69e47
 	csn_as_string(csn, PR_FALSE, csn_str);
b69e47
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
b69e47
 
b69e47
@@ -187,8 +193,58 @@ int csnplRemove (CSNPL *csnpl, const CSN *csn)
b69e47
 
b69e47
 	return 0;
b69e47
 }
b69e47
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx)
b69e47
+{
b69e47
+    if (csn_ctx == NULL)
b69e47
+        return PR_FALSE;
b69e47
+    
b69e47
+    if (replica != csn_ctx->prim_repl) {
b69e47
+        /* The CSNs are not from the same replication topology
b69e47
+         * so even if the csn values are equal they are not related
b69e47
+         * to the same operation
b69e47
+         */
b69e47
+        return PR_FALSE;
b69e47
+    }
b69e47
+    
b69e47
+    /* Here the two CSNs belong to the same replication topology */
b69e47
+    
b69e47
+    /* check if the CSN identifies the primary update */
b69e47
+    if (csn_is_equal(csn, csn_ctx->prim_csn)) {
b69e47
+        return PR_TRUE;
b69e47
+    }
b69e47
+    
b69e47
+    return PR_FALSE;
b69e47
+}
b69e47
+
b69e47
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx)
b69e47
+{
b69e47
+    if ((csn_data == NULL) || (csn_ctx == NULL))
b69e47
+        return PR_FALSE;
b69e47
+    
b69e47
+    if (csn_data->prim_replica != csn_ctx->prim_repl) {
b69e47
+        /* The CSNs are not from the same replication topology
b69e47
+         * so even if the csn values are equal they are not related
b69e47
+         * to the same operation
b69e47
+         */
b69e47
+        return PR_FALSE;
b69e47
+    }
b69e47
+    
b69e47
+    /* Here the two CSNs belong to the same replication topology */
b69e47
+    
b69e47
+    /* First check if the CSN identifies the primary update */
b69e47
+    if (csn_is_equal(csn_data->csn, csn_ctx->prim_csn)) {
b69e47
+        return PR_TRUE;
b69e47
+    }
b69e47
+    
b69e47
+    /* Second check if the CSN identifies a nested update */
b69e47
+    if (csn_is_equal(csn_data->prim_csn, csn_ctx->prim_csn)) {
b69e47
+        return PR_TRUE;
b69e47
+    }
b69e47
+    
b69e47
+    return PR_FALSE;
b69e47
+}
b69e47
 
b69e47
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
b69e47
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
b69e47
 {
b69e47
 	csnpldata *data;
b69e47
 	void *iterator;
b69e47
@@ -197,8 +253,7 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
b69e47
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
b69e47
 	while (NULL != data)
b69e47
 	{
b69e47
-		if (csn_is_equal(data->csn, csn) ||
b69e47
-		    csn_is_equal(data->prim_csn, csn)) {
b69e47
+		if (csn_primary_or_nested(data, csn_ctx)) {
b69e47
 			csnpldata_free(&data);
b69e47
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
b69e47
 		} else {
b69e47
@@ -213,13 +268,13 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
b69e47
 }
b69e47
 
b69e47
 
b69e47
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
b69e47
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
b69e47
 {
b69e47
 	csnpldata *data;
b69e47
 	void *iterator;
b69e47
 	char csn_str[CSN_STRSIZE];
b69e47
 
b69e47
-	csn_as_string(csn, PR_FALSE, csn_str);
b69e47
+	csn_as_string(csn_ctx->prim_csn, PR_FALSE, csn_str);
b69e47
 	slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
b69e47
 		            "csnplCommitALL: committing all csns for csn %s\n", csn_str);
b69e47
 	slapi_rwlock_wrlock (csnpl->csnLock);
b69e47
@@ -229,8 +284,7 @@ int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
b69e47
 		csn_as_string(data->csn, PR_FALSE, csn_str);
b69e47
 		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
b69e47
 				"csnplCommitALL: processing data csn %s\n", csn_str);
b69e47
-		if (csn_is_equal(data->csn, csn) ||
b69e47
-		    csn_is_equal(data->prim_csn, csn)) {
b69e47
+                if (csn_primary_or_nested(data, csn_ctx)) {
b69e47
 			data->committed = PR_TRUE;
b69e47
 		}
b69e47
 		data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
b69e47
@@ -395,7 +449,12 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
b69e47
 
b69e47
 /* wrapper around csn_free, to satisfy NSPR thread context API */
b69e47
 void
b69e47
-csnplFreeCSN (void *arg)
b69e47
+csnplFreeCSNPL_CTX (void *arg)
b69e47
 {
b69e47
-	csn_free((CSN **)&arg;;
b69e47
+	CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)arg;
b69e47
+	csn_free(&csnpl_ctx->prim_csn);
b69e47
+	if (csnpl_ctx->sec_repl) {
b69e47
+		slapi_ch_free((void **)&csnpl_ctx->sec_repl);
b69e47
+	}
b69e47
+	slapi_ch_free((void **)&csnpl_ctx);
b69e47
 }
b69e47
diff --git a/ldap/servers/plugins/replication/csnpl.h b/ldap/servers/plugins/replication/csnpl.h
b69e47
index 594c8f2..1036c62 100644
b69e47
--- a/ldap/servers/plugins/replication/csnpl.h
b69e47
+++ b/ldap/servers/plugins/replication/csnpl.h
b69e47
@@ -17,15 +17,17 @@
b69e47
 #define CSNPL_H
b69e47
 
b69e47
 #include "slapi-private.h"
b69e47
+#include "repl5.h"
b69e47
 
b69e47
 typedef struct csnpl CSNPL;
b69e47
 
b69e47
 CSNPL* csnplNew(void);
b69e47
 void csnplFree (CSNPL **csnpl);
b69e47
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn);
b69e47
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn);
b69e47
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
b69e47
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn);
b69e47
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn);
b69e47
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
b69e47
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
b69e47
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx);
b69e47
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
b69e47
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
b69e47
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
b69e47
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
b69e47
index 1d8989c..718f64e 100644
b69e47
--- a/ldap/servers/plugins/replication/repl5.h
b69e47
+++ b/ldap/servers/plugins/replication/repl5.h
b69e47
@@ -228,12 +228,27 @@ int multimaster_be_betxnpostop_delete (Slapi_PBlock *pb);
b69e47
 int multimaster_be_betxnpostop_add (Slapi_PBlock *pb);
b69e47
 int multimaster_be_betxnpostop_modify (Slapi_PBlock *pb);
b69e47
 
b69e47
+/* In repl5_replica.c */
b69e47
+typedef struct replica Replica;
b69e47
+
b69e47
+/* csn pending lists */
b69e47
+#define CSNPL_CTX_REPLCNT 4
b69e47
+typedef struct CSNPL_CTX
b69e47
+{
b69e47
+	CSN *prim_csn;
b69e47
+	size_t repl_alloc; /* max number of replicas  */
b69e47
+	size_t repl_cnt; /* number of replicas affected by operation */
b69e47
+	Replica *prim_repl; /* pirmary replica */
b69e47
+	Replica **sec_repl; /* additional replicas affected */
b69e47
+} CSNPL_CTX;
b69e47
+
b69e47
 /* In repl5_init.c */
b69e47
 extern int repl5_is_betxn;
b69e47
 char* get_thread_private_agmtname(void);
b69e47
 void  set_thread_private_agmtname (const char *agmtname);
b69e47
-void  set_thread_primary_csn (const CSN *prim_csn);
b69e47
-CSN*  get_thread_primary_csn(void);
b69e47
+void  set_thread_primary_csn (const CSN *prim_csn, Replica *repl);
b69e47
+void  add_replica_to_primcsn(CSNPL_CTX *prim_csn, Replica *repl);
b69e47
+CSNPL_CTX*  get_thread_primary_csn(void);
b69e47
 void* get_thread_private_cache(void);
b69e47
 void  set_thread_private_cache (void *buf);
b69e47
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
b69e47
@@ -302,7 +317,6 @@ typedef struct repl_bos Repl_Bos;
b69e47
 
b69e47
 /* In repl5_agmt.c */
b69e47
 typedef struct repl5agmt Repl_Agmt;
b69e47
-typedef struct replica Replica;
b69e47
 
b69e47
 #define TRANSPORT_FLAG_SSL 1
b69e47
 #define TRANSPORT_FLAG_TLS 2
b69e47
@@ -629,6 +643,8 @@ PRUint64 replica_get_precise_purging(Replica *r);
b69e47
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
b69e47
 PRBool ignore_error_and_keep_going(int error);
b69e47
 void replica_check_release_timeout(Replica *r, Slapi_PBlock *pb);
b69e47
+void replica_lock_replica(Replica *r);
b69e47
+void replica_unlock_replica(Replica *r);
b69e47
 
b69e47
 /* The functions below handles the state flag */
b69e47
 /* Current internal state flags */
b69e47
diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c
b69e47
index edffb84..b0bc515 100644
b69e47
--- a/ldap/servers/plugins/replication/repl5_init.c
b69e47
+++ b/ldap/servers/plugins/replication/repl5_init.c
b69e47
@@ -154,26 +154,62 @@ set_thread_private_agmtname(const char *agmtname)
b69e47
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
b69e47
 }
b69e47
 
b69e47
-CSN*
b69e47
+CSNPL_CTX*
b69e47
 get_thread_primary_csn(void)
b69e47
 {
b69e47
-	CSN *prim_csn = NULL;
b69e47
+	CSNPL_CTX *prim_csn = NULL;
b69e47
 	if (thread_primary_csn)
b69e47
-		prim_csn = (CSN *)PR_GetThreadPrivate(thread_primary_csn);
b69e47
+		prim_csn = (CSNPL_CTX *)PR_GetThreadPrivate(thread_primary_csn);
b69e47
+
b69e47
 	return prim_csn;
b69e47
 }
b69e47
 void
b69e47
-set_thread_primary_csn(const CSN *prim_csn)
b69e47
+set_thread_primary_csn (const CSN *prim_csn, Replica *repl)
b69e47
 {
b69e47
 	if (thread_primary_csn) {
b69e47
 		if (prim_csn) {
b69e47
-			PR_SetThreadPrivate(thread_primary_csn, (void *)csn_dup(prim_csn));
b69e47
+			CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)slapi_ch_calloc(1,sizeof(CSNPL_CTX));
b69e47
+			csnpl_ctx->prim_csn = csn_dup(prim_csn);
b69e47
+			/* repl_alloc, repl_cnt and sec_repl are 0 by calloc */
b69e47
+			csnpl_ctx->prim_repl = repl;
b69e47
+			PR_SetThreadPrivate(thread_primary_csn, (void *)csnpl_ctx);
b69e47
 		} else {
b69e47
 			PR_SetThreadPrivate(thread_primary_csn, NULL);
b69e47
 		}
b69e47
 	}
b69e47
 }
b69e47
 
b69e47
+void
b69e47
+add_replica_to_primcsn(CSNPL_CTX *csnpl_ctx, Replica *repl)
b69e47
+{
b69e47
+	size_t found = 0;
b69e47
+	size_t it = 0;
b69e47
+
b69e47
+	if (repl == csnpl_ctx->prim_repl) return;
b69e47
+
b69e47
+	while (it < csnpl_ctx->repl_cnt) {
b69e47
+		if (csnpl_ctx->sec_repl[it] == repl) {
b69e47
+			found = 1;
b69e47
+			break;
b69e47
+		}
b69e47
+		it++;
b69e47
+	}
b69e47
+	if (found) return;
b69e47
+
b69e47
+	if (csnpl_ctx->repl_cnt < csnpl_ctx->repl_alloc) {
b69e47
+		csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
b69e47
+		return;
b69e47
+	}
b69e47
+	csnpl_ctx->repl_alloc += CSNPL_CTX_REPLCNT;
b69e47
+	if (csnpl_ctx->repl_cnt == 0) {
b69e47
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_calloc(csnpl_ctx->repl_alloc, sizeof(Replica *));
b69e47
+	} else {
b69e47
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_realloc((char *)csnpl_ctx->sec_repl, csnpl_ctx->repl_alloc * sizeof(Replica *));
b69e47
+	}
b69e47
+	csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
b69e47
+	return;
b69e47
+}
b69e47
+
b69e47
 void*
b69e47
 get_thread_private_cache ()
b69e47
 {
b69e47
@@ -740,7 +776,7 @@ multimaster_start( Slapi_PBlock *pb )
b69e47
 		/* Initialize thread private data for logging. Ignore if fails */
b69e47
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
b69e47
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
b69e47
-		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSN);
b69e47
+		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSNPL_CTX);
b69e47
 
b69e47
 		/* Decode the command line args to see if we're dumping to LDIF */
b69e47
 		is_ldif_dump = check_for_ldif_dump(pb);
b69e47
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
b69e47
index 9ef06af..c31d9d5 100644
b69e47
--- a/ldap/servers/plugins/replication/repl5_plugins.c
b69e47
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
b69e47
@@ -45,6 +45,7 @@
b69e47
 #include "repl.h"
b69e47
 #include "cl5_api.h"
b69e47
 #include "urp.h"
b69e47
+#include "csnpl.h"
b69e47
 
b69e47
 static char *local_purl = NULL;
b69e47
 static char *purl_attrs[] = {"nsslapd-localhost", "nsslapd-port", "nsslapd-secureport", NULL};
b69e47
@@ -1034,7 +1035,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
b69e47
 {
b69e47
 	Slapi_Operation *op = NULL;
b69e47
 	CSN *opcsn;
b69e47
-	CSN *prim_csn;
b69e47
+	CSNPL_CTX *prim_csn;
b69e47
 	int rc;
b69e47
 	slapi_operation_parameters *op_params = NULL;
b69e47
 	Object *repl_obj = NULL;
b69e47
@@ -1070,14 +1071,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
b69e47
 	if (repl_obj == NULL)
b69e47
 		return return_value;
b69e47
 
b69e47
+	r = (Replica*)object_get_data (repl_obj);
b69e47
+	PR_ASSERT (r);
b69e47
+
b69e47
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
b69e47
 	if (rc) { /* op failed - just return */
b69e47
 		cancel_opcsn(pb);
b69e47
 		goto common_return;
b69e47
 	}
b69e47
 
b69e47
-	r = (Replica*)object_get_data (repl_obj);
b69e47
-	PR_ASSERT (r);
b69e47
 
b69e47
 	replica_check_release_timeout(r, pb);
b69e47
 
b69e47
@@ -1223,12 +1225,12 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
b69e47
 common_return:
b69e47
 	opcsn = operation_get_csn(op);
b69e47
 	prim_csn = get_thread_primary_csn();
b69e47
-	if (csn_is_equal(opcsn, prim_csn)) {
b69e47
+	if (csn_primary(r, opcsn, prim_csn)) {
b69e47
 		if (return_value == 0) {
b69e47
 			/* the primary csn was succesfully committed
b69e47
 			 * unset it in the thread local data
b69e47
 			 */
b69e47
-			set_thread_primary_csn(NULL);
b69e47
+			set_thread_primary_csn(NULL, NULL);
b69e47
 		}
b69e47
 	}
b69e47
 	if (repl_obj) {
b69e47
@@ -1430,7 +1432,7 @@ cancel_opcsn (Slapi_PBlock *pb)
b69e47
 
b69e47
             ruv_obj = replica_get_ruv (r);
b69e47
             PR_ASSERT (ruv_obj);
b69e47
-            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
b69e47
+            ruv_cancel_csn_inprogress (r, (RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
b69e47
             object_release (ruv_obj);
b69e47
         }
b69e47
 
b69e47
@@ -1491,7 +1493,7 @@ process_operation (Slapi_PBlock *pb, const CSN *csn)
b69e47
     ruv = (RUV*)object_get_data (ruv_obj);
b69e47
     PR_ASSERT (ruv);
b69e47
  
b69e47
-    rc = ruv_add_csn_inprogress (ruv, csn);
b69e47
+    rc = ruv_add_csn_inprogress (r, ruv, csn);
b69e47
 
b69e47
     object_release (ruv_obj);
b69e47
     object_release (r_obj);
b69e47
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
b69e47
index 1bdc138..7927ac3 100644
b69e47
--- a/ldap/servers/plugins/replication/repl5_replica.c
b69e47
+++ b/ldap/servers/plugins/replication/repl5_replica.c
b69e47
@@ -923,7 +923,7 @@ replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
b69e47
 					}
b69e47
 				}
b69e47
 				/* Update max csn for local and remote replicas */
b69e47
-				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r->repl_rid);
b69e47
+				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r, r->repl_rid);
b69e47
 				if (RUV_COVERS_CSN == rc)
b69e47
 				{
b69e47
 					slapi_log_err(SLAPI_LOG_REPL,
b69e47
@@ -3663,7 +3663,7 @@ assign_csn_callback(const CSN *csn, void *data)
b69e47
         }
b69e47
     }
b69e47
 
b69e47
-    ruv_add_csn_inprogress (ruv, csn);
b69e47
+    ruv_add_csn_inprogress (r, ruv, csn);
b69e47
 
b69e47
     replica_unlock(r->repl_lock);
b69e47
 
b69e47
@@ -3692,13 +3692,13 @@ abort_csn_callback(const CSN *csn, void *data)
b69e47
     {
b69e47
         int rc = csnplRemove(r->min_csn_pl, csn);
b69e47
         if (rc) {
b69e47
-            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed");
b69e47
+            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed\n");
b69e47
             replica_unlock(r->repl_lock);
b69e47
             return;
b69e47
         }
b69e47
     }
b69e47
 
b69e47
-    ruv_cancel_csn_inprogress (ruv, csn, replica_get_rid(r));
b69e47
+    ruv_cancel_csn_inprogress (r, ruv, csn, replica_get_rid(r));
b69e47
     replica_unlock(r->repl_lock);
b69e47
 
b69e47
     object_release (ruv_obj);
b69e47
@@ -4489,3 +4489,13 @@ replica_check_release_timeout(Replica *r, Slapi_PBlock *pb)
b69e47
 	}
b69e47
 	replica_unlock(r->repl_lock);
b69e47
 }
b69e47
+void
b69e47
+replica_lock_replica(Replica *r)
b69e47
+{
b69e47
+	replica_lock(r->repl_lock);
b69e47
+}
b69e47
+void
b69e47
+replica_unlock_replica(Replica *r)
b69e47
+{
b69e47
+	replica_unlock(r->repl_lock);
b69e47
+}
b69e47
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c
b69e47
index d59e6d2..39449b6 100644
b69e47
--- a/ldap/servers/plugins/replication/repl5_ruv.c
b69e47
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
b69e47
@@ -77,7 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
b69e47
 static const char * const prefix_replicageneration = "{replicageneration}";
b69e47
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
b69e47
 
b69e47
-static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
b69e47
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal);
b69e47
 
b69e47
 /* API implementation */
b69e47
 
b69e47
@@ -1599,13 +1599,13 @@ ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile)
b69e47
 
b69e47
 /* this function notifies the ruv that there are operations in progress so that
b69e47
    they can be added to the pending list for the appropriate client. */
b69e47
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
b69e47
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn)
b69e47
 {
b69e47
     RUVElement* replica;
b69e47
     char csn_str[CSN_STRSIZE];
b69e47
     int rc = RUV_SUCCESS;
b69e47
     int rid = csn_get_replicaid (csn);
b69e47
-    CSN *prim_csn;
b69e47
+    CSNPL_CTX *prim_csn;
b69e47
 
b69e47
     PR_ASSERT (ruv && csn);
b69e47
 
b69e47
@@ -1645,8 +1645,13 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
b69e47
     }
b69e47
     prim_csn = get_thread_primary_csn();
b69e47
     if (prim_csn == NULL) {
b69e47
-        set_thread_primary_csn(csn);
b69e47
+        set_thread_primary_csn(csn, (Replica *)repl);
b69e47
         prim_csn = get_thread_primary_csn();
b69e47
+    } else {
b69e47
+	/* the prim csn data already exist, need to check if
b69e47
+	 * current replica is already present
b69e47
+	 */
b69e47
+	add_replica_to_primcsn(prim_csn, (Replica *)repl);
b69e47
     }
b69e47
     rc = csnplInsert (replica->csnpl, csn, prim_csn);
b69e47
     if (rc == 1)    /* we already seen this csn */
b69e47
@@ -1656,7 +1661,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
b69e47
                             "The csn %s has already be seen - ignoring\n",
b69e47
                             csn_as_string (csn, PR_FALSE, csn_str));
b69e47
         }
b69e47
-        set_thread_primary_csn(NULL);
b69e47
+        set_thread_primary_csn(NULL, NULL);
b69e47
         rc = RUV_COVERS_CSN;    
b69e47
     }
b69e47
     else if(rc != 0)
b69e47
@@ -1681,11 +1686,13 @@ done:
b69e47
     return rc;
b69e47
 }
b69e47
 
b69e47
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
b69e47
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId local_rid)
b69e47
 {
b69e47
-    RUVElement* replica;
b69e47
+    RUVElement* repl_ruv;
b69e47
     int rc = RUV_SUCCESS;
b69e47
-    CSN *prim_csn = NULL;
b69e47
+    CSNPL_CTX *prim_csn = NULL;
b69e47
+    Replica *repl_it;
b69e47
+    size_t it;
b69e47
 
b69e47
 
b69e47
     PR_ASSERT (ruv && csn);
b69e47
@@ -1693,29 +1700,53 @@ int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
b69e47
     prim_csn = get_thread_primary_csn();
b69e47
     /* locate ruvElement */
b69e47
     slapi_rwlock_wrlock (ruv->lock);
b69e47
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
b69e47
-    if (replica == NULL) {
b69e47
+    repl_ruv = ruvGetReplica (ruv, csn_get_replicaid (csn));
b69e47
+    if (repl_ruv == NULL) {
b69e47
         /* ONREPL - log error */
b69e47
 	rc = RUV_NOTFOUND;
b69e47
 	goto done;
b69e47
     }
b69e47
-    if (csn_is_equal(csn, prim_csn)) {
b69e47
-	/* the prim csn is cancelled, lets remove all dependent csns */
b69e47
-	ReplicaId prim_rid = csn_get_replicaid (csn);
b69e47
-	replica = ruvGetReplica (ruv, prim_rid);
b69e47
-	rc = csnplRemoveAll (replica->csnpl, prim_csn);
b69e47
-	if (prim_rid != local_rid) {
b69e47
-		if( local_rid != READ_ONLY_REPLICA_ID) {
b69e47
-			replica = ruvGetReplica (ruv, local_rid);
b69e47
-			if (replica) {
b69e47
-				rc = csnplRemoveAll (replica->csnpl, prim_csn);
b69e47
-			} else {
b69e47
-				rc = RUV_NOTFOUND;
b69e47
-			}
b69e47
-		}
b69e47
-	}
b69e47
+    if (csn_primary(repl, csn, prim_csn)) {
b69e47
+        /* the prim csn is cancelled, lets remove all dependent csns */
b69e47
+        /* for the primary replica we can have modifications for two RIDS:
b69e47
+         * - the local RID for direct or internal operations
b69e47
+         * - a remote RID if the primary csn is for a replciated op.
b69e47
+         */
b69e47
+        ReplicaId prim_rid = csn_get_replicaid(csn);
b69e47
+        repl_ruv = ruvGetReplica(ruv, prim_rid);
b69e47
+        if (!repl_ruv) {
b69e47
+            rc = RUV_NOTFOUND;
b69e47
+            goto done;
b69e47
+        }
b69e47
+        rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
b69e47
+
b69e47
+        if (prim_rid != local_rid && local_rid != READ_ONLY_REPLICA_ID) {
b69e47
+            repl_ruv = ruvGetReplica(ruv, local_rid);
b69e47
+            if (!repl_ruv) {
b69e47
+                rc = RUV_NOTFOUND;
b69e47
+                goto done;
b69e47
+            }
b69e47
+            rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
b69e47
+        }
b69e47
+
b69e47
+        for (it = 0; it < prim_csn->repl_cnt; it++) {
b69e47
+            repl_it = prim_csn->sec_repl[it];
b69e47
+            replica_lock_replica(repl_it);
b69e47
+            local_rid = replica_get_rid(repl_it);
b69e47
+            if (local_rid != READ_ONLY_REPLICA_ID) {
b69e47
+                Object *ruv_obj = replica_get_ruv(repl_it);
b69e47
+                RUV *ruv_it = object_get_data(ruv_obj);
b69e47
+                repl_ruv = ruvGetReplica(ruv_it, local_rid);
b69e47
+                if (repl_ruv) {
b69e47
+                    rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
b69e47
+                } else {
b69e47
+                    rc = RUV_NOTFOUND;
b69e47
+                }
b69e47
+            }
b69e47
+            replica_unlock_replica(repl_it);
b69e47
+        }
b69e47
     } else {
b69e47
-	rc = csnplRemove (replica->csnpl, csn);
b69e47
+	rc = csnplRemove (repl_ruv->csnpl, csn);
b69e47
     }
b69e47
     if (rc != 0)
b69e47
         rc = RUV_NOTFOUND;
b69e47
@@ -1727,86 +1758,100 @@ done:
b69e47
     return rc;
b69e47
 }
b69e47
 
b69e47
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
b69e47
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid)
b69e47
 {
b69e47
     int rc=RUV_SUCCESS;
b69e47
-    RUVElement *replica;
b69e47
+    RUVElement *repl_ruv;
b69e47
     ReplicaId prim_rid;
b69e47
+    Replica *repl_it = NULL;
b69e47
+    size_t it = 0;
b69e47
 
b69e47
-    CSN *prim_csn = get_thread_primary_csn();
b69e47
+    CSNPL_CTX *prim_csn = get_thread_primary_csn();
b69e47
 
b69e47
-    if (! csn_is_equal(csn, prim_csn)) {
b69e47
+    if (! csn_primary(replica, csn, prim_csn)) {
b69e47
 	/* not a primary csn, nothing to do */
b69e47
 	return rc;
b69e47
     }
b69e47
-    slapi_rwlock_wrlock (ruv->lock);
b69e47
+
b69e47
+    /* first handle primary replica 
b69e47
+     * there can be two ruv elements affected
b69e47
+     */
b69e47
     prim_rid = csn_get_replicaid (csn);
b69e47
-    replica = ruvGetReplica (ruv, local_rid);
b69e47
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
b69e47
-    if ( rc || local_rid == prim_rid) goto done;
b69e47
-    replica = ruvGetReplica (ruv, prim_rid);
b69e47
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
b69e47
-done:
b69e47
+    slapi_rwlock_wrlock (ruv->lock);
b69e47
+    if ( local_rid != prim_rid) {
b69e47
+	repl_ruv = ruvGetReplica (ruv, prim_rid);
b69e47
+	rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_FALSE);
b69e47
+    }
b69e47
+    repl_ruv = ruvGetReplica (ruv, local_rid);
b69e47
+    rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_TRUE);
b69e47
     slapi_rwlock_unlock (ruv->lock);
b69e47
+    if (rc) return rc;
b69e47
+
b69e47
+    /* now handle secondary replicas */
b69e47
+    for (it=0; it<prim_csn->repl_cnt; it++) {
b69e47
+	repl_it = prim_csn->sec_repl[it];
b69e47
+	replica_lock_replica(repl_it);
b69e47
+	Object *ruv_obj = replica_get_ruv (repl_it);
b69e47
+	RUV *ruv_it = object_get_data (ruv_obj);
b69e47
+	slapi_rwlock_wrlock (ruv_it->lock);
b69e47
+	repl_ruv = ruvGetReplica (ruv_it, replica_get_rid(repl_it));
b69e47
+	rc = ruv_update_ruv_element(ruv_it, repl_ruv, prim_csn, replica_purl, PR_TRUE);
b69e47
+	slapi_rwlock_unlock (ruv_it->lock);
b69e47
+	replica_unlock_replica(repl_it);
b69e47
+	if (rc) break;
b69e47
+    }
b69e47
     return rc;
b69e47
 }
b69e47
+
b69e47
 static int
b69e47
-ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
b69e47
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal)
b69e47
 {
b69e47
     int rc=RUV_SUCCESS;
b69e47
     char csn_str[CSN_STRSIZE];
b69e47
     CSN *max_csn;
b69e47
     CSN *first_csn = NULL;
b69e47
     
b69e47
-    if (replica == NULL)
b69e47
-    {
b69e47
+    if (replica == NULL) {
b69e47
         /* we should have a ruv element at this point because it would have
b69e47
            been added by ruv_add_inprogress function */
b69e47
         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
b69e47
-			            "Can't locate RUV element for replica %d\n", csn_get_replicaid (csn)); 
b69e47
+                        "Can't locate RUV element for replica %d\n", csn_get_replicaid (prim_csn->prim_csn));
b69e47
         goto done;
b69e47
     } 
b69e47
 
b69e47
-	if (csnplCommitAll(replica->csnpl, csn) != 0)
b69e47
-	{
b69e47
-		slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
b69e47
-			            csn_as_string(csn, PR_FALSE, csn_str));
b69e47
+    if (csnplCommitAll(replica->csnpl, prim_csn) != 0) {
b69e47
+        slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
b69e47
+                        csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
b69e47
         rc = RUV_CSNPL_ERROR;
b69e47
         goto done;
b69e47
-	}
b69e47
-    else
b69e47
-    {
b69e47
+    } else {
b69e47
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
b69e47
             slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
b69e47
-                            "Successfully committed csn %s\n", csn_as_string(csn, PR_FALSE, csn_str));
b69e47
+                            "Successfully committed csn %s\n", csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
b69e47
         }
b69e47
     }
b69e47
 
b69e47
-	if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL)
b69e47
-	{
b69e47
-#ifdef DEBUG
b69e47
-		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
b69e47
-			            csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
b69e47
-#endif
b69e47
+    if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL) {
b69e47
+        slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
b69e47
+                        csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
b69e47
         /* replica object sets min csn for local replica */
b69e47
-		if (!isLocal && replica->min_csn == NULL) {
b69e47
-		  /* bug 559223 - it seems that, under huge stress, a server might pass
b69e47
-		   * through this code when more than 1 change has already been sent and commited into
b69e47
-		   * the pending lists... Therefore, as we are trying to set the min_csn ever 
b69e47
-		   * generated by this replica, we need to set the first_csn as the min csn in the
b69e47
-		   * ruv */
b69e47
-		  set_min_csn_nolock(ruv, first_csn, replica_purl);
b69e47
-		}
b69e47
-		/* only update the max_csn in the RUV if it is greater than the existing one */
b69e47
-		rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
b69e47
-		/* It is possible that first_csn points to max_csn.
b69e47
-		   We need to free it once */
b69e47
-		if (max_csn != first_csn) {
b69e47
-			csn_free(&first_csn); 
b69e47
-		}
b69e47
-		csn_free(&max_csn);
b69e47
-	}
b69e47
-
b69e47
+        if (!isLocal && replica->min_csn == NULL) {
b69e47
+            /* bug 559223 - it seems that, under huge stress, a server might pass
b69e47
+             * through this code when more than 1 change has already been sent and commited into
b69e47
+             * the pending lists... Therefore, as we are trying to set the min_csn ever
b69e47
+             * generated by this replica, we need to set the first_csn as the min csn in the
b69e47
+             * ruv */
b69e47
+        set_min_csn_nolock(ruv, first_csn, replica_purl);
b69e47
+        }
b69e47
+        /* only update the max_csn in the RUV if it is greater than the existing one */
b69e47
+        rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
b69e47
+        /* It is possible that first_csn points to max_csn.
b69e47
+           We need to free it once */
b69e47
+        if (max_csn != first_csn) {
b69e47
+            csn_free(&first_csn);
b69e47
+        }
b69e47
+        csn_free(&max_csn);
b69e47
+    }
b69e47
 done:
b69e47
 
b69e47
     return rc;
b69e47
diff --git a/ldap/servers/plugins/replication/repl5_ruv.h b/ldap/servers/plugins/replication/repl5_ruv.h
b69e47
index c8960fd..f3cd38b 100644
b69e47
--- a/ldap/servers/plugins/replication/repl5_ruv.h
b69e47
+++ b/ldap/servers/plugins/replication/repl5_ruv.h
b69e47
@@ -108,9 +108,9 @@ int ruv_to_bervals(const RUV *ruv, struct berval ***bvals);
b69e47
 PRInt32 ruv_replica_count (const RUV *ruv);
b69e47
 char **ruv_get_referrals(const RUV *ruv);
b69e47
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
b69e47
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
b69e47
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId rid);
b69e47
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid);
b69e47
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn);
b69e47
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId rid);
b69e47
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid);
b69e47
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
b69e47
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
b69e47
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);
b69e47
diff --git a/ldap/servers/slapd/slapi-private.h b/ldap/servers/slapd/slapi-private.h
b69e47
index 0836d66..3910dbe 100644
b69e47
--- a/ldap/servers/slapd/slapi-private.h
b69e47
+++ b/ldap/servers/slapd/slapi-private.h
b69e47
@@ -193,7 +193,7 @@ const CSN *csn_max(const CSN *csn1,const CSN *csn2);
b69e47
    a csn from the set.*/
b69e47
 int csn_increment_subsequence (CSN *csn);
b69e47
 
b69e47
-void csnplFreeCSN (void *arg);
b69e47
+void csnplFreeCSNPL_CTX (void *arg);
b69e47
 /*
b69e47
  * csnset.c
b69e47
  */
b69e47
-- 
b69e47
2.9.4
b69e47