andykimpe / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 months ago
Clone
dc8c34
From 9ba2e34aa6a03850a6b477c5d470ef4701f85777 Mon Sep 17 00:00:00 2001
dc8c34
From: Mark Reynolds <mreynolds@redhat.com>
dc8c34
Date: Wed, 8 Jul 2015 11:48:27 -0400
dc8c34
Subject: [PATCH 346/347] Ticket 48208 - CleanAllRUV should completely purge
dc8c34
 changelog
dc8c34
dc8c34
Bug Description:  After cleanAllRUV finishes, the changelog still
dc8c34
                  contains entries from the cleaned rid.  Under certain
dc8c34
                  conditions this can allow the RUV to get polluted
dc8c34
                  again, and the ruv element will be missing the replica
dc8c34
                  url.
dc8c34
dc8c34
Fix Description:  At the end of the cleaning task, fire of a thread to
dc8c34
                  to completely purge the changelog of all entries
dc8c34
                  containing the cleaned rid.
dc8c34
dc8c34
                  Also, improved the cleanAllRUV task when dealing
dc8c34
                  with a server shutdown - previously if the timing is
dc8c34
                  right the task can "delay/hang" the shutdown process.
dc8c34
dc8c34
https://fedorahosted.org/389/ticket/48208
dc8c34
dc8c34
Reviewed by: nhosoi(Thanks!)
dc8c34
dc8c34
(cherry picked from commit ff1c34538b0600259dba4801da2b2f0993fa5404)
dc8c34
(cherry picked from commit 9e4cf12cfbfde0761325b75c3fd5a8b39223760a)
dc8c34
(cherry picked from commit 264f67218aec5e11f68ad4e36e444730c8c3110c)
dc8c34
(cherry picked from commit e8803f5ad77ec742c57c0121dfc83822633ab602)
dc8c34
(cherry picked from commit a1dc207eb1566b5b64ceeb54bb59b6d2b50c48f1)
dc8c34
---
dc8c34
 ldap/servers/plugins/replication/cl5_api.c         | 452 ++++++++++++++++++---
dc8c34
 ldap/servers/plugins/replication/cl5_api.h         |   5 +-
dc8c34
 .../plugins/replication/repl5_replica_config.c     |  44 +-
dc8c34
 3 files changed, 433 insertions(+), 68 deletions(-)
dc8c34
dc8c34
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
dc8c34
index dd3168a..13bf203 100644
dc8c34
--- a/ldap/servers/plugins/replication/cl5_api.c
dc8c34
+++ b/ldap/servers/plugins/replication/cl5_api.c
dc8c34
@@ -345,14 +345,18 @@ static int _cl5TrimInit ();
dc8c34
 static void _cl5TrimCleanup ();
dc8c34
 static int _cl5TrimMain (void *param);
dc8c34
 static void _cl5DoTrimming (ReplicaId rid);
dc8c34
-static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid);
dc8c34
 static PRBool _cl5CanTrim (time_t time, long *numToTrim);
dc8c34
+static void _cl5TrimFile (Object *obj, long *numToTrim);
dc8c34
+
dc8c34
+static void _cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid);
dc8c34
+static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
dc8c34
+static int _cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key);
dc8c34
 static int  _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge);
dc8c34
 static int  _cl5WriteRUV (CL5DBFile *file, PRBool purge);
dc8c34
 static int  _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge);
dc8c34
 static int  _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge);
dc8c34
 static int  _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv);
dc8c34
-void trigger_cl_trimming_thread(void *rid);
dc8c34
+void trigger_cl_purging_thread(void *rid);
dc8c34
 
dc8c34
 /* bakup/recovery, import/export */
dc8c34
 static int _cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op,
dc8c34
@@ -3467,10 +3471,18 @@ static void _cl5DoTrimming (ReplicaId rid)
dc8c34
 	   trimmed more often than other. We might have to fix that by, for 
dc8c34
 	   example, randomizing starting point */
dc8c34
 	obj = objset_first_obj (s_cl5Desc.dbFiles);
dc8c34
-	while (obj && _cl5CanTrim ((time_t)0, &numToTrim))
dc8c34
-	{	
dc8c34
-		_cl5TrimFile (obj, &numToTrim, rid);
dc8c34
-		obj = objset_next_obj (s_cl5Desc.dbFiles, obj);	
dc8c34
+	while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid))
dc8c34
+	{
dc8c34
+		if (rid){
dc8c34
+			/*
dc8c34
+			 * We are cleaning an invalid rid, and need to strip it
dc8c34
+			 * from the changelog.
dc8c34
+			 */
dc8c34
+			_cl5PurgeRID (obj, rid);
dc8c34
+		} else {
dc8c34
+			_cl5TrimFile (obj, &numToTrim);
dc8c34
+		}
dc8c34
+		obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
dc8c34
 	}
dc8c34
 
dc8c34
     if (obj)
dc8c34
@@ -3481,12 +3493,351 @@ static void _cl5DoTrimming (ReplicaId rid)
dc8c34
 	return;
dc8c34
 }
dc8c34
 
dc8c34
+/*
dc8c34
+ * If the rid is not set it is the very first iteration of the changelog.
dc8c34
+ * If the rid is set, we are doing another pass, and we have a key as our
dc8c34
+ * starting point.
dc8c34
+ */
dc8c34
+static int
dc8c34
+_cl5PurgeGetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key)
dc8c34
+{
dc8c34
+	DBC *cursor = NULL;
dc8c34
+	DBT	data = {0};
dc8c34
+	CL5Iterator *it;
dc8c34
+	CL5DBFile *file;
dc8c34
+	int rc;
dc8c34
+
dc8c34
+	file = (CL5DBFile*)object_get_data (obj);
dc8c34
+
dc8c34
+	/* create cursor */
dc8c34
+	rc = file->db->cursor(file->db, txnid, &cursor, 0);
dc8c34
+	if (rc != 0)
dc8c34
+	{
dc8c34
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+			"_cl5PurgeGetFirstEntry: failed to create cursor; db error - %d %s\n", rc, db_strerror(rc));
dc8c34
+		rc = CL5_DB_ERROR;
dc8c34
+		goto done;
dc8c34
+	}
dc8c34
+
dc8c34
+	key->flags = DB_DBT_MALLOC;
dc8c34
+	data.flags = DB_DBT_MALLOC;
dc8c34
+	while ((rc = cursor->c_get(cursor, key, &data, rid?DB_SET:DB_NEXT)) == 0)
dc8c34
+	{
dc8c34
+		/* skip service entries on the first pass (rid == 0)*/
dc8c34
+		if (!rid && cl5HelperEntry ((char*)key->data, NULL))
dc8c34
+		{
dc8c34
+			slapi_ch_free(&key->data);
dc8c34
+			slapi_ch_free(&(data.data));
dc8c34
+			continue;
dc8c34
+		}
dc8c34
+
dc8c34
+		/* format entry */
dc8c34
+		rc = cl5DBData2Entry(data.data, data.size, entry);
dc8c34
+		slapi_ch_free(&(data.data));
dc8c34
+		if (rc != 0)
dc8c34
+		{
dc8c34
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
dc8c34
+				"_cl5PurgeGetFirstEntry: failed to format entry: %d\n", rc);
dc8c34
+			goto done;
dc8c34
+		}
dc8c34
+
dc8c34
+		it = (CL5Iterator*)slapi_ch_malloc(sizeof (CL5Iterator));
dc8c34
+		it->cursor  = cursor;
dc8c34
+		object_acquire (obj);
dc8c34
+		it->file = obj;
dc8c34
+		*(CL5Iterator**)iterator = it;
dc8c34
+
dc8c34
+		return CL5_SUCCESS;
dc8c34
+	}
dc8c34
+
dc8c34
+	slapi_ch_free(&key->data);
dc8c34
+	slapi_ch_free(&(data.data));
dc8c34
+
dc8c34
+	/* walked of the end of the file */
dc8c34
+	if (rc == DB_NOTFOUND)
dc8c34
+	{
dc8c34
+		rc = CL5_NOTFOUND;
dc8c34
+		goto done;
dc8c34
+	}
dc8c34
+
dc8c34
+	/* db error occured while iterating */
dc8c34
+	slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+				"_cl5PurgeGetFirstEntry: failed to get entry; db error - %d %s\n",
dc8c34
+				rc, db_strerror(rc));
dc8c34
+	rc = CL5_DB_ERROR;
dc8c34
+
dc8c34
+done:
dc8c34
+	/*
dc8c34
+	 * We didn't success in assigning this cursor to the iterator,
dc8c34
+	 * so we need to free the cursor here.
dc8c34
+	 */
dc8c34
+	if (cursor)
dc8c34
+		cursor->c_close(cursor);
dc8c34
+
dc8c34
+	return rc;
dc8c34
+}
dc8c34
+
dc8c34
+/*
dc8c34
+ * Get the next entry.  If we get a lock error we will restart the process
dc8c34
+ * starting at the current key.
dc8c34
+ */
dc8c34
+static int
dc8c34
+_cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key)
dc8c34
+{
dc8c34
+	CL5Iterator *it;
dc8c34
+	DBT data={0};
dc8c34
+	int rc;
dc8c34
+
dc8c34
+	it = (CL5Iterator*) iterator;
dc8c34
+
dc8c34
+	key->flags = DB_DBT_MALLOC;
dc8c34
+	data.flags = DB_DBT_MALLOC;
dc8c34
+	while ((rc = it->cursor->c_get(it->cursor, key, &data, DB_NEXT)) == 0)
dc8c34
+	{
dc8c34
+		if (cl5HelperEntry ((char*)key->data, NULL))
dc8c34
+		{
dc8c34
+			slapi_ch_free(&key->data);
dc8c34
+			slapi_ch_free(&(data.data));
dc8c34
+			continue;
dc8c34
+		}
dc8c34
+
dc8c34
+		/* format entry */
dc8c34
+		rc = cl5DBData2Entry (data.data, data.size, entry);
dc8c34
+		slapi_ch_free (&(data.data));
dc8c34
+		if (rc != 0)
dc8c34
+		{
dc8c34
+			if (rc != CL5_DB_LOCK_ERROR){
dc8c34
+				/* Not a lock error, free the key */
dc8c34
+				slapi_ch_free(&key->data);
dc8c34
+			}
dc8c34
+			slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL,
dc8c34
+				repl_plugin_name_cl,
dc8c34
+				"_cl5PurgeGetNextEntry: failed to format entry: %d\n",
dc8c34
+				rc);
dc8c34
+
dc8c34
+		}
dc8c34
+
dc8c34
+		return rc;
dc8c34
+	}
dc8c34
+	slapi_ch_free(&(data.data));
dc8c34
+
dc8c34
+	/* walked of the end of the file or entry is out of range */
dc8c34
+	if (rc == 0 || rc == DB_NOTFOUND){
dc8c34
+		slapi_ch_free(&key->data);
dc8c34
+		return CL5_NOTFOUND;
dc8c34
+	}
dc8c34
+	if (rc != CL5_DB_LOCK_ERROR){
dc8c34
+		/* Not a lock error, free the key */
dc8c34
+		slapi_ch_free(&key->data);
dc8c34
+	}
dc8c34
+
dc8c34
+	/* cursor operation failed */
dc8c34
+	slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL,
dc8c34
+		repl_plugin_name_cl,
dc8c34
+		"_cl5PurgeGetNextEntry: failed to get entry; db error - %d %s\n",
dc8c34
+		rc, db_strerror(rc));
dc8c34
+
dc8c34
+	return rc;
dc8c34
+}
dc8c34
+
dc8c34
+#define MAX_RETRIES 10
dc8c34
+/*
dc8c34
+ *  _cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid)
dc8c34
+ *
dc8c34
+ *  Clean the entire changelog of updates from the "cleaned rid" via CLEANALLRUV
dc8c34
+ *  Delete entries in batches so we don't consume too many db locks, and we don't
dc8c34
+ *  lockup the changelog during the entire purging process using one transaction.
dc8c34
+ *  We save the key from the last iteration so we don't have to start from the
dc8c34
+ *  beginning for each new iteration.
dc8c34
+ */
dc8c34
+static void
dc8c34
+_cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid)
dc8c34
+{
dc8c34
+	slapi_operation_parameters op = {0};
dc8c34
+	ReplicaId csn_rid;
dc8c34
+	CL5Entry entry;
dc8c34
+	DB_TXN *txnid = NULL;
dc8c34
+	DBT key = {0};
dc8c34
+	void *iterator = NULL;
dc8c34
+	long totalTrimmed = 0;
dc8c34
+	long trimmed = 0;
dc8c34
+	char *starting_key = NULL;
dc8c34
+	int batch_count = 0;
dc8c34
+	int db_lock_retry_count = 0;
dc8c34
+	int first_pass = 1;
dc8c34
+	int finished = 0;
dc8c34
+	int rc = 0;
dc8c34
+
dc8c34
+	PR_ASSERT (obj);
dc8c34
+	entry.op = &op;
dc8c34
+
dc8c34
+	/*
dc8c34
+	 * Keep processing the changelog until we are done, shutting down, or we
dc8c34
+	 * maxed out on the db lock retries.
dc8c34
+	 */
dc8c34
+	while (!finished && db_lock_retry_count < MAX_RETRIES && !slapi_is_shutting_down()){
dc8c34
+		trimmed = 0;
dc8c34
+
dc8c34
+		/*
dc8c34
+		 * Sleep a bit to allow others to use the changelog - we can't hog the
dc8c34
+		 * changelog for the entire purge.
dc8c34
+		 */
dc8c34
+		DS_Sleep(PR_MillisecondsToInterval(100));
dc8c34
+
dc8c34
+		rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
dc8c34
+		if (rc != 0){
dc8c34
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+				"_cl5PurgeRID: failed to begin transaction; db error - %d %s.  "
dc8c34
+				"Changelog was not purged of rid(%d)\n",
dc8c34
+				rc, db_strerror(rc), cleaned_rid);
dc8c34
+			return;
dc8c34
+		}
dc8c34
+
dc8c34
+		/*
dc8c34
+		 * Check every changelog entry for the cleaned rid
dc8c34
+		 */
dc8c34
+		rc = _cl5PurgeGetFirstEntry(obj, &entry, &iterator, txnid, first_pass?0:cleaned_rid, &key);
dc8c34
+		first_pass = 0;
dc8c34
+		while (rc == CL5_SUCCESS && !slapi_is_shutting_down()) {
dc8c34
+			/*
dc8c34
+			 * Store the new starting key - we need this starting key in case
dc8c34
+			 * we run out of locks and have to start the transaction over.
dc8c34
+			 */
dc8c34
+			slapi_ch_free_string(&starting_key);
dc8c34
+			starting_key = slapi_ch_strdup((char*)key.data);
dc8c34
+
dc8c34
+			if(trimmed == 10000 || (batch_count && trimmed == batch_count)){
dc8c34
+				/*
dc8c34
+				 * Break out, and commit these deletes.  Do not free the key,
dc8c34
+				 * we need it for the next pass.
dc8c34
+				 */
dc8c34
+				cl5_operation_parameters_done (&op);
dc8c34
+				db_lock_retry_count = 0; /* reset the retry count */
dc8c34
+				break;
dc8c34
+			}
dc8c34
+			if(op.csn){
dc8c34
+				csn_rid = csn_get_replicaid (op.csn);
dc8c34
+				if (csn_rid == cleaned_rid){
dc8c34
+					rc = _cl5CurrentDeleteEntry (iterator);
dc8c34
+					if (rc != CL5_SUCCESS){
dc8c34
+						/* log error */
dc8c34
+						cl5_operation_parameters_done (&op);
dc8c34
+						if (rc == CL5_DB_LOCK_ERROR){
dc8c34
+							/*
dc8c34
+							 * Ran out of locks, need to restart the transaction.
dc8c34
+							 * Reduce the the batch count and reset the key to
dc8c34
+							 * the starting point
dc8c34
+							 */
dc8c34
+							slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
dc8c34
+								"_cl5PurgeRID: Ran out of db locks deleting entry.  "
dc8c34
+								"Reduce the batch value and restart.\n");
dc8c34
+							batch_count = trimmed - 10;
dc8c34
+							if (batch_count < 10){
dc8c34
+								batch_count = 10;
dc8c34
+							}
dc8c34
+							trimmed = 0;
dc8c34
+							slapi_ch_free(&(key.data));
dc8c34
+							key.data = starting_key;
dc8c34
+							starting_key = NULL;
dc8c34
+							db_lock_retry_count++;
dc8c34
+							break;
dc8c34
+						} else {
dc8c34
+							/* fatal error */
dc8c34
+							slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+								"_cl5PurgeRID: fatal error (%d)\n", rc);
dc8c34
+							slapi_ch_free(&(key.data));
dc8c34
+							finished = 1;
dc8c34
+							break;
dc8c34
+						}
dc8c34
+					}
dc8c34
+					trimmed++;
dc8c34
+				}
dc8c34
+			}
dc8c34
+			slapi_ch_free(&(key.data));
dc8c34
+			cl5_operation_parameters_done (&op);
dc8c34
+
dc8c34
+			rc = _cl5PurgeGetNextEntry (&entry, iterator, &key);
dc8c34
+			if (rc == CL5_DB_LOCK_ERROR){
dc8c34
+				/*
dc8c34
+				 * Ran out of locks, need to restart the transaction.
dc8c34
+				 * Reduce the the batch count and reset the key to the starting
dc8c34
+				 * point.
dc8c34
+				 */
dc8c34
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+					"_cl5PurgeRID: Ran out of db locks getting the next entry.  "
dc8c34
+					"Reduce the batch value and restart.\n");
dc8c34
+				batch_count = trimmed - 10;
dc8c34
+				if (batch_count < 10){
dc8c34
+					batch_count = 10;
dc8c34
+				}
dc8c34
+				trimmed = 0;
dc8c34
+				cl5_operation_parameters_done (&op);
dc8c34
+				slapi_ch_free(&(key.data));
dc8c34
+				key.data = starting_key;
dc8c34
+				starting_key = NULL;
dc8c34
+				db_lock_retry_count++;
dc8c34
+				break;
dc8c34
+			}
dc8c34
+		}
dc8c34
+
dc8c34
+		if (rc == CL5_NOTFOUND){
dc8c34
+			/* Scanned the entire changelog, we're done */
dc8c34
+			finished = 1;
dc8c34
+		}
dc8c34
+
dc8c34
+		/* Destroy the iterator before we finish with the txn */
dc8c34
+		cl5DestroyIterator (iterator);
dc8c34
+
dc8c34
+		/*
dc8c34
+		 * Commit or abort the txn
dc8c34
+		 */
dc8c34
+		if (rc == CL5_SUCCESS || rc == CL5_NOTFOUND){
dc8c34
+			rc = TXN_COMMIT (txnid, 0);
dc8c34
+			if (rc != 0){
dc8c34
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+					"_cl5PurgeRID: failed to commit transaction; db error - %d %s.  "
dc8c34
+					"Changelog was not completely purged of rid (%d)\n",
dc8c34
+					rc, db_strerror(rc), cleaned_rid);
dc8c34
+				break;
dc8c34
+			} else if (finished){
dc8c34
+				/* We're done  */
dc8c34
+				totalTrimmed += trimmed;
dc8c34
+				break;
dc8c34
+			} else {
dc8c34
+				/* Not done yet */
dc8c34
+				totalTrimmed += trimmed;
dc8c34
+				trimmed = 0;
dc8c34
+			}
dc8c34
+		} else {
dc8c34
+			rc = TXN_ABORT (txnid);
dc8c34
+			if (rc != 0){
dc8c34
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+					"_cl5PurgeRID: failed to abort transaction; db error - %d %s.  "
dc8c34
+					"Changelog was not completely purged of rid (%d)\n",
dc8c34
+					rc, db_strerror(rc), cleaned_rid);
dc8c34
+			}
dc8c34
+			if (batch_count == 0){
dc8c34
+				/* This was not a retry.  Fatal error, break out */
dc8c34
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+					"_cl5PurgeRID: Changelog was not purged of rid (%d)\n",
dc8c34
+					cleaned_rid);
dc8c34
+				break;
dc8c34
+			}
dc8c34
+		}
dc8c34
+	}
dc8c34
+	slapi_ch_free_string(&starting_key);
dc8c34
+
dc8c34
+	slapi_log_error (SLAPI_LOG_REPL, repl_plugin_name_cl,
dc8c34
+		"_cl5PurgeRID: Removed (%ld entries) that originated from rid (%d)\n",
dc8c34
+		totalTrimmed, cleaned_rid);
dc8c34
+}
dc8c34
+
dc8c34
 /* Note that each file contains changes for a single replicated area.
dc8c34
    trimming algorithm:
dc8c34
 */
dc8c34
 #define CL5_TRIM_MAX_PER_TRANSACTION 10
dc8c34
 
dc8c34
-static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
dc8c34
+static void _cl5TrimFile (Object *obj, long *numToTrim)
dc8c34
 {
dc8c34
 	DB_TXN *txnid;
dc8c34
 	RUV *ruv = NULL;
dc8c34
@@ -3509,7 +3860,6 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
dc8c34
 	}
dc8c34
 
dc8c34
 	entry.op = &op;
dc8c34
-
dc8c34
 	while ( !finished && !slapi_is_shutting_down() )
dc8c34
 	{
dc8c34
 		it = NULL;
dc8c34
@@ -3530,7 +3880,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
dc8c34
 		}
dc8c34
 
dc8c34
 		finished = _cl5GetFirstEntry (obj, &entry, &it, txnid);
dc8c34
-		while ( !finished )
dc8c34
+		while ( !finished && !slapi_is_shutting_down())
dc8c34
 		{
dc8c34
         	/*
dc8c34
 			 * This change can be trimmed if it exceeds purge
dc8c34
@@ -3544,11 +3894,12 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
dc8c34
 				continue;
dc8c34
 			}
dc8c34
 			csn_rid = csn_get_replicaid (op.csn);
dc8c34
+
dc8c34
 			if ( (*numToTrim > 0 || _cl5CanTrim (entry.time, numToTrim)) &&
dc8c34
 				 ruv_covers_csn_strict (ruv, op.csn) )
dc8c34
 			{
dc8c34
 				rc = _cl5CurrentDeleteEntry (it);
dc8c34
-				if ( rc == CL5_SUCCESS && cleaned_rid != csn_rid)
dc8c34
+				if ( rc == CL5_SUCCESS)
dc8c34
 				{
dc8c34
 					rc = _cl5UpdateRUV (obj, op.csn, PR_FALSE, PR_TRUE);				
dc8c34
 				}
dc8c34
@@ -3562,7 +3913,6 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
dc8c34
 					/* The above two functions have logged the error */
dc8c34
 					abort = PR_TRUE;
dc8c34
 				}
dc8c34
-
dc8c34
 			}
dc8c34
 			else
dc8c34
 			{
dc8c34
@@ -3619,7 +3969,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
dc8c34
 			rc = TXN_ABORT (txnid);
dc8c34
 			if (rc != 0)
dc8c34
 			{
dc8c34
-				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
 					"_cl5TrimFile: failed to abort transaction; db error - %d %s\n",
dc8c34
 					rc, db_strerror(rc));	
dc8c34
 			}
dc8c34
@@ -3630,7 +3980,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
dc8c34
 			if (rc != 0)
dc8c34
 			{
dc8c34
 				finished = 1;
dc8c34
-				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
 					"_cl5TrimFile: failed to commit transaction; db error - %d %s\n",
dc8c34
 					rc, db_strerror(rc));
dc8c34
 			}
dc8c34
@@ -4654,9 +5004,9 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen,
dc8c34
 				goto done;
dc8c34
 			}
dc8c34
 #endif
dc8c34
-			/* back off */			
dc8c34
+			/* back off */
dc8c34
     		interval = PR_MillisecondsToInterval(slapi_rand() % 100);
dc8c34
-    		DS_Sleep(interval);		
dc8c34
+    		DS_Sleep(interval);
dc8c34
 		}
dc8c34
 #if USE_DB_TXN
dc8c34
 		/* begin transaction */
dc8c34
@@ -4702,19 +5052,19 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen,
dc8c34
 		}
dc8c34
 		cnt ++;
dc8c34
 	}
dc8c34
-    
dc8c34
+
dc8c34
 	if (rc == 0) /* we successfully added entry */
dc8c34
 	{
dc8c34
 #if USE_DB_TXN
dc8c34
 		rc = TXN_COMMIT (txnid, 0);
dc8c34
 #endif
dc8c34
 	}
dc8c34
-	else	
dc8c34
+	else
dc8c34
 	{
dc8c34
-		char s[CSN_STRSIZE];		
dc8c34
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
+		char s[CSN_STRSIZE];
dc8c34
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
 						"_cl5WriteOperationTxn: failed to write entry with csn (%s); "
dc8c34
-						"db error - %d %s\n", csn_as_string(op->csn,PR_FALSE,s), 
dc8c34
+						"db error - %d %s\n", csn_as_string(op->csn,PR_FALSE,s),
dc8c34
 						rc, db_strerror(rc));
dc8c34
 #if USE_DB_TXN
dc8c34
 		rc = TXN_ABORT (txnid);
dc8c34
@@ -4735,7 +5085,7 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen,
dc8c34
     /* update purge vector if we have not seen any changes from this replica before */
dc8c34
     _cl5UpdateRUV (file_obj, op->csn, PR_TRUE, PR_TRUE);
dc8c34
 
dc8c34
-	slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, 
dc8c34
+	slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
dc8c34
 			"cl5WriteOperationTxn: successfully written entry with csn (%s)\n", csnStr);
dc8c34
 	rc = CL5_SUCCESS;
dc8c34
 done:
dc8c34
@@ -4749,7 +5099,7 @@ done:
dc8c34
 	return rc;
dc8c34
 }
dc8c34
 
dc8c34
-static int _cl5WriteOperation(const char *replName, const char *replGen, 
dc8c34
+static int _cl5WriteOperation(const char *replName, const char *replGen,
dc8c34
                               const slapi_operation_parameters *op, PRBool local)
dc8c34
 {
dc8c34
     return _cl5WriteOperationTxn(replName, replGen, op, local, NULL);
dc8c34
@@ -4800,7 +5150,7 @@ static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_
dc8c34
 			goto done;
dc8c34
 		}
dc8c34
 
dc8c34
-		it = (CL5Iterator*)slapi_ch_malloc (sizeof (CL5Iterator));
dc8c34
+		it = (CL5Iterator*)slapi_ch_malloc(sizeof (CL5Iterator));
dc8c34
 		it->cursor  = cursor;
dc8c34
 		object_acquire (obj);
dc8c34
 		it->file = obj;
dc8c34
@@ -4875,7 +5225,7 @@ static int _cl5GetNextEntry (CL5Entry *entry, void *iterator)
dc8c34
 		slapi_ch_free (&(data.data));
dc8c34
 		if (rc != 0)
dc8c34
 		{
dc8c34
-			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
dc8c34
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
 				"_cl5GetNextEntry: failed to format entry: %d\n", rc);
dc8c34
 		}
dc8c34
 
dc8c34
@@ -4904,38 +5254,42 @@ static int _cl5GetNextEntry (CL5Entry *entry, void *iterator)
dc8c34
 	}
dc8c34
 
dc8c34
 	/* cursor operation failed */
dc8c34
-	slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
-			"_cl5GetNextEntry: failed to get entry; db error - %d %s\n", 
dc8c34
-			rc, db_strerror(rc));
dc8c34
+	slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL,
dc8c34
+		repl_plugin_name_cl,
dc8c34
+		"_cl5GetNextEntry: failed to get entry; db error - %d %s\n",
dc8c34
+		rc, db_strerror(rc));
dc8c34
 
dc8c34
-	return CL5_DB_ERROR;
dc8c34
+	return rc;
dc8c34
 }
dc8c34
 
dc8c34
 static int _cl5CurrentDeleteEntry (void *iterator)
dc8c34
 {
dc8c34
 	int rc;
dc8c34
 	CL5Iterator *it;
dc8c34
-    CL5DBFile *file;
dc8c34
+	CL5DBFile *file;
dc8c34
 
dc8c34
-    PR_ASSERT (iterator);
dc8c34
+	PR_ASSERT (iterator);
dc8c34
 
dc8c34
 	it = (CL5Iterator*)iterator;
dc8c34
 
dc8c34
 	rc = it->cursor->c_del (it->cursor, 0);
dc8c34
 
dc8c34
 	if (rc == 0) {        
dc8c34
-            /* decrement entry count */
dc8c34
-            file = (CL5DBFile*)object_get_data (it->file);
dc8c34
-            PR_AtomicDecrement (&file->entryCount);
dc8c34
-            return CL5_SUCCESS;
dc8c34
-        } else {
dc8c34
-            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
-                            "_cl5CurrentDeleteEntry failed, err=%d %s\n", 
dc8c34
-                            rc, db_strerror(rc));
dc8c34
-	    /* We don't free(close) the cursor here, as the caller will free it by a call to cl5DestroyIterator */
dc8c34
-	    /* Freeing it here is a potential bug, as the cursor can't be referenced later once freed */
dc8c34
-            return CL5_DB_ERROR;
dc8c34
-        }
dc8c34
+		/* decrement entry count */
dc8c34
+		file = (CL5DBFile*)object_get_data (it->file);
dc8c34
+		PR_AtomicDecrement (&file->entryCount);
dc8c34
+		return CL5_SUCCESS;
dc8c34
+	} else {
dc8c34
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+			"_cl5CurrentDeleteEntry failed, err=%d %s\n",
dc8c34
+			rc, db_strerror(rc));
dc8c34
+		/*
dc8c34
+		 * We don't free(close) the cursor here, as the caller will free it by
dc8c34
+		 * a call to cl5DestroyIterator.  Freeing it here is a potential bug,
dc8c34
+		 * as the cursor can't be referenced later once freed.
dc8c34
+		 */
dc8c34
+		return rc;
dc8c34
+	}
dc8c34
 }
dc8c34
 
dc8c34
 static PRBool _cl5IsValidIterator (const CL5Iterator *iterator)
dc8c34
@@ -6186,7 +6540,7 @@ static int _cl5ExportFile (PRFileDesc *prFile, Object *obj)
dc8c34
 	slapi_write_buffer (prFile, "\n", strlen("\n"));
dc8c34
 
dc8c34
 	entry.op = &op;
dc8c34
-	rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL); 
dc8c34
+	rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL);
dc8c34
 	while (rc == CL5_SUCCESS)
dc8c34
 	{
dc8c34
 		rc = _cl5Operation2LDIF (&op, file->replGen, &buff, &len;;
dc8c34
@@ -6607,16 +6961,16 @@ cl5CleanRUV(ReplicaId rid){
dc8c34
     slapi_rwlock_unlock (s_cl5Desc.stLock);
dc8c34
 }
dc8c34
 
dc8c34
-void trigger_cl_trimming(ReplicaId rid){
dc8c34
+void trigger_cl_purging(ReplicaId rid){
dc8c34
     PRThread *trim_tid = NULL;
dc8c34
 
dc8c34
-    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_trimming: rid (%d)\n",(int)rid);
dc8c34
-    trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_trimming_thread,
dc8c34
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid);
dc8c34
+    trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread,
dc8c34
                    (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
dc8c34
                    PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
dc8c34
     if (NULL == trim_tid){
dc8c34
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
-            "trigger_cl_trimming: failed to create trimming "
dc8c34
+            "trigger_cl_purging: failed to create trimming "
dc8c34
             "thread; NSPR error - %d\n", PR_GetError ());
dc8c34
     } else {
dc8c34
         /* need a little time for the thread to get started */
dc8c34
@@ -6625,7 +6979,7 @@ void trigger_cl_trimming(ReplicaId rid){
dc8c34
 }
dc8c34
 
dc8c34
 void
dc8c34
-trigger_cl_trimming_thread(void *arg){
dc8c34
+trigger_cl_purging_thread(void *arg){
dc8c34
     ReplicaId rid = *(ReplicaId *)arg;
dc8c34
 
dc8c34
     /* make sure we have a change log, and we aren't closing it */
dc8c34
@@ -6634,7 +6988,7 @@ trigger_cl_trimming_thread(void *arg){
dc8c34
     }
dc8c34
     if (CL5_SUCCESS != _cl5AddThread()) {
dc8c34
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
-            "trigger_cl_trimming: failed to increment thread count "
dc8c34
+            "trigger_cl_purging: failed to increment thread count "
dc8c34
             "NSPR error - %d\n", PR_GetError ());
dc8c34
     }
dc8c34
     _cl5DoTrimming(rid);
dc8c34
diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h
dc8c34
index 9b285ca..b46a691 100644
dc8c34
--- a/ldap/servers/plugins/replication/cl5_api.h
dc8c34
+++ b/ldap/servers/plugins/replication/cl5_api.h
dc8c34
@@ -145,6 +145,9 @@ enum
dc8c34
 	CL5_CSN_ERROR,		/* CSN API failed */
dc8c34
 	CL5_RUV_ERROR,		/* RUV API failed */
dc8c34
 	CL5_OBJSET_ERROR,	/* namedobjset api failed */
dc8c34
+	CL5_DB_LOCK_ERROR,  /* bdb returns error 12 when the db runs out of locks,
dc8c34
+	                       this var needs to be in slot 12 of the list.
dc8c34
+	                       Do not re-order enum above! */
dc8c34
 	CL5_PURGED_DATA,    /* requested data has been purged */
dc8c34
 	CL5_MISSING_DATA,   /* data should be in the changelog, but is missing */
dc8c34
 	CL5_UNKNOWN_ERROR,	/* unclassified error */
dc8c34
@@ -490,6 +493,6 @@ int cl5WriteRUV();
dc8c34
 int cl5DeleteRUV();
dc8c34
 void cl5CleanRUV(ReplicaId rid);
dc8c34
 void cl5NotifyCleanup(int rid);
dc8c34
-void trigger_cl_trimming(ReplicaId rid);
dc8c34
+void trigger_cl_purging(ReplicaId rid);
dc8c34
 
dc8c34
 #endif
dc8c34
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c
dc8c34
index ae4c2ff..68b3ce5 100644
dc8c34
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
dc8c34
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
dc8c34
@@ -1216,6 +1216,11 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not
dc8c34
 	 */
dc8c34
 	cl5CleanRUV(rid);
dc8c34
 
dc8c34
+	/*
dc8c34
+	 * Now purge the changelog
dc8c34
+	 */
dc8c34
+	trigger_cl_purging(rid);
dc8c34
+
dc8c34
 	if (rc != RUV_SUCCESS){
dc8c34
 		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc);
dc8c34
 		return LDAP_OPERATIONS_ERROR;
dc8c34
@@ -1603,7 +1608,7 @@ replica_cleanallruv_thread(void *arg)
dc8c34
             /* no agmts, just clean this replica */
dc8c34
             break;
dc8c34
         }
dc8c34
-        while (agmt_obj){
dc8c34
+        while (agmt_obj && !slapi_is_shutting_down()){
dc8c34
             agmt = (Repl_Agmt*)object_get_data (agmt_obj);
dc8c34
             if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){
dc8c34
                 agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj);
dc8c34
@@ -1685,13 +1690,15 @@ replica_cleanallruv_thread(void *arg)
dc8c34
             break;
dc8c34
         }
dc8c34
         /*
dc8c34
-         *  need to sleep between passes
dc8c34
+         * Need to sleep between passes unless we are shutting down
dc8c34
          */
dc8c34
-        cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, "Replicas have not been cleaned yet, "
dc8c34
-            "retrying in %d seconds", interval);
dc8c34
-        PR_Lock( notify_lock );
dc8c34
-        PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) );
dc8c34
-        PR_Unlock( notify_lock );
dc8c34
+        if (!slapi_is_shutting_down()){
dc8c34
+            cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, "Replicas have not been cleaned yet, "
dc8c34
+                "retrying in %d seconds", interval);
dc8c34
+            PR_Lock( notify_lock );
dc8c34
+            PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) );
dc8c34
+            PR_Unlock( notify_lock );
dc8c34
+        }
dc8c34
 
dc8c34
         if(interval < 14400){ /* 4 hour max */
dc8c34
             interval = interval * 2;
dc8c34
@@ -1702,10 +1709,9 @@ replica_cleanallruv_thread(void *arg)
dc8c34
 
dc8c34
 done:
dc8c34
     /*
dc8c34
-     *  If the replicas are cleaned, release the rid, and trim the changelog
dc8c34
+     *  If the replicas are cleaned, release the rid
dc8c34
      */
dc8c34
     if(!aborted){
dc8c34
-        trigger_cl_trimming(data->rid);
dc8c34
         delete_cleaned_rid_config(data);
dc8c34
         /* make sure all the replicas have been "pre_cleaned" before finishing */
dc8c34
         check_replicas_are_done_cleaning(data);
dc8c34
@@ -1715,7 +1721,7 @@ done:
dc8c34
         /*
dc8c34
          *  Shutdown or abort
dc8c34
          */
dc8c34
-        if(!is_task_aborted(data->rid)){
dc8c34
+        if(!is_task_aborted(data->rid) || slapi_is_shutting_down()){
dc8c34
             cleanruv_log(data->task, data->rid, CLEANALLRUV_ID,"Server shutting down.  Process will resume at server startup");
dc8c34
         } else {
dc8c34
             cleanruv_log(data->task, data->rid, CLEANALLRUV_ID,"Task aborted for rid(%d).",data->rid);
dc8c34
@@ -1918,7 +1924,7 @@ check_agmts_are_caught_up(cleanruv_data *data, char *maxcsn)
dc8c34
             not_all_caughtup = 0;
dc8c34
             break;
dc8c34
         }
dc8c34
-        while (agmt_obj){
dc8c34
+        while (agmt_obj && !slapi_is_shutting_down()){
dc8c34
             agmt = (Repl_Agmt*)object_get_data (agmt_obj);
dc8c34
             if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){
dc8c34
                 agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj);
dc8c34
@@ -1976,7 +1982,7 @@ check_agmts_are_alive(Replica *replica, ReplicaId rid, Slapi_Task *task)
dc8c34
             not_all_alive = 0;
dc8c34
             break;
dc8c34
         }
dc8c34
-        while (agmt_obj){
dc8c34
+        while (agmt_obj && !slapi_is_shutting_down()){
dc8c34
             agmt = (Repl_Agmt*)object_get_data (agmt_obj);
dc8c34
             if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){
dc8c34
                 agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
dc8c34
@@ -2746,12 +2752,14 @@ replica_abort_task_thread(void *arg)
dc8c34
             break;
dc8c34
         }
dc8c34
         /*
dc8c34
-         *  need to sleep between passes
dc8c34
+         *  Need to sleep between passes. unless we are shutting down
dc8c34
          */
dc8c34
-        cleanruv_log(data->task, data->rid, ABORT_CLEANALLRUV_ID,"Retrying in %d seconds",interval);
dc8c34
-        PR_Lock( notify_lock );
dc8c34
-        PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) );
dc8c34
-        PR_Unlock( notify_lock );
dc8c34
+        if (!slapi_is_shutting_down()){
dc8c34
+            cleanruv_log(data->task, data->rid, ABORT_CLEANALLRUV_ID,"Retrying in %d seconds",interval);
dc8c34
+            PR_Lock( notify_lock );
dc8c34
+            PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) );
dc8c34
+            PR_Unlock( notify_lock );
dc8c34
+        }
dc8c34
 
dc8c34
         if(interval < 14400){ /* 4 hour max */
dc8c34
             interval = interval * 2;
dc8c34
@@ -2769,7 +2777,7 @@ done:
dc8c34
          *  Wait for this server to stop its cleanallruv task(which removes the rid from the cleaned list)
dc8c34
          */
dc8c34
         cleanruv_log(data->task, data->rid, ABORT_CLEANALLRUV_ID, "Waiting for CleanAllRUV task to abort...");
dc8c34
-        while(is_cleaned_rid(data->rid)){
dc8c34
+        while(is_cleaned_rid(data->rid) && !slapi_is_shutting_down()){
dc8c34
             DS_Sleep(PR_SecondsToInterval(1));
dc8c34
             count++;
dc8c34
             if(count == 60){ /* it should not take this long */
dc8c34
-- 
dc8c34
1.9.3
dc8c34