amoralej / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 years ago
Clone
Blob Blame History Raw
From d1477e4a03d85aec79b497db4531d9e484029139 Mon Sep 17 00:00:00 2001
From: Ludwig Krispenz <lkrispen@redhat.com>
Date: Tue, 24 Jan 2017 15:07:19 +0100
Subject: [PATCH 64/67]     Ticket 49008 backport 1.3.5 : aborted operation can
 leave RUV in incorrect state

        Bug description:
        If a plugin operation succeeded, but the operation itself fails and is aborted the RUV is in an incorrect state (rolled up to the succesful plugin op)

        Fix Decription:
        Introduce a "primary_csn", this is the csn of the main operation, either a client operation or a replicated operation.
        csns generated by internal operations, eg by plugins are secondary csn.

        Maintain the primary csn in thread local data, like it is used for the agreement name (or txn stack): prim_csn.

        Extend the data structure of the pending list to keep prim_csn for each inserted csn

        If a csn is created or received check prim_csn: if it exists use it, if it doesn't exist set it

        when inserting a csn to the pending list pass the prim_csn

        when cancelling a csn, if it is the prim_csn also cancell all secondary csns

        when committing a csn,

        if it is not the primary csn, do nothing

        if it is the prim_csn trigger the pending list rollup, stop at the first not committed csn

        if the RID of the prim_csn is not the local RID also rollup the pending list for the local RID.

        Reviewed by:  Thierry, Thanks

(cherry picked from commit 79a3deafe943a3ce5c31c50272939146d17bd7ac)
---
 ldap/servers/plugins/replication/csnpl.c         | 75 +++++++++++++++++++++---
 ldap/servers/plugins/replication/csnpl.h         |  5 +-
 ldap/servers/plugins/replication/repl5.h         |  2 +
 ldap/servers/plugins/replication/repl5_init.c    | 22 +++++++
 ldap/servers/plugins/replication/repl5_plugins.c | 40 ++++++++-----
 ldap/servers/plugins/replication/repl5_replica.c |  6 +-
 ldap/servers/plugins/replication/repl5_ruv.c     | 74 +++++++++++++++++------
 ldap/servers/plugins/replication/repl5_ruv.h     |  4 +-
 ldap/servers/slapd/csn.c                         | 15 +++++
 ldap/servers/slapd/slapi-private.h               |  2 +
 10 files changed, 195 insertions(+), 50 deletions(-)

diff --git a/ldap/servers/plugins/replication/csnpl.c b/ldap/servers/plugins/replication/csnpl.c
index acd38d0..db1ae13 100644
--- a/ldap/servers/plugins/replication/csnpl.c
+++ b/ldap/servers/plugins/replication/csnpl.c
@@ -24,8 +24,9 @@ struct csnpl
 
 typedef struct _csnpldata
 {
-	PRBool		committed;  /* True if CSN committed */
-	CSN			*csn;       /* The actual CSN */
+	PRBool	committed;  /* True if CSN committed */
+	CSN	*csn;       /* The actual CSN */
+	const CSN *prim_csn;  /* The primary CSN of an operation consising of multiple sub ops*/
 } csnpldata;
 
 /* forward declarations */
@@ -103,7 +104,7 @@ void csnplFree (CSNPL **csnpl)
  *          1 if the csn has already been seen
  *         -1 for any other kind of errors
  */
-int csnplInsert (CSNPL *csnpl, const CSN *csn)
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
 {
 	int rc;
 	csnpldata *csnplnode;
@@ -131,6 +132,7 @@ int csnplInsert (CSNPL *csnpl, const CSN *csn)
 	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
 	csnplnode->committed = PR_FALSE;
 	csnplnode->csn = csn_dup(csn);
+	csnplnode->prim_csn = prim_csn;
 	csn_as_string(csn, PR_FALSE, csn_str);
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
 
@@ -186,6 +188,57 @@ int csnplRemove (CSNPL *csnpl, const CSN *csn)
 	return 0;
 }
 
+int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
+{
+	csnpldata *data;
+	void *iterator;
+
+	slapi_rwlock_wrlock (csnpl->csnLock);
+	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
+	while (NULL != data)
+	{
+		if (csn_is_equal(data->csn, csn) ||
+		    csn_is_equal(data->prim_csn, csn)) {
+			csnpldata_free(&data);
+			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
+		} else {
+			data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
+		}
+	}
+#ifdef DEBUG
+    _csnplDumpContentNoLock(csnpl, "csnplRemoveAll");
+#endif
+	slapi_rwlock_unlock (csnpl->csnLock);
+	return 0;
+}
+
+
+int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
+{
+	csnpldata *data;
+	void *iterator;
+	char csn_str[CSN_STRSIZE];
+
+	csn_as_string(csn, PR_FALSE, csn_str);
+	slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
+		            "csnplCommitALL: committing all csns for csn %s\n", csn_str);
+	slapi_rwlock_wrlock (csnpl->csnLock);
+	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
+	while (NULL != data)
+	{
+		csn_as_string(data->csn, PR_FALSE, csn_str);
+		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
+				"csnplCommitALL: processing data csn %s\n", csn_str);
+		if (csn_is_equal(data->csn, csn) ||
+		    csn_is_equal(data->prim_csn, csn)) {
+			data->committed = PR_TRUE;
+		}
+		data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
+	}
+	slapi_rwlock_unlock (csnpl->csnLock);
+	return 0;
+}
+
 int csnplCommit (CSNPL *csnpl, const CSN *csn)
 {
 	csnpldata *data;
@@ -276,13 +329,12 @@ csnplRollUp(CSNPL *csnpl, CSN **first_commited)
 	  *first_commited = NULL;
 	}
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
-	while (NULL != data)
+	while (NULL != data && data->committed)
 	{
 		if (NULL != largest_committed_csn && freeit)
 		{
 			csn_free(&largest_committed_csn);
 		}
-		if (data->committed) {
 			freeit = PR_TRUE;
 			largest_committed_csn = data->csn; /* Save it */
 			if (first_commited && (*first_commited == NULL)) {
@@ -294,9 +346,6 @@ csnplRollUp(CSNPL *csnpl, CSN **first_commited)
 			data->csn = NULL;
 			csnpldata_free(&data);
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
-		} else {
-			data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
-		}
 	} 
 
 #ifdef DEBUG
@@ -326,6 +375,7 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
     csnpldata *data;
     void *iterator;
     char csn_str[CSN_STRSIZE];
+    char primcsn_str[CSN_STRSIZE];
     
     data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
 	if (data) {
@@ -334,11 +384,18 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
 	}
     while (data)
     {
-        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "%s, %s\n",                        
+        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "%s,(prim %s), %s\n",
                         csn_as_string(data->csn, PR_FALSE, csn_str),
+			data->prim_csn ? csn_as_string(data->prim_csn, PR_FALSE, primcsn_str) : " ",
                         data->committed ? "committed" : "not committed");
         data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
     }
 }
 #endif
 
+/* wrapper around csn_free, to satisfy NSPR thread context API */
+void
+csnplFreeCSN (void *arg)
+{
+	csn_free((CSN **)&arg);
+}
diff --git a/ldap/servers/plugins/replication/csnpl.h b/ldap/servers/plugins/replication/csnpl.h
index 32e3ff7..f5c28f5 100644
--- a/ldap/servers/plugins/replication/csnpl.h
+++ b/ldap/servers/plugins/replication/csnpl.h
@@ -22,10 +22,13 @@ typedef struct csnpl CSNPL;
 
 CSNPL* csnplNew ();
 void csnplFree (CSNPL **csnpl);
-int csnplInsert (CSNPL *csnpl, const CSN *csn);
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn);
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
+int csnplRemoveAll (CSNPL *csnpl, const CSN *csn);
+int csnplCommitAll (CSNPL *csnpl, const CSN *csn);
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
 void csnplDumpContent(CSNPL *csnpl, const char *caller); 
+
 #endif
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 4ab2355..27ad416 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -232,6 +232,8 @@ int multimaster_be_betxnpostop_modify (Slapi_PBlock *pb);
 extern int repl5_is_betxn;
 char* get_thread_private_agmtname ();
 void  set_thread_private_agmtname (const char *agmtname);
+void  set_thread_primary_csn (const CSN *prim_csn);
+CSN*  get_thread_primary_csn(void);
 void* get_thread_private_cache ();
 void  set_thread_private_cache (void *buf);
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c
index 0304ed5..1570655 100644
--- a/ldap/servers/plugins/replication/repl5_init.c
+++ b/ldap/servers/plugins/replication/repl5_init.c
@@ -136,6 +136,7 @@ static int multimaster_started_flag = 0;
 /* Thread private data and interface */
 static PRUintn thread_private_agmtname;	/* thread private index for logging*/
 static PRUintn thread_private_cache;
+static PRUintn thread_primary_csn;
 
 char*
 get_thread_private_agmtname()
@@ -153,6 +154,26 @@ set_thread_private_agmtname(const char *agmtname)
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
 }
 
+CSN*
+get_thread_primary_csn(void)
+{
+	CSN *prim_csn = NULL;
+	if (thread_primary_csn)
+		prim_csn = (CSN *)PR_GetThreadPrivate(thread_primary_csn);
+	return prim_csn;
+}
+void
+set_thread_primary_csn(const CSN *prim_csn)
+{
+	if (thread_primary_csn) {
+		if (prim_csn) {
+			PR_SetThreadPrivate(thread_primary_csn, (void *)csn_dup(prim_csn));
+		} else {
+			PR_SetThreadPrivate(thread_primary_csn, NULL);
+		}
+	}
+}
+
 void*
 get_thread_private_cache ()
 {
@@ -721,6 +742,7 @@ multimaster_start( Slapi_PBlock *pb )
 		/* Initialize thread private data for logging. Ignore if fails */
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
+		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSN);
 
 		/* Decode the command line args to see if we're dumping to LDIF */
 		is_ldif_dump = check_for_ldif_dump(pb);
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
index b331c81..84624e9 100644
--- a/ldap/servers/plugins/replication/repl5_plugins.c
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
@@ -1033,9 +1033,11 @@ static int
 write_changelog_and_ruv (Slapi_PBlock *pb)
 {
 	Slapi_Operation *op = NULL;
+	CSN *opcsn;
+	CSN *prim_csn;
 	int rc;
 	slapi_operation_parameters *op_params = NULL;
-	Object *repl_obj;
+	Object *repl_obj = NULL;
 	int return_value = SLAPI_PLUGIN_SUCCESS;
 	Replica *r;
 	Slapi_Backend *be;
@@ -1063,17 +1065,17 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 	{
 		return return_value;
 	}
+	/* we only log changes for operations applied to a replica */
+	repl_obj = replica_get_replica_for_op (pb);
+	if (repl_obj == NULL)
+		return return_value;
 
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
 	if (rc) { /* op failed - just return */
-		return return_value;
+		cancel_opcsn(pb);
+		goto common_return;
 	}
 
-	/* we only log changes for operations applied to a replica */
-	repl_obj = replica_get_replica_for_op (pb);
-	if (repl_obj == NULL)
-		return return_value;
- 
 	r = (Replica*)object_get_data (repl_obj);
 	PR_ASSERT (r);
 
@@ -1108,7 +1110,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 
 			slapi_pblock_get (pb, SLAPI_OPERATION_PARAMETERS, &op_params);
 			if (NULL == op_params) {
-				return return_value;
+				goto common_return;
 			}
 
 			/* need to set uniqueid operation parameter */
@@ -1127,19 +1129,18 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 				slapi_pblock_get (pb, SLAPI_ENTRY_PRE_OP, &e);
 			}
 			if (NULL == e) {
-				return return_value;
+				goto common_return;
 			}
 			uniqueid = slapi_entry_get_uniqueid (e);
 			if (NULL == uniqueid) {
-				return return_value;
+				goto common_return;
 			}
 			op_params->target_address.uniqueid = slapi_ch_strdup (uniqueid);
 		} 
 
 		if( op_params->csn && is_cleaned_rid(csn_get_replicaid(op_params->csn))){
 			/* this RID has been cleaned */
-			object_release (repl_obj);
-			return return_value;
+			goto common_return;
 		}
 
 		/* we might have stripped all the mods - in that case we do not
@@ -1152,7 +1153,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 			{
 				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
 								"write_changelog_and_ruv: Skipped due to DISKFULL\n");
-				return return_value;
+				goto common_return;
 			}
 			slapi_pblock_get(pb, SLAPI_TXN, &txn);
 			rc = cl5WriteOperationTxn(repl_name, repl_gen, op_params, 
@@ -1188,7 +1189,6 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 	*/
 	if (0 == return_value) {
 		char csn_str[CSN_STRSIZE] = {'\0'};
-		CSN *opcsn;
 		int rc;
 		const char *dn = op_params ? REPL_GET_DN(&op_params->target_address) : "unknown";
 		Slapi_DN *sdn = op_params ? (&op_params->target_address)->sdn : NULL;
@@ -1220,7 +1220,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 		}
 	}
 
-	object_release (repl_obj);
+common_return:
+	opcsn = operation_get_csn(op);
+	prim_csn = get_thread_primary_csn();
+	if (csn_is_equal(opcsn, prim_csn)) {
+		set_thread_primary_csn(NULL);
+	}
+	if (repl_obj) {
+		object_release (repl_obj);
+	}
 	return return_value;
 }
 
@@ -1417,7 +1425,7 @@ cancel_opcsn (Slapi_PBlock *pb)
 
             ruv_obj = replica_get_ruv (r);
             PR_ASSERT (ruv_obj);
-            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn);
+            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
             object_release (ruv_obj);
         }
 
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
index 7360d97..602653a 100644
--- a/ldap/servers/plugins/replication/repl5_replica.c
+++ b/ldap/servers/plugins/replication/repl5_replica.c
@@ -903,7 +903,7 @@ replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
 					}
 				}
 				/* Update max csn for local and remote replicas */
-				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, rid == r->repl_rid);
+				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r->repl_rid);
 				if (RUV_COVERS_CSN == rc)
 				{
 					slapi_log_error(SLAPI_LOG_REPL,
@@ -3626,7 +3626,7 @@ assign_csn_callback(const CSN *csn, void *data)
 	
     if (NULL != r->min_csn_pl)
     {
-        if (csnplInsert(r->min_csn_pl, csn) != 0)
+        if (csnplInsert(r->min_csn_pl, csn, NULL) != 0)
         {
             char csn_str[CSN_STRSIZE]; /* For logging only */
             /* Ack, we can't keep track of min csn. Punt. */
@@ -3674,7 +3674,7 @@ abort_csn_callback(const CSN *csn, void *data)
         }
     }
 
-    ruv_cancel_csn_inprogress (ruv, csn);
+    ruv_cancel_csn_inprogress (ruv, csn, replica_get_rid(r));
     replica_unlock(r->repl_lock);
 
     object_release (ruv_obj);
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c
index 5d6e1c3..c2d3bb4 100644
--- a/ldap/servers/plugins/replication/repl5_ruv.c
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
@@ -77,6 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
 static const char * const prefix_replicageneration = "{replicageneration}";
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
 
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
 
 /* API implementation */
 
@@ -1602,6 +1603,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
     char csn_str[CSN_STRSIZE];
     int rc = RUV_SUCCESS;
     int rid = csn_get_replicaid (csn);
+    CSN *prim_csn;
 
     PR_ASSERT (ruv && csn);
 
@@ -1639,8 +1641,12 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
         rc = RUV_COVERS_CSN;
         goto done;
     }
-
-    rc = csnplInsert (replica->csnpl, csn);
+    prim_csn = get_thread_primary_csn();
+    if (prim_csn == NULL) {
+        set_thread_primary_csn(csn);
+        prim_csn = get_thread_primary_csn();
+    }
+    rc = csnplInsert (replica->csnpl, csn, prim_csn);
     if (rc == 1)    /* we already seen this csn */
     {
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
@@ -1648,6 +1654,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
                             "the csn %s has already be seen - ignoring\n",
                             csn_as_string (csn, PR_FALSE, csn_str));
         }
+        set_thread_primary_csn(NULL);
         rc = RUV_COVERS_CSN;    
     }
     else if(rc != 0)
@@ -1672,24 +1679,36 @@ done:
     return rc;
 }
 
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn)
+int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
 {
     RUVElement* replica;
     int rc = RUV_SUCCESS;
+    CSN *prim_csn = NULL;
+
 
     PR_ASSERT (ruv && csn);
 
+    prim_csn = get_thread_primary_csn();
     /* locate ruvElement */
     slapi_rwlock_wrlock (ruv->lock);
     replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
-    if (replica == NULL)
-    {
+    if (replica == NULL) {
         /* ONREPL - log error */
-        rc = RUV_NOTFOUND;
-        goto done;
-    } 
-
-    rc = csnplRemove (replica->csnpl, csn);
+	rc = RUV_NOTFOUND;
+	goto done;
+    }
+    if (csn_is_equal(csn, prim_csn)) {
+	/* the prim csn is cancelled, lets remove all dependent csns */
+	ReplicaId prim_rid = csn_get_replicaid (csn);
+	replica = ruvGetReplica (ruv, prim_rid);
+	rc = csnplRemoveAll (replica->csnpl, prim_csn);
+	if (prim_rid != local_rid) {
+		replica = ruvGetReplica (ruv, local_rid);
+		rc = csnplRemoveAll (replica->csnpl, prim_csn);
+	}
+    } else {
+	rc = csnplRemove (replica->csnpl, csn);
+    }
     if (rc != 0)
         rc = RUV_NOTFOUND;
     else
@@ -1700,19 +1719,37 @@ done:
     return rc;
 }
 
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool isLocal)
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
+{
+    int rc=RUV_SUCCESS;
+    RUVElement *replica;
+    ReplicaId prim_rid;
+
+    CSN *prim_csn = get_thread_primary_csn();
+
+    if (! csn_is_equal(csn, prim_csn)) {
+	/* not a primary csn, nothing to do */
+	return rc;
+    }
+    slapi_rwlock_wrlock (ruv->lock);
+    prim_rid = csn_get_replicaid (csn);
+    replica = ruvGetReplica (ruv, local_rid);
+    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
+    if ( rc || local_rid == prim_rid) goto done;
+    replica = ruvGetReplica (ruv, prim_rid);
+    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
+done:
+    slapi_rwlock_unlock (ruv->lock);
+    return rc;
+}
+static int
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
 {
     int rc=RUV_SUCCESS;
     char csn_str[CSN_STRSIZE];
     CSN *max_csn;
     CSN *first_csn = NULL;
-    RUVElement *replica;
     
-    PR_ASSERT (ruv && csn);
-
-    slapi_rwlock_wrlock (ruv->lock);
-
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
     if (replica == NULL)
     {
         /* we should have a ruv element at this point because it would have
@@ -1722,7 +1759,7 @@ int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool i
         goto done;
     } 
 
-	if (csnplCommit(replica->csnpl, csn) != 0)
+	if (csnplCommitAll(replica->csnpl, csn) != 0)
 	{
 		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "ruv_update_ruv: cannot commit csn %s\n",
 			            csn_as_string(csn, PR_FALSE, csn_str));
@@ -1763,7 +1800,6 @@ int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool i
 	}
 
 done:
-    slapi_rwlock_unlock (ruv->lock);
 
     return rc;
 }
diff --git a/ldap/servers/plugins/replication/repl5_ruv.h b/ldap/servers/plugins/replication/repl5_ruv.h
index e9eff5a..c8960fd 100644
--- a/ldap/servers/plugins/replication/repl5_ruv.h
+++ b/ldap/servers/plugins/replication/repl5_ruv.h
@@ -109,8 +109,8 @@ PRInt32 ruv_replica_count (const RUV *ruv);
 char **ruv_get_referrals(const RUV *ruv);
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
 int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn);
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool isLocal);
+int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId rid);
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid);
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);
diff --git a/ldap/servers/slapd/csn.c b/ldap/servers/slapd/csn.c
index a3f4815..175f82a 100644
--- a/ldap/servers/slapd/csn.c
+++ b/ldap/servers/slapd/csn.c
@@ -268,6 +268,21 @@ csn_as_attr_option_string(CSNType t,const CSN *csn,char *ss)
 	return s;
 }
 
+int
+csn_is_equal(const CSN *csn1, const CSN *csn2)
+{
+	int retval = 0;
+	if ((csn1 == NULL && csn2 == NULL) ||
+		(csn1 && csn2 &&
+		 csn1->tstamp == csn2->tstamp &&
+		 csn1->seqnum == csn2->seqnum &&
+		 csn1->rid == csn2->rid &&
+		 csn1->subseqnum == csn2->subseqnum)) {
+		retval = 1;
+	}
+	return retval;
+}
+
 int 
 csn_compare_ext(const CSN *csn1, const CSN *csn2, unsigned int flags)
 {
diff --git a/ldap/servers/slapd/slapi-private.h b/ldap/servers/slapd/slapi-private.h
index 52d1c4a..e909e9c 100644
--- a/ldap/servers/slapd/slapi-private.h
+++ b/ldap/servers/slapd/slapi-private.h
@@ -166,6 +166,7 @@ time_t csn_get_time(const CSN *csn);
 PRUint16 csn_get_seqnum(const CSN *csn);
 PRUint16 csn_get_subseqnum(const CSN *csn);
 char *csn_as_string(const CSN *csn, PRBool replicaIdOrder, char *ss); /* WARNING: ss must be CSN_STRSIZE bytes, or NULL. */
+int csn_is_equal(const CSN *csn1, const CSN *csn2);
 int csn_compare(const CSN *csn1, const CSN *csn2);
 int csn_compare_ext(const CSN *csn1, const CSN *csn2, unsigned int flags);
 #define CSN_COMPARE_SKIP_SUBSEQ 0x1
@@ -181,6 +182,7 @@ const CSN *csn_max(const CSN *csn1,const CSN *csn2);
    a csn from the set.*/
 int csn_increment_subsequence (CSN *csn);
 
+void csnplFreeCSN (void *arg);
 /*
  * csnset.c
  */
-- 
2.9.3