andykimpe / rpms / 389-ds-base

Forked from rpms/389-ds-base 6 months ago
Clone
1c155e
From 6b5aa0e288f1ea5553d4dd5d220d4e5daf50a247 Mon Sep 17 00:00:00 2001
1c155e
From: Mark Reynolds <mreynolds@redhat.com>
1c155e
Date: Mon, 31 Jul 2017 14:45:50 -0400
1c155e
Subject: [PATCH] Ticket 49287 - v3 extend csnpl handling to multiple backends
1c155e
1c155e
        The csn pending list mechanism failed if internal operation affected multiple backends
1c155e
1c155e
        This fix is an extension to the fix in ticket 49008, the thread local data now also contains
1c155e
        a list of all affected replicas.
1c155e
1c155e
        http://www.port389.org/docs/389ds/design/csn-pending-lists-and-ruv-update.html
1c155e
1c155e
        Reviewed by: William, Thierry - thanks
1c155e
---
1c155e
 ldap/servers/plugins/replication/csnpl.c         |  85 ++++++++--
1c155e
 ldap/servers/plugins/replication/csnpl.h         |   8 +-
1c155e
 ldap/servers/plugins/replication/repl5.h         |  22 ++-
1c155e
 ldap/servers/plugins/replication/repl5_init.c    |  48 +++++-
1c155e
 ldap/servers/plugins/replication/repl5_plugins.c |  16 +-
1c155e
 ldap/servers/plugins/replication/repl5_replica.c |  18 ++-
1c155e
 ldap/servers/plugins/replication/repl5_ruv.c     | 191 ++++++++++++++---------
1c155e
 ldap/servers/plugins/replication/repl5_ruv.h     |   6 +-
1c155e
 ldap/servers/slapd/slapi-private.h               |   2 +-
1c155e
 9 files changed, 283 insertions(+), 113 deletions(-)
1c155e
1c155e
diff --git a/ldap/servers/plugins/replication/csnpl.c b/ldap/servers/plugins/replication/csnpl.c
1c155e
index 4a0f5f5..12a0bb8 100644
1c155e
--- a/ldap/servers/plugins/replication/csnpl.c
1c155e
+++ b/ldap/servers/plugins/replication/csnpl.c
1c155e
@@ -14,7 +14,6 @@
1c155e
 
1c155e
 #include "csnpl.h"
1c155e
 #include "llist.h"
1c155e
-#include "repl_shared.h"
1c155e
 
1c155e
 struct csnpl 
1c155e
 {
1c155e
@@ -22,13 +21,17 @@ struct csnpl
1c155e
 	Slapi_RWLock*	csnLock;	/* lock to serialize access to PL */
1c155e
 };	
1c155e
 
1c155e
+
1c155e
 typedef struct _csnpldata
1c155e
 {
1c155e
 	PRBool	committed;  /* True if CSN committed */
1c155e
 	CSN	*csn;       /* The actual CSN */
1c155e
+	Replica * prim_replica; /* The replica where the prom csn was generated */
1c155e
 	const CSN *prim_csn;  /* The primary CSN of an operation consising of multiple sub ops*/
1c155e
 } csnpldata;
1c155e
 
1c155e
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx);
1c155e
+
1c155e
 /* forward declarations */
1c155e
 #ifdef DEBUG
1c155e
 static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller);
1c155e
@@ -104,7 +107,7 @@ void csnplFree (CSNPL **csnpl)
1c155e
  *          1 if the csn has already been seen
1c155e
  *         -1 for any other kind of errors
1c155e
  */
1c155e
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
1c155e
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn)
1c155e
 {
1c155e
 	int rc;
1c155e
 	csnpldata *csnplnode;
1c155e
@@ -129,10 +132,13 @@ int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
1c155e
         return 1;
1c155e
     }
1c155e
 
1c155e
-	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
1c155e
+	csnplnode = (csnpldata *)slapi_ch_calloc(1, sizeof(csnpldata));
1c155e
 	csnplnode->committed = PR_FALSE;
1c155e
 	csnplnode->csn = csn_dup(csn);
1c155e
-	csnplnode->prim_csn = prim_csn;
1c155e
+	if (prim_csn) {
1c155e
+		csnplnode->prim_csn = prim_csn->prim_csn;
1c155e
+		csnplnode->prim_replica =  prim_csn->prim_repl;
1c155e
+	}
1c155e
 	csn_as_string(csn, PR_FALSE, csn_str);
1c155e
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
1c155e
 
1c155e
@@ -187,8 +193,58 @@ int csnplRemove (CSNPL *csnpl, const CSN *csn)
1c155e
 
1c155e
 	return 0;
1c155e
 }
1c155e
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx)
1c155e
+{
1c155e
+    if (csn_ctx == NULL)
1c155e
+        return PR_FALSE;
1c155e
+    
1c155e
+    if (replica != csn_ctx->prim_repl) {
1c155e
+        /* The CSNs are not from the same replication topology
1c155e
+         * so even if the csn values are equal they are not related
1c155e
+         * to the same operation
1c155e
+         */
1c155e
+        return PR_FALSE;
1c155e
+    }
1c155e
+    
1c155e
+    /* Here the two CSNs belong to the same replication topology */
1c155e
+    
1c155e
+    /* check if the CSN identifies the primary update */
1c155e
+    if (csn_is_equal(csn, csn_ctx->prim_csn)) {
1c155e
+        return PR_TRUE;
1c155e
+    }
1c155e
+    
1c155e
+    return PR_FALSE;
1c155e
+}
1c155e
+
1c155e
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx)
1c155e
+{
1c155e
+    if ((csn_data == NULL) || (csn_ctx == NULL))
1c155e
+        return PR_FALSE;
1c155e
+    
1c155e
+    if (csn_data->prim_replica != csn_ctx->prim_repl) {
1c155e
+        /* The CSNs are not from the same replication topology
1c155e
+         * so even if the csn values are equal they are not related
1c155e
+         * to the same operation
1c155e
+         */
1c155e
+        return PR_FALSE;
1c155e
+    }
1c155e
+    
1c155e
+    /* Here the two CSNs belong to the same replication topology */
1c155e
+    
1c155e
+    /* First check if the CSN identifies the primary update */
1c155e
+    if (csn_is_equal(csn_data->csn, csn_ctx->prim_csn)) {
1c155e
+        return PR_TRUE;
1c155e
+    }
1c155e
+    
1c155e
+    /* Second check if the CSN identifies a nested update */
1c155e
+    if (csn_is_equal(csn_data->prim_csn, csn_ctx->prim_csn)) {
1c155e
+        return PR_TRUE;
1c155e
+    }
1c155e
+    
1c155e
+    return PR_FALSE;
1c155e
+}
1c155e
 
1c155e
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
1c155e
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
1c155e
 {
1c155e
 	csnpldata *data;
1c155e
 	void *iterator;
1c155e
@@ -197,8 +253,7 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
1c155e
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
1c155e
 	while (NULL != data)
1c155e
 	{
1c155e
-		if (csn_is_equal(data->csn, csn) ||
1c155e
-		    csn_is_equal(data->prim_csn, csn)) {
1c155e
+		if (csn_primary_or_nested(data, csn_ctx)) {
1c155e
 			csnpldata_free(&data);
1c155e
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
1c155e
 		} else {
1c155e
@@ -213,13 +268,13 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
1c155e
 }
1c155e
 
1c155e
 
1c155e
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
1c155e
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
1c155e
 {
1c155e
 	csnpldata *data;
1c155e
 	void *iterator;
1c155e
 	char csn_str[CSN_STRSIZE];
1c155e
 
1c155e
-	csn_as_string(csn, PR_FALSE, csn_str);
1c155e
+	csn_as_string(csn_ctx->prim_csn, PR_FALSE, csn_str);
1c155e
 	slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
1c155e
 		            "csnplCommitALL: committing all csns for csn %s\n", csn_str);
1c155e
 	slapi_rwlock_wrlock (csnpl->csnLock);
1c155e
@@ -229,8 +284,7 @@ int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
1c155e
 		csn_as_string(data->csn, PR_FALSE, csn_str);
1c155e
 		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
1c155e
 				"csnplCommitALL: processing data csn %s\n", csn_str);
1c155e
-		if (csn_is_equal(data->csn, csn) ||
1c155e
-		    csn_is_equal(data->prim_csn, csn)) {
1c155e
+                if (csn_primary_or_nested(data, csn_ctx)) {
1c155e
 			data->committed = PR_TRUE;
1c155e
 		}
1c155e
 		data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
1c155e
@@ -395,7 +449,12 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
1c155e
 
1c155e
 /* wrapper around csn_free, to satisfy NSPR thread context API */
1c155e
 void
1c155e
-csnplFreeCSN (void *arg)
1c155e
+csnplFreeCSNPL_CTX (void *arg)
1c155e
 {
1c155e
-	csn_free((CSN **)&arg;;
1c155e
+	CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)arg;
1c155e
+	csn_free(&csnpl_ctx->prim_csn);
1c155e
+	if (csnpl_ctx->sec_repl) {
1c155e
+		slapi_ch_free((void **)&csnpl_ctx->sec_repl);
1c155e
+	}
1c155e
+	slapi_ch_free((void **)&csnpl_ctx);
1c155e
 }
1c155e
diff --git a/ldap/servers/plugins/replication/csnpl.h b/ldap/servers/plugins/replication/csnpl.h
1c155e
index 594c8f2..1036c62 100644
1c155e
--- a/ldap/servers/plugins/replication/csnpl.h
1c155e
+++ b/ldap/servers/plugins/replication/csnpl.h
1c155e
@@ -17,15 +17,17 @@
1c155e
 #define CSNPL_H
1c155e
 
1c155e
 #include "slapi-private.h"
1c155e
+#include "repl5.h"
1c155e
 
1c155e
 typedef struct csnpl CSNPL;
1c155e
 
1c155e
 CSNPL* csnplNew(void);
1c155e
 void csnplFree (CSNPL **csnpl);
1c155e
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn);
1c155e
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn);
1c155e
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
1c155e
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn);
1c155e
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn);
1c155e
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
1c155e
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
1c155e
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx);
1c155e
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
1c155e
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
1c155e
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
1c155e
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
1c155e
index 1d8989c..718f64e 100644
1c155e
--- a/ldap/servers/plugins/replication/repl5.h
1c155e
+++ b/ldap/servers/plugins/replication/repl5.h
1c155e
@@ -228,12 +228,27 @@ int multimaster_be_betxnpostop_delete (Slapi_PBlock *pb);
1c155e
 int multimaster_be_betxnpostop_add (Slapi_PBlock *pb);
1c155e
 int multimaster_be_betxnpostop_modify (Slapi_PBlock *pb);
1c155e
 
1c155e
+/* In repl5_replica.c */
1c155e
+typedef struct replica Replica;
1c155e
+
1c155e
+/* csn pending lists */
1c155e
+#define CSNPL_CTX_REPLCNT 4
1c155e
+typedef struct CSNPL_CTX
1c155e
+{
1c155e
+	CSN *prim_csn;
1c155e
+	size_t repl_alloc; /* max number of replicas  */
1c155e
+	size_t repl_cnt; /* number of replicas affected by operation */
1c155e
+	Replica *prim_repl; /* pirmary replica */
1c155e
+	Replica **sec_repl; /* additional replicas affected */
1c155e
+} CSNPL_CTX;
1c155e
+
1c155e
 /* In repl5_init.c */
1c155e
 extern int repl5_is_betxn;
1c155e
 char* get_thread_private_agmtname(void);
1c155e
 void  set_thread_private_agmtname (const char *agmtname);
1c155e
-void  set_thread_primary_csn (const CSN *prim_csn);
1c155e
-CSN*  get_thread_primary_csn(void);
1c155e
+void  set_thread_primary_csn (const CSN *prim_csn, Replica *repl);
1c155e
+void  add_replica_to_primcsn(CSNPL_CTX *prim_csn, Replica *repl);
1c155e
+CSNPL_CTX*  get_thread_primary_csn(void);
1c155e
 void* get_thread_private_cache(void);
1c155e
 void  set_thread_private_cache (void *buf);
1c155e
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
1c155e
@@ -302,7 +317,6 @@ typedef struct repl_bos Repl_Bos;
1c155e
 
1c155e
 /* In repl5_agmt.c */
1c155e
 typedef struct repl5agmt Repl_Agmt;
1c155e
-typedef struct replica Replica;
1c155e
 
1c155e
 #define TRANSPORT_FLAG_SSL 1
1c155e
 #define TRANSPORT_FLAG_TLS 2
1c155e
@@ -629,6 +643,8 @@ PRUint64 replica_get_precise_purging(Replica *r);
1c155e
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
1c155e
 PRBool ignore_error_and_keep_going(int error);
1c155e
 void replica_check_release_timeout(Replica *r, Slapi_PBlock *pb);
1c155e
+void replica_lock_replica(Replica *r);
1c155e
+void replica_unlock_replica(Replica *r);
1c155e
 
1c155e
 /* The functions below handles the state flag */
1c155e
 /* Current internal state flags */
1c155e
diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c
1c155e
index edffb84..b0bc515 100644
1c155e
--- a/ldap/servers/plugins/replication/repl5_init.c
1c155e
+++ b/ldap/servers/plugins/replication/repl5_init.c
1c155e
@@ -154,26 +154,62 @@ set_thread_private_agmtname(const char *agmtname)
1c155e
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
1c155e
 }
1c155e
 
1c155e
-CSN*
1c155e
+CSNPL_CTX*
1c155e
 get_thread_primary_csn(void)
1c155e
 {
1c155e
-	CSN *prim_csn = NULL;
1c155e
+	CSNPL_CTX *prim_csn = NULL;
1c155e
 	if (thread_primary_csn)
1c155e
-		prim_csn = (CSN *)PR_GetThreadPrivate(thread_primary_csn);
1c155e
+		prim_csn = (CSNPL_CTX *)PR_GetThreadPrivate(thread_primary_csn);
1c155e
+
1c155e
 	return prim_csn;
1c155e
 }
1c155e
 void
1c155e
-set_thread_primary_csn(const CSN *prim_csn)
1c155e
+set_thread_primary_csn (const CSN *prim_csn, Replica *repl)
1c155e
 {
1c155e
 	if (thread_primary_csn) {
1c155e
 		if (prim_csn) {
1c155e
-			PR_SetThreadPrivate(thread_primary_csn, (void *)csn_dup(prim_csn));
1c155e
+			CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)slapi_ch_calloc(1,sizeof(CSNPL_CTX));
1c155e
+			csnpl_ctx->prim_csn = csn_dup(prim_csn);
1c155e
+			/* repl_alloc, repl_cnt and sec_repl are 0 by calloc */
1c155e
+			csnpl_ctx->prim_repl = repl;
1c155e
+			PR_SetThreadPrivate(thread_primary_csn, (void *)csnpl_ctx);
1c155e
 		} else {
1c155e
 			PR_SetThreadPrivate(thread_primary_csn, NULL);
1c155e
 		}
1c155e
 	}
1c155e
 }
1c155e
 
1c155e
+void
1c155e
+add_replica_to_primcsn(CSNPL_CTX *csnpl_ctx, Replica *repl)
1c155e
+{
1c155e
+	size_t found = 0;
1c155e
+	size_t it = 0;
1c155e
+
1c155e
+	if (repl == csnpl_ctx->prim_repl) return;
1c155e
+
1c155e
+	while (it < csnpl_ctx->repl_cnt) {
1c155e
+		if (csnpl_ctx->sec_repl[it] == repl) {
1c155e
+			found = 1;
1c155e
+			break;
1c155e
+		}
1c155e
+		it++;
1c155e
+	}
1c155e
+	if (found) return;
1c155e
+
1c155e
+	if (csnpl_ctx->repl_cnt < csnpl_ctx->repl_alloc) {
1c155e
+		csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
1c155e
+		return;
1c155e
+	}
1c155e
+	csnpl_ctx->repl_alloc += CSNPL_CTX_REPLCNT;
1c155e
+	if (csnpl_ctx->repl_cnt == 0) {
1c155e
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_calloc(csnpl_ctx->repl_alloc, sizeof(Replica *));
1c155e
+	} else {
1c155e
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_realloc((char *)csnpl_ctx->sec_repl, csnpl_ctx->repl_alloc * sizeof(Replica *));
1c155e
+	}
1c155e
+	csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
1c155e
+	return;
1c155e
+}
1c155e
+
1c155e
 void*
1c155e
 get_thread_private_cache ()
1c155e
 {
1c155e
@@ -740,7 +776,7 @@ multimaster_start( Slapi_PBlock *pb )
1c155e
 		/* Initialize thread private data for logging. Ignore if fails */
1c155e
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
1c155e
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
1c155e
-		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSN);
1c155e
+		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSNPL_CTX);
1c155e
 
1c155e
 		/* Decode the command line args to see if we're dumping to LDIF */
1c155e
 		is_ldif_dump = check_for_ldif_dump(pb);
1c155e
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
1c155e
index 9ef06af..c31d9d5 100644
1c155e
--- a/ldap/servers/plugins/replication/repl5_plugins.c
1c155e
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
1c155e
@@ -45,6 +45,7 @@
1c155e
 #include "repl.h"
1c155e
 #include "cl5_api.h"
1c155e
 #include "urp.h"
1c155e
+#include "csnpl.h"
1c155e
 
1c155e
 static char *local_purl = NULL;
1c155e
 static char *purl_attrs[] = {"nsslapd-localhost", "nsslapd-port", "nsslapd-secureport", NULL};
1c155e
@@ -1034,7 +1035,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
1c155e
 {
1c155e
 	Slapi_Operation *op = NULL;
1c155e
 	CSN *opcsn;
1c155e
-	CSN *prim_csn;
1c155e
+	CSNPL_CTX *prim_csn;
1c155e
 	int rc;
1c155e
 	slapi_operation_parameters *op_params = NULL;
1c155e
 	Object *repl_obj = NULL;
1c155e
@@ -1070,14 +1071,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
1c155e
 	if (repl_obj == NULL)
1c155e
 		return return_value;
1c155e
 
1c155e
+	r = (Replica*)object_get_data (repl_obj);
1c155e
+	PR_ASSERT (r);
1c155e
+
1c155e
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
1c155e
 	if (rc) { /* op failed - just return */
1c155e
 		cancel_opcsn(pb);
1c155e
 		goto common_return;
1c155e
 	}
1c155e
 
1c155e
-	r = (Replica*)object_get_data (repl_obj);
1c155e
-	PR_ASSERT (r);
1c155e
 
1c155e
 	replica_check_release_timeout(r, pb);
1c155e
 
1c155e
@@ -1223,12 +1225,12 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
1c155e
 common_return:
1c155e
 	opcsn = operation_get_csn(op);
1c155e
 	prim_csn = get_thread_primary_csn();
1c155e
-	if (csn_is_equal(opcsn, prim_csn)) {
1c155e
+	if (csn_primary(r, opcsn, prim_csn)) {
1c155e
 		if (return_value == 0) {
1c155e
 			/* the primary csn was succesfully committed
1c155e
 			 * unset it in the thread local data
1c155e
 			 */
1c155e
-			set_thread_primary_csn(NULL);
1c155e
+			set_thread_primary_csn(NULL, NULL);
1c155e
 		}
1c155e
 	}
1c155e
 	if (repl_obj) {
1c155e
@@ -1430,7 +1432,7 @@ cancel_opcsn (Slapi_PBlock *pb)
1c155e
 
1c155e
             ruv_obj = replica_get_ruv (r);
1c155e
             PR_ASSERT (ruv_obj);
1c155e
-            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
1c155e
+            ruv_cancel_csn_inprogress (r, (RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
1c155e
             object_release (ruv_obj);
1c155e
         }
1c155e
 
1c155e
@@ -1491,7 +1493,7 @@ process_operation (Slapi_PBlock *pb, const CSN *csn)
1c155e
     ruv = (RUV*)object_get_data (ruv_obj);
1c155e
     PR_ASSERT (ruv);
1c155e
  
1c155e
-    rc = ruv_add_csn_inprogress (ruv, csn);
1c155e
+    rc = ruv_add_csn_inprogress (r, ruv, csn);
1c155e
 
1c155e
     object_release (ruv_obj);
1c155e
     object_release (r_obj);
1c155e
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
1c155e
index 1bdc138..7927ac3 100644
1c155e
--- a/ldap/servers/plugins/replication/repl5_replica.c
1c155e
+++ b/ldap/servers/plugins/replication/repl5_replica.c
1c155e
@@ -923,7 +923,7 @@ replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
1c155e
 					}
1c155e
 				}
1c155e
 				/* Update max csn for local and remote replicas */
1c155e
-				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r->repl_rid);
1c155e
+				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r, r->repl_rid);
1c155e
 				if (RUV_COVERS_CSN == rc)
1c155e
 				{
1c155e
 					slapi_log_err(SLAPI_LOG_REPL,
1c155e
@@ -3663,7 +3663,7 @@ assign_csn_callback(const CSN *csn, void *data)
1c155e
         }
1c155e
     }
1c155e
 
1c155e
-    ruv_add_csn_inprogress (ruv, csn);
1c155e
+    ruv_add_csn_inprogress (r, ruv, csn);
1c155e
 
1c155e
     replica_unlock(r->repl_lock);
1c155e
 
1c155e
@@ -3692,13 +3692,13 @@ abort_csn_callback(const CSN *csn, void *data)
1c155e
     {
1c155e
         int rc = csnplRemove(r->min_csn_pl, csn);
1c155e
         if (rc) {
1c155e
-            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed");
1c155e
+            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed\n");
1c155e
             replica_unlock(r->repl_lock);
1c155e
             return;
1c155e
         }
1c155e
     }
1c155e
 
1c155e
-    ruv_cancel_csn_inprogress (ruv, csn, replica_get_rid(r));
1c155e
+    ruv_cancel_csn_inprogress (r, ruv, csn, replica_get_rid(r));
1c155e
     replica_unlock(r->repl_lock);
1c155e
 
1c155e
     object_release (ruv_obj);
1c155e
@@ -4489,3 +4489,13 @@ replica_check_release_timeout(Replica *r, Slapi_PBlock *pb)
1c155e
 	}
1c155e
 	replica_unlock(r->repl_lock);
1c155e
 }
1c155e
+void
1c155e
+replica_lock_replica(Replica *r)
1c155e
+{
1c155e
+	replica_lock(r->repl_lock);
1c155e
+}
1c155e
+void
1c155e
+replica_unlock_replica(Replica *r)
1c155e
+{
1c155e
+	replica_unlock(r->repl_lock);
1c155e
+}
1c155e
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c
1c155e
index d59e6d2..39449b6 100644
1c155e
--- a/ldap/servers/plugins/replication/repl5_ruv.c
1c155e
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
1c155e
@@ -77,7 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
1c155e
 static const char * const prefix_replicageneration = "{replicageneration}";
1c155e
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
1c155e
 
1c155e
-static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
1c155e
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal);
1c155e
 
1c155e
 /* API implementation */
1c155e
 
1c155e
@@ -1599,13 +1599,13 @@ ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile)
1c155e
 
1c155e
 /* this function notifies the ruv that there are operations in progress so that
1c155e
    they can be added to the pending list for the appropriate client. */
1c155e
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
1c155e
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn)
1c155e
 {
1c155e
     RUVElement* replica;
1c155e
     char csn_str[CSN_STRSIZE];
1c155e
     int rc = RUV_SUCCESS;
1c155e
     int rid = csn_get_replicaid (csn);
1c155e
-    CSN *prim_csn;
1c155e
+    CSNPL_CTX *prim_csn;
1c155e
 
1c155e
     PR_ASSERT (ruv && csn);
1c155e
 
1c155e
@@ -1645,8 +1645,13 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
1c155e
     }
1c155e
     prim_csn = get_thread_primary_csn();
1c155e
     if (prim_csn == NULL) {
1c155e
-        set_thread_primary_csn(csn);
1c155e
+        set_thread_primary_csn(csn, (Replica *)repl);
1c155e
         prim_csn = get_thread_primary_csn();
1c155e
+    } else {
1c155e
+	/* the prim csn data already exist, need to check if
1c155e
+	 * current replica is already present
1c155e
+	 */
1c155e
+	add_replica_to_primcsn(prim_csn, (Replica *)repl);
1c155e
     }
1c155e
     rc = csnplInsert (replica->csnpl, csn, prim_csn);
1c155e
     if (rc == 1)    /* we already seen this csn */
1c155e
@@ -1656,7 +1661,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
1c155e
                             "The csn %s has already be seen - ignoring\n",
1c155e
                             csn_as_string (csn, PR_FALSE, csn_str));
1c155e
         }
1c155e
-        set_thread_primary_csn(NULL);
1c155e
+        set_thread_primary_csn(NULL, NULL);
1c155e
         rc = RUV_COVERS_CSN;    
1c155e
     }
1c155e
     else if(rc != 0)
1c155e
@@ -1681,11 +1686,13 @@ done:
1c155e
     return rc;
1c155e
 }
1c155e
 
1c155e
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
1c155e
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId local_rid)
1c155e
 {
1c155e
-    RUVElement* replica;
1c155e
+    RUVElement* repl_ruv;
1c155e
     int rc = RUV_SUCCESS;
1c155e
-    CSN *prim_csn = NULL;
1c155e
+    CSNPL_CTX *prim_csn = NULL;
1c155e
+    Replica *repl_it;
1c155e
+    size_t it;
1c155e
 
1c155e
 
1c155e
     PR_ASSERT (ruv && csn);
1c155e
@@ -1693,29 +1700,53 @@ int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
1c155e
     prim_csn = get_thread_primary_csn();
1c155e
     /* locate ruvElement */
1c155e
     slapi_rwlock_wrlock (ruv->lock);
1c155e
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
1c155e
-    if (replica == NULL) {
1c155e
+    repl_ruv = ruvGetReplica (ruv, csn_get_replicaid (csn));
1c155e
+    if (repl_ruv == NULL) {
1c155e
         /* ONREPL - log error */
1c155e
 	rc = RUV_NOTFOUND;
1c155e
 	goto done;
1c155e
     }
1c155e
-    if (csn_is_equal(csn, prim_csn)) {
1c155e
-	/* the prim csn is cancelled, lets remove all dependent csns */
1c155e
-	ReplicaId prim_rid = csn_get_replicaid (csn);
1c155e
-	replica = ruvGetReplica (ruv, prim_rid);
1c155e
-	rc = csnplRemoveAll (replica->csnpl, prim_csn);
1c155e
-	if (prim_rid != local_rid) {
1c155e
-		if( local_rid != READ_ONLY_REPLICA_ID) {
1c155e
-			replica = ruvGetReplica (ruv, local_rid);
1c155e
-			if (replica) {
1c155e
-				rc = csnplRemoveAll (replica->csnpl, prim_csn);
1c155e
-			} else {
1c155e
-				rc = RUV_NOTFOUND;
1c155e
-			}
1c155e
-		}
1c155e
-	}
1c155e
+    if (csn_primary(repl, csn, prim_csn)) {
1c155e
+        /* the prim csn is cancelled, lets remove all dependent csns */
1c155e
+        /* for the primary replica we can have modifications for two RIDS:
1c155e
+         * - the local RID for direct or internal operations
1c155e
+         * - a remote RID if the primary csn is for a replciated op.
1c155e
+         */
1c155e
+        ReplicaId prim_rid = csn_get_replicaid(csn);
1c155e
+        repl_ruv = ruvGetReplica(ruv, prim_rid);
1c155e
+        if (!repl_ruv) {
1c155e
+            rc = RUV_NOTFOUND;
1c155e
+            goto done;
1c155e
+        }
1c155e
+        rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
1c155e
+
1c155e
+        if (prim_rid != local_rid && local_rid != READ_ONLY_REPLICA_ID) {
1c155e
+            repl_ruv = ruvGetReplica(ruv, local_rid);
1c155e
+            if (!repl_ruv) {
1c155e
+                rc = RUV_NOTFOUND;
1c155e
+                goto done;
1c155e
+            }
1c155e
+            rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
1c155e
+        }
1c155e
+
1c155e
+        for (it = 0; it < prim_csn->repl_cnt; it++) {
1c155e
+            repl_it = prim_csn->sec_repl[it];
1c155e
+            replica_lock_replica(repl_it);
1c155e
+            local_rid = replica_get_rid(repl_it);
1c155e
+            if (local_rid != READ_ONLY_REPLICA_ID) {
1c155e
+                Object *ruv_obj = replica_get_ruv(repl_it);
1c155e
+                RUV *ruv_it = object_get_data(ruv_obj);
1c155e
+                repl_ruv = ruvGetReplica(ruv_it, local_rid);
1c155e
+                if (repl_ruv) {
1c155e
+                    rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
1c155e
+                } else {
1c155e
+                    rc = RUV_NOTFOUND;
1c155e
+                }
1c155e
+            }
1c155e
+            replica_unlock_replica(repl_it);
1c155e
+        }
1c155e
     } else {
1c155e
-	rc = csnplRemove (replica->csnpl, csn);
1c155e
+	rc = csnplRemove (repl_ruv->csnpl, csn);
1c155e
     }
1c155e
     if (rc != 0)
1c155e
         rc = RUV_NOTFOUND;
1c155e
@@ -1727,86 +1758,100 @@ done:
1c155e
     return rc;
1c155e
 }
1c155e
 
1c155e
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
1c155e
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid)
1c155e
 {
1c155e
     int rc=RUV_SUCCESS;
1c155e
-    RUVElement *replica;
1c155e
+    RUVElement *repl_ruv;
1c155e
     ReplicaId prim_rid;
1c155e
+    Replica *repl_it = NULL;
1c155e
+    size_t it = 0;
1c155e
 
1c155e
-    CSN *prim_csn = get_thread_primary_csn();
1c155e
+    CSNPL_CTX *prim_csn = get_thread_primary_csn();
1c155e
 
1c155e
-    if (! csn_is_equal(csn, prim_csn)) {
1c155e
+    if (! csn_primary(replica, csn, prim_csn)) {
1c155e
 	/* not a primary csn, nothing to do */
1c155e
 	return rc;
1c155e
     }
1c155e
-    slapi_rwlock_wrlock (ruv->lock);
1c155e
+
1c155e
+    /* first handle primary replica 
1c155e
+     * there can be two ruv elements affected
1c155e
+     */
1c155e
     prim_rid = csn_get_replicaid (csn);
1c155e
-    replica = ruvGetReplica (ruv, local_rid);
1c155e
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
1c155e
-    if ( rc || local_rid == prim_rid) goto done;
1c155e
-    replica = ruvGetReplica (ruv, prim_rid);
1c155e
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
1c155e
-done:
1c155e
+    slapi_rwlock_wrlock (ruv->lock);
1c155e
+    if ( local_rid != prim_rid) {
1c155e
+	repl_ruv = ruvGetReplica (ruv, prim_rid);
1c155e
+	rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_FALSE);
1c155e
+    }
1c155e
+    repl_ruv = ruvGetReplica (ruv, local_rid);
1c155e
+    rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_TRUE);
1c155e
     slapi_rwlock_unlock (ruv->lock);
1c155e
+    if (rc) return rc;
1c155e
+
1c155e
+    /* now handle secondary replicas */
1c155e
+    for (it=0; it<prim_csn->repl_cnt; it++) {
1c155e
+	repl_it = prim_csn->sec_repl[it];
1c155e
+	replica_lock_replica(repl_it);
1c155e
+	Object *ruv_obj = replica_get_ruv (repl_it);
1c155e
+	RUV *ruv_it = object_get_data (ruv_obj);
1c155e
+	slapi_rwlock_wrlock (ruv_it->lock);
1c155e
+	repl_ruv = ruvGetReplica (ruv_it, replica_get_rid(repl_it));
1c155e
+	rc = ruv_update_ruv_element(ruv_it, repl_ruv, prim_csn, replica_purl, PR_TRUE);
1c155e
+	slapi_rwlock_unlock (ruv_it->lock);
1c155e
+	replica_unlock_replica(repl_it);
1c155e
+	if (rc) break;
1c155e
+    }
1c155e
     return rc;
1c155e
 }
1c155e
+
1c155e
 static int
1c155e
-ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
1c155e
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal)
1c155e
 {
1c155e
     int rc=RUV_SUCCESS;
1c155e
     char csn_str[CSN_STRSIZE];
1c155e
     CSN *max_csn;
1c155e
     CSN *first_csn = NULL;
1c155e
     
1c155e
-    if (replica == NULL)
1c155e
-    {
1c155e
+    if (replica == NULL) {
1c155e
         /* we should have a ruv element at this point because it would have
1c155e
            been added by ruv_add_inprogress function */
1c155e
         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
1c155e
-			            "Can't locate RUV element for replica %d\n", csn_get_replicaid (csn)); 
1c155e
+                        "Can't locate RUV element for replica %d\n", csn_get_replicaid (prim_csn->prim_csn));
1c155e
         goto done;
1c155e
     } 
1c155e
 
1c155e
-	if (csnplCommitAll(replica->csnpl, csn) != 0)
1c155e
-	{
1c155e
-		slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
1c155e
-			            csn_as_string(csn, PR_FALSE, csn_str));
1c155e
+    if (csnplCommitAll(replica->csnpl, prim_csn) != 0) {
1c155e
+        slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
1c155e
+                        csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
1c155e
         rc = RUV_CSNPL_ERROR;
1c155e
         goto done;
1c155e
-	}
1c155e
-    else
1c155e
-    {
1c155e
+    } else {
1c155e
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
1c155e
             slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
1c155e
-                            "Successfully committed csn %s\n", csn_as_string(csn, PR_FALSE, csn_str));
1c155e
+                            "Successfully committed csn %s\n", csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
1c155e
         }
1c155e
     }
1c155e
 
1c155e
-	if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL)
1c155e
-	{
1c155e
-#ifdef DEBUG
1c155e
-		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
1c155e
-			            csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
1c155e
-#endif
1c155e
+    if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL) {
1c155e
+        slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
1c155e
+                        csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
1c155e
         /* replica object sets min csn for local replica */
1c155e
-		if (!isLocal && replica->min_csn == NULL) {
1c155e
-		  /* bug 559223 - it seems that, under huge stress, a server might pass
1c155e
-		   * through this code when more than 1 change has already been sent and commited into
1c155e
-		   * the pending lists... Therefore, as we are trying to set the min_csn ever 
1c155e
-		   * generated by this replica, we need to set the first_csn as the min csn in the
1c155e
-		   * ruv */
1c155e
-		  set_min_csn_nolock(ruv, first_csn, replica_purl);
1c155e
-		}
1c155e
-		/* only update the max_csn in the RUV if it is greater than the existing one */
1c155e
-		rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
1c155e
-		/* It is possible that first_csn points to max_csn.
1c155e
-		   We need to free it once */
1c155e
-		if (max_csn != first_csn) {
1c155e
-			csn_free(&first_csn); 
1c155e
-		}
1c155e
-		csn_free(&max_csn);
1c155e
-	}
1c155e
-
1c155e
+        if (!isLocal && replica->min_csn == NULL) {
1c155e
+            /* bug 559223 - it seems that, under huge stress, a server might pass
1c155e
+             * through this code when more than 1 change has already been sent and commited into
1c155e
+             * the pending lists... Therefore, as we are trying to set the min_csn ever
1c155e
+             * generated by this replica, we need to set the first_csn as the min csn in the
1c155e
+             * ruv */
1c155e
+        set_min_csn_nolock(ruv, first_csn, replica_purl);
1c155e
+        }
1c155e
+        /* only update the max_csn in the RUV if it is greater than the existing one */
1c155e
+        rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
1c155e
+        /* It is possible that first_csn points to max_csn.
1c155e
+           We need to free it once */
1c155e
+        if (max_csn != first_csn) {
1c155e
+            csn_free(&first_csn);
1c155e
+        }
1c155e
+        csn_free(&max_csn);
1c155e
+    }
1c155e
 done:
1c155e
 
1c155e
     return rc;
1c155e
diff --git a/ldap/servers/plugins/replication/repl5_ruv.h b/ldap/servers/plugins/replication/repl5_ruv.h
1c155e
index c8960fd..f3cd38b 100644
1c155e
--- a/ldap/servers/plugins/replication/repl5_ruv.h
1c155e
+++ b/ldap/servers/plugins/replication/repl5_ruv.h
1c155e
@@ -108,9 +108,9 @@ int ruv_to_bervals(const RUV *ruv, struct berval ***bvals);
1c155e
 PRInt32 ruv_replica_count (const RUV *ruv);
1c155e
 char **ruv_get_referrals(const RUV *ruv);
1c155e
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
1c155e
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
1c155e
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId rid);
1c155e
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid);
1c155e
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn);
1c155e
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId rid);
1c155e
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid);
1c155e
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
1c155e
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
1c155e
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);
1c155e
diff --git a/ldap/servers/slapd/slapi-private.h b/ldap/servers/slapd/slapi-private.h
1c155e
index 0836d66..3910dbe 100644
1c155e
--- a/ldap/servers/slapd/slapi-private.h
1c155e
+++ b/ldap/servers/slapd/slapi-private.h
1c155e
@@ -193,7 +193,7 @@ const CSN *csn_max(const CSN *csn1,const CSN *csn2);
1c155e
    a csn from the set.*/
1c155e
 int csn_increment_subsequence (CSN *csn);
1c155e
 
1c155e
-void csnplFreeCSN (void *arg);
1c155e
+void csnplFreeCSNPL_CTX (void *arg);
1c155e
 /*
1c155e
  * csnset.c
1c155e
  */
1c155e
-- 
1c155e
2.9.4
1c155e