andykimpe / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 months ago
Clone
dc8c34
From 29ef94c5991621bc1c59cbbdedcfcbd0d04ba18f Mon Sep 17 00:00:00 2001
dc8c34
From: Thierry Bordaz <tbordaz@redhat.com>
dc8c34
Date: Fri, 11 Sep 2015 18:56:53 +0200
dc8c34
Subject: [PATCH 338/342] Ticket 48266: Fractional replication evaluates
dc8c34
 several times the same CSN
dc8c34
dc8c34
Bug Description:
dc8c34
	In fractional replication if there are only skipped updates and many of them, the supplier
dc8c34
	acquire the replica for a long time. At the end of the session, RUV is not updated
dc8c34
	so the next session will restart evaluating the same skipped updates
dc8c34
dc8c34
Fix Description:
dc8c34
	The fix introduces subentries under the suffix: 'cn=repl keep alive <rid>,$SUFFIX'
dc8c34
	During an incremental replication session, if the session only contains skipped updates
dc8c34
	and the number of them overpass a threshold (100), it triggers an update on that subentry.
dc8c34
dc8c34
	This update will eventually be replicated, moving forward the RUV
dc8c34
dc8c34
https://fedorahosted.org/389/ticket/48266
dc8c34
dc8c34
Reviewed by: Noriko Hosoi, Rich Megginson, Simon Pichugin
dc8c34
dc8c34
Platforms tested: <plat>
dc8c34
dc8c34
Flag Day: no
dc8c34
dc8c34
Doc impact: no
dc8c34
dc8c34
(cherry picked from commit f04f4c0140c1a970314735cb69b827230136b346)
dc8c34
---
dc8c34
 ldap/servers/plugins/replication/repl5.h           |   2 +
dc8c34
 .../plugins/replication/repl5_inc_protocol.c       |  39 ++++++
dc8c34
 ldap/servers/plugins/replication/repl5_replica.c   | 156 +++++++++++++++++++++
dc8c34
 .../plugins/replication/repl5_tot_protocol.c       |  13 +-
dc8c34
 4 files changed, 209 insertions(+), 1 deletion(-)
dc8c34
dc8c34
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
dc8c34
index 6cec248..66006f6 100644
dc8c34
--- a/ldap/servers/plugins/replication/repl5.h
dc8c34
+++ b/ldap/servers/plugins/replication/repl5.h
dc8c34
@@ -521,6 +521,8 @@ Replica *windows_replica_new(const Slapi_DN *root);
dc8c34
    during addition of the replica over LDAP */
dc8c34
 Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation);
dc8c34
 void replica_destroy(void **arg);
dc8c34
+int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid);
dc8c34
+int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid);
dc8c34
 PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid,
dc8c34
 									const char *locking_purl,
dc8c34
 									char **current_purl);
dc8c34
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
dc8c34
index f5516a3..0dc9f30 100644
dc8c34
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
dc8c34
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
dc8c34
@@ -1688,6 +1688,11 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
dc8c34
 		int finished = 0;
dc8c34
 		ConnResult replay_crc;
dc8c34
 		char csn_str[CSN_STRSIZE];
dc8c34
+		PRBool subentry_update_sent = PR_FALSE;
dc8c34
+		PRBool subentry_update_needed = PR_FALSE;
dc8c34
+		int skipped_updates = 0;
dc8c34
+		int fractional_repl;
dc8c34
+#define FRACTIONAL_SKIPPED_THRESHOLD 100
dc8c34
 
dc8c34
 		/* Start the results reading thread */
dc8c34
 		rd = repl5_inc_rd_new(prp);
dc8c34
@@ -1704,6 +1709,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
dc8c34
 
dc8c34
 		memset ( (void*)&op, 0, sizeof (op) );
dc8c34
 		entry.op = &op;
dc8c34
+		fractional_repl = agmt_is_fractional(prp->agmt);
dc8c34
 		do {
dc8c34
 			cl5_operation_parameters_done ( entry.op );
dc8c34
 			memset ( (void*)entry.op, 0, sizeof (op) );
dc8c34
@@ -1799,6 +1805,15 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
dc8c34
 					csn_as_string(entry.op->csn, PR_FALSE, csn_str);
dc8c34
 					replica_id = csn_get_replicaid(entry.op->csn);
dc8c34
 					uniqueid = entry.op->target_address.uniqueid;
dc8c34
+                    
dc8c34
+					if (fractional_repl && message_id) 
dc8c34
+					{
dc8c34
+						/* This update was sent no need to update the subentry
dc8c34
+						 * and restart counting the skipped updates
dc8c34
+						 */
dc8c34
+						subentry_update_needed = PR_FALSE;
dc8c34
+						skipped_updates = 0;
dc8c34
+					}
dc8c34
 
dc8c34
 					if (prp->repl50consumer && message_id) 
dc8c34
 					{
dc8c34
@@ -1829,6 +1844,16 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
dc8c34
 							agmt_get_long_name(prp->agmt),
dc8c34
 							entry.op->target_address.uniqueid, csn_str);
dc8c34
 						agmt_inc_last_update_changecount (prp->agmt, csn_get_replicaid(entry.op->csn), 1 /*skipped*/);
dc8c34
+						if (fractional_repl) 
dc8c34
+						{
dc8c34
+							skipped_updates++;
dc8c34
+							if (skipped_updates > FRACTIONAL_SKIPPED_THRESHOLD) {
dc8c34
+								slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
dc8c34
+										"%s: skipped updates is too high (%d) if no other update is sent we will update the subentry\n",
dc8c34
+										agmt_get_long_name(prp->agmt), skipped_updates);
dc8c34
+								subentry_update_needed = PR_TRUE;
dc8c34
+							}
dc8c34
+						}
dc8c34
 					}
dc8c34
 				}
dc8c34
 				break;
dc8c34
@@ -1894,6 +1919,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
dc8c34
 			PR_Unlock(rd->lock);
dc8c34
 		} while (!finished);
dc8c34
 
dc8c34
+		if (fractional_repl && subentry_update_needed)
dc8c34
+		{
dc8c34
+			Replica *replica;
dc8c34
+			ReplicaId rid = -1; /* Used to create the replica keep alive subentry */
dc8c34
+			replica = (Replica*) object_get_data(prp->replica_object);
dc8c34
+			if (replica)
dc8c34
+			{
dc8c34
+				rid = replica_get_rid(replica);
dc8c34
+			}
dc8c34
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
dc8c34
+					"%s: skipped updates was definitely too high (%d) update the subentry now\n",
dc8c34
+					agmt_get_long_name(prp->agmt), skipped_updates);
dc8c34
+			replica_subentry_update(agmt_get_replarea(prp->agmt), rid);
dc8c34
+		}
dc8c34
 		/* Terminate the results reading thread */
dc8c34
 		if (!prp->repl50consumer) 
dc8c34
 		{
dc8c34
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
dc8c34
index f64e719..84539d2 100644
dc8c34
--- a/ldap/servers/plugins/replication/repl5_replica.c
dc8c34
+++ b/ldap/servers/plugins/replication/repl5_replica.c
dc8c34
@@ -398,6 +398,161 @@ replica_destroy(void **arg)
dc8c34
 	slapi_ch_free((void **)arg);
dc8c34
 }
dc8c34
 
dc8c34
+#define KEEP_ALIVE_ATTR "keepalivetimestamp"
dc8c34
+#define KEEP_ALIVE_ENTRY "repl keep alive"
dc8c34
+#define KEEP_ALIVE_DN_FORMAT "cn=%s %d,%s"
dc8c34
+
dc8c34
+
dc8c34
+static int
dc8c34
+replica_subentry_create(Slapi_DN *repl_root, ReplicaId rid) 
dc8c34
+{
dc8c34
+    char *entry_string = NULL;
dc8c34
+    Slapi_Entry *e = NULL;
dc8c34
+    Slapi_PBlock *pb = NULL;
dc8c34
+    int return_value;
dc8c34
+    int rc = 0;
dc8c34
+
dc8c34
+    entry_string = slapi_ch_smprintf("dn: cn=%s %d,%s\nobjectclass: top\nobjectclass: ldapsubentry\nobjectclass: extensibleObject\ncn: %s %d",
dc8c34
+            KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), KEEP_ALIVE_ENTRY, rid);
dc8c34
+    if (entry_string == NULL) {
dc8c34
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
dc8c34
+            "replica_subentry_create add failed in slapi_ch_smprintf\n");
dc8c34
+        rc = -1;
dc8c34
+        goto done;
dc8c34
+    }
dc8c34
+
dc8c34
+    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "add %s\n", entry_string);
dc8c34
+    e = slapi_str2entry(entry_string, 0);
dc8c34
+
dc8c34
+    /* create the entry */
dc8c34
+    pb = slapi_pblock_new();
dc8c34
+
dc8c34
+
dc8c34
+    slapi_add_entry_internal_set_pb(pb, e, NULL, /* controls */
dc8c34
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0 /* flags */);
dc8c34
+    slapi_add_internal_pb(pb);
dc8c34
+    slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value);
dc8c34
+    if (return_value != LDAP_SUCCESS && return_value != LDAP_ALREADY_EXISTS) 
dc8c34
+    {
dc8c34
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
dc8c34
+                "create replication keep alive entry %s: %s\n", slapi_entry_get_dn_const(e),
dc8c34
+                ldap_err2string(return_value));
dc8c34
+        rc = -1;
dc8c34
+        slapi_entry_free(e); /* The entry was not consumed */
dc8c34
+        goto done;
dc8c34
+    }
dc8c34
+
dc8c34
+done:
dc8c34
+
dc8c34
+    slapi_pblock_destroy(pb);
dc8c34
+    slapi_ch_free_string(&entry_string);
dc8c34
+    return rc;
dc8c34
+
dc8c34
+}
dc8c34
+
dc8c34
+int
dc8c34
+replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid)
dc8c34
+{
dc8c34
+    Slapi_PBlock *pb;
dc8c34
+    char *filter = NULL;
dc8c34
+    Slapi_Entry **entries = NULL;
dc8c34
+    int res;
dc8c34
+    int rc = 0;
dc8c34
+
dc8c34
+    pb = slapi_pblock_new();
dc8c34
+    filter = slapi_ch_smprintf("(&(objectclass=ldapsubentry)(cn=%s %d))", KEEP_ALIVE_ENTRY, rid);
dc8c34
+    slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(repl_root), LDAP_SCOPE_ONELEVEL,
dc8c34
+            filter, NULL, 0, NULL, NULL,
dc8c34
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
dc8c34
+    slapi_search_internal_pb(pb);
dc8c34
+    slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &res;;
dc8c34
+    if (res == LDAP_SUCCESS)
dc8c34
+    {
dc8c34
+        slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
dc8c34
+        if (entries && (entries[0] == NULL))
dc8c34
+        {
dc8c34
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
dc8c34
+                    "Need to create replication keep alive entry <cn=%s %d,%s>\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
dc8c34
+            rc = replica_subentry_create(repl_root, rid);
dc8c34
+        } else {
dc8c34
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
dc8c34
+                    "replication keep alive entry <cn=%s %d,%s> already exists\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
dc8c34
+            rc = 0;
dc8c34
+        }
dc8c34
+    } else {
dc8c34
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
dc8c34
+                "Error accessing replication keep alive entry <cn=%s %d,%s> res=%d\n",
dc8c34
+                KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), res);
dc8c34
+        /* The status of the entry is not clear, do not attempt to create it */
dc8c34
+        rc = 1;
dc8c34
+    }
dc8c34
+    slapi_free_search_results_internal(pb);
dc8c34
+
dc8c34
+    slapi_pblock_destroy(pb);
dc8c34
+    slapi_ch_free_string(&filter);
dc8c34
+    return rc;
dc8c34
+}
dc8c34
+
dc8c34
+int
dc8c34
+replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid) 
dc8c34
+{
dc8c34
+    int ldrc;
dc8c34
+    int rc = LDAP_SUCCESS; /* Optimistic default */
dc8c34
+    LDAPMod * mods[2];
dc8c34
+    LDAPMod mod;
dc8c34
+    struct berval * vals[2];
dc8c34
+    char buf[20];
dc8c34
+    time_t curtime;
dc8c34
+    struct tm ltm;
dc8c34
+    struct berval val;
dc8c34
+    Slapi_PBlock *modpb = NULL;
dc8c34
+    char *dn;
dc8c34
+
dc8c34
+    replica_subentry_check(repl_root, rid);
dc8c34
+    curtime = current_time();
dc8c34
+    gmtime_r(&curtime, <m;;
dc8c34
+    strftime(buf, sizeof (buf), "%Y%m%d%H%M%SZ", <m;;
dc8c34
+
dc8c34
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "subentry_update called at %s\n", buf);
dc8c34
+
dc8c34
+
dc8c34
+    val.bv_val = buf;
dc8c34
+    val.bv_len = strlen(val.bv_val);
dc8c34
+
dc8c34
+    vals [0] = &val;
dc8c34
+    vals [1] = NULL;
dc8c34
+
dc8c34
+    mod.mod_op = LDAP_MOD_REPLACE | LDAP_MOD_BVALUES;
dc8c34
+    mod.mod_type = KEEP_ALIVE_ATTR;
dc8c34
+    mod.mod_bvalues = vals;
dc8c34
+
dc8c34
+    mods[0] = &mod;
dc8c34
+    mods[1] = NULL;
dc8c34
+
dc8c34
+    modpb = slapi_pblock_new();
dc8c34
+    dn = slapi_ch_smprintf(KEEP_ALIVE_DN_FORMAT, KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
dc8c34
+
dc8c34
+    slapi_modify_internal_set_pb(modpb, dn, mods, NULL, NULL,
dc8c34
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
dc8c34
+    slapi_modify_internal_pb(modpb);
dc8c34
+
dc8c34
+    slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &ldrc;;
dc8c34
+
dc8c34
+    if (ldrc != LDAP_SUCCESS)
dc8c34
+    {
dc8c34
+        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
dc8c34
+                "Failure (%d) to update replication keep alive entry \"%s: %s\"\n", ldrc, KEEP_ALIVE_ATTR, buf);
dc8c34
+        rc = ldrc;
dc8c34
+    } else {
dc8c34
+        slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name,
dc8c34
+                "Successful update of replication keep alive entry \"%s: %s\"\n", KEEP_ALIVE_ATTR, buf);
dc8c34
+    }
dc8c34
+
dc8c34
+    slapi_pblock_destroy(modpb);
dc8c34
+    slapi_ch_free_string(&dn;;
dc8c34
+    return rc;
dc8c34
+
dc8c34
+}
dc8c34
 /*
dc8c34
  * Attempt to obtain exclusive access to replica (advisory only)
dc8c34
  *
dc8c34
@@ -3616,6 +3771,7 @@ replica_enable_replication (Replica *r)
dc8c34
         /* What to do ? */
dc8c34
     }
dc8c34
 
dc8c34
+    replica_subentry_check(r->repl_root, replica_get_rid(r));
dc8c34
     /* Replica came back online, Check if the total update was terminated.
dc8c34
        If flag is still set, it was not terminated, therefore the data is
dc8c34
        very likely to be incorrect, and we should not restart Replication threads...
dc8c34
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
dc8c34
index e514dc6..0143e19 100644
dc8c34
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
dc8c34
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
dc8c34
@@ -335,6 +335,9 @@ repl5_tot_run(Private_Repl_Protocol *prp)
dc8c34
 	int portnum = 0;
dc8c34
 	Slapi_DN *area_sdn = NULL;
dc8c34
 	CSN *remote_schema_csn = NULL;
dc8c34
+	int init_retry = 0;
dc8c34
+	Replica *replica;
dc8c34
+	ReplicaId rid = 0; /* Used to create the replica keep alive subentry */
dc8c34
 	
dc8c34
 	PR_ASSERT(NULL != prp);
dc8c34
 
dc8c34
@@ -412,7 +415,15 @@ repl5_tot_run(Private_Repl_Protocol *prp)
dc8c34
     ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *));
dc8c34
     ctrls[0] = create_managedsait_control ();
dc8c34
     ctrls[1] = create_backend_control(area_sdn);
dc8c34
-	
dc8c34
+
dc8c34
+	/* Time to make sure it exists a keep alive subentry for that replica */
dc8c34
+	replica = (Replica*) object_get_data(prp->replica_object);
dc8c34
+	if (replica)
dc8c34
+	{
dc8c34
+		rid = replica_get_rid(replica);
dc8c34
+	}
dc8c34
+	replica_subentry_check(area_sdn, rid);
dc8c34
+
dc8c34
     slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn), 
dc8c34
                                   LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, 
dc8c34
                                   repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
dc8c34
-- 
dc8c34
1.9.3
dc8c34