Blame SOURCES/0016-Issue-3903-fix-repl-keep-alive-event-interval.patch

cb1cc6
From 01e941e3eadd7a208982d20c0ca9c104142f2b91 Mon Sep 17 00:00:00 2001
cb1cc6
From: Mark Reynolds <mreynolds@redhat.com>
cb1cc6
Date: Wed, 10 Aug 2022 08:58:28 -0400
cb1cc6
Subject: [PATCH 4/4] Issue 3903 - fix repl keep alive event interval
cb1cc6
cb1cc6
Description:  Previously we passed the interval as seconds to the
cb1cc6
              event queue, but it is supposed to be milliseconds.
cb1cc6
cb1cc6
              Fixed a crash with repl logging and decoding extended
cb1cc6
              op payload (referrals).
cb1cc6
cb1cc6
              Also reworked alot of the replication CI tests that
cb1cc6
              were flaky.
cb1cc6
cb1cc6
relates: https://github.com/389ds/389-ds-base/issues/3903
cb1cc6
cb1cc6
Reviewed by: tbordaz & spichugi(Thanks!)
cb1cc6
---
cb1cc6
 .../suites/replication/acceptance_test.py     |  52 +-
cb1cc6
 .../cleanallruv_abort_certify_test.py         | 136 ++++
cb1cc6
 .../cleanallruv_abort_restart_test.py         | 146 ++++
cb1cc6
 .../replication/cleanallruv_abort_test.py     | 123 +++
cb1cc6
 .../replication/cleanallruv_force_test.py     | 187 +++++
cb1cc6
 .../cleanallruv_multiple_force_test.py        | 214 +++++
cb1cc6
 .../replication/cleanallruv_restart_test.py   | 161 ++++
cb1cc6
 .../cleanallruv_shutdown_crash_test.py        | 123 +++
cb1cc6
 .../replication/cleanallruv_stress_test.py    | 216 +++++
cb1cc6
 .../suites/replication/cleanallruv_test.py    | 742 +-----------------
cb1cc6
 .../suites/replication/regression_m2_test.py  |  13 +-
cb1cc6
 .../replication/regression_m2c2_test.py       |   1 +
cb1cc6
 .../plugins/replication/repl5_replica.c       |  12 +-
cb1cc6
 ldap/servers/plugins/replication/repl_extop.c |   4 +-
cb1cc6
 ldap/servers/slapd/task.c                     |   8 +-
cb1cc6
 src/lib389/lib389/instance/remove.py          |   6 +
cb1cc6
 16 files changed, 1385 insertions(+), 759 deletions(-)
cb1cc6
 create mode 100644 dirsrvtests/tests/suites/replication/cleanallruv_abort_certify_test.py
cb1cc6
 create mode 100644 dirsrvtests/tests/suites/replication/cleanallruv_abort_restart_test.py
cb1cc6
 create mode 100644 dirsrvtests/tests/suites/replication/cleanallruv_abort_test.py
cb1cc6
 create mode 100644 dirsrvtests/tests/suites/replication/cleanallruv_force_test.py
cb1cc6
 create mode 100644 dirsrvtests/tests/suites/replication/cleanallruv_multiple_force_test.py
cb1cc6
 create mode 100644 dirsrvtests/tests/suites/replication/cleanallruv_restart_test.py
cb1cc6
 create mode 100644 dirsrvtests/tests/suites/replication/cleanallruv_shutdown_crash_test.py
cb1cc6
 create mode 100644 dirsrvtests/tests/suites/replication/cleanallruv_stress_test.py
cb1cc6
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/acceptance_test.py b/dirsrvtests/tests/suites/replication/acceptance_test.py
cb1cc6
index a5f0c4c6b..863ee2553 100644
cb1cc6
--- a/dirsrvtests/tests/suites/replication/acceptance_test.py
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/acceptance_test.py
cb1cc6
@@ -8,6 +8,7 @@
cb1cc6
 #
cb1cc6
 import pytest
cb1cc6
 import logging
cb1cc6
+import time
cb1cc6
 from lib389.replica import Replicas
cb1cc6
 from lib389.tasks import *
cb1cc6
 from lib389.utils import *
cb1cc6
@@ -124,12 +125,16 @@ def test_modify_entry(topo_m4, create_entry):
cb1cc6
         8. Some time should pass
cb1cc6
         9. The change should be present on all suppliers
cb1cc6
     """
cb1cc6
+    if DEBUGGING:
cb1cc6
+        sleep_time = 8
cb1cc6
+    else:
cb1cc6
+        sleep_time = 2
cb1cc6
 
cb1cc6
     log.info('Modifying entry {} - add operation'.format(TEST_ENTRY_DN))
cb1cc6
 
cb1cc6
     test_user = UserAccount(topo_m4.ms["supplier1"], TEST_ENTRY_DN)
cb1cc6
     test_user.add('mail', '{}@redhat.com'.format(TEST_ENTRY_NAME))
cb1cc6
-    time.sleep(1)
cb1cc6
+    time.sleep(sleep_time)
cb1cc6
 
cb1cc6
     all_user = topo_m4.all_get_dsldapobject(TEST_ENTRY_DN, UserAccount)
cb1cc6
     for u in all_user:
cb1cc6
@@ -137,7 +142,7 @@ def test_modify_entry(topo_m4, create_entry):
cb1cc6
 
cb1cc6
     log.info('Modifying entry {} - replace operation'.format(TEST_ENTRY_DN))
cb1cc6
     test_user.replace('mail', '{}@greenhat.com'.format(TEST_ENTRY_NAME))
cb1cc6
-    time.sleep(1)
cb1cc6
+    time.sleep(sleep_time)
cb1cc6
 
cb1cc6
     all_user = topo_m4.all_get_dsldapobject(TEST_ENTRY_DN, UserAccount)
cb1cc6
     for u in all_user:
cb1cc6
@@ -145,7 +150,7 @@ def test_modify_entry(topo_m4, create_entry):
cb1cc6
 
cb1cc6
     log.info('Modifying entry {} - delete operation'.format(TEST_ENTRY_DN))
cb1cc6
     test_user.remove('mail', '{}@greenhat.com'.format(TEST_ENTRY_NAME))
cb1cc6
-    time.sleep(1)
cb1cc6
+    time.sleep(sleep_time)
cb1cc6
 
cb1cc6
     all_user = topo_m4.all_get_dsldapobject(TEST_ENTRY_DN, UserAccount)
cb1cc6
     for u in all_user:
cb1cc6
@@ -167,7 +172,10 @@ def test_delete_entry(topo_m4, create_entry):
cb1cc6
 
cb1cc6
     log.info('Deleting entry {} during the test'.format(TEST_ENTRY_DN))
cb1cc6
     topo_m4.ms["supplier1"].delete_s(TEST_ENTRY_DN)
cb1cc6
-
cb1cc6
+    if DEBUGGING:
cb1cc6
+        time.sleep(8)
cb1cc6
+    else:
cb1cc6
+        time.sleep(1)
cb1cc6
     entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["uid"])
cb1cc6
     assert not entries, "Entry deletion {} wasn't replicated successfully".format(TEST_ENTRY_DN)
cb1cc6
 
cb1cc6
@@ -231,6 +239,11 @@ def test_modrdn_after_pause(topo_m4):
cb1cc6
         5. The change should be present on all suppliers
cb1cc6
     """
cb1cc6
 
cb1cc6
+    if DEBUGGING:
cb1cc6
+        sleep_time = 8
cb1cc6
+    else:
cb1cc6
+        sleep_time = 3
cb1cc6
+
cb1cc6
     newrdn_name = 'newrdn'
cb1cc6
     newrdn_dn = 'uid={},{}'.format(newrdn_name, DEFAULT_SUFFIX)
cb1cc6
 
cb1cc6
@@ -264,7 +277,7 @@ def test_modrdn_after_pause(topo_m4):
cb1cc6
     topo_m4.resume_all_replicas()
cb1cc6
 
cb1cc6
     log.info('Wait for replication to happen')
cb1cc6
-    time.sleep(3)
cb1cc6
+    time.sleep(sleep_time)
cb1cc6
 
cb1cc6
     try:
cb1cc6
         entries_new = get_repl_entries(topo_m4, newrdn_name, ["uid"])
cb1cc6
@@ -354,6 +367,11 @@ def test_many_attrs(topo_m4, create_entry):
cb1cc6
     for add_name in add_list:
cb1cc6
         test_user.add('description', add_name)
cb1cc6
 
cb1cc6
+    if DEBUGGING:
cb1cc6
+        time.sleep(10)
cb1cc6
+    else:
cb1cc6
+        time.sleep(1)
cb1cc6
+
cb1cc6
     log.info('Check that everything was properly replicated after an add operation')
cb1cc6
     entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["description"])
cb1cc6
     for entry in entries:
cb1cc6
@@ -363,6 +381,11 @@ def test_many_attrs(topo_m4, create_entry):
cb1cc6
     for delete_name in delete_list:
cb1cc6
         test_user.remove('description', delete_name)
cb1cc6
 
cb1cc6
+    if DEBUGGING:
cb1cc6
+        time.sleep(10)
cb1cc6
+    else:
cb1cc6
+        time.sleep(1)
cb1cc6
+
cb1cc6
     log.info('Check that everything was properly replicated after a delete operation')
cb1cc6
     entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["description"])
cb1cc6
     for entry in entries:
cb1cc6
@@ -386,12 +409,22 @@ def test_double_delete(topo_m4, create_entry):
cb1cc6
     log.info('Deleting entry {} from supplier1'.format(TEST_ENTRY_DN))
cb1cc6
     topo_m4.ms["supplier1"].delete_s(TEST_ENTRY_DN)
cb1cc6
 
cb1cc6
+    if DEBUGGING:
cb1cc6
+        time.sleep(5)
cb1cc6
+    else:
cb1cc6
+        time.sleep(1)
cb1cc6
+
cb1cc6
     log.info('Deleting entry {} from supplier2'.format(TEST_ENTRY_DN))
cb1cc6
     try:
cb1cc6
         topo_m4.ms["supplier2"].delete_s(TEST_ENTRY_DN)
cb1cc6
     except ldap.NO_SUCH_OBJECT:
cb1cc6
         log.info("Entry {} wasn't found supplier2. It is expected.".format(TEST_ENTRY_DN))
cb1cc6
 
cb1cc6
+    if DEBUGGING:
cb1cc6
+        time.sleep(5)
cb1cc6
+    else:
cb1cc6
+        time.sleep(1)
cb1cc6
+        
cb1cc6
     log.info('Make searches to check if server is alive')
cb1cc6
     entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["uid"])
cb1cc6
     assert not entries, "Entry deletion {} wasn't replicated successfully".format(TEST_ENTRY_DN)
cb1cc6
@@ -436,6 +469,11 @@ def test_password_repl_error(topo_m4, create_entry):
cb1cc6
     m3_conn = test_user_m3.bind(TEST_ENTRY_NEW_PASS)
cb1cc6
     m4_conn = test_user_m4.bind(TEST_ENTRY_NEW_PASS)
cb1cc6
 
cb1cc6
+    if DEBUGGING:
cb1cc6
+        time.sleep(5)
cb1cc6
+    else:
cb1cc6
+        time.sleep(1)
cb1cc6
+
cb1cc6
     log.info('Check the error log for the error with {}'.format(TEST_ENTRY_DN))
cb1cc6
     assert not m2.ds_error_log.match('.*can.t add a change for uid={}.*'.format(TEST_ENTRY_NAME))
cb1cc6
 
cb1cc6
@@ -552,7 +590,7 @@ def test_csnpurge_large_valueset(topo_m2):
cb1cc6
     replica = replicas.list()[0]
cb1cc6
     log.info('nsds5ReplicaPurgeDelay to 5')
cb1cc6
     replica.set('nsds5ReplicaPurgeDelay', '5')
cb1cc6
-    time.sleep(6)
cb1cc6
+    time.sleep(10)
cb1cc6
 
cb1cc6
     # add some new values to the valueset containing entries that should be purged
cb1cc6
     for i in range(21,25):
cb1cc6
@@ -612,7 +650,7 @@ def test_urp_trigger_substring_search(topo_m2):
cb1cc6
             break
cb1cc6
         else:
cb1cc6
             log.info('Entry not yet replicated on M2, wait a bit')
cb1cc6
-            time.sleep(2)
cb1cc6
+            time.sleep(3)
cb1cc6
 
cb1cc6
     # check that M2 access logs does not "(&(objectclass=nstombstone)(nscpentrydn=uid=asterisk_*_in_value,dc=example,dc=com))"
cb1cc6
     log.info('Check that on M2, URP as not triggered such internal search')
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_abort_certify_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_abort_certify_test.py
cb1cc6
new file mode 100644
cb1cc6
index 000000000..603693b9e
cb1cc6
--- /dev/null
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_abort_certify_test.py
cb1cc6
@@ -0,0 +1,136 @@
cb1cc6
+# --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
+# All rights reserved.
cb1cc6
+#
cb1cc6
+# License: GPL (version 3 or any later version).
cb1cc6
+# See LICENSE for details.
cb1cc6
+# --- END COPYRIGHT BLOCK ---
cb1cc6
+#
cb1cc6
+import logging
cb1cc6
+import pytest
cb1cc6
+import os
cb1cc6
+import time
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
+from lib389.topologies import topology_m4
cb1cc6
+from lib389.tasks import CleanAllRUVTask
cb1cc6
+from lib389.replica import ReplicationManager, Replicas
cb1cc6
+
cb1cc6
+log = logging.getLogger(__name__)
cb1cc6
+
cb1cc6
+
cb1cc6
+def remove_supplier4_agmts(msg, topology_m4):
cb1cc6
+    """Remove all the repl agmts to supplier4. """
cb1cc6
+
cb1cc6
+    log.info('%s: remove all the agreements to supplier 4...' % msg)
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    # This will delete m4 from the topo *and* remove all incoming agreements
cb1cc6
+    # to m4.
cb1cc6
+    repl.remove_supplier(topology_m4.ms["supplier4"],
cb1cc6
+        [topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]])
cb1cc6
+
cb1cc6
+def task_done(topology_m4, task_dn, timeout=60):
cb1cc6
+    """Check if the task is complete"""
cb1cc6
+
cb1cc6
+    attrlist = ['nsTaskLog', 'nsTaskStatus', 'nsTaskExitCode',
cb1cc6
+                'nsTaskCurrentItem', 'nsTaskTotalItems']
cb1cc6
+    done = False
cb1cc6
+    count = 0
cb1cc6
+
cb1cc6
+    while not done and count < timeout:
cb1cc6
+        try:
cb1cc6
+            entry = topology_m4.ms["supplier1"].getEntry(task_dn, attrlist=attrlist)
cb1cc6
+            if entry is not None:
cb1cc6
+                if entry.hasAttr('nsTaskExitCode'):
cb1cc6
+                    done = True
cb1cc6
+                    break
cb1cc6
+            else:
cb1cc6
+                done = True
cb1cc6
+                break
cb1cc6
+        except ldap.NO_SUCH_OBJECT:
cb1cc6
+            done = True
cb1cc6
+            break
cb1cc6
+        except ldap.LDAPError:
cb1cc6
+            break
cb1cc6
+        time.sleep(1)
cb1cc6
+        count += 1
cb1cc6
+
cb1cc6
+    return done
cb1cc6
+
cb1cc6
+@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
+def test_abort_certify(topology_m4):
cb1cc6
+    """Test the abort task with a replica-certify-all option
cb1cc6
+
cb1cc6
+    :id: 78959966-d644-44a8-b98c-1fcf21b45eb0
cb1cc6
+    :setup: Replication setup with four suppliers
cb1cc6
+    :steps:
cb1cc6
+        1. Disable replication on supplier 4
cb1cc6
+        2. Remove agreements to supplier 4 from other suppliers
cb1cc6
+        3. Stop supplier 2
cb1cc6
+        4. Run a cleanallruv task on supplier 1
cb1cc6
+        5. Run a cleanallruv abort task on supplier 1 with a replica-certify-all option
cb1cc6
+    :expectedresults: No hanging tasks left
cb1cc6
+        1. Replication on supplier 4 should be disabled
cb1cc6
+        2. Agreements to supplier 4 should be removed
cb1cc6
+        3. Supplier 2 should be stopped
cb1cc6
+        4. Operation should be successful
cb1cc6
+        5. Operation should be successful
cb1cc6
+    """
cb1cc6
+
cb1cc6
+    log.info('Running test_abort_certify...')
cb1cc6
+
cb1cc6
+    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
+    remove_supplier4_agmts("test_abort_certify", topology_m4)
cb1cc6
+
cb1cc6
+    # Stop supplier 2
cb1cc6
+    log.info('test_abort_certify: stop supplier 2 to freeze the cleanAllRUV task...')
cb1cc6
+    topology_m4.ms["supplier2"].stop()
cb1cc6
+
cb1cc6
+    # Run the task
cb1cc6
+    log.info('test_abort_certify: add the cleanAllRUV task...')
cb1cc6
+    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
+    cruv_task.create(properties={
cb1cc6
+        'replica-id': m4rid,
cb1cc6
+        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+        'replica-force-cleaning': 'no',
cb1cc6
+        'replica-certify-all': 'yes'
cb1cc6
+        })
cb1cc6
+    # Wait a bit
cb1cc6
+    time.sleep(2)
cb1cc6
+
cb1cc6
+    # Abort the task
cb1cc6
+    log.info('test_abort_certify: abort the cleanAllRUV task...')
cb1cc6
+    abort_task = cruv_task.abort(certify=True)
cb1cc6
+
cb1cc6
+    # Wait a while and make sure the abort task is still running
cb1cc6
+    log.info('test_abort_certify...')
cb1cc6
+
cb1cc6
+    if task_done(topology_m4, abort_task.dn, 10):
cb1cc6
+        log.fatal('test_abort_certify: abort task incorrectly finished')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    # Now start supplier 2 so it can be aborted
cb1cc6
+    log.info('test_abort_certify: start supplier 2 to allow the abort task to finish...')
cb1cc6
+    topology_m4.ms["supplier2"].start()
cb1cc6
+
cb1cc6
+    # Wait for the abort task to stop
cb1cc6
+    if not task_done(topology_m4, abort_task.dn, 90):
cb1cc6
+        log.fatal('test_abort_certify: The abort CleanAllRUV task was not aborted')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    # Check supplier 1 does not have the clean task running
cb1cc6
+    log.info('test_abort_certify: check supplier 1 no longer has a cleanAllRUV task...')
cb1cc6
+    if not task_done(topology_m4, cruv_task.dn):
cb1cc6
+        log.fatal('test_abort_certify: CleanAllRUV task was not aborted')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    log.info('test_abort_certify PASSED')
cb1cc6
+
cb1cc6
+
cb1cc6
+if __name__ == '__main__':
cb1cc6
+    # Run isolated
cb1cc6
+    # -s for DEBUG mode
cb1cc6
+    CURRENT_FILE = os.path.realpath(__file__)
cb1cc6
+    pytest.main(["-s", CURRENT_FILE])
cb1cc6
+
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_abort_restart_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_abort_restart_test.py
cb1cc6
new file mode 100644
cb1cc6
index 000000000..1406c6553
cb1cc6
--- /dev/null
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_abort_restart_test.py
cb1cc6
@@ -0,0 +1,146 @@
cb1cc6
+# --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
+# All rights reserved.
cb1cc6
+#
cb1cc6
+# License: GPL (version 3 or any later version).
cb1cc6
+# See LICENSE for details.
cb1cc6
+# --- END COPYRIGHT BLOCK ---
cb1cc6
+#
cb1cc6
+import logging
cb1cc6
+import pytest
cb1cc6
+import os
cb1cc6
+import time
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
+from lib389.topologies import topology_m4
cb1cc6
+from lib389.tasks import CleanAllRUVTask
cb1cc6
+from lib389.replica import ReplicationManager
cb1cc6
+
cb1cc6
+log = logging.getLogger(__name__)
cb1cc6
+
cb1cc6
+
cb1cc6
+def remove_supplier4_agmts(msg, topology_m4):
cb1cc6
+    """Remove all the repl agmts to supplier4. """
cb1cc6
+
cb1cc6
+    log.info('%s: remove all the agreements to supplier 4...' % msg)
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    # This will delete m4 from the topo *and* remove all incoming agreements
cb1cc6
+    # to m4.
cb1cc6
+    repl.remove_supplier(topology_m4.ms["supplier4"],
cb1cc6
+        [topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]])
cb1cc6
+
cb1cc6
+def task_done(topology_m4, task_dn, timeout=60):
cb1cc6
+    """Check if the task is complete"""
cb1cc6
+
cb1cc6
+    attrlist = ['nsTaskLog', 'nsTaskStatus', 'nsTaskExitCode',
cb1cc6
+                'nsTaskCurrentItem', 'nsTaskTotalItems']
cb1cc6
+    done = False
cb1cc6
+    count = 0
cb1cc6
+
cb1cc6
+    while not done and count < timeout:
cb1cc6
+        try:
cb1cc6
+            entry = topology_m4.ms["supplier1"].getEntry(task_dn, attrlist=attrlist)
cb1cc6
+            if entry is not None:
cb1cc6
+                if entry.hasAttr('nsTaskExitCode'):
cb1cc6
+                    done = True
cb1cc6
+                    break
cb1cc6
+            else:
cb1cc6
+                done = True
cb1cc6
+                break
cb1cc6
+        except ldap.NO_SUCH_OBJECT:
cb1cc6
+            done = True
cb1cc6
+            break
cb1cc6
+        except ldap.LDAPError:
cb1cc6
+            break
cb1cc6
+        time.sleep(1)
cb1cc6
+        count += 1
cb1cc6
+
cb1cc6
+    return done
cb1cc6
+   
cb1cc6
+
cb1cc6
+@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
+def test_abort_restart(topology_m4):
cb1cc6
+    """Test the abort task can handle a restart, and then resume
cb1cc6
+
cb1cc6
+    :id: b66e33d4-fe85-4e1c-b882-75da80f70ab3
cb1cc6
+    :setup: Replication setup with four suppliers
cb1cc6
+    :steps:
cb1cc6
+        1. Disable replication on supplier 4
cb1cc6
+        2. Remove agreements to supplier 4 from other suppliers
cb1cc6
+        3. Stop supplier 3
cb1cc6
+        4. Run a cleanallruv task on supplier 1
cb1cc6
+        5. Run a cleanallruv abort task on supplier 1
cb1cc6
+        6. Restart supplier 1
cb1cc6
+        7. Make sure that no crash happened
cb1cc6
+        8. Start supplier 3
cb1cc6
+        9. Check supplier 1 does not have the clean task running
cb1cc6
+        10. Check that errors log doesn't have 'Aborting abort task' message
cb1cc6
+    :expectedresults:
cb1cc6
+        1. Replication on supplier 4 should be disabled
cb1cc6
+        2. Agreements to supplier 4 should be removed
cb1cc6
+        3. Supplier 3 should be stopped
cb1cc6
+        4. Operation should be successful
cb1cc6
+        5. Operation should be successful
cb1cc6
+        6. Supplier 1 should be restarted
cb1cc6
+        7. No crash should happened
cb1cc6
+        8. Supplier 3 should be started
cb1cc6
+        9. Check supplier 1 shouldn't have the clean task running
cb1cc6
+        10. Errors log shouldn't have 'Aborting abort task' message
cb1cc6
+    """
cb1cc6
+
cb1cc6
+    log.info('Running test_abort_restart...')
cb1cc6
+    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
+    remove_supplier4_agmts("test_abort", topology_m4)
cb1cc6
+
cb1cc6
+    # Stop supplier 3
cb1cc6
+    log.info('test_abort_restart: stop supplier 3 to freeze the cleanAllRUV task...')
cb1cc6
+    topology_m4.ms["supplier3"].stop()
cb1cc6
+
cb1cc6
+    # Run the task
cb1cc6
+    log.info('test_abort_restart: add the cleanAllRUV task...')
cb1cc6
+    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
+    cruv_task.create(properties={
cb1cc6
+        'replica-id': m4rid,
cb1cc6
+        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+        'replica-force-cleaning': 'no',
cb1cc6
+        'replica-certify-all': 'yes'
cb1cc6
+        })
cb1cc6
+    # Wait a bit
cb1cc6
+    time.sleep(2)
cb1cc6
+
cb1cc6
+    # Abort the task
cb1cc6
+    cruv_task.abort(certify=True)
cb1cc6
+
cb1cc6
+    # Check supplier 1 does not have the clean task running
cb1cc6
+    log.info('test_abort_abort: check supplier 1 no longer has a cleanAllRUV task...')
cb1cc6
+    if not task_done(topology_m4, cruv_task.dn):
cb1cc6
+        log.fatal('test_abort_restart: CleanAllRUV task was not aborted')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    # Now restart supplier 1, and make sure the abort process completes
cb1cc6
+    topology_m4.ms["supplier1"].restart()
cb1cc6
+    if topology_m4.ms["supplier1"].detectDisorderlyShutdown():
cb1cc6
+        log.fatal('test_abort_restart: Supplier 1 previously crashed!')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    # Start supplier 3
cb1cc6
+    topology_m4.ms["supplier3"].start()
cb1cc6
+
cb1cc6
+    # Need to wait 5 seconds before server processes any leftover tasks
cb1cc6
+    time.sleep(6)
cb1cc6
+
cb1cc6
+    # Check supplier 1 tried to run abort task.  We expect the abort task to be aborted.
cb1cc6
+    if not topology_m4.ms["supplier1"].searchErrorsLog('Aborting abort task'):
cb1cc6
+        log.fatal('test_abort_restart: Abort task did not restart')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    log.info('test_abort_restart PASSED')
cb1cc6
+    
cb1cc6
+
cb1cc6
+if __name__ == '__main__':
cb1cc6
+    # Run isolated
cb1cc6
+    # -s for DEBUG mode
cb1cc6
+    CURRENT_FILE = os.path.realpath(__file__)
cb1cc6
+    pytest.main(["-s", CURRENT_FILE])
cb1cc6
+
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_abort_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_abort_test.py
cb1cc6
new file mode 100644
cb1cc6
index 000000000..f89188165
cb1cc6
--- /dev/null
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_abort_test.py
cb1cc6
@@ -0,0 +1,123 @@
cb1cc6
+# --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
+# All rights reserved.
cb1cc6
+#
cb1cc6
+# License: GPL (version 3 or any later version).
cb1cc6
+# See LICENSE for details.
cb1cc6
+# --- END COPYRIGHT BLOCK ---
cb1cc6
+#
cb1cc6
+import logging
cb1cc6
+import pytest
cb1cc6
+import os
cb1cc6
+import time
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
+from lib389.topologies import topology_m4
cb1cc6
+from lib389.tasks import CleanAllRUVTask
cb1cc6
+from lib389.replica import ReplicationManager
cb1cc6
+
cb1cc6
+log = logging.getLogger(__name__)
cb1cc6
+
cb1cc6
+
cb1cc6
+def remove_supplier4_agmts(msg, topology_m4):
cb1cc6
+    """Remove all the repl agmts to supplier4. """
cb1cc6
+
cb1cc6
+    log.info('%s: remove all the agreements to supplier 4...' % msg)
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    # This will delete m4 from the topo *and* remove all incoming agreements
cb1cc6
+    # to m4.
cb1cc6
+    repl.remove_supplier(topology_m4.ms["supplier4"],
cb1cc6
+        [topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]])
cb1cc6
+
cb1cc6
+def task_done(topology_m4, task_dn, timeout=60):
cb1cc6
+    """Check if the task is complete"""
cb1cc6
+
cb1cc6
+    attrlist = ['nsTaskLog', 'nsTaskStatus', 'nsTaskExitCode',
cb1cc6
+                'nsTaskCurrentItem', 'nsTaskTotalItems']
cb1cc6
+    done = False
cb1cc6
+    count = 0
cb1cc6
+
cb1cc6
+    while not done and count < timeout:
cb1cc6
+        try:
cb1cc6
+            entry = topology_m4.ms["supplier1"].getEntry(task_dn, attrlist=attrlist)
cb1cc6
+            if entry is not None:
cb1cc6
+                if entry.hasAttr('nsTaskExitCode'):
cb1cc6
+                    done = True
cb1cc6
+                    break
cb1cc6
+            else:
cb1cc6
+                done = True
cb1cc6
+                break
cb1cc6
+        except ldap.NO_SUCH_OBJECT:
cb1cc6
+            done = True
cb1cc6
+            break
cb1cc6
+        except ldap.LDAPError:
cb1cc6
+            break
cb1cc6
+        time.sleep(1)
cb1cc6
+        count += 1
cb1cc6
+
cb1cc6
+    return done
cb1cc6
+
cb1cc6
+    
cb1cc6
+@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
+def test_abort(topology_m4):
cb1cc6
+    """Test the abort task basic functionality
cb1cc6
+
cb1cc6
+    :id: b09a6887-8de0-4fac-8e41-73ccbaaf7a08
cb1cc6
+    :setup: Replication setup with four suppliers
cb1cc6
+    :steps:
cb1cc6
+        1. Disable replication on supplier 4
cb1cc6
+        2. Remove agreements to supplier 4 from other suppliers
cb1cc6
+        3. Stop supplier 2
cb1cc6
+        4. Run a cleanallruv task on supplier 1
cb1cc6
+        5. Run a cleanallruv abort task on supplier 1
cb1cc6
+    :expectedresults: No hanging tasks left
cb1cc6
+        1. Replication on supplier 4 should be disabled
cb1cc6
+        2. Agreements to supplier 4 should be removed
cb1cc6
+        3. Supplier 2 should be stopped
cb1cc6
+        4. Operation should be successful
cb1cc6
+        5. Operation should be successful
cb1cc6
+    """
cb1cc6
+
cb1cc6
+    log.info('Running test_abort...')
cb1cc6
+    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
+    remove_supplier4_agmts("test_abort", topology_m4)
cb1cc6
+
cb1cc6
+    # Stop supplier 2
cb1cc6
+    log.info('test_abort: stop supplier 2 to freeze the cleanAllRUV task...')
cb1cc6
+    topology_m4.ms["supplier2"].stop()
cb1cc6
+
cb1cc6
+    # Run the task
cb1cc6
+    log.info('test_abort: add the cleanAllRUV task...')
cb1cc6
+    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
+    cruv_task.create(properties={
cb1cc6
+        'replica-id': m4rid,
cb1cc6
+        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+        'replica-force-cleaning': 'no',
cb1cc6
+        'replica-certify-all': 'yes'
cb1cc6
+        })
cb1cc6
+    # Wait a bit
cb1cc6
+    time.sleep(2)
cb1cc6
+
cb1cc6
+    # Abort the task
cb1cc6
+    cruv_task.abort()
cb1cc6
+
cb1cc6
+    # Check supplier 1 does not have the clean task running
cb1cc6
+    log.info('test_abort: check supplier 1 no longer has a cleanAllRUV task...')
cb1cc6
+    if not task_done(topology_m4, cruv_task.dn):
cb1cc6
+        log.fatal('test_abort: CleanAllRUV task was not aborted')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    # Start supplier 2
cb1cc6
+    log.info('test_abort: start supplier 2 to begin the restore process...')
cb1cc6
+    topology_m4.ms["supplier2"].start()
cb1cc6
+
cb1cc6
+    log.info('test_abort PASSED')
cb1cc6
+
cb1cc6
+
cb1cc6
+if __name__ == '__main__':
cb1cc6
+    # Run isolated
cb1cc6
+    # -s for DEBUG mode
cb1cc6
+    CURRENT_FILE = os.path.realpath(__file__)
cb1cc6
+    pytest.main(["-s", CURRENT_FILE])
cb1cc6
+
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_force_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_force_test.py
cb1cc6
new file mode 100644
cb1cc6
index 000000000..d5b930584
cb1cc6
--- /dev/null
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_force_test.py
cb1cc6
@@ -0,0 +1,187 @@
cb1cc6
+# --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
+# All rights reserved.
cb1cc6
+#
cb1cc6
+# License: GPL (version 3 or any later version).
cb1cc6
+# See LICENSE for details.
cb1cc6
+# --- END COPYRIGHT BLOCK ---
cb1cc6
+#
cb1cc6
+import logging
cb1cc6
+import pytest
cb1cc6
+import os
cb1cc6
+import time
cb1cc6
+import random
cb1cc6
+import threading
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
+from lib389.topologies import topology_m4
cb1cc6
+from lib389.tasks import CleanAllRUVTask
cb1cc6
+from lib389.replica import Replicas, ReplicationManager
cb1cc6
+from lib389.idm.directorymanager import DirectoryManager
cb1cc6
+from lib389.idm.user import UserAccounts
cb1cc6
+
cb1cc6
+log = logging.getLogger(__name__)
cb1cc6
+
cb1cc6
+
cb1cc6
+class AddUsers(threading.Thread):
cb1cc6
+    def __init__(self, inst, num_users):
cb1cc6
+        threading.Thread.__init__(self)
cb1cc6
+        self.daemon = True
cb1cc6
+        self.inst = inst
cb1cc6
+        self.num_users = num_users
cb1cc6
+
cb1cc6
+    def run(self):
cb1cc6
+        """Start adding users"""
cb1cc6
+
cb1cc6
+        dm = DirectoryManager(self.inst)
cb1cc6
+        conn = dm.bind()
cb1cc6
+
cb1cc6
+        users = UserAccounts(conn, DEFAULT_SUFFIX)
cb1cc6
+
cb1cc6
+        u_range = list(range(self.num_users))
cb1cc6
+        random.shuffle(u_range)
cb1cc6
+
cb1cc6
+        for idx in u_range:
cb1cc6
+            try:
cb1cc6
+                users.create(properties={
cb1cc6
+                    'uid': 'testuser%s' % idx,
cb1cc6
+                    'cn' : 'testuser%s' % idx,
cb1cc6
+                    'sn' : 'user%s' % idx,
cb1cc6
+                    'uidNumber' : '%s' % (1000 + idx),
cb1cc6
+                    'gidNumber' : '%s' % (1000 + idx),
cb1cc6
+                    'homeDirectory' : '/home/testuser%s' % idx
cb1cc6
+                })
cb1cc6
+            # One of the suppliers was probably put into read only mode - just break out
cb1cc6
+            except ldap.UNWILLING_TO_PERFORM:
cb1cc6
+                break
cb1cc6
+            except ldap.ALREADY_EXISTS:
cb1cc6
+                pass
cb1cc6
+        conn.close()
cb1cc6
+
cb1cc6
+def remove_some_supplier4_agmts(msg, topology_m4):
cb1cc6
+    """Remove all the repl agmts to supplier4 except from supplier3.  Used by
cb1cc6
+    the force tests."""
cb1cc6
+
cb1cc6
+    log.info('%s: remove the agreements to supplier 4...' % msg)
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    # This will delete m4 from the topo *and* remove all incoming agreements
cb1cc6
+    # to m4.
cb1cc6
+    repl.remove_supplier(topology_m4.ms["supplier4"],
cb1cc6
+        [topology_m4.ms["supplier1"], topology_m4.ms["supplier2"]])
cb1cc6
+
cb1cc6
+def task_done(topology_m4, task_dn, timeout=60):
cb1cc6
+    """Check if the task is complete"""
cb1cc6
+
cb1cc6
+    attrlist = ['nsTaskLog', 'nsTaskStatus', 'nsTaskExitCode',
cb1cc6
+                'nsTaskCurrentItem', 'nsTaskTotalItems']
cb1cc6
+    done = False
cb1cc6
+    count = 0
cb1cc6
+
cb1cc6
+    while not done and count < timeout:
cb1cc6
+        try:
cb1cc6
+            entry = topology_m4.ms["supplier1"].getEntry(task_dn, attrlist=attrlist)
cb1cc6
+            if entry is not None:
cb1cc6
+                if entry.hasAttr('nsTaskExitCode'):
cb1cc6
+                    done = True
cb1cc6
+                    break
cb1cc6
+            else:
cb1cc6
+                done = True
cb1cc6
+                break
cb1cc6
+        except ldap.NO_SUCH_OBJECT:
cb1cc6
+            done = True
cb1cc6
+            break
cb1cc6
+        except ldap.LDAPError:
cb1cc6
+            break
cb1cc6
+        time.sleep(1)
cb1cc6
+        count += 1
cb1cc6
+
cb1cc6
+    return done
cb1cc6
+
cb1cc6
+def check_ruvs(msg, topology_m4, m4rid):
cb1cc6
+    """Check suppliers 1-3 for supplier 4's rid."""
cb1cc6
+    for inst in (topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]):
cb1cc6
+        clean = False
cb1cc6
+        replicas = Replicas(inst)
cb1cc6
+        replica = replicas.get(DEFAULT_SUFFIX)
cb1cc6
+        log.info('check_ruvs for replica %s:%s (suffix:rid)' % (replica.get_suffix(), replica.get_rid()))
cb1cc6
+
cb1cc6
+        count = 0
cb1cc6
+        while not clean and count < 20:
cb1cc6
+            ruv = replica.get_ruv()
cb1cc6
+            if m4rid in ruv._rids:
cb1cc6
+                time.sleep(5)
cb1cc6
+                count = count + 1
cb1cc6
+            else:
cb1cc6
+                clean = True
cb1cc6
+        if not clean:
cb1cc6
+            raise Exception("Supplier %s was not cleaned in time." % inst.serverid)
cb1cc6
+    return True
cb1cc6
+
cb1cc6
+def test_clean_force(topology_m4):
cb1cc6
+    """Check that multiple tasks with a 'force' option work properly
cb1cc6
+
cb1cc6
+    :id: f8810dfe-d2d2-4dd9-ba03-5fc14896fabe
cb1cc6
+    :setup: Replication setup with four suppliers
cb1cc6
+    :steps:
cb1cc6
+        1. Stop supplier 3
cb1cc6
+        2. Add a bunch of updates to supplier 4
cb1cc6
+        3. Disable replication on supplier 4
cb1cc6
+        4. Start supplier 3
cb1cc6
+        5. Remove agreements to supplier 4 from other suppliers
cb1cc6
+        6. Run a cleanallruv task on supplier 1 with a 'force' option 'on'
cb1cc6
+        7. Check that everything was cleaned
cb1cc6
+    :expectedresults:
cb1cc6
+        1. Supplier 3 should be stopped
cb1cc6
+        2. Operation should be successful
cb1cc6
+        3. Replication on supplier 4 should be disabled
cb1cc6
+        4. Supplier 3 should be started
cb1cc6
+        5. Agreements to supplier 4 should be removed
cb1cc6
+        6. Operation should be successful
cb1cc6
+        7. Everything should be cleaned
cb1cc6
+    """
cb1cc6
+
cb1cc6
+    log.info('Running test_clean_force...')
cb1cc6
+
cb1cc6
+    # Stop supplier 3, while we update supplier 4, so that 3 is behind the other suppliers
cb1cc6
+    topology_m4.ms["supplier3"].stop()
cb1cc6
+
cb1cc6
+    # Add a bunch of updates to supplier 4
cb1cc6
+    m4_add_users = AddUsers(topology_m4.ms["supplier4"], 10)
cb1cc6
+    m4_add_users.start()
cb1cc6
+    m4_add_users.join()
cb1cc6
+
cb1cc6
+    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
+    remove_some_supplier4_agmts("test_clean_force", topology_m4)
cb1cc6
+
cb1cc6
+    # Start supplier 3, it should be out of sync with the other replicas...
cb1cc6
+    topology_m4.ms["supplier3"].start()
cb1cc6
+
cb1cc6
+    # Remove the agreement to replica 4
cb1cc6
+    replica = Replicas(topology_m4.ms["supplier3"]).get(DEFAULT_SUFFIX)
cb1cc6
+    replica.get_agreements().get("004").delete()
cb1cc6
+
cb1cc6
+    # Run the task, use "force" because supplier 3 is not in sync with the other replicas
cb1cc6
+    # in regards to the replica 4 RUV
cb1cc6
+    log.info('test_clean: run the cleanAllRUV task...')
cb1cc6
+    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
+    cruv_task.create(properties={
cb1cc6
+        'replica-id': m4rid,
cb1cc6
+        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+        'replica-force-cleaning': 'yes'
cb1cc6
+        })
cb1cc6
+    cruv_task.wait()
cb1cc6
+
cb1cc6
+    # Check the other supplier's RUV for 'replica 4'
cb1cc6
+    log.info('test_clean_force: check all the suppliers have been cleaned...')
cb1cc6
+    clean = check_ruvs("test_clean_force", topology_m4, m4rid)
cb1cc6
+    assert clean
cb1cc6
+
cb1cc6
+    log.info('test_clean_force PASSED')
cb1cc6
+
cb1cc6
+
cb1cc6
+if __name__ == '__main__':
cb1cc6
+    # Run isolated
cb1cc6
+    # -s for DEBUG mode
cb1cc6
+    CURRENT_FILE = os.path.realpath(__file__)
cb1cc6
+    pytest.main(["-s", CURRENT_FILE])
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_multiple_force_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_multiple_force_test.py
cb1cc6
new file mode 100644
cb1cc6
index 000000000..0a0848bda
cb1cc6
--- /dev/null
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_multiple_force_test.py
cb1cc6
@@ -0,0 +1,214 @@
cb1cc6
+# --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
+# All rights reserved.
cb1cc6
+#
cb1cc6
+# License: GPL (version 3 or any later version).
cb1cc6
+# See LICENSE for details.
cb1cc6
+# --- END COPYRIGHT BLOCK ---
cb1cc6
+#
cb1cc6
+import ldap
cb1cc6
+import logging
cb1cc6
+import os
cb1cc6
+import pytest
cb1cc6
+import random
cb1cc6
+import time
cb1cc6
+import threading
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
+from lib389.topologies import topology_m4
cb1cc6
+from lib389.tasks import CleanAllRUVTask
cb1cc6
+from lib389.idm.directorymanager import DirectoryManager
cb1cc6
+from lib389.idm.user import UserAccounts
cb1cc6
+from lib389.replica import ReplicationManager, Replicas
cb1cc6
+
cb1cc6
+log = logging.getLogger(__name__)
cb1cc6
+
cb1cc6
+
cb1cc6
+class AddUsers(threading.Thread):
cb1cc6
+    def __init__(self, inst, num_users):
cb1cc6
+        threading.Thread.__init__(self)
cb1cc6
+        self.daemon = True
cb1cc6
+        self.inst = inst
cb1cc6
+        self.num_users = num_users
cb1cc6
+
cb1cc6
+    def run(self):
cb1cc6
+        """Start adding users"""
cb1cc6
+
cb1cc6
+        dm = DirectoryManager(self.inst)
cb1cc6
+        conn = dm.bind()
cb1cc6
+
cb1cc6
+        users = UserAccounts(conn, DEFAULT_SUFFIX)
cb1cc6
+
cb1cc6
+        u_range = list(range(self.num_users))
cb1cc6
+        random.shuffle(u_range)
cb1cc6
+
cb1cc6
+        for idx in u_range:
cb1cc6
+            try:
cb1cc6
+                users.create(properties={
cb1cc6
+                    'uid': 'testuser%s' % idx,
cb1cc6
+                    'cn' : 'testuser%s' % idx,
cb1cc6
+                    'sn' : 'user%s' % idx,
cb1cc6
+                    'uidNumber' : '%s' % (1000 + idx),
cb1cc6
+                    'gidNumber' : '%s' % (1000 + idx),
cb1cc6
+                    'homeDirectory' : '/home/testuser%s' % idx
cb1cc6
+                })
cb1cc6
+            # One of the suppliers was probably put into read only mode - just break out
cb1cc6
+            except ldap.UNWILLING_TO_PERFORM:
cb1cc6
+                break
cb1cc6
+            except ldap.ALREADY_EXISTS:
cb1cc6
+                pass
cb1cc6
+        conn.close()
cb1cc6
+
cb1cc6
+def remove_some_supplier4_agmts(msg, topology_m4):
cb1cc6
+    """Remove all the repl agmts to supplier4 except from supplier3.  Used by
cb1cc6
+    the force tests."""
cb1cc6
+
cb1cc6
+    log.info('%s: remove the agreements to supplier 4...' % msg)
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    # This will delete m4 from the topo *and* remove all incoming agreements
cb1cc6
+    # to m4.
cb1cc6
+    repl.remove_supplier(topology_m4.ms["supplier4"],
cb1cc6
+        [topology_m4.ms["supplier1"], topology_m4.ms["supplier2"]])
cb1cc6
+
cb1cc6
+def task_done(topology_m4, task_dn, timeout=60):
cb1cc6
+    """Check if the task is complete"""
cb1cc6
+
cb1cc6
+    attrlist = ['nsTaskLog', 'nsTaskStatus', 'nsTaskExitCode',
cb1cc6
+                'nsTaskCurrentItem', 'nsTaskTotalItems']
cb1cc6
+    done = False
cb1cc6
+    count = 0
cb1cc6
+
cb1cc6
+    while not done and count < timeout:
cb1cc6
+        try:
cb1cc6
+            entry = topology_m4.ms["supplier1"].getEntry(task_dn, attrlist=attrlist)
cb1cc6
+            if entry is not None:
cb1cc6
+                if entry.hasAttr('nsTaskExitCode'):
cb1cc6
+                    done = True
cb1cc6
+                    break
cb1cc6
+            else:
cb1cc6
+                done = True
cb1cc6
+                break
cb1cc6
+        except ldap.NO_SUCH_OBJECT:
cb1cc6
+            done = True
cb1cc6
+            break
cb1cc6
+        except ldap.LDAPError:
cb1cc6
+            break
cb1cc6
+        time.sleep(1)
cb1cc6
+        count += 1
cb1cc6
+
cb1cc6
+    return done
cb1cc6
+
cb1cc6
+def check_ruvs(msg, topology_m4, m4rid):
cb1cc6
+    """Check suppliers 1-3 for supplier 4's rid."""
cb1cc6
+    for inst in (topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]):
cb1cc6
+        clean = False
cb1cc6
+        replicas = Replicas(inst)
cb1cc6
+        replica = replicas.get(DEFAULT_SUFFIX)
cb1cc6
+        log.info('check_ruvs for replica %s:%s (suffix:rid)' % (replica.get_suffix(), replica.get_rid()))
cb1cc6
+
cb1cc6
+        count = 0
cb1cc6
+        while not clean and count < 20:
cb1cc6
+            ruv = replica.get_ruv()
cb1cc6
+            if m4rid in ruv._rids:
cb1cc6
+                time.sleep(5)
cb1cc6
+                count = count + 1
cb1cc6
+            else:
cb1cc6
+                clean = True
cb1cc6
+        if not clean:
cb1cc6
+            raise Exception("Supplier %s was not cleaned in time." % inst.serverid)
cb1cc6
+    return True
cb1cc6
+
cb1cc6
+
cb1cc6
+def test_multiple_tasks_with_force(topology_m4):
cb1cc6
+    """Check that multiple tasks with a 'force' option work properly
cb1cc6
+
cb1cc6
+    :id: eb76a93d-8d1c-405e-9f25-6e8d5a781098
cb1cc6
+    :setup: Replication setup with four suppliers
cb1cc6
+    :steps:
cb1cc6
+        1. Stop supplier 3
cb1cc6
+        2. Add a bunch of updates to supplier 4
cb1cc6
+        3. Disable replication on supplier 4
cb1cc6
+        4. Start supplier 3
cb1cc6
+        5. Remove agreements to supplier 4 from other suppliers
cb1cc6
+        6. Run a cleanallruv task on supplier 1 with a 'force' option 'on'
cb1cc6
+        7. Run one more cleanallruv task on supplier 1 with a 'force' option 'off'
cb1cc6
+        8. Check that everything was cleaned
cb1cc6
+    :expectedresults:
cb1cc6
+        1. Supplier 3 should be stopped
cb1cc6
+        2. Operation should be successful
cb1cc6
+        3. Replication on supplier 4 should be disabled
cb1cc6
+        4. Supplier 3 should be started
cb1cc6
+        5. Agreements to supplier 4 should be removed
cb1cc6
+        6. Operation should be successful
cb1cc6
+        7. Operation should be successful
cb1cc6
+        8. Everything should be cleaned
cb1cc6
+    """
cb1cc6
+
cb1cc6
+    log.info('Running test_multiple_tasks_with_force...')
cb1cc6
+
cb1cc6
+    # Stop supplier 3, while we update supplier 4, so that 3 is behind the other suppliers
cb1cc6
+    topology_m4.ms["supplier3"].stop()
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
+
cb1cc6
+    # Add a bunch of updates to supplier 4
cb1cc6
+    m4_add_users = AddUsers(topology_m4.ms["supplier4"], 10)
cb1cc6
+    m4_add_users.start()
cb1cc6
+    m4_add_users.join()
cb1cc6
+
cb1cc6
+    # Disable supplier 4
cb1cc6
+    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
+    remove_some_supplier4_agmts("test_multiple_tasks_with_force", topology_m4)
cb1cc6
+
cb1cc6
+    # Start supplier 3, it should be out of sync with the other replicas...
cb1cc6
+    topology_m4.ms["supplier3"].start()
cb1cc6
+
cb1cc6
+    # Remove the agreement to replica 4
cb1cc6
+    replica = Replicas(topology_m4.ms["supplier3"]).get(DEFAULT_SUFFIX)
cb1cc6
+    replica.get_agreements().get("004").delete()
cb1cc6
+
cb1cc6
+    # Run the task, use "force" because supplier 3 is not in sync with the other replicas
cb1cc6
+    # in regards to the replica 4 RUV
cb1cc6
+    log.info('test_multiple_tasks_with_force: run the cleanAllRUV task with "force" on...')
cb1cc6
+    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
+    cruv_task.create(properties={
cb1cc6
+        'replica-id': m4rid,
cb1cc6
+        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+        'replica-force-cleaning': 'yes',
cb1cc6
+        'replica-certify-all': 'no'
cb1cc6
+        })
cb1cc6
+
cb1cc6
+    log.info('test_multiple_tasks_with_force: run the cleanAllRUV task with "force" off...')
cb1cc6
+
cb1cc6
+    # NOTE: This must be try not py.test raises, because the above may or may
cb1cc6
+    # not have completed yet ....
cb1cc6
+    try:
cb1cc6
+        cruv_task_fail = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
+        cruv_task_fail.create(properties={
cb1cc6
+            'replica-id': m4rid,
cb1cc6
+            'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+            'replica-force-cleaning': 'no',
cb1cc6
+            'replica-certify-all': 'no'
cb1cc6
+            })
cb1cc6
+        cruv_task_fail.wait()
cb1cc6
+    except ldap.UNWILLING_TO_PERFORM:
cb1cc6
+        pass
cb1cc6
+    # Wait for the force task ....
cb1cc6
+    cruv_task.wait()
cb1cc6
+
cb1cc6
+    # Check the other supplier's RUV for 'replica 4'
cb1cc6
+    log.info('test_multiple_tasks_with_force: check all the suppliers have been cleaned...')
cb1cc6
+    clean = check_ruvs("test_clean_force", topology_m4, m4rid)
cb1cc6
+    assert clean
cb1cc6
+    # Check supplier 1 does not have the clean task running
cb1cc6
+    log.info('test_abort: check supplier 1 no longer has a cleanAllRUV task...')
cb1cc6
+    if not task_done(topology_m4, cruv_task.dn):
cb1cc6
+        log.fatal('test_abort: CleanAllRUV task was not aborted')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+
cb1cc6
+if __name__ == '__main__':
cb1cc6
+    # Run isolated
cb1cc6
+    # -s for DEBUG mode
cb1cc6
+    CURRENT_FILE = os.path.realpath(__file__)
cb1cc6
+    pytest.main(["-s", CURRENT_FILE])
cb1cc6
+
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_restart_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_restart_test.py
cb1cc6
new file mode 100644
cb1cc6
index 000000000..2e8d7e4a6
cb1cc6
--- /dev/null
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_restart_test.py
cb1cc6
@@ -0,0 +1,161 @@
cb1cc6
+# --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
+# All rights reserved.
cb1cc6
+#
cb1cc6
+# License: GPL (version 3 or any later version).
cb1cc6
+# See LICENSE for details.
cb1cc6
+# --- END COPYRIGHT BLOCK ---
cb1cc6
+#
cb1cc6
+import logging
cb1cc6
+import pytest
cb1cc6
+import os
cb1cc6
+import time
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
+from lib389.topologies import topology_m4
cb1cc6
+from lib389.tasks import CleanAllRUVTask
cb1cc6
+from lib389.replica import ReplicationManager, Replicas
cb1cc6
+
cb1cc6
+log = logging.getLogger(__name__)
cb1cc6
+
cb1cc6
+
cb1cc6
+def remove_supplier4_agmts(msg, topology_m4):
cb1cc6
+    """Remove all the repl agmts to supplier4. """
cb1cc6
+
cb1cc6
+    log.info('%s: remove all the agreements to supplier 4...' % msg)
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    # This will delete m4 from the topo *and* remove all incoming agreements
cb1cc6
+    # to m4.
cb1cc6
+    repl.remove_supplier(topology_m4.ms["supplier4"],
cb1cc6
+        [topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]])
cb1cc6
+
cb1cc6
+def task_done(topology_m4, task_dn, timeout=60):
cb1cc6
+    """Check if the task is complete"""
cb1cc6
+
cb1cc6
+    attrlist = ['nsTaskLog', 'nsTaskStatus', 'nsTaskExitCode',
cb1cc6
+                'nsTaskCurrentItem', 'nsTaskTotalItems']
cb1cc6
+    done = False
cb1cc6
+    count = 0
cb1cc6
+
cb1cc6
+    while not done and count < timeout:
cb1cc6
+        try:
cb1cc6
+            entry = topology_m4.ms["supplier1"].getEntry(task_dn, attrlist=attrlist)
cb1cc6
+            if entry is not None:
cb1cc6
+                if entry.hasAttr('nsTaskExitCode'):
cb1cc6
+                    done = True
cb1cc6
+                    break
cb1cc6
+            else:
cb1cc6
+                done = True
cb1cc6
+                break
cb1cc6
+        except ldap.NO_SUCH_OBJECT:
cb1cc6
+            done = True
cb1cc6
+            break
cb1cc6
+        except ldap.LDAPError:
cb1cc6
+            break
cb1cc6
+        time.sleep(1)
cb1cc6
+        count += 1
cb1cc6
+
cb1cc6
+    return done
cb1cc6
+
cb1cc6
+
cb1cc6
+def check_ruvs(msg, topology_m4, m4rid):
cb1cc6
+    """Check suppliers 1-3 for supplier 4's rid."""
cb1cc6
+    for inst in (topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]):
cb1cc6
+        clean = False
cb1cc6
+        replicas = Replicas(inst)
cb1cc6
+        replica = replicas.get(DEFAULT_SUFFIX)
cb1cc6
+        log.info('check_ruvs for replica %s:%s (suffix:rid)' % (replica.get_suffix(), replica.get_rid()))
cb1cc6
+
cb1cc6
+        count = 0
cb1cc6
+        while not clean and count < 20:
cb1cc6
+            ruv = replica.get_ruv()
cb1cc6
+            if m4rid in ruv._rids:
cb1cc6
+                time.sleep(5)
cb1cc6
+                count = count + 1
cb1cc6
+            else:
cb1cc6
+                clean = True
cb1cc6
+        if not clean:
cb1cc6
+            raise Exception("Supplier %s was not cleaned in time." % inst.serverid)
cb1cc6
+    return True
cb1cc6
+    
cb1cc6
+
cb1cc6
+@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
+def test_clean_restart(topology_m4):
cb1cc6
+    """Check that cleanallruv task works properly after a restart
cb1cc6
+
cb1cc6
+    :id: c6233bb3-092c-4919-9ac9-80dd02cc6e02
cb1cc6
+    :setup: Replication setup with four suppliers
cb1cc6
+    :steps:
cb1cc6
+        1. Disable replication on supplier 4
cb1cc6
+        2. Remove agreements to supplier 4 from other suppliers
cb1cc6
+        3. Stop supplier 3
cb1cc6
+        4. Run a cleanallruv task on supplier 1
cb1cc6
+        5. Stop supplier 1
cb1cc6
+        6. Start supplier 3
cb1cc6
+        7. Make sure that no crash happened
cb1cc6
+        8. Start supplier 1
cb1cc6
+        9. Make sure that no crash happened
cb1cc6
+        10. Check that everything was cleaned
cb1cc6
+    :expectedresults:
cb1cc6
+        1. Operation should be successful
cb1cc6
+        2. Agreements to supplier 4 should be removed
cb1cc6
+        3. Supplier 3 should be stopped
cb1cc6
+        4. Cleanallruv task should be successfully executed
cb1cc6
+        5. Supplier 1 should be stopped
cb1cc6
+        6. Supplier 3 should be started
cb1cc6
+        7. No crash should happened
cb1cc6
+        8. Supplier 1 should be started
cb1cc6
+        9. No crash should happened
cb1cc6
+        10. Everything should be cleaned
cb1cc6
+    """
cb1cc6
+    log.info('Running test_clean_restart...')
cb1cc6
+
cb1cc6
+    # Disable supplier 4
cb1cc6
+    log.info('test_clean: disable supplier 4...')
cb1cc6
+
cb1cc6
+    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
+    remove_supplier4_agmts("test_clean", topology_m4)
cb1cc6
+
cb1cc6
+    # Stop supplier 3 to keep the task running, so we can stop supplier 1...
cb1cc6
+    topology_m4.ms["supplier3"].stop()
cb1cc6
+
cb1cc6
+    # Run the task
cb1cc6
+    log.info('test_clean: run the cleanAllRUV task...')
cb1cc6
+    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
+    cruv_task.create(properties={
cb1cc6
+        'replica-id': m4rid,
cb1cc6
+        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+        'replica-force-cleaning': 'no',
cb1cc6
+        'replica-certify-all': 'yes'
cb1cc6
+        })
cb1cc6
+
cb1cc6
+    # Sleep a bit, then stop supplier 1
cb1cc6
+    time.sleep(5)
cb1cc6
+    topology_m4.ms["supplier1"].stop()
cb1cc6
+
cb1cc6
+    # Now start supplier 3 & 1, and make sure we didn't crash
cb1cc6
+    topology_m4.ms["supplier3"].start()
cb1cc6
+    if topology_m4.ms["supplier3"].detectDisorderlyShutdown():
cb1cc6
+        log.fatal('test_clean_restart: Supplier 3 previously crashed!')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    topology_m4.ms["supplier1"].start(timeout=30)
cb1cc6
+    if topology_m4.ms["supplier1"].detectDisorderlyShutdown():
cb1cc6
+        log.fatal('test_clean_restart: Supplier 1 previously crashed!')
cb1cc6
+        assert False
cb1cc6
+
cb1cc6
+    # Check the other supplier's RUV for 'replica 4'
cb1cc6
+    log.info('test_clean_restart: check all the suppliers have been cleaned...')
cb1cc6
+    clean = check_ruvs("test_clean_restart", topology_m4, m4rid)
cb1cc6
+    assert clean
cb1cc6
+
cb1cc6
+    log.info('test_clean_restart PASSED, restoring supplier 4...')
cb1cc6
+
cb1cc6
+
cb1cc6
+if __name__ == '__main__':
cb1cc6
+    # Run isolated
cb1cc6
+    # -s for DEBUG mode
cb1cc6
+    CURRENT_FILE = os.path.realpath(__file__)
cb1cc6
+    pytest.main(["-s", CURRENT_FILE])
cb1cc6
+
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_shutdown_crash_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_shutdown_crash_test.py
cb1cc6
new file mode 100644
cb1cc6
index 000000000..b4b74e339
cb1cc6
--- /dev/null
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_shutdown_crash_test.py
cb1cc6
@@ -0,0 +1,123 @@
cb1cc6
+# --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
+# All rights reserved.
cb1cc6
+#
cb1cc6
+# License: GPL (version 3 or any later version).
cb1cc6
+# See LICENSE for details.
cb1cc6
+# --- END COPYRIGHT BLOCK ---
cb1cc6
+#
cb1cc6
+import logging
cb1cc6
+import pytest
cb1cc6
+import os
cb1cc6
+import time
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
+from lib389.topologies import topology_m4
cb1cc6
+from lib389.tasks import CleanAllRUVTask
cb1cc6
+from lib389.replica import ReplicationManager, Replicas
cb1cc6
+from lib389.config import CertmapLegacy
cb1cc6
+from lib389.idm.services import ServiceAccounts
cb1cc6
+
cb1cc6
+log = logging.getLogger(__name__)
cb1cc6
+
cb1cc6
+
cb1cc6
+def test_clean_shutdown_crash(topology_m2):
cb1cc6
+    """Check that server didn't crash after shutdown when running CleanAllRUV task
cb1cc6
+
cb1cc6
+    :id: c34d0b40-3c3e-4f53-8656-5e4c2a310aaf
cb1cc6
+    :setup: Replication setup with two suppliers
cb1cc6
+    :steps:
cb1cc6
+        1. Enable TLS on both suppliers
cb1cc6
+        2. Reconfigure both agreements to use TLS Client auth
cb1cc6
+        3. Stop supplier2
cb1cc6
+        4. Run the CleanAllRUV task
cb1cc6
+        5. Restart supplier1
cb1cc6
+        6. Check if supplier1 didn't crash
cb1cc6
+        7. Restart supplier1 again
cb1cc6
+        8. Check if supplier1 didn't crash
cb1cc6
+
cb1cc6
+    :expectedresults:
cb1cc6
+        1. Success
cb1cc6
+        2. Success
cb1cc6
+        3. Success
cb1cc6
+        4. Success
cb1cc6
+        5. Success
cb1cc6
+        6. Success
cb1cc6
+        7. Success
cb1cc6
+        8. Success
cb1cc6
+    """
cb1cc6
+
cb1cc6
+    m1 = topology_m2.ms["supplier1"]
cb1cc6
+    m2 = topology_m2.ms["supplier2"]
cb1cc6
+
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+
cb1cc6
+    cm_m1 = CertmapLegacy(m1)
cb1cc6
+    cm_m2 = CertmapLegacy(m2)
cb1cc6
+
cb1cc6
+    certmaps = cm_m1.list()
cb1cc6
+    certmaps['default']['DNComps'] = None
cb1cc6
+    certmaps['default']['CmapLdapAttr'] = 'nsCertSubjectDN'
cb1cc6
+
cb1cc6
+    cm_m1.set(certmaps)
cb1cc6
+    cm_m2.set(certmaps)
cb1cc6
+
cb1cc6
+    log.info('Enabling TLS')
cb1cc6
+    [i.enable_tls() for i in topology_m2]
cb1cc6
+
cb1cc6
+    log.info('Creating replication dns')
cb1cc6
+    services = ServiceAccounts(m1, DEFAULT_SUFFIX)
cb1cc6
+    repl_m1 = services.get('%s:%s' % (m1.host, m1.sslport))
cb1cc6
+    repl_m1.set('nsCertSubjectDN', m1.get_server_tls_subject())
cb1cc6
+
cb1cc6
+    repl_m2 = services.get('%s:%s' % (m2.host, m2.sslport))
cb1cc6
+    repl_m2.set('nsCertSubjectDN', m2.get_server_tls_subject())
cb1cc6
+
cb1cc6
+    log.info('Changing auth type')
cb1cc6
+    replica_m1 = Replicas(m1).get(DEFAULT_SUFFIX)
cb1cc6
+    agmt_m1 = replica_m1.get_agreements().list()[0]
cb1cc6
+    agmt_m1.replace_many(
cb1cc6
+        ('nsDS5ReplicaBindMethod', 'SSLCLIENTAUTH'),
cb1cc6
+        ('nsDS5ReplicaTransportInfo', 'SSL'),
cb1cc6
+        ('nsDS5ReplicaPort', '%s' % m2.sslport),
cb1cc6
+    )
cb1cc6
+
cb1cc6
+    agmt_m1.remove_all('nsDS5ReplicaBindDN')
cb1cc6
+
cb1cc6
+    replica_m2 = Replicas(m2).get(DEFAULT_SUFFIX)
cb1cc6
+    agmt_m2 = replica_m2.get_agreements().list()[0]
cb1cc6
+
cb1cc6
+    agmt_m2.replace_many(
cb1cc6
+        ('nsDS5ReplicaBindMethod', 'SSLCLIENTAUTH'),
cb1cc6
+        ('nsDS5ReplicaTransportInfo', 'SSL'),
cb1cc6
+        ('nsDS5ReplicaPort', '%s' % m1.sslport),
cb1cc6
+    )
cb1cc6
+    agmt_m2.remove_all('nsDS5ReplicaBindDN')
cb1cc6
+
cb1cc6
+    log.info('Stopping supplier2')
cb1cc6
+    m2.stop()
cb1cc6
+
cb1cc6
+    log.info('Run the cleanAllRUV task')
cb1cc6
+    cruv_task = CleanAllRUVTask(m1)
cb1cc6
+    cruv_task.create(properties={
cb1cc6
+        'replica-id': repl.get_rid(m1),
cb1cc6
+        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+        'replica-force-cleaning': 'no',
cb1cc6
+        'replica-certify-all': 'yes'
cb1cc6
+    })
cb1cc6
+
cb1cc6
+    m1.restart()
cb1cc6
+
cb1cc6
+    log.info('Check if supplier1 crashed')
cb1cc6
+    assert not m1.detectDisorderlyShutdown()
cb1cc6
+
cb1cc6
+    log.info('Repeat')
cb1cc6
+    m1.restart()
cb1cc6
+    assert not m1.detectDisorderlyShutdown()
cb1cc6
+
cb1cc6
+
cb1cc6
+if __name__ == '__main__':
cb1cc6
+    # Run isolated
cb1cc6
+    # -s for DEBUG mode
cb1cc6
+    CURRENT_FILE = os.path.realpath(__file__)
cb1cc6
+    pytest.main(["-s", CURRENT_FILE])
cb1cc6
+
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_stress_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_stress_test.py
cb1cc6
new file mode 100644
cb1cc6
index 000000000..0d43dd7d4
cb1cc6
--- /dev/null
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_stress_test.py
cb1cc6
@@ -0,0 +1,216 @@
cb1cc6
+# --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
+# All rights reserved.
cb1cc6
+#
cb1cc6
+# License: GPL (version 3 or any later version).
cb1cc6
+# See LICENSE for details.
cb1cc6
+# --- END COPYRIGHT BLOCK ---
cb1cc6
+#
cb1cc6
+import ldap
cb1cc6
+import logging
cb1cc6
+import pytest
cb1cc6
+import os
cb1cc6
+import random
cb1cc6
+import time
cb1cc6
+import threading
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
+from lib389.topologies import topology_m4
cb1cc6
+from lib389.tasks import CleanAllRUVTask
cb1cc6
+from lib389.idm.directorymanager import DirectoryManager
cb1cc6
+from lib389.idm.user import UserAccounts
cb1cc6
+from lib389.replica import ReplicationManager, Replicas
cb1cc6
+from lib389.config import LDBMConfig
cb1cc6
+
cb1cc6
+log = logging.getLogger(__name__)
cb1cc6
+
cb1cc6
+
cb1cc6
+class AddUsers(threading.Thread):
cb1cc6
+    def __init__(self, inst, num_users):
cb1cc6
+        threading.Thread.__init__(self)
cb1cc6
+        self.daemon = True
cb1cc6
+        self.inst = inst
cb1cc6
+        self.num_users = num_users
cb1cc6
+
cb1cc6
+    def run(self):
cb1cc6
+        """Start adding users"""
cb1cc6
+
cb1cc6
+        dm = DirectoryManager(self.inst)
cb1cc6
+        conn = dm.bind()
cb1cc6
+
cb1cc6
+        users = UserAccounts(conn, DEFAULT_SUFFIX)
cb1cc6
+
cb1cc6
+        u_range = list(range(self.num_users))
cb1cc6
+        random.shuffle(u_range)
cb1cc6
+
cb1cc6
+        for idx in u_range:
cb1cc6
+            try:
cb1cc6
+                users.create(properties={
cb1cc6
+                    'uid': 'testuser%s' % idx,
cb1cc6
+                    'cn' : 'testuser%s' % idx,
cb1cc6
+                    'sn' : 'user%s' % idx,
cb1cc6
+                    'uidNumber' : '%s' % (1000 + idx),
cb1cc6
+                    'gidNumber' : '%s' % (1000 + idx),
cb1cc6
+                    'homeDirectory' : '/home/testuser%s' % idx
cb1cc6
+                })
cb1cc6
+            # One of the suppliers was probably put into read only mode - just break out
cb1cc6
+            except ldap.UNWILLING_TO_PERFORM:
cb1cc6
+                break
cb1cc6
+            except ldap.ALREADY_EXISTS:
cb1cc6
+                pass
cb1cc6
+        conn.close()
cb1cc6
+
cb1cc6
+def remove_supplier4_agmts(msg, topology_m4):
cb1cc6
+    """Remove all the repl agmts to supplier4. """
cb1cc6
+
cb1cc6
+    log.info('%s: remove all the agreements to supplier 4...' % msg)
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    # This will delete m4 from the topo *and* remove all incoming agreements
cb1cc6
+    # to m4.
cb1cc6
+    repl.remove_supplier(topology_m4.ms["supplier4"],
cb1cc6
+        [topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]])
cb1cc6
+
cb1cc6
+def task_done(topology_m4, task_dn, timeout=60):
cb1cc6
+    """Check if the task is complete"""
cb1cc6
+
cb1cc6
+    attrlist = ['nsTaskLog', 'nsTaskStatus', 'nsTaskExitCode',
cb1cc6
+                'nsTaskCurrentItem', 'nsTaskTotalItems']
cb1cc6
+    done = False
cb1cc6
+    count = 0
cb1cc6
+
cb1cc6
+    while not done and count < timeout:
cb1cc6
+        try:
cb1cc6
+            entry = topology_m4.ms["supplier1"].getEntry(task_dn, attrlist=attrlist)
cb1cc6
+            if entry is not None:
cb1cc6
+                if entry.hasAttr('nsTaskExitCode'):
cb1cc6
+                    done = True
cb1cc6
+                    break
cb1cc6
+            else:
cb1cc6
+                done = True
cb1cc6
+                break
cb1cc6
+        except ldap.NO_SUCH_OBJECT:
cb1cc6
+            done = True
cb1cc6
+            break
cb1cc6
+        except ldap.LDAPError:
cb1cc6
+            break
cb1cc6
+        time.sleep(1)
cb1cc6
+        count += 1
cb1cc6
+
cb1cc6
+    return done
cb1cc6
+
cb1cc6
+def check_ruvs(msg, topology_m4, m4rid):
cb1cc6
+    """Check suppliers 1-3 for supplier 4's rid."""
cb1cc6
+    for inst in (topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]):
cb1cc6
+        clean = False
cb1cc6
+        replicas = Replicas(inst)
cb1cc6
+        replica = replicas.get(DEFAULT_SUFFIX)
cb1cc6
+        log.info('check_ruvs for replica %s:%s (suffix:rid)' % (replica.get_suffix(), replica.get_rid()))
cb1cc6
+
cb1cc6
+        count = 0
cb1cc6
+        while not clean and count < 20:
cb1cc6
+            ruv = replica.get_ruv()
cb1cc6
+            if m4rid in ruv._rids:
cb1cc6
+                time.sleep(5)
cb1cc6
+                count = count + 1
cb1cc6
+            else:
cb1cc6
+                clean = True
cb1cc6
+        if not clean:
cb1cc6
+            raise Exception("Supplier %s was not cleaned in time." % inst.serverid)
cb1cc6
+    return True
cb1cc6
+
cb1cc6
+
cb1cc6
+@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
+def test_stress_clean(topology_m4):
cb1cc6
+    """Put each server(m1 - m4) under a stress, and perform the entire clean process
cb1cc6
+
cb1cc6
+    :id: a8263cd6-f068-4357-86e0-e7c34504c8c5
cb1cc6
+    :setup: Replication setup with four suppliers
cb1cc6
+    :steps:
cb1cc6
+        1. Add a bunch of updates to all suppliers
cb1cc6
+        2. Put supplier 4 to read-only mode
cb1cc6
+        3. Disable replication on supplier 4
cb1cc6
+        4. Remove agreements to supplier 4 from other suppliers
cb1cc6
+        5. Run a cleanallruv task on supplier 1
cb1cc6
+        6. Check that everything was cleaned
cb1cc6
+    :expectedresults:
cb1cc6
+        1. Operation should be successful
cb1cc6
+        2. Supplier 4 should be put to read-only mode
cb1cc6
+        3. Replication on supplier 4 should be disabled
cb1cc6
+        4. Agreements to supplier 4 should be removed
cb1cc6
+        5. Operation should be successful
cb1cc6
+        6. Everything should be cleaned
cb1cc6
+    """
cb1cc6
+
cb1cc6
+    log.info('Running test_stress_clean...')
cb1cc6
+    log.info('test_stress_clean: put all the suppliers under load...')
cb1cc6
+
cb1cc6
+    ldbm_config = LDBMConfig(topology_m4.ms["supplier4"])
cb1cc6
+
cb1cc6
+    # Put all the suppliers under load
cb1cc6
+    # not too high load else it takes a long time to converge and
cb1cc6
+    # the test result becomes instable
cb1cc6
+    m1_add_users = AddUsers(topology_m4.ms["supplier1"], 200)
cb1cc6
+    m1_add_users.start()
cb1cc6
+    m2_add_users = AddUsers(topology_m4.ms["supplier2"], 200)
cb1cc6
+    m2_add_users.start()
cb1cc6
+    m3_add_users = AddUsers(topology_m4.ms["supplier3"], 200)
cb1cc6
+    m3_add_users.start()
cb1cc6
+    m4_add_users = AddUsers(topology_m4.ms["supplier4"], 200)
cb1cc6
+    m4_add_users.start()
cb1cc6
+
cb1cc6
+    # Allow sometime to get replication flowing in all directions
cb1cc6
+    log.info('test_stress_clean: allow some time for replication to get flowing...')
cb1cc6
+    time.sleep(5)
cb1cc6
+
cb1cc6
+    # Put supplier 4 into read only mode
cb1cc6
+    ldbm_config.set('nsslapd-readonly', 'on')
cb1cc6
+    # We need to wait for supplier 4 to push its changes out
cb1cc6
+    log.info('test_stress_clean: allow some time for supplier 4 to push changes out (60 seconds)...')
cb1cc6
+    time.sleep(60)
cb1cc6
+
cb1cc6
+    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
+    remove_supplier4_agmts("test_stress_clean", topology_m4)
cb1cc6
+
cb1cc6
+    # Run the task
cb1cc6
+    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
+    cruv_task.create(properties={
cb1cc6
+        'replica-id': m4rid,
cb1cc6
+        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
+        'replica-force-cleaning': 'no'
cb1cc6
+        })
cb1cc6
+    cruv_task.wait()
cb1cc6
+
cb1cc6
+    # Wait for the update to finish
cb1cc6
+    log.info('test_stress_clean: wait for all the updates to finish...')
cb1cc6
+    m1_add_users.join()
cb1cc6
+    m2_add_users.join()
cb1cc6
+    m3_add_users.join()
cb1cc6
+    m4_add_users.join()
cb1cc6
+
cb1cc6
+    # Check the other supplier's RUV for 'replica 4'
cb1cc6
+    log.info('test_stress_clean: check if all the replicas have been cleaned...')
cb1cc6
+    clean = check_ruvs("test_stress_clean", topology_m4, m4rid)
cb1cc6
+    assert clean
cb1cc6
+
cb1cc6
+    log.info('test_stress_clean:  PASSED, restoring supplier 4...')
cb1cc6
+
cb1cc6
+    # Sleep for a bit to replication complete
cb1cc6
+    log.info("Sleep for 120 seconds to allow replication to complete...")
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    repl.test_replication_topology([
cb1cc6
+        topology_m4.ms["supplier1"],
cb1cc6
+        topology_m4.ms["supplier2"],
cb1cc6
+        topology_m4.ms["supplier3"],
cb1cc6
+        ], timeout=120)
cb1cc6
+
cb1cc6
+    # Turn off readonly mode
cb1cc6
+    ldbm_config.set('nsslapd-readonly', 'off')
cb1cc6
+
cb1cc6
+
cb1cc6
+if __name__ == '__main__':
cb1cc6
+    # Run isolated
cb1cc6
+    # -s for DEBUG mode
cb1cc6
+    CURRENT_FILE = os.path.realpath(__file__)
cb1cc6
+    pytest.main(["-s", CURRENT_FILE])
cb1cc6
+
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_test.py
cb1cc6
index 1e9cd7c28..6d7141ada 100644
cb1cc6
--- a/dirsrvtests/tests/suites/replication/cleanallruv_test.py
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/cleanallruv_test.py
cb1cc6
@@ -1,27 +1,20 @@
cb1cc6
 # --- BEGIN COPYRIGHT BLOCK ---
cb1cc6
-# Copyright (C) 2019 Red Hat, Inc.
cb1cc6
+# Copyright (C) 2022 Red Hat, Inc.
cb1cc6
 # All rights reserved.
cb1cc6
 #
cb1cc6
 # License: GPL (version 3 or any later version).
cb1cc6
 # See LICENSE for details.
cb1cc6
 # --- END COPYRIGHT BLOCK ---
cb1cc6
 #
cb1cc6
-import threading
cb1cc6
 import pytest
cb1cc6
-import random
cb1cc6
 from lib389 import DirSrv
cb1cc6
 from lib389.tasks import *
cb1cc6
 from lib389.utils import *
cb1cc6
 from lib389.topologies import topology_m4, topology_m2
cb1cc6
-from lib389._constants import *
cb1cc6
-
cb1cc6
-from lib389.idm.directorymanager import DirectoryManager
cb1cc6
+from lib389._constants import DEFAULT_SUFFIX
cb1cc6
 from lib389.replica import ReplicationManager, Replicas
cb1cc6
 from lib389.tasks import CleanAllRUVTask
cb1cc6
-from lib389.idm.user import UserAccounts
cb1cc6
-from lib389.config import LDBMConfig
cb1cc6
-from lib389.config import CertmapLegacy
cb1cc6
-from lib389.idm.services import ServiceAccounts
cb1cc6
+
cb1cc6
 
cb1cc6
 pytestmark = pytest.mark.tier1
cb1cc6
 
cb1cc6
@@ -29,42 +22,6 @@ logging.getLogger(__name__).setLevel(logging.DEBUG)
cb1cc6
 log = logging.getLogger(__name__)
cb1cc6
 
cb1cc6
 
cb1cc6
-class AddUsers(threading.Thread):
cb1cc6
-    def __init__(self, inst, num_users):
cb1cc6
-        threading.Thread.__init__(self)
cb1cc6
-        self.daemon = True
cb1cc6
-        self.inst = inst
cb1cc6
-        self.num_users = num_users
cb1cc6
-
cb1cc6
-    def run(self):
cb1cc6
-        """Start adding users"""
cb1cc6
-
cb1cc6
-        dm = DirectoryManager(self.inst)
cb1cc6
-        conn = dm.bind()
cb1cc6
-
cb1cc6
-        users = UserAccounts(conn, DEFAULT_SUFFIX)
cb1cc6
-
cb1cc6
-        u_range = list(range(self.num_users))
cb1cc6
-        random.shuffle(u_range)
cb1cc6
-
cb1cc6
-        for idx in u_range:
cb1cc6
-            try:
cb1cc6
-                users.create(properties={
cb1cc6
-                    'uid': 'testuser%s' % idx,
cb1cc6
-                    'cn' : 'testuser%s' % idx,
cb1cc6
-                    'sn' : 'user%s' % idx,
cb1cc6
-                    'uidNumber' : '%s' % (1000 + idx),
cb1cc6
-                    'gidNumber' : '%s' % (1000 + idx),
cb1cc6
-                    'homeDirectory' : '/home/testuser%s' % idx
cb1cc6
-                })
cb1cc6
-            # One of the suppliers was probably put into read only mode - just break out
cb1cc6
-            except ldap.UNWILLING_TO_PERFORM:
cb1cc6
-                break
cb1cc6
-            except ldap.ALREADY_EXISTS:
cb1cc6
-                pass
cb1cc6
-        conn.close()
cb1cc6
-
cb1cc6
-
cb1cc6
 def remove_supplier4_agmts(msg, topology_m4):
cb1cc6
     """Remove all the repl agmts to supplier4. """
cb1cc6
 
cb1cc6
@@ -96,92 +53,7 @@ def check_ruvs(msg, topology_m4, m4rid):
cb1cc6
     return True
cb1cc6
 
cb1cc6
 
cb1cc6
-def task_done(topology_m4, task_dn, timeout=60):
cb1cc6
-    """Check if the task is complete"""
cb1cc6
-
cb1cc6
-    attrlist = ['nsTaskLog', 'nsTaskStatus', 'nsTaskExitCode',
cb1cc6
-                'nsTaskCurrentItem', 'nsTaskTotalItems']
cb1cc6
-    done = False
cb1cc6
-    count = 0
cb1cc6
-
cb1cc6
-    while not done and count < timeout:
cb1cc6
-        try:
cb1cc6
-            entry = topology_m4.ms["supplier1"].getEntry(task_dn, attrlist=attrlist)
cb1cc6
-            if entry is not None:
cb1cc6
-                if entry.hasAttr('nsTaskExitCode'):
cb1cc6
-                    done = True
cb1cc6
-                    break
cb1cc6
-            else:
cb1cc6
-                done = True
cb1cc6
-                break
cb1cc6
-        except ldap.NO_SUCH_OBJECT:
cb1cc6
-            done = True
cb1cc6
-            break
cb1cc6
-        except ldap.LDAPError:
cb1cc6
-            break
cb1cc6
-        time.sleep(1)
cb1cc6
-        count += 1
cb1cc6
-
cb1cc6
-    return done
cb1cc6
-
cb1cc6
-
cb1cc6
-def restore_supplier4(topology_m4):
cb1cc6
-    """In our tests will always be removing supplier 4, so we need a common
cb1cc6
-    way to restore it for another test
cb1cc6
-    """
cb1cc6
-
cb1cc6
-    # Restart the remaining suppliers to allow rid 4 to be reused.
cb1cc6
-    for inst in topology_m4.ms.values():
cb1cc6
-        inst.restart()
cb1cc6
-
cb1cc6
-    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
-    repl.join_supplier(topology_m4.ms["supplier1"], topology_m4.ms["supplier4"])
cb1cc6
-
cb1cc6
-    # Add the 2,3 -> 4 agmt.
cb1cc6
-    repl.ensure_agreement(topology_m4.ms["supplier2"], topology_m4.ms["supplier4"])
cb1cc6
-    repl.ensure_agreement(topology_m4.ms["supplier3"], topology_m4.ms["supplier4"])
cb1cc6
-    # And in reverse ...
cb1cc6
-    repl.ensure_agreement(topology_m4.ms["supplier4"], topology_m4.ms["supplier2"])
cb1cc6
-    repl.ensure_agreement(topology_m4.ms["supplier4"], topology_m4.ms["supplier3"])
cb1cc6
-
cb1cc6
-    log.info('Supplier 4 has been successfully restored.')
cb1cc6
-
cb1cc6
-
cb1cc6
-@pytest.fixture()
cb1cc6
-def m4rid(request, topology_m4):
cb1cc6
-    log.debug("Wait a bit before the reset - it is required for the slow machines")
cb1cc6
-    time.sleep(5)
cb1cc6
-    log.debug("-------------- BEGIN RESET of m4 -----------------")
cb1cc6
-    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
-    repl.test_replication_topology(topology_m4.ms.values())
cb1cc6
-    # What is supplier4's rid?
cb1cc6
-    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
-
cb1cc6
-    def fin():
cb1cc6
-        try:
cb1cc6
-            # Restart the suppliers and rerun cleanallruv
cb1cc6
-            for inst in topology_m4.ms.values():
cb1cc6
-                inst.restart()
cb1cc6
-
cb1cc6
-            cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-            cruv_task.create(properties={
cb1cc6
-                'replica-id': m4rid,
cb1cc6
-                'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-                'replica-force-cleaning': 'no',
cb1cc6
-                })
cb1cc6
-            cruv_task.wait()
cb1cc6
-        except ldap.UNWILLING_TO_PERFORM:
cb1cc6
-            # In some casse we already cleaned rid4, so if we fail, it's okay
cb1cc6
-            pass
cb1cc6
-        restore_supplier4(topology_m4)
cb1cc6
-        # Make sure everything works.
cb1cc6
-        repl.test_replication_topology(topology_m4.ms.values())
cb1cc6
-    request.addfinalizer(fin)
cb1cc6
-    log.debug("-------------- FINISH RESET of m4 -----------------")
cb1cc6
-    return m4rid
cb1cc6
-
cb1cc6
-
cb1cc6
-def test_clean(topology_m4, m4rid):
cb1cc6
+def test_clean(topology_m4):
cb1cc6
     """Check that cleanallruv task works properly
cb1cc6
 
cb1cc6
     :id: e9b3ce5c-e17c-409e-aafc-e97d630f2878
cb1cc6
@@ -204,6 +76,8 @@ def test_clean(topology_m4, m4rid):
cb1cc6
     # Disable supplier 4
cb1cc6
     # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
     log.info('test_clean: disable supplier 4...')
cb1cc6
+    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
+    m4rid = repl.get_rid(topology_m4.ms["supplier4"])
cb1cc6
     remove_supplier4_agmts("test_clean", topology_m4)
cb1cc6
 
cb1cc6
     # Run the task
cb1cc6
@@ -221,610 +95,6 @@ def test_clean(topology_m4, m4rid):
cb1cc6
     clean = check_ruvs("test_clean", topology_m4, m4rid)
cb1cc6
     assert clean
cb1cc6
 
cb1cc6
-    log.info('test_clean PASSED, restoring supplier 4...')
cb1cc6
-
cb1cc6
-@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
-def test_clean_restart(topology_m4, m4rid):
cb1cc6
-    """Check that cleanallruv task works properly after a restart
cb1cc6
-
cb1cc6
-    :id: c6233bb3-092c-4919-9ac9-80dd02cc6e02
cb1cc6
-    :setup: Replication setup with four suppliers
cb1cc6
-    :steps:
cb1cc6
-        1. Disable replication on supplier 4
cb1cc6
-        2. Remove agreements to supplier 4 from other suppliers
cb1cc6
-        3. Stop supplier 3
cb1cc6
-        4. Run a cleanallruv task on supplier 1
cb1cc6
-        5. Stop supplier 1
cb1cc6
-        6. Start supplier 3
cb1cc6
-        7. Make sure that no crash happened
cb1cc6
-        8. Start supplier 1
cb1cc6
-        9. Make sure that no crash happened
cb1cc6
-        10. Check that everything was cleaned
cb1cc6
-    :expectedresults:
cb1cc6
-        1. Operation should be successful
cb1cc6
-        2. Agreements to supplier 4 should be removed
cb1cc6
-        3. Supplier 3 should be stopped
cb1cc6
-        4. Cleanallruv task should be successfully executed
cb1cc6
-        5. Supplier 1 should be stopped
cb1cc6
-        6. Supplier 3 should be started
cb1cc6
-        7. No crash should happened
cb1cc6
-        8. Supplier 1 should be started
cb1cc6
-        9. No crash should happened
cb1cc6
-        10. Everything should be cleaned
cb1cc6
-    """
cb1cc6
-    log.info('Running test_clean_restart...')
cb1cc6
-
cb1cc6
-    # Disable supplier 4
cb1cc6
-    log.info('test_clean: disable supplier 4...')
cb1cc6
-    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
-    remove_supplier4_agmts("test_clean", topology_m4)
cb1cc6
-
cb1cc6
-    # Stop supplier 3 to keep the task running, so we can stop supplier 1...
cb1cc6
-    topology_m4.ms["supplier3"].stop()
cb1cc6
-
cb1cc6
-    # Run the task
cb1cc6
-    log.info('test_clean: run the cleanAllRUV task...')
cb1cc6
-    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-    cruv_task.create(properties={
cb1cc6
-        'replica-id': m4rid,
cb1cc6
-        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-        'replica-force-cleaning': 'no',
cb1cc6
-        'replica-certify-all': 'yes'
cb1cc6
-        })
cb1cc6
-
cb1cc6
-    # Sleep a bit, then stop supplier 1
cb1cc6
-    time.sleep(5)
cb1cc6
-    topology_m4.ms["supplier1"].stop()
cb1cc6
-
cb1cc6
-    # Now start supplier 3 & 1, and make sure we didn't crash
cb1cc6
-    topology_m4.ms["supplier3"].start()
cb1cc6
-    if topology_m4.ms["supplier3"].detectDisorderlyShutdown():
cb1cc6
-        log.fatal('test_clean_restart: Supplier 3 previously crashed!')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    topology_m4.ms["supplier1"].start(timeout=30)
cb1cc6
-    if topology_m4.ms["supplier1"].detectDisorderlyShutdown():
cb1cc6
-        log.fatal('test_clean_restart: Supplier 1 previously crashed!')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    # Check the other supplier's RUV for 'replica 4'
cb1cc6
-    log.info('test_clean_restart: check all the suppliers have been cleaned...')
cb1cc6
-    clean = check_ruvs("test_clean_restart", topology_m4, m4rid)
cb1cc6
-    assert clean
cb1cc6
-
cb1cc6
-    log.info('test_clean_restart PASSED, restoring supplier 4...')
cb1cc6
-
cb1cc6
-
cb1cc6
-@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
-def test_clean_force(topology_m4, m4rid):
cb1cc6
-    """Check that multiple tasks with a 'force' option work properly
cb1cc6
-
cb1cc6
-    :id: f8810dfe-d2d2-4dd9-ba03-5fc14896fabe
cb1cc6
-    :setup: Replication setup with four suppliers
cb1cc6
-    :steps:
cb1cc6
-        1. Stop supplier 3
cb1cc6
-        2. Add a bunch of updates to supplier 4
cb1cc6
-        3. Disable replication on supplier 4
cb1cc6
-        4. Start supplier 3
cb1cc6
-        5. Remove agreements to supplier 4 from other suppliers
cb1cc6
-        6. Run a cleanallruv task on supplier 1 with a 'force' option 'on'
cb1cc6
-        7. Check that everything was cleaned
cb1cc6
-    :expectedresults:
cb1cc6
-        1. Supplier 3 should be stopped
cb1cc6
-        2. Operation should be successful
cb1cc6
-        3. Replication on supplier 4 should be disabled
cb1cc6
-        4. Supplier 3 should be started
cb1cc6
-        5. Agreements to supplier 4 should be removed
cb1cc6
-        6. Operation should be successful
cb1cc6
-        7. Everything should be cleaned
cb1cc6
-    """
cb1cc6
-
cb1cc6
-    log.info('Running test_clean_force...')
cb1cc6
-
cb1cc6
-    # Stop supplier 3, while we update supplier 4, so that 3 is behind the other suppliers
cb1cc6
-    topology_m4.ms["supplier3"].stop()
cb1cc6
-
cb1cc6
-    # Add a bunch of updates to supplier 4
cb1cc6
-    m4_add_users = AddUsers(topology_m4.ms["supplier4"], 1500)
cb1cc6
-    m4_add_users.start()
cb1cc6
-    m4_add_users.join()
cb1cc6
-
cb1cc6
-    # Start supplier 3, it should be out of sync with the other replicas...
cb1cc6
-    topology_m4.ms["supplier3"].start()
cb1cc6
-
cb1cc6
-    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
-    remove_supplier4_agmts("test_clean_force", topology_m4)
cb1cc6
-
cb1cc6
-    # Run the task, use "force" because supplier 3 is not in sync with the other replicas
cb1cc6
-    # in regards to the replica 4 RUV
cb1cc6
-    log.info('test_clean: run the cleanAllRUV task...')
cb1cc6
-    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-    cruv_task.create(properties={
cb1cc6
-        'replica-id': m4rid,
cb1cc6
-        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-        'replica-force-cleaning': 'yes'
cb1cc6
-        })
cb1cc6
-    cruv_task.wait()
cb1cc6
-
cb1cc6
-    # Check the other supplier's RUV for 'replica 4'
cb1cc6
-    log.info('test_clean_force: check all the suppliers have been cleaned...')
cb1cc6
-    clean = check_ruvs("test_clean_force", topology_m4, m4rid)
cb1cc6
-    assert clean
cb1cc6
-
cb1cc6
-    log.info('test_clean_force PASSED, restoring supplier 4...')
cb1cc6
-
cb1cc6
-
cb1cc6
-@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
-def test_abort(topology_m4, m4rid):
cb1cc6
-    """Test the abort task basic functionality
cb1cc6
-
cb1cc6
-    :id: b09a6887-8de0-4fac-8e41-73ccbaaf7a08
cb1cc6
-    :setup: Replication setup with four suppliers
cb1cc6
-    :steps:
cb1cc6
-        1. Disable replication on supplier 4
cb1cc6
-        2. Remove agreements to supplier 4 from other suppliers
cb1cc6
-        3. Stop supplier 2
cb1cc6
-        4. Run a cleanallruv task on supplier 1
cb1cc6
-        5. Run a cleanallruv abort task on supplier 1
cb1cc6
-    :expectedresults: No hanging tasks left
cb1cc6
-        1. Replication on supplier 4 should be disabled
cb1cc6
-        2. Agreements to supplier 4 should be removed
cb1cc6
-        3. Supplier 2 should be stopped
cb1cc6
-        4. Operation should be successful
cb1cc6
-        5. Operation should be successful
cb1cc6
-    """
cb1cc6
-
cb1cc6
-    log.info('Running test_abort...')
cb1cc6
-    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
-    remove_supplier4_agmts("test_abort", topology_m4)
cb1cc6
-
cb1cc6
-    # Stop supplier 2
cb1cc6
-    log.info('test_abort: stop supplier 2 to freeze the cleanAllRUV task...')
cb1cc6
-    topology_m4.ms["supplier2"].stop()
cb1cc6
-
cb1cc6
-    # Run the task
cb1cc6
-    log.info('test_abort: add the cleanAllRUV task...')
cb1cc6
-    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-    cruv_task.create(properties={
cb1cc6
-        'replica-id': m4rid,
cb1cc6
-        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-        'replica-force-cleaning': 'no',
cb1cc6
-        'replica-certify-all': 'yes'
cb1cc6
-        })
cb1cc6
-    # Wait a bit
cb1cc6
-    time.sleep(2)
cb1cc6
-
cb1cc6
-    # Abort the task
cb1cc6
-    cruv_task.abort()
cb1cc6
-
cb1cc6
-    # Check supplier 1 does not have the clean task running
cb1cc6
-    log.info('test_abort: check supplier 1 no longer has a cleanAllRUV task...')
cb1cc6
-    if not task_done(topology_m4, cruv_task.dn):
cb1cc6
-        log.fatal('test_abort: CleanAllRUV task was not aborted')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    # Start supplier 2
cb1cc6
-    log.info('test_abort: start supplier 2 to begin the restore process...')
cb1cc6
-    topology_m4.ms["supplier2"].start()
cb1cc6
-
cb1cc6
-    log.info('test_abort PASSED, restoring supplier 4...')
cb1cc6
-
cb1cc6
-
cb1cc6
-@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
-def test_abort_restart(topology_m4, m4rid):
cb1cc6
-    """Test the abort task can handle a restart, and then resume
cb1cc6
-
cb1cc6
-    :id: b66e33d4-fe85-4e1c-b882-75da80f70ab3
cb1cc6
-    :setup: Replication setup with four suppliers
cb1cc6
-    :steps:
cb1cc6
-        1. Disable replication on supplier 4
cb1cc6
-        2. Remove agreements to supplier 4 from other suppliers
cb1cc6
-        3. Stop supplier 3
cb1cc6
-        4. Run a cleanallruv task on supplier 1
cb1cc6
-        5. Run a cleanallruv abort task on supplier 1
cb1cc6
-        6. Restart supplier 1
cb1cc6
-        7. Make sure that no crash happened
cb1cc6
-        8. Start supplier 3
cb1cc6
-        9. Check supplier 1 does not have the clean task running
cb1cc6
-        10. Check that errors log doesn't have 'Aborting abort task' message
cb1cc6
-    :expectedresults:
cb1cc6
-        1. Replication on supplier 4 should be disabled
cb1cc6
-        2. Agreements to supplier 4 should be removed
cb1cc6
-        3. Supplier 3 should be stopped
cb1cc6
-        4. Operation should be successful
cb1cc6
-        5. Operation should be successful
cb1cc6
-        6. Supplier 1 should be restarted
cb1cc6
-        7. No crash should happened
cb1cc6
-        8. Supplier 3 should be started
cb1cc6
-        9. Check supplier 1 shouldn't have the clean task running
cb1cc6
-        10. Errors log shouldn't have 'Aborting abort task' message
cb1cc6
-    """
cb1cc6
-
cb1cc6
-    log.info('Running test_abort_restart...')
cb1cc6
-    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
-    remove_supplier4_agmts("test_abort", topology_m4)
cb1cc6
-
cb1cc6
-    # Stop supplier 3
cb1cc6
-    log.info('test_abort_restart: stop supplier 3 to freeze the cleanAllRUV task...')
cb1cc6
-    topology_m4.ms["supplier3"].stop()
cb1cc6
-
cb1cc6
-    # Run the task
cb1cc6
-    log.info('test_abort_restart: add the cleanAllRUV task...')
cb1cc6
-    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-    cruv_task.create(properties={
cb1cc6
-        'replica-id': m4rid,
cb1cc6
-        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-        'replica-force-cleaning': 'no',
cb1cc6
-        'replica-certify-all': 'yes'
cb1cc6
-        })
cb1cc6
-    # Wait a bit
cb1cc6
-    time.sleep(2)
cb1cc6
-
cb1cc6
-    # Abort the task
cb1cc6
-    cruv_task.abort(certify=True)
cb1cc6
-
cb1cc6
-    # Check supplier 1 does not have the clean task running
cb1cc6
-    log.info('test_abort_abort: check supplier 1 no longer has a cleanAllRUV task...')
cb1cc6
-    if not task_done(topology_m4, cruv_task.dn):
cb1cc6
-        log.fatal('test_abort_restart: CleanAllRUV task was not aborted')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    # Now restart supplier 1, and make sure the abort process completes
cb1cc6
-    topology_m4.ms["supplier1"].restart()
cb1cc6
-    if topology_m4.ms["supplier1"].detectDisorderlyShutdown():
cb1cc6
-        log.fatal('test_abort_restart: Supplier 1 previously crashed!')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    # Start supplier 3
cb1cc6
-    topology_m4.ms["supplier3"].start()
cb1cc6
-
cb1cc6
-    # Need to wait 5 seconds before server processes any leftover tasks
cb1cc6
-    time.sleep(6)
cb1cc6
-
cb1cc6
-    # Check supplier 1 tried to run abort task.  We expect the abort task to be aborted.
cb1cc6
-    if not topology_m4.ms["supplier1"].searchErrorsLog('Aborting abort task'):
cb1cc6
-        log.fatal('test_abort_restart: Abort task did not restart')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    log.info('test_abort_restart PASSED, restoring supplier 4...')
cb1cc6
-
cb1cc6
-
cb1cc6
-@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
-def test_abort_certify(topology_m4, m4rid):
cb1cc6
-    """Test the abort task with a replica-certify-all option
cb1cc6
-
cb1cc6
-    :id: 78959966-d644-44a8-b98c-1fcf21b45eb0
cb1cc6
-    :setup: Replication setup with four suppliers
cb1cc6
-    :steps:
cb1cc6
-        1. Disable replication on supplier 4
cb1cc6
-        2. Remove agreements to supplier 4 from other suppliers
cb1cc6
-        3. Stop supplier 2
cb1cc6
-        4. Run a cleanallruv task on supplier 1
cb1cc6
-        5. Run a cleanallruv abort task on supplier 1 with a replica-certify-all option
cb1cc6
-    :expectedresults: No hanging tasks left
cb1cc6
-        1. Replication on supplier 4 should be disabled
cb1cc6
-        2. Agreements to supplier 4 should be removed
cb1cc6
-        3. Supplier 2 should be stopped
cb1cc6
-        4. Operation should be successful
cb1cc6
-        5. Operation should be successful
cb1cc6
-    """
cb1cc6
-
cb1cc6
-    log.info('Running test_abort_certify...')
cb1cc6
-
cb1cc6
-    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
-    remove_supplier4_agmts("test_abort_certify", topology_m4)
cb1cc6
-
cb1cc6
-    # Stop supplier 2
cb1cc6
-    log.info('test_abort_certify: stop supplier 2 to freeze the cleanAllRUV task...')
cb1cc6
-    topology_m4.ms["supplier2"].stop()
cb1cc6
-
cb1cc6
-    # Run the task
cb1cc6
-    log.info('test_abort_certify: add the cleanAllRUV task...')
cb1cc6
-    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-    cruv_task.create(properties={
cb1cc6
-        'replica-id': m4rid,
cb1cc6
-        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-        'replica-force-cleaning': 'no',
cb1cc6
-        'replica-certify-all': 'yes'
cb1cc6
-        })
cb1cc6
-    # Wait a bit
cb1cc6
-    time.sleep(2)
cb1cc6
-
cb1cc6
-    # Abort the task
cb1cc6
-    log.info('test_abort_certify: abort the cleanAllRUV task...')
cb1cc6
-    abort_task = cruv_task.abort(certify=True)
cb1cc6
-
cb1cc6
-    # Wait a while and make sure the abort task is still running
cb1cc6
-    log.info('test_abort_certify...')
cb1cc6
-
cb1cc6
-    if task_done(topology_m4, abort_task.dn, 10):
cb1cc6
-        log.fatal('test_abort_certify: abort task incorrectly finished')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    # Now start supplier 2 so it can be aborted
cb1cc6
-    log.info('test_abort_certify: start supplier 2 to allow the abort task to finish...')
cb1cc6
-    topology_m4.ms["supplier2"].start()
cb1cc6
-
cb1cc6
-    # Wait for the abort task to stop
cb1cc6
-    if not task_done(topology_m4, abort_task.dn, 90):
cb1cc6
-        log.fatal('test_abort_certify: The abort CleanAllRUV task was not aborted')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    # Check supplier 1 does not have the clean task running
cb1cc6
-    log.info('test_abort_certify: check supplier 1 no longer has a cleanAllRUV task...')
cb1cc6
-    if not task_done(topology_m4, cruv_task.dn):
cb1cc6
-        log.fatal('test_abort_certify: CleanAllRUV task was not aborted')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-    log.info('test_abort_certify PASSED, restoring supplier 4...')
cb1cc6
-
cb1cc6
-
cb1cc6
-@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
-def test_stress_clean(topology_m4, m4rid):
cb1cc6
-    """Put each server(m1 - m4) under a stress, and perform the entire clean process
cb1cc6
-
cb1cc6
-    :id: a8263cd6-f068-4357-86e0-e7c34504c8c5
cb1cc6
-    :setup: Replication setup with four suppliers
cb1cc6
-    :steps:
cb1cc6
-        1. Add a bunch of updates to all suppliers
cb1cc6
-        2. Put supplier 4 to read-only mode
cb1cc6
-        3. Disable replication on supplier 4
cb1cc6
-        4. Remove agreements to supplier 4 from other suppliers
cb1cc6
-        5. Run a cleanallruv task on supplier 1
cb1cc6
-        6. Check that everything was cleaned
cb1cc6
-    :expectedresults:
cb1cc6
-        1. Operation should be successful
cb1cc6
-        2. Supplier 4 should be put to read-only mode
cb1cc6
-        3. Replication on supplier 4 should be disabled
cb1cc6
-        4. Agreements to supplier 4 should be removed
cb1cc6
-        5. Operation should be successful
cb1cc6
-        6. Everything should be cleaned
cb1cc6
-    """
cb1cc6
-
cb1cc6
-    log.info('Running test_stress_clean...')
cb1cc6
-    log.info('test_stress_clean: put all the suppliers under load...')
cb1cc6
-
cb1cc6
-    ldbm_config = LDBMConfig(topology_m4.ms["supplier4"])
cb1cc6
-
cb1cc6
-    # not too high load else it takes a long time to converge and
cb1cc6
-    # the test result becomes instable
cb1cc6
-    m1_add_users = AddUsers(topology_m4.ms["supplier1"], 500)
cb1cc6
-    m1_add_users.start()
cb1cc6
-    m2_add_users = AddUsers(topology_m4.ms["supplier2"], 500)
cb1cc6
-    m2_add_users.start()
cb1cc6
-    m3_add_users = AddUsers(topology_m4.ms["supplier3"], 500)
cb1cc6
-    m3_add_users.start()
cb1cc6
-    m4_add_users = AddUsers(topology_m4.ms["supplier4"], 500)
cb1cc6
-    m4_add_users.start()
cb1cc6
-
cb1cc6
-    # Allow sometime to get replication flowing in all directions
cb1cc6
-    log.info('test_stress_clean: allow some time for replication to get flowing...')
cb1cc6
-    time.sleep(5)
cb1cc6
-
cb1cc6
-    # Put supplier 4 into read only mode
cb1cc6
-    ldbm_config.set('nsslapd-readonly', 'on')
cb1cc6
-    # We need to wait for supplier 4 to push its changes out
cb1cc6
-    log.info('test_stress_clean: allow some time for supplier 4 to push changes out (60 seconds)...')
cb1cc6
-    time.sleep(30)
cb1cc6
-
cb1cc6
-    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
-    remove_supplier4_agmts("test_stress_clean", topology_m4)
cb1cc6
-
cb1cc6
-    # Run the task
cb1cc6
-    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-    cruv_task.create(properties={
cb1cc6
-        'replica-id': m4rid,
cb1cc6
-        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-        'replica-force-cleaning': 'no'
cb1cc6
-        })
cb1cc6
-    cruv_task.wait()
cb1cc6
-
cb1cc6
-    # Wait for the update to finish
cb1cc6
-    log.info('test_stress_clean: wait for all the updates to finish...')
cb1cc6
-    m1_add_users.join()
cb1cc6
-    m2_add_users.join()
cb1cc6
-    m3_add_users.join()
cb1cc6
-    m4_add_users.join()
cb1cc6
-
cb1cc6
-    # Check the other supplier's RUV for 'replica 4'
cb1cc6
-    log.info('test_stress_clean: check if all the replicas have been cleaned...')
cb1cc6
-    clean = check_ruvs("test_stress_clean", topology_m4, m4rid)
cb1cc6
-    assert clean
cb1cc6
-
cb1cc6
-    log.info('test_stress_clean:  PASSED, restoring supplier 4...')
cb1cc6
-
cb1cc6
-    # Sleep for a bit to replication complete
cb1cc6
-    log.info("Sleep for 120 seconds to allow replication to complete...")
cb1cc6
-    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
-    repl.test_replication_topology([
cb1cc6
-        topology_m4.ms["supplier1"],
cb1cc6
-        topology_m4.ms["supplier2"],
cb1cc6
-        topology_m4.ms["supplier3"],
cb1cc6
-        ], timeout=120)
cb1cc6
-
cb1cc6
-    # Turn off readonly mode
cb1cc6
-    ldbm_config.set('nsslapd-readonly', 'off')
cb1cc6
-
cb1cc6
-
cb1cc6
-@pytest.mark.flaky(max_runs=2, min_passes=1)
cb1cc6
-def test_multiple_tasks_with_force(topology_m4, m4rid):
cb1cc6
-    """Check that multiple tasks with a 'force' option work properly
cb1cc6
-
cb1cc6
-    :id: eb76a93d-8d1c-405e-9f25-6e8d5a781098
cb1cc6
-    :setup: Replication setup with four suppliers
cb1cc6
-    :steps:
cb1cc6
-        1. Stop supplier 3
cb1cc6
-        2. Add a bunch of updates to supplier 4
cb1cc6
-        3. Disable replication on supplier 4
cb1cc6
-        4. Start supplier 3
cb1cc6
-        5. Remove agreements to supplier 4 from other suppliers
cb1cc6
-        6. Run a cleanallruv task on supplier 1 with a 'force' option 'on'
cb1cc6
-        7. Run one more cleanallruv task on supplier 1 with a 'force' option 'off'
cb1cc6
-        8. Check that everything was cleaned
cb1cc6
-    :expectedresults:
cb1cc6
-        1. Supplier 3 should be stopped
cb1cc6
-        2. Operation should be successful
cb1cc6
-        3. Replication on supplier 4 should be disabled
cb1cc6
-        4. Supplier 3 should be started
cb1cc6
-        5. Agreements to supplier 4 should be removed
cb1cc6
-        6. Operation should be successful
cb1cc6
-        7. Operation should be successful
cb1cc6
-        8. Everything should be cleaned
cb1cc6
-    """
cb1cc6
-
cb1cc6
-    log.info('Running test_multiple_tasks_with_force...')
cb1cc6
-
cb1cc6
-    # Stop supplier 3, while we update supplier 4, so that 3 is behind the other suppliers
cb1cc6
-    topology_m4.ms["supplier3"].stop()
cb1cc6
-
cb1cc6
-    # Add a bunch of updates to supplier 4
cb1cc6
-    m4_add_users = AddUsers(topology_m4.ms["supplier4"], 1500)
cb1cc6
-    m4_add_users.start()
cb1cc6
-    m4_add_users.join()
cb1cc6
-
cb1cc6
-    # Start supplier 3, it should be out of sync with the other replicas...
cb1cc6
-    topology_m4.ms["supplier3"].start()
cb1cc6
-
cb1cc6
-    # Disable supplier 4
cb1cc6
-    # Remove the agreements from the other suppliers that point to supplier 4
cb1cc6
-    remove_supplier4_agmts("test_multiple_tasks_with_force", topology_m4)
cb1cc6
-
cb1cc6
-    # Run the task, use "force" because supplier 3 is not in sync with the other replicas
cb1cc6
-    # in regards to the replica 4 RUV
cb1cc6
-    log.info('test_multiple_tasks_with_force: run the cleanAllRUV task with "force" on...')
cb1cc6
-    cruv_task = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-    cruv_task.create(properties={
cb1cc6
-        'replica-id': m4rid,
cb1cc6
-        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-        'replica-force-cleaning': 'yes',
cb1cc6
-        'replica-certify-all': 'no'
cb1cc6
-        })
cb1cc6
-
cb1cc6
-    log.info('test_multiple_tasks_with_force: run the cleanAllRUV task with "force" off...')
cb1cc6
-
cb1cc6
-    # NOTE: This must be try not py.test raises, because the above may or may
cb1cc6
-    # not have completed yet ....
cb1cc6
-    try:
cb1cc6
-        cruv_task_fail = CleanAllRUVTask(topology_m4.ms["supplier1"])
cb1cc6
-        cruv_task_fail.create(properties={
cb1cc6
-            'replica-id': m4rid,
cb1cc6
-            'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-            'replica-force-cleaning': 'no',
cb1cc6
-            'replica-certify-all': 'no'
cb1cc6
-            })
cb1cc6
-        cruv_task_fail.wait()
cb1cc6
-    except ldap.UNWILLING_TO_PERFORM:
cb1cc6
-        pass
cb1cc6
-    # Wait for the force task ....
cb1cc6
-    cruv_task.wait()
cb1cc6
-
cb1cc6
-    # Check the other supplier's RUV for 'replica 4'
cb1cc6
-    log.info('test_multiple_tasks_with_force: check all the suppliers have been cleaned...')
cb1cc6
-    clean = check_ruvs("test_clean_force", topology_m4, m4rid)
cb1cc6
-    assert clean
cb1cc6
-    # Check supplier 1 does not have the clean task running
cb1cc6
-    log.info('test_abort: check supplier 1 no longer has a cleanAllRUV task...')
cb1cc6
-    if not task_done(topology_m4, cruv_task.dn):
cb1cc6
-        log.fatal('test_abort: CleanAllRUV task was not aborted')
cb1cc6
-        assert False
cb1cc6
-
cb1cc6
-
cb1cc6
-@pytest.mark.bz1466441
cb1cc6
-@pytest.mark.ds50370
cb1cc6
-def test_clean_shutdown_crash(topology_m2):
cb1cc6
-    """Check that server didn't crash after shutdown when running CleanAllRUV task
cb1cc6
-
cb1cc6
-    :id: c34d0b40-3c3e-4f53-8656-5e4c2a310aaf
cb1cc6
-    :setup: Replication setup with two suppliers
cb1cc6
-    :steps:
cb1cc6
-        1. Enable TLS on both suppliers
cb1cc6
-        2. Reconfigure both agreements to use TLS Client auth
cb1cc6
-        3. Stop supplier2
cb1cc6
-        4. Run the CleanAllRUV task
cb1cc6
-        5. Restart supplier1
cb1cc6
-        6. Check if supplier1 didn't crash
cb1cc6
-        7. Restart supplier1 again
cb1cc6
-        8. Check if supplier1 didn't crash
cb1cc6
-
cb1cc6
-    :expectedresults:
cb1cc6
-        1. Success
cb1cc6
-        2. Success
cb1cc6
-        3. Success
cb1cc6
-        4. Success
cb1cc6
-        5. Success
cb1cc6
-        6. Success
cb1cc6
-        7. Success
cb1cc6
-        8. Success
cb1cc6
-    """
cb1cc6
-
cb1cc6
-    m1 = topology_m2.ms["supplier1"]
cb1cc6
-    m2 = topology_m2.ms["supplier2"]
cb1cc6
-
cb1cc6
-    repl = ReplicationManager(DEFAULT_SUFFIX)
cb1cc6
-
cb1cc6
-    cm_m1 = CertmapLegacy(m1)
cb1cc6
-    cm_m2 = CertmapLegacy(m2)
cb1cc6
-
cb1cc6
-    certmaps = cm_m1.list()
cb1cc6
-    certmaps['default']['DNComps'] = None
cb1cc6
-    certmaps['default']['CmapLdapAttr'] = 'nsCertSubjectDN'
cb1cc6
-
cb1cc6
-    cm_m1.set(certmaps)
cb1cc6
-    cm_m2.set(certmaps)
cb1cc6
-
cb1cc6
-    log.info('Enabling TLS')
cb1cc6
-    [i.enable_tls() for i in topology_m2]
cb1cc6
-
cb1cc6
-    log.info('Creating replication dns')
cb1cc6
-    services = ServiceAccounts(m1, DEFAULT_SUFFIX)
cb1cc6
-    repl_m1 = services.get('%s:%s' % (m1.host, m1.sslport))
cb1cc6
-    repl_m1.set('nsCertSubjectDN', m1.get_server_tls_subject())
cb1cc6
-
cb1cc6
-    repl_m2 = services.get('%s:%s' % (m2.host, m2.sslport))
cb1cc6
-    repl_m2.set('nsCertSubjectDN', m2.get_server_tls_subject())
cb1cc6
-
cb1cc6
-    log.info('Changing auth type')
cb1cc6
-    replica_m1 = Replicas(m1).get(DEFAULT_SUFFIX)
cb1cc6
-    agmt_m1 = replica_m1.get_agreements().list()[0]
cb1cc6
-    agmt_m1.replace_many(
cb1cc6
-        ('nsDS5ReplicaBindMethod', 'SSLCLIENTAUTH'),
cb1cc6
-        ('nsDS5ReplicaTransportInfo', 'SSL'),
cb1cc6
-        ('nsDS5ReplicaPort', '%s' % m2.sslport),
cb1cc6
-    )
cb1cc6
-
cb1cc6
-    agmt_m1.remove_all('nsDS5ReplicaBindDN')
cb1cc6
-
cb1cc6
-    replica_m2 = Replicas(m2).get(DEFAULT_SUFFIX)
cb1cc6
-    agmt_m2 = replica_m2.get_agreements().list()[0]
cb1cc6
-
cb1cc6
-    agmt_m2.replace_many(
cb1cc6
-        ('nsDS5ReplicaBindMethod', 'SSLCLIENTAUTH'),
cb1cc6
-        ('nsDS5ReplicaTransportInfo', 'SSL'),
cb1cc6
-        ('nsDS5ReplicaPort', '%s' % m1.sslport),
cb1cc6
-    )
cb1cc6
-    agmt_m2.remove_all('nsDS5ReplicaBindDN')
cb1cc6
-
cb1cc6
-    log.info('Stopping supplier2')
cb1cc6
-    m2.stop()
cb1cc6
-
cb1cc6
-    log.info('Run the cleanAllRUV task')
cb1cc6
-    cruv_task = CleanAllRUVTask(m1)
cb1cc6
-    cruv_task.create(properties={
cb1cc6
-        'replica-id': repl.get_rid(m1),
cb1cc6
-        'replica-base-dn': DEFAULT_SUFFIX,
cb1cc6
-        'replica-force-cleaning': 'no',
cb1cc6
-        'replica-certify-all': 'yes'
cb1cc6
-    })
cb1cc6
-
cb1cc6
-    m1.restart()
cb1cc6
-
cb1cc6
-    log.info('Check if supplier1 crashed')
cb1cc6
-    assert not m1.detectDisorderlyShutdown()
cb1cc6
-
cb1cc6
-    log.info('Repeat')
cb1cc6
-    m1.restart()
cb1cc6
-    assert not m1.detectDisorderlyShutdown()
cb1cc6
-
cb1cc6
 
cb1cc6
 if __name__ == '__main__':
cb1cc6
     # Run isolated
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/regression_m2_test.py b/dirsrvtests/tests/suites/replication/regression_m2_test.py
cb1cc6
index bbf9c8486..65c299a0c 100644
cb1cc6
--- a/dirsrvtests/tests/suites/replication/regression_m2_test.py
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/regression_m2_test.py
cb1cc6
@@ -240,8 +240,12 @@ def test_double_delete(topo_m2, create_entry):
cb1cc6
     log.info('Deleting entry {} from supplier1'.format(create_entry.dn))
cb1cc6
     topo_m2.ms["supplier1"].delete_s(create_entry.dn)
cb1cc6
 
cb1cc6
-    log.info('Deleting entry {} from supplier2'.format(create_entry.dn))
cb1cc6
-    topo_m2.ms["supplier2"].delete_s(create_entry.dn)
cb1cc6
+    try:
cb1cc6
+        log.info('Deleting entry {} from supplier2'.format(create_entry.dn))
cb1cc6
+        topo_m2.ms["supplier2"].delete_s(create_entry.dn)
cb1cc6
+    except ldap.NO_SUCH_OBJECT:
cb1cc6
+        # replication was too fast (DEBUGGING is probably set)
cb1cc6
+        pass
cb1cc6
 
cb1cc6
     repl.enable_to_supplier(m2, [m1])
cb1cc6
     repl.enable_to_supplier(m1, [m2])
cb1cc6
@@ -813,8 +817,9 @@ def test_keepalive_entries(topo_m2):
cb1cc6
     keep_alive_s1 = str(entries[0].data['keepalivetimestamp'])
cb1cc6
     keep_alive_s2 = str(entries[1].data['keepalivetimestamp'])
cb1cc6
 
cb1cc6
-    # Wait for event interval (60 secs) to pass
cb1cc6
-    time.sleep(61)
cb1cc6
+    # Wait for event interval (60 secs) to pass, but first update doesn't
cb1cc6
+    # start until 30 seconds after startup
cb1cc6
+    time.sleep(91)
cb1cc6
 
cb1cc6
     # Check keep alives entries have been updated
cb1cc6
     entries = verify_keepalive_entries(topo_m2, True);
cb1cc6
diff --git a/dirsrvtests/tests/suites/replication/regression_m2c2_test.py b/dirsrvtests/tests/suites/replication/regression_m2c2_test.py
cb1cc6
index 97b35c7ab..f9de7383c 100644
cb1cc6
--- a/dirsrvtests/tests/suites/replication/regression_m2c2_test.py
cb1cc6
+++ b/dirsrvtests/tests/suites/replication/regression_m2c2_test.py
cb1cc6
@@ -289,6 +289,7 @@ def test_csngen_state_not_updated_if_different_uuid(topo_m2c2):
cb1cc6
         log.error(f"c1 csngen state has unexpectedly been synchronized with m2: time skew {c1_timeSkew}")
cb1cc6
         assert False
cb1cc6
     c1.start()
cb1cc6
+    time.sleep(5)
cb1cc6
 
cb1cc6
     # Step 8: Check that c2 has time skew
cb1cc6
     # Stop server to insure that dse.ldif is uptodate
cb1cc6
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
cb1cc6
index 5dab57de4..d67f1bc71 100644
cb1cc6
--- a/ldap/servers/plugins/replication/repl5_replica.c
cb1cc6
+++ b/ldap/servers/plugins/replication/repl5_replica.c
cb1cc6
@@ -239,8 +239,8 @@ replica_new_from_entry(Slapi_Entry *e, char *errortext, PRBool is_add_operation,
cb1cc6
     /* create supplier update event */
cb1cc6
     if (r->repl_eqcxt_ka_update == NULL && replica_get_type(r) == REPLICA_TYPE_UPDATABLE) {
cb1cc6
         r->repl_eqcxt_ka_update = slapi_eq_repeat_rel(replica_subentry_update, r,
cb1cc6
-                                                   slapi_current_rel_time_t() + 30,
cb1cc6
-                                                   replica_get_keepalive_update_interval(r));
cb1cc6
+                                                      slapi_current_rel_time_t() + 30,
cb1cc6
+                                                      1000 * replica_get_keepalive_update_interval(r));
cb1cc6
     }
cb1cc6
 
cb1cc6
     if (r->tombstone_reap_interval > 0) {
cb1cc6
@@ -518,7 +518,7 @@ replica_subentry_update(time_t when __attribute__((unused)), void *arg)
cb1cc6
     replica_subentry_check(repl_root, rid);
cb1cc6
 
cb1cc6
     slapi_timestamp_utc_hr(buf, SLAPI_TIMESTAMP_BUFSIZE);
cb1cc6
-    slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "replica_subentry_update called at %s\n", buf);
cb1cc6
+    slapi_log_err(SLAPI_LOG_REPL, "NSMMReplicationPlugin", "replica_subentry_update called at %s\n", buf);
cb1cc6
     val.bv_val = buf;
cb1cc6
     val.bv_len = strlen(val.bv_val);
cb1cc6
     vals[0] = &val;
cb1cc6
@@ -542,7 +542,7 @@ replica_subentry_update(time_t when __attribute__((unused)), void *arg)
cb1cc6
                       "Failure (%d) to update replication keep alive entry \"%s: %s\"\n",
cb1cc6
                       ldrc, KEEP_ALIVE_ATTR, buf);
cb1cc6
     } else {
cb1cc6
-        slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name,
cb1cc6
+        slapi_log_err(SLAPI_LOG_REPL, "NSMMReplicationPlugin",
cb1cc6
                       "replica_subentry_update - "
cb1cc6
                       "Successful update of replication keep alive entry \"%s: %s\"\n",
cb1cc6
                       KEEP_ALIVE_ATTR, buf);
cb1cc6
@@ -1536,7 +1536,7 @@ replica_set_enabled(Replica *r, PRBool enable)
cb1cc6
         if (r->repl_eqcxt_ka_update == NULL && replica_get_type(r) == REPLICA_TYPE_UPDATABLE) {
cb1cc6
             r->repl_eqcxt_ka_update = slapi_eq_repeat_rel(replica_subentry_update, r,
cb1cc6
                                                        slapi_current_rel_time_t() + START_UPDATE_DELAY,
cb1cc6
-                                                       replica_get_keepalive_update_interval(r));
cb1cc6
+                                                       1000 * replica_get_keepalive_update_interval(r));
cb1cc6
         }
cb1cc6
     } else /* disable */
cb1cc6
     {
cb1cc6
@@ -1546,7 +1546,7 @@ replica_set_enabled(Replica *r, PRBool enable)
cb1cc6
             r->repl_eqcxt_rs = NULL;
cb1cc6
         }
cb1cc6
         /* Remove supplier update event */
cb1cc6
-        if (replica_get_type(r) == REPLICA_TYPE_PRIMARY) {
cb1cc6
+        if (replica_get_type(r) == REPLICA_TYPE_UPDATABLE) {
cb1cc6
             slapi_eq_cancel_rel(r->repl_eqcxt_ka_update);
cb1cc6
             r->repl_eqcxt_ka_update = NULL;
cb1cc6
         }
cb1cc6
diff --git a/ldap/servers/plugins/replication/repl_extop.c b/ldap/servers/plugins/replication/repl_extop.c
cb1cc6
index 70c45ec50..b32d00941 100644
cb1cc6
--- a/ldap/servers/plugins/replication/repl_extop.c
cb1cc6
+++ b/ldap/servers/plugins/replication/repl_extop.c
cb1cc6
@@ -493,7 +493,7 @@ free_and_return:
cb1cc6
         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
cb1cc6
                 "decode_startrepl_extop - decoded csn: %s\n", *csnstr);
cb1cc6
         ruv_dump_to_log(*supplier_ruv, "decode_startrepl_extop");
cb1cc6
-        for (size_t i = 0; *extra_referrals && *extra_referrals[i]; i++) {
cb1cc6
+        for (size_t i = 0; *extra_referrals && extra_referrals[i]; i++) {
cb1cc6
             slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "decode_startrepl_extop - "
cb1cc6
                 "decoded referral: %s\n", *extra_referrals[i]);
cb1cc6
         }
cb1cc6
@@ -1661,7 +1661,7 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb)
cb1cc6
          *  Launch the cleanruv monitoring thread.  Once all the replicas are cleaned it will release the rid
cb1cc6
          */
cb1cc6
 
cb1cc6
-        cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, "Launching cleanAllRUV thread...");
cb1cc6
+        cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Launching cleanAllRUV thread...");
cb1cc6
         data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data));
cb1cc6
         if (data == NULL) {
cb1cc6
             slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multimaster_extop_cleanruv - CleanAllRUV Task - Failed to allocate "
cb1cc6
diff --git a/ldap/servers/slapd/task.c b/ldap/servers/slapd/task.c
cb1cc6
index 4c7262ab3..71d5a2fb5 100644
cb1cc6
--- a/ldap/servers/slapd/task.c
cb1cc6
+++ b/ldap/servers/slapd/task.c
cb1cc6
@@ -742,7 +742,7 @@ get_internal_entry(Slapi_PBlock *pb, char *dn)
cb1cc6
     slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &ret;;
cb1cc6
     if (ret != LDAP_SUCCESS) {
cb1cc6
         slapi_log_err(SLAPI_LOG_WARNING, "get_internal_entry",
cb1cc6
-                      "Can't find task entry '%s'\n", dn);
cb1cc6
+                      "Failed to search for task entry '%s' error: %d\n", dn, ret);
cb1cc6
         return NULL;
cb1cc6
     }
cb1cc6
 
cb1cc6
@@ -786,9 +786,9 @@ modify_internal_entry(char *dn, LDAPMod **mods)
cb1cc6
              * entry -- try at least 3 times before giving up.
cb1cc6
              */
cb1cc6
             tries++;
cb1cc6
-            if (tries == 3) {
cb1cc6
-                slapi_log_err(SLAPI_LOG_WARNING, "modify_internal_entry", "Can't modify task "
cb1cc6
-                                                                          "entry '%s'; %s (%d)\n",
cb1cc6
+            if (tries == 5) {
cb1cc6
+                slapi_log_err(SLAPI_LOG_WARNING, "modify_internal_entry",
cb1cc6
+                              "Can't modify task entry '%s'; %s (%d)\n",
cb1cc6
                               dn, ldap_err2string(ret), ret);
cb1cc6
                 slapi_pblock_destroy(pb);
cb1cc6
                 return;
cb1cc6
diff --git a/src/lib389/lib389/instance/remove.py b/src/lib389/lib389/instance/remove.py
cb1cc6
index e96db3896..5668f375b 100644
cb1cc6
--- a/src/lib389/lib389/instance/remove.py
cb1cc6
+++ b/src/lib389/lib389/instance/remove.py
cb1cc6
@@ -90,6 +90,12 @@ def remove_ds_instance(dirsrv, force=False):
cb1cc6
     # Remove parent (/var/lib/dirsrv/slapd-INST)
cb1cc6
     shutil.rmtree(remove_paths['db_dir'].replace('db', ''), ignore_errors=True)
cb1cc6
 
cb1cc6
+    # Remove /run/slapd-isntance
cb1cc6
+    try:
cb1cc6
+        os.remove(f'/run/slapd-{dirsrv.serverid}.socket')
cb1cc6
+    except OSError as e:
cb1cc6
+        _log.debug("Failed to remove socket file: " + str(e))
cb1cc6
+
cb1cc6
     # We can not assume we have systemd ...
cb1cc6
     if dirsrv.ds_paths.with_systemd:
cb1cc6
         # Remove the systemd symlink
cb1cc6
-- 
cb1cc6
2.37.1
cb1cc6