|
|
f92ce9 |
From 5b0283a5a5b12c9b2ccee049ddc611decaa07a09 Mon Sep 17 00:00:00 2001
|
|
|
f92ce9 |
From: "Thierry bordaz (tbordaz)" <tbordaz@redhat.com>
|
|
|
f92ce9 |
Date: Mon, 15 Dec 2014 15:12:35 +0100
|
|
|
f92ce9 |
Subject: [PATCH 32/53] Ticket 47942: DS hangs during online total update
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Bug Description:
|
|
|
f92ce9 |
During incremental or total update of a consumer the replica agreement thread may hang.
|
|
|
f92ce9 |
For total update:
|
|
|
f92ce9 |
The replica agreement thread that send the entries flowed the consumer that is not
|
|
|
f92ce9 |
able to process fast enough the entries. So the TCP connection get full and
|
|
|
f92ce9 |
the RA sender sleep on the connection to be able to write the next entries.
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Sleeping on the poll or write the RA.sender holds the connection lock.
|
|
|
f92ce9 |
|
|
|
f92ce9 |
It prevents the replica agreement result thread to read the results from the
|
|
|
f92ce9 |
network. So the consumer is also halted because is can no longer send the results.
|
|
|
f92ce9 |
|
|
|
f92ce9 |
For incrementatl update:
|
|
|
f92ce9 |
During incremental update, all updates are sent by the RA.sender.
|
|
|
f92ce9 |
If many updates need to be send, the supplier may overflow the consumer
|
|
|
f92ce9 |
that is very late. This flow of updates can fill the TCP connection
|
|
|
f92ce9 |
so that the RA.sender hang when writing the next update.
|
|
|
f92ce9 |
On the hang, it holds the connection lock preventing the RA.reader
|
|
|
f92ce9 |
to receive the acks. And so the consumer can also hang trying to send the
|
|
|
f92ce9 |
acks.
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Fix Description:
|
|
|
f92ce9 |
For total update there are two parts of the fix:
|
|
|
f92ce9 |
|
|
|
f92ce9 |
To prevent the RA.sender to sleep too long on the poll, the fix (conn_is_available)
|
|
|
f92ce9 |
splits the RA.timeout into 1s period.
|
|
|
f92ce9 |
If unable to write for 1s, it releases the connection for a short period of time 100ms.
|
|
|
f92ce9 |
|
|
|
f92ce9 |
To prevent the RA.sender to sleep on the write, the fix (check_flow_control_tot_init)
|
|
|
f92ce9 |
checks how late is the consumer and if it is too late, it pauses (releasing the connection
|
|
|
f92ce9 |
during that time). This second part of the fix is configurable and it may need to be
|
|
|
f92ce9 |
tune according to the observed failures.
|
|
|
f92ce9 |
|
|
|
f92ce9 |
For incremental update:
|
|
|
f92ce9 |
The fix is to implement a flow control on the RA.sender.
|
|
|
f92ce9 |
After each sent update, if the window (update.sent - update.acked) cross the limit
|
|
|
f92ce9 |
The RA.sender pause during a configured delay.
|
|
|
f92ce9 |
When the RA.sender pause it does not hold the connection lock
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Tuning can be done with nsds5ReplicaFlowControlWindow (how late is the consumer in terms of
|
|
|
f92ce9 |
number of entries/updates acknowledged) and nsds5ReplicaFlowControlPause (how long the RA.sender will
|
|
|
f92ce9 |
pause if the consumer is too late)
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Logging:
|
|
|
f92ce9 |
For total update, the first time the flow control pauses, it logs a message (FATAL level).
|
|
|
f92ce9 |
If flow control happened, then at the end of the total update, it also logs the number
|
|
|
f92ce9 |
of flow control pauses (FATAL level).
|
|
|
f92ce9 |
|
|
|
f92ce9 |
For incremental update, if flow control happened it logs the number of pause (REPL level).
|
|
|
f92ce9 |
|
|
|
f92ce9 |
https://fedorahosted.org/389/ticket/47942
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Reviewed by: Mark Reynolds, Rich Megginson, Andrey Ivanov, Noriko Hosoi (many many thanks to all of you !)
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Platforms tested: RHEL 7.0, Centos
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Flag Day: no
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Doc impact: no
|
|
|
f92ce9 |
---
|
|
|
f92ce9 |
ldap/schema/01core389.ldif | 4 +-
|
|
|
f92ce9 |
ldap/servers/plugins/replication/repl5.h | 10 ++
|
|
|
f92ce9 |
ldap/servers/plugins/replication/repl5_agmt.c | 160 ++++++++++++++++++++
|
|
|
f92ce9 |
ldap/servers/plugins/replication/repl5_agmtlist.c | 26 ++++
|
|
|
f92ce9 |
.../servers/plugins/replication/repl5_connection.c | 163 ++++++++++++++++++++-
|
|
|
f92ce9 |
.../plugins/replication/repl5_inc_protocol.c | 32 +++-
|
|
|
f92ce9 |
.../plugins/replication/repl5_prot_private.h | 2 +
|
|
|
f92ce9 |
.../plugins/replication/repl5_tot_protocol.c | 53 ++++++-
|
|
|
f92ce9 |
ldap/servers/plugins/replication/repl_globals.c | 2 +
|
|
|
f92ce9 |
9 files changed, 446 insertions(+), 6 deletions(-)
|
|
|
f92ce9 |
|
|
|
f92ce9 |
diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif
|
|
|
f92ce9 |
index c7aec70..c59d762 100644
|
|
|
f92ce9 |
--- a/ldap/schema/01core389.ldif
|
|
|
f92ce9 |
+++ b/ldap/schema/01core389.ldif
|
|
|
f92ce9 |
@@ -302,6 +302,8 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2306 NAME 'nsslapd-return-default-opattr
|
|
|
f92ce9 |
attributeTypes: ( 2.16.840.1.113730.3.1.2307 NAME 'nsslapd-allow-hashed-passwords' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
attributeTypes: ( 2.16.840.1.113730.3.1.2308 NAME 'nstombstonecsn' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
attributeTypes: ( 2.16.840.1.113730.3.1.2309 NAME 'nsds5ReplicaPreciseTombstonePurging' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
+attributeTypes: ( 2.16.840.1.113730.3.1.2310 NAME 'nsds5ReplicaFlowControlWindow' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
+attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
#
|
|
|
f92ce9 |
# objectclasses
|
|
|
f92ce9 |
#
|
|
|
f92ce9 |
@@ -313,7 +315,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape d
|
|
|
f92ce9 |
objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nstombstonecsn $ nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
-objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout ) X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
+objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause ) X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
objectClasses: ( 2.16.840.1.113730.3.2.317 NAME 'nsSaslMapping' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSaslMapRegexString $ nsSaslMapBaseDNTemplate $ nsSaslMapFilterTemplate ) MAY ( nsSaslMapPriority ) X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
objectClasses: ( 2.16.840.1.113730.3.2.43 NAME 'nsSNMP' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSNMPEnabled ) MAY ( nsSNMPOrganization $ nsSNMPLocation $ nsSNMPContact $ nsSNMPDescription $ nsSNMPName $ nsSNMPMasterHost $ nsSNMPMasterPort ) X-ORIGIN 'Netscape Directory Server' )
|
|
|
f92ce9 |
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
|
|
|
f92ce9 |
index 86c77ce..e2b6209 100644
|
|
|
f92ce9 |
--- a/ldap/servers/plugins/replication/repl5.h
|
|
|
f92ce9 |
+++ b/ldap/servers/plugins/replication/repl5.h
|
|
|
f92ce9 |
@@ -170,6 +170,8 @@ extern const char *type_nsds5ReplicaBusyWaitTime;
|
|
|
f92ce9 |
extern const char *type_nsds5ReplicaSessionPauseTime;
|
|
|
f92ce9 |
extern const char *type_nsds5ReplicaEnabled;
|
|
|
f92ce9 |
extern const char *type_nsds5ReplicaStripAttrs;
|
|
|
f92ce9 |
+extern const char *type_nsds5ReplicaFlowControlWindow;
|
|
|
f92ce9 |
+extern const char *type_nsds5ReplicaFlowControlPause;
|
|
|
f92ce9 |
extern const char *type_replicaProtocolTimeout;
|
|
|
f92ce9 |
extern const char *type_replicaBackoffMin;
|
|
|
f92ce9 |
extern const char *type_replicaBackoffMax;
|
|
|
f92ce9 |
@@ -332,6 +334,8 @@ int agmt_get_auto_initialize(const Repl_Agmt *ra);
|
|
|
f92ce9 |
long agmt_get_timeout(const Repl_Agmt *ra);
|
|
|
f92ce9 |
long agmt_get_busywaittime(const Repl_Agmt *ra);
|
|
|
f92ce9 |
long agmt_get_pausetime(const Repl_Agmt *ra);
|
|
|
f92ce9 |
+long agmt_get_flowcontrolwindow(const Repl_Agmt *ra);
|
|
|
f92ce9 |
+long agmt_get_flowcontrolpause(const Repl_Agmt *ra);
|
|
|
f92ce9 |
int agmt_start(Repl_Agmt *ra);
|
|
|
f92ce9 |
int windows_agmt_start(Repl_Agmt *ra);
|
|
|
f92ce9 |
int agmt_stop(Repl_Agmt *ra);
|
|
|
f92ce9 |
@@ -352,6 +356,8 @@ int agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name);
|
|
|
f92ce9 |
int agmt_schedule_in_window_now(const Repl_Agmt *ra);
|
|
|
f92ce9 |
int agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
|
|
|
f92ce9 |
int agmt_set_timeout_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
|
|
|
f92ce9 |
+int agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
|
|
|
f92ce9 |
+int agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
|
|
|
f92ce9 |
int agmt_set_busywaittime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
|
|
|
f92ce9 |
int agmt_set_pausetime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
|
|
|
f92ce9 |
int agmt_set_credentials_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
|
|
|
f92ce9 |
@@ -490,6 +496,10 @@ void conn_lock(Repl_Connection *conn);
|
|
|
f92ce9 |
void conn_unlock(Repl_Connection *conn);
|
|
|
f92ce9 |
void conn_delete_internal_ext(Repl_Connection *conn);
|
|
|
f92ce9 |
const char* conn_get_bindmethod(Repl_Connection *conn);
|
|
|
f92ce9 |
+void conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data);
|
|
|
f92ce9 |
+void conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data);
|
|
|
f92ce9 |
+void conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data);
|
|
|
f92ce9 |
+void conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data);
|
|
|
f92ce9 |
|
|
|
f92ce9 |
/* In repl5_protocol.c */
|
|
|
f92ce9 |
typedef struct repl_protocol Repl_Protocol;
|
|
|
f92ce9 |
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c
|
|
|
f92ce9 |
index 7c5c37c..91be757 100644
|
|
|
f92ce9 |
--- a/ldap/servers/plugins/replication/repl5_agmt.c
|
|
|
f92ce9 |
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
|
|
|
f92ce9 |
@@ -87,6 +87,8 @@
|
|
|
f92ce9 |
#include "slapi-plugin.h"
|
|
|
f92ce9 |
|
|
|
f92ce9 |
#define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */
|
|
|
f92ce9 |
+#define DEFAULT_FLOWCONTROL_WINDOW 1000 /* #entries sent without acknowledgment */
|
|
|
f92ce9 |
+#define DEFAULT_FLOWCONTROL_PAUSE 2000 /* msec of pause when #entries sent witout acknowledgment */
|
|
|
f92ce9 |
#define STATUS_LEN 1024
|
|
|
f92ce9 |
|
|
|
f92ce9 |
struct changecounter {
|
|
|
f92ce9 |
@@ -145,6 +147,12 @@ typedef struct repl5agmt {
|
|
|
f92ce9 |
int agreement_type;
|
|
|
f92ce9 |
Slapi_Counter *protocol_timeout;
|
|
|
f92ce9 |
char *maxcsn; /* agmt max csn */
|
|
|
f92ce9 |
+ long flowControlWindow; /* This is the maximum number of entries
|
|
|
f92ce9 |
+ * sent without acknowledgment
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+ long flowControlPause; /* When nb of not acknowledged entries overpass totalUpdateWindow
|
|
|
f92ce9 |
+ * This is the duration (in msec) that the RA will pause before sending the next entry
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
Slapi_RWLock *attr_lock; /* RW lock for all the stripped attrs */
|
|
|
f92ce9 |
} repl5agmt;
|
|
|
f92ce9 |
|
|
|
f92ce9 |
@@ -345,6 +353,28 @@ agmt_new_from_entry(Slapi_Entry *e)
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
|
|
|
f92ce9 |
+ /* flow control update window. */
|
|
|
f92ce9 |
+ ra->flowControlWindow = DEFAULT_FLOWCONTROL_WINDOW;
|
|
|
f92ce9 |
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr) == 0)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ Slapi_Value *sval;
|
|
|
f92ce9 |
+ if (slapi_attr_first_value(sattr, &sval) == 0)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ ra->flowControlWindow = slapi_value_get_long(sval);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ /* flow control update pause. */
|
|
|
f92ce9 |
+ ra->flowControlPause = DEFAULT_FLOWCONTROL_PAUSE;
|
|
|
f92ce9 |
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr) == 0)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ Slapi_Value *sval;
|
|
|
f92ce9 |
+ if (slapi_attr_first_value(sattr, &sval) == 0)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ ra->flowControlPause = slapi_value_get_long(sval);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
/* DN of entry at root of replicated area */
|
|
|
f92ce9 |
tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot);
|
|
|
f92ce9 |
if (NULL != tmpstr)
|
|
|
f92ce9 |
@@ -1014,6 +1044,26 @@ agmt_get_pausetime(const Repl_Agmt *ra)
|
|
|
f92ce9 |
return return_value;
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
|
|
|
f92ce9 |
+long
|
|
|
f92ce9 |
+agmt_get_flowcontrolwindow(const Repl_Agmt *ra)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ long return_value;
|
|
|
f92ce9 |
+ PR_ASSERT(NULL != ra);
|
|
|
f92ce9 |
+ PR_Lock(ra->lock);
|
|
|
f92ce9 |
+ return_value = ra->flowControlWindow;
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+ return return_value;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+long
|
|
|
f92ce9 |
+agmt_get_flowcontrolpause(const Repl_Agmt *ra)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ long return_value;
|
|
|
f92ce9 |
+ PR_ASSERT(NULL != ra);
|
|
|
f92ce9 |
+ PR_Lock(ra->lock);
|
|
|
f92ce9 |
+ return_value = ra->flowControlPause;
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+ return return_value;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
/*
|
|
|
f92ce9 |
* Warning - reference to the long name of the agreement is returned.
|
|
|
f92ce9 |
* The long name of an agreement is the DN of the agreement entry,
|
|
|
f92ce9 |
@@ -1775,6 +1825,90 @@ agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
|
|
|
f92ce9 |
return return_value;
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
|
|
|
f92ce9 |
+/*
|
|
|
f92ce9 |
+ * Set or reset the windows of entries sent without acknowledgment.
|
|
|
f92ce9 |
+ * The window is used during update to determine the number of
|
|
|
f92ce9 |
+ * entries will be send by the replica agreement without acknowledgment from the consumer
|
|
|
f92ce9 |
+ *
|
|
|
f92ce9 |
+ * Returns 0 if window set, or -1 if an error occurred.
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+int
|
|
|
f92ce9 |
+agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ Slapi_Attr *sattr = NULL;
|
|
|
f92ce9 |
+ int return_value = -1;
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ PR_ASSERT(NULL != ra);
|
|
|
f92ce9 |
+ PR_Lock(ra->lock);
|
|
|
f92ce9 |
+ if (ra->stop_in_progress)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+ return return_value;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr);
|
|
|
f92ce9 |
+ if (NULL != sattr)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ Slapi_Value *sval = NULL;
|
|
|
f92ce9 |
+ slapi_attr_first_value(sattr, &sval);
|
|
|
f92ce9 |
+ if (NULL != sval)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ long tmpval = slapi_value_get_long(sval);
|
|
|
f92ce9 |
+ if (tmpval >= 0) {
|
|
|
f92ce9 |
+ ra->flowControlWindow = tmpval;
|
|
|
f92ce9 |
+ return_value = 0; /* success! */
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+ if (return_value == 0)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ return return_value;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+/*
|
|
|
f92ce9 |
+ * Set or reset the pause duration when #entries sent without acknowledgment overpass flow control window
|
|
|
f92ce9 |
+ *
|
|
|
f92ce9 |
+ * Returns 0 if pause set, or -1 if an error occurred.
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+int
|
|
|
f92ce9 |
+agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ Slapi_Attr *sattr = NULL;
|
|
|
f92ce9 |
+ int return_value = -1;
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ PR_ASSERT(NULL != ra);
|
|
|
f92ce9 |
+ PR_Lock(ra->lock);
|
|
|
f92ce9 |
+ if (ra->stop_in_progress)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+ return return_value;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr);
|
|
|
f92ce9 |
+ if (NULL != sattr)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ Slapi_Value *sval = NULL;
|
|
|
f92ce9 |
+ slapi_attr_first_value(sattr, &sval);
|
|
|
f92ce9 |
+ if (NULL != sval)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ long tmpval = slapi_value_get_long(sval);
|
|
|
f92ce9 |
+ if (tmpval >= 0) {
|
|
|
f92ce9 |
+ ra->flowControlPause = tmpval;
|
|
|
f92ce9 |
+ return_value = 0; /* success! */
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+ if (return_value == 0)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ return return_value;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
int
|
|
|
f92ce9 |
agmt_set_timeout(Repl_Agmt *ra, long timeout)
|
|
|
f92ce9 |
{
|
|
|
f92ce9 |
@@ -1788,6 +1922,32 @@ agmt_set_timeout(Repl_Agmt *ra, long timeout)
|
|
|
f92ce9 |
|
|
|
f92ce9 |
return 0;
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
+int
|
|
|
f92ce9 |
+agmt_set_flowcontrolwindow(Repl_Agmt *ra, long window)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ PR_Lock(ra->lock);
|
|
|
f92ce9 |
+ if (ra->stop_in_progress){
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+ return -1;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ ra->flowControlWindow = window;
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ return 0;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+int
|
|
|
f92ce9 |
+agmt_set_flowcontrolpause(Repl_Agmt *ra, long pause)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ PR_Lock(ra->lock);
|
|
|
f92ce9 |
+ if (ra->stop_in_progress){
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+ return -1;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ ra->flowControlPause = pause;
|
|
|
f92ce9 |
+ PR_Unlock(ra->lock);
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ return 0;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
|
|
|
f92ce9 |
/*
|
|
|
f92ce9 |
* Set or reset the busywaittime
|
|
|
f92ce9 |
diff --git a/ldap/servers/plugins/replication/repl5_agmtlist.c b/ldap/servers/plugins/replication/repl5_agmtlist.c
|
|
|
f92ce9 |
index 8a70055..5eead07 100644
|
|
|
f92ce9 |
--- a/ldap/servers/plugins/replication/repl5_agmtlist.c
|
|
|
f92ce9 |
+++ b/ldap/servers/plugins/replication/repl5_agmtlist.c
|
|
|
f92ce9 |
@@ -330,6 +330,32 @@ agmtlist_modify_callback(Slapi_PBlock *pb, Slapi_Entry *entryBefore, Slapi_Entry
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
else if (slapi_attr_types_equivalent(mods[i]->mod_type,
|
|
|
f92ce9 |
+ type_nsds5ReplicaFlowControlWindow))
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ /* New replica timeout */
|
|
|
f92ce9 |
+ if (agmt_set_flowcontrolwindow_from_entry(agmt, e) != 0)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: "
|
|
|
f92ce9 |
+ "failed to update the flow control window for agreement %s\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(agmt));
|
|
|
f92ce9 |
+ *returncode = LDAP_OPERATIONS_ERROR;
|
|
|
f92ce9 |
+ rc = SLAPI_DSE_CALLBACK_ERROR;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ else if (slapi_attr_types_equivalent(mods[i]->mod_type,
|
|
|
f92ce9 |
+ type_nsds5ReplicaFlowControlPause))
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ /* New replica timeout */
|
|
|
f92ce9 |
+ if (agmt_set_flowcontrolpause_from_entry(agmt, e) != 0)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: "
|
|
|
f92ce9 |
+ "failed to update the flow control pause for agreement %s\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(agmt));
|
|
|
f92ce9 |
+ *returncode = LDAP_OPERATIONS_ERROR;
|
|
|
f92ce9 |
+ rc = SLAPI_DSE_CALLBACK_ERROR;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ else if (slapi_attr_types_equivalent(mods[i]->mod_type,
|
|
|
f92ce9 |
type_nsds5ReplicaBusyWaitTime))
|
|
|
f92ce9 |
{
|
|
|
f92ce9 |
/* New replica busywaittime */
|
|
|
f92ce9 |
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
|
|
|
f92ce9 |
index c004bfb..2971025 100644
|
|
|
f92ce9 |
--- a/ldap/servers/plugins/replication/repl5_connection.c
|
|
|
f92ce9 |
+++ b/ldap/servers/plugins/replication/repl5_connection.c
|
|
|
f92ce9 |
@@ -52,6 +52,7 @@ replica locked. Seems like right thing to do.
|
|
|
f92ce9 |
*/
|
|
|
f92ce9 |
|
|
|
f92ce9 |
#include "repl5.h"
|
|
|
f92ce9 |
+#include "repl5_prot_private.h"
|
|
|
f92ce9 |
#include "slapi-private.h"
|
|
|
f92ce9 |
#if defined(USE_OPENLDAP)
|
|
|
f92ce9 |
#include "ldap.h"
|
|
|
f92ce9 |
@@ -91,6 +92,7 @@ typedef struct repl_connection
|
|
|
f92ce9 |
struct timeval timeout;
|
|
|
f92ce9 |
int flag_agmt_changed;
|
|
|
f92ce9 |
char *plain;
|
|
|
f92ce9 |
+ void *tot_init_callback; /* Used during total update to do flow control */
|
|
|
f92ce9 |
} repl_connection;
|
|
|
f92ce9 |
|
|
|
f92ce9 |
/* #define DEFAULT_LINGER_TIME (5 * 60) */ /* 5 minutes */
|
|
|
f92ce9 |
@@ -274,6 +276,32 @@ conn_delete(Repl_Connection *conn)
|
|
|
f92ce9 |
PR_Unlock(conn->lock);
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
|
|
|
f92ce9 |
+void
|
|
|
f92ce9 |
+conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ conn->tot_init_callback = (void *) cb_data;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+void
|
|
|
f92ce9 |
+conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ PR_Lock(conn->lock);
|
|
|
f92ce9 |
+ conn_set_tot_update_cb_nolock(conn, cb_data);
|
|
|
f92ce9 |
+ PR_Unlock(conn->lock);
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+void
|
|
|
f92ce9 |
+conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ *cb_data = (void *) conn->tot_init_callback;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+void
|
|
|
f92ce9 |
+conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ PR_Lock(conn->lock);
|
|
|
f92ce9 |
+ conn_get_tot_update_cb_nolock(conn, cb_data);
|
|
|
f92ce9 |
+ PR_Unlock(conn->lock);
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
/*
|
|
|
f92ce9 |
* Return the last operation type processed by the connection
|
|
|
f92ce9 |
* object, and the LDAP error encountered.
|
|
|
f92ce9 |
@@ -640,6 +668,131 @@ see_if_write_available(Repl_Connection *conn, PRIntervalTime timeout)
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
#endif /* ! USE_OPENLDAP */
|
|
|
f92ce9 |
|
|
|
f92ce9 |
+/*
|
|
|
f92ce9 |
+ * During a total update, this function checks how much entries
|
|
|
f92ce9 |
+ * have been sent to the consumer without having received their acknowledgment.
|
|
|
f92ce9 |
+ * Basically it checks how late is the consumer.
|
|
|
f92ce9 |
+ *
|
|
|
f92ce9 |
+ * If the consumer is too late, it pause the RA.sender (releasing the lock) to
|
|
|
f92ce9 |
+ * let the consumer to catch up and RA.reader to receive the acknowledgments.
|
|
|
f92ce9 |
+ *
|
|
|
f92ce9 |
+ * Caller must hold conn->lock
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+static void
|
|
|
f92ce9 |
+check_flow_control_tot_init(Repl_Connection *conn, int optype, const char *extop_oid, int sent_msgid)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ int rcv_msgid;
|
|
|
f92ce9 |
+ int once;
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ if ((sent_msgid != 0) && (optype == CONN_EXTENDED_OPERATION) && (strcmp(extop_oid, REPL_NSDS50_REPLICATION_ENTRY_REQUEST_OID) == 0)) {
|
|
|
f92ce9 |
+ /* We are sending entries part of the total update of a consumer
|
|
|
f92ce9 |
+ * Wait a bit if the consumer needs to catchup from the current sent entries
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+ rcv_msgid = repl5_tot_last_rcv_msgid(conn);
|
|
|
f92ce9 |
+ if (rcv_msgid == -1) {
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
f92ce9 |
+ "%s: check_flow_control_tot_init no callback data [ msgid sent: %d]\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(conn->agmt),
|
|
|
f92ce9 |
+ sent_msgid);
|
|
|
f92ce9 |
+ } else if (sent_msgid < rcv_msgid) {
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
|
|
|
f92ce9 |
+ "%s: check_flow_control_tot_init invalid message ids [ msgid sent: %d, rcv: %d]\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(conn->agmt),
|
|
|
f92ce9 |
+ sent_msgid,
|
|
|
f92ce9 |
+ rcv_msgid);
|
|
|
f92ce9 |
+ } else if ((sent_msgid - rcv_msgid) > agmt_get_flowcontrolwindow(conn->agmt)) {
|
|
|
f92ce9 |
+ int totalUpdatePause;
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ totalUpdatePause = agmt_get_flowcontrolpause(conn->agmt);
|
|
|
f92ce9 |
+ if (totalUpdatePause) {
|
|
|
f92ce9 |
+ /* The consumer is late. Last sent entry compare to last acknowledged entry
|
|
|
f92ce9 |
+ * overpass the allowed limit (flowcontrolwindow)
|
|
|
f92ce9 |
+ * Give some time to the consumer to catch up
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+ once = repl5_tot_flowcontrol_detection(conn, 1);
|
|
|
f92ce9 |
+ PR_Unlock(conn->lock);
|
|
|
f92ce9 |
+ if (once == 1) {
|
|
|
f92ce9 |
+ /* This is the first time we hit total update flow control.
|
|
|
f92ce9 |
+ * Log it at least once to inform administrator there is
|
|
|
f92ce9 |
+ * a potential configuration issue here
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
f92ce9 |
+ "%s: Total update flow control gives time (%d msec) to the consumer before sending more entries [ msgid sent: %d, rcv: %d])\n"
|
|
|
f92ce9 |
+ "If total update fails you can try to increase %s and/or decrease %s in the replica agreement configuration\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(conn->agmt),
|
|
|
f92ce9 |
+ totalUpdatePause,
|
|
|
f92ce9 |
+ sent_msgid,
|
|
|
f92ce9 |
+ rcv_msgid,
|
|
|
f92ce9 |
+ type_nsds5ReplicaFlowControlPause,
|
|
|
f92ce9 |
+ type_nsds5ReplicaFlowControlWindow);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ DS_Sleep(PR_MillisecondsToInterval(totalUpdatePause));
|
|
|
f92ce9 |
+ PR_Lock(conn->lock);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+/*
|
|
|
f92ce9 |
+ * Test if the connection is available to do a write.
|
|
|
f92ce9 |
+ * This function is doing a periodic polling of the connection.
|
|
|
f92ce9 |
+ * If the polling times out:
|
|
|
f92ce9 |
+ * - it releases the connection lock (to let other thread ,i.e.
|
|
|
f92ce9 |
+ * replication result thread, the opportunity to use the connection)
|
|
|
f92ce9 |
+ * - Sleeps for a short period (100ms)
|
|
|
f92ce9 |
+ * - acquires the connection lock
|
|
|
f92ce9 |
+ *
|
|
|
f92ce9 |
+ * It loops until
|
|
|
f92ce9 |
+ * - it is available
|
|
|
f92ce9 |
+ * - exceeds RA complete timeout
|
|
|
f92ce9 |
+ * - server is shutdown
|
|
|
f92ce9 |
+ * - connection is disconnected (Disable, stop, delete the RA
|
|
|
f92ce9 |
+ * 'terminate' the replication protocol and disconnect the connection)
|
|
|
f92ce9 |
+ *
|
|
|
f92ce9 |
+ * Return:
|
|
|
f92ce9 |
+ * - CONN_OPERATION_SUCCESS if the connection is available
|
|
|
f92ce9 |
+ * - CONN_TIMEOUT if the overall polling/sleeping delay exceeds RA timeout
|
|
|
f92ce9 |
+ * - CONN_NOT_CONNECTED if the replication connection state is disconnected
|
|
|
f92ce9 |
+ * - other ConnResult
|
|
|
f92ce9 |
+ *
|
|
|
f92ce9 |
+ * Caller must hold conn->Lock. At the exit, conn->lock is held
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+static ConnResult
|
|
|
f92ce9 |
+conn_is_available(Repl_Connection *conn)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ time_t poll_timeout_sec = 1; /* Polling for 1sec */
|
|
|
f92ce9 |
+ time_t yield_delay_msec = 100; /* Delay to wait */
|
|
|
f92ce9 |
+ time_t start_time = time( NULL );
|
|
|
f92ce9 |
+ time_t time_now;
|
|
|
f92ce9 |
+ ConnResult return_value = CONN_OPERATION_SUCCESS;
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ while (!slapi_is_shutting_down() && (conn->state != STATE_DISCONNECTED)) {
|
|
|
f92ce9 |
+ return_value = see_if_write_available(conn, PR_SecondsToInterval(poll_timeout_sec));
|
|
|
f92ce9 |
+ if (return_value == CONN_TIMEOUT) {
|
|
|
f92ce9 |
+ /* in case of timeout we return CONN_TIMEOUT only
|
|
|
f92ce9 |
+ * if the RA.timeout is exceeded
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+ time_now = time(NULL);
|
|
|
f92ce9 |
+ if (conn->timeout.tv_sec <= (time_now - start_time)) {
|
|
|
f92ce9 |
+ break;
|
|
|
f92ce9 |
+ } else {
|
|
|
f92ce9 |
+ /* Else give connection to others threads */
|
|
|
f92ce9 |
+ PR_Unlock(conn->lock);
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
|
|
|
f92ce9 |
+ "%s: perform_operation transient timeout. retry)\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(conn->agmt));
|
|
|
f92ce9 |
+ DS_Sleep(PR_MillisecondsToInterval(yield_delay_msec));
|
|
|
f92ce9 |
+ PR_Lock(conn->lock);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ } else {
|
|
|
f92ce9 |
+ break;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ if (conn->state == STATE_DISCONNECTED) {
|
|
|
f92ce9 |
+ return_value = CONN_NOT_CONNECTED;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ return return_value;
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
/*
|
|
|
f92ce9 |
* Common code to send an LDAPv3 operation and collect the result.
|
|
|
f92ce9 |
* Return values:
|
|
|
f92ce9 |
@@ -683,10 +836,13 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
|
|
|
f92ce9 |
|
|
|
f92ce9 |
Slapi_Eq_Context eqctx = repl5_start_debug_timeout(&setlevel);
|
|
|
f92ce9 |
|
|
|
f92ce9 |
- return_value = see_if_write_available(
|
|
|
f92ce9 |
- conn, PR_SecondsToInterval(conn->timeout.tv_sec));
|
|
|
f92ce9 |
+ return_value = conn_is_available(conn);
|
|
|
f92ce9 |
if (return_value != CONN_OPERATION_SUCCESS) {
|
|
|
f92ce9 |
PR_Unlock(conn->lock);
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
f92ce9 |
+ "%s: perform_operation connection is not available (%d)\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(conn->agmt),
|
|
|
f92ce9 |
+ return_value);
|
|
|
f92ce9 |
return return_value;
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
conn->last_operation = optype;
|
|
|
f92ce9 |
@@ -758,6 +914,9 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
|
|
|
f92ce9 |
*/
|
|
|
f92ce9 |
return_value = CONN_NOT_CONNECTED;
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ check_flow_control_tot_init(conn, optype, extop_oid, msgid);
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
PR_Unlock(conn->lock); /* release the lock */
|
|
|
f92ce9 |
if (message_id)
|
|
|
f92ce9 |
{
|
|
|
f92ce9 |
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
|
|
|
f92ce9 |
index 3bb68e7..b867fc4 100644
|
|
|
f92ce9 |
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
|
|
|
f92ce9 |
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
|
|
|
f92ce9 |
@@ -108,6 +108,7 @@ typedef struct result_data
|
|
|
f92ce9 |
int stop_result_thread; /* Flag used to tell the result thread to exit */
|
|
|
f92ce9 |
int last_message_id_sent;
|
|
|
f92ce9 |
int last_message_id_received;
|
|
|
f92ce9 |
+ int flowcontrol_detection;
|
|
|
f92ce9 |
int result; /* The UPDATE_TRANSIENT_ERROR etc */
|
|
|
f92ce9 |
} result_data;
|
|
|
f92ce9 |
|
|
|
f92ce9 |
@@ -460,6 +461,23 @@ repl5_inc_destroy_async_result_thread(result_data *rd)
|
|
|
f92ce9 |
return retval;
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
|
|
|
f92ce9 |
+/* The interest of this routine is to give time to the consumer
|
|
|
f92ce9 |
+ * to apply the sent updates and return the acks.
|
|
|
f92ce9 |
+ * So the caller should not hold the replication connection lock
|
|
|
f92ce9 |
+ * to let the RA.reader receives the acks.
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+static void
|
|
|
f92ce9 |
+repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ PR_Lock(rd->lock);
|
|
|
f92ce9 |
+ if ((rd->last_message_id_received <= rd->last_message_id_sent) &&
|
|
|
f92ce9 |
+ ((rd->last_message_id_sent - rd->last_message_id_received) >= agmt_get_flowcontrolwindow(agmt))) {
|
|
|
f92ce9 |
+ rd->flowcontrol_detection++;
|
|
|
f92ce9 |
+ DS_Sleep(PR_MillisecondsToInterval(agmt_get_flowcontrolpause(agmt)));
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ PR_Unlock(rd->lock);
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
static void
|
|
|
f92ce9 |
repl5_inc_waitfor_async_results(result_data *rd)
|
|
|
f92ce9 |
{
|
|
|
f92ce9 |
@@ -1683,7 +1701,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
f92ce9 |
{
|
|
|
f92ce9 |
int finished = 0;
|
|
|
f92ce9 |
ConnResult replay_crc;
|
|
|
f92ce9 |
- char csn_str[CSN_STRSIZE];
|
|
|
f92ce9 |
+ char csn_str[CSN_STRSIZE];
|
|
|
f92ce9 |
|
|
|
f92ce9 |
/* Start the results reading thread */
|
|
|
f92ce9 |
rd = repl5_inc_rd_new(prp);
|
|
|
f92ce9 |
@@ -1818,6 +1836,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
f92ce9 |
sop->replica_id = replica_id;
|
|
|
f92ce9 |
PL_strncpyz(sop->uniqueid, uniqueid, sizeof(sop->uniqueid));
|
|
|
f92ce9 |
repl5_int_push_operation(rd,sop);
|
|
|
f92ce9 |
+ repl5_inc_flow_control_results(prp->agmt, rd);
|
|
|
f92ce9 |
} else {
|
|
|
f92ce9 |
slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
|
|
|
f92ce9 |
"%s: Skipping update operation with no message_id (uniqueid %s, CSN %s):\n",
|
|
|
f92ce9 |
@@ -1906,6 +1925,17 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
*num_changes_sent = rd->num_changes_sent;
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
+ PR_Lock(rd->lock);
|
|
|
f92ce9 |
+ if (rd->flowcontrol_detection) {
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
|
|
|
f92ce9 |
+ "%s: Incremental update flow control triggered %d times\n"
|
|
|
f92ce9 |
+ "You may increase %s and/or decrease %s in the replica agreement configuration\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(prp->agmt),
|
|
|
f92ce9 |
+ rd->flowcontrol_detection,
|
|
|
f92ce9 |
+ type_nsds5ReplicaFlowControlPause,
|
|
|
f92ce9 |
+ type_nsds5ReplicaFlowControlWindow);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ PR_Unlock(rd->lock);
|
|
|
f92ce9 |
repl5_inc_rd_destroy(&rd);
|
|
|
f92ce9 |
|
|
|
f92ce9 |
cl5_operation_parameters_done ( entry.op );
|
|
|
f92ce9 |
diff --git a/ldap/servers/plugins/replication/repl5_prot_private.h b/ldap/servers/plugins/replication/repl5_prot_private.h
|
|
|
f92ce9 |
index 586e1eb..1b1c00b 100644
|
|
|
f92ce9 |
--- a/ldap/servers/plugins/replication/repl5_prot_private.h
|
|
|
f92ce9 |
+++ b/ldap/servers/plugins/replication/repl5_prot_private.h
|
|
|
f92ce9 |
@@ -79,6 +79,8 @@ typedef struct private_repl_protocol
|
|
|
f92ce9 |
|
|
|
f92ce9 |
extern Private_Repl_Protocol *Repl_5_Inc_Protocol_new();
|
|
|
f92ce9 |
extern Private_Repl_Protocol *Repl_5_Tot_Protocol_new();
|
|
|
f92ce9 |
+extern int repl5_tot_last_rcv_msgid(Repl_Connection *conn);
|
|
|
f92ce9 |
+extern int repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment);
|
|
|
f92ce9 |
extern Private_Repl_Protocol *Windows_Inc_Protocol_new();
|
|
|
f92ce9 |
extern Private_Repl_Protocol *Windows_Tot_Protocol_new();
|
|
|
f92ce9 |
|
|
|
f92ce9 |
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
|
|
|
f92ce9 |
index d4f0fcc..adadd44 100644
|
|
|
f92ce9 |
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
|
|
|
f92ce9 |
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
|
|
|
f92ce9 |
@@ -82,6 +82,7 @@ typedef struct callback_data
|
|
|
f92ce9 |
int stop_result_thread; /* Flag used to tell the result thread to exit */
|
|
|
f92ce9 |
int last_message_id_sent;
|
|
|
f92ce9 |
int last_message_id_received;
|
|
|
f92ce9 |
+ int flowcontrol_detection;
|
|
|
f92ce9 |
} callback_data;
|
|
|
f92ce9 |
|
|
|
f92ce9 |
/*
|
|
|
f92ce9 |
@@ -428,13 +429,19 @@ repl5_tot_run(Private_Repl_Protocol *prp)
|
|
|
f92ce9 |
LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL,
|
|
|
f92ce9 |
repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
|
|
|
f92ce9 |
|
|
|
f92ce9 |
- cb_data.prp = prp;
|
|
|
f92ce9 |
- cb_data.rc = 0;
|
|
|
f92ce9 |
+ cb_data.prp = prp;
|
|
|
f92ce9 |
+ cb_data.rc = 0;
|
|
|
f92ce9 |
cb_data.num_entries = 0UL;
|
|
|
f92ce9 |
cb_data.sleep_on_busy = 0UL;
|
|
|
f92ce9 |
cb_data.last_busy = current_time ();
|
|
|
f92ce9 |
+ cb_data.flowcontrol_detection = 0;
|
|
|
f92ce9 |
cb_data.lock = PR_NewLock();
|
|
|
f92ce9 |
|
|
|
f92ce9 |
+ /* This allows during perform_operation to check the callback data
|
|
|
f92ce9 |
+ * especially to do flow contol on delta send msgid / recv msgid
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+ conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
/* Before we get started on sending entries to the replica, we need to
|
|
|
f92ce9 |
* setup things for async propagation:
|
|
|
f92ce9 |
* 1. Create a thread that will read the LDAP results from the connection.
|
|
|
f92ce9 |
@@ -506,6 +513,17 @@ repl5_tot_run(Private_Repl_Protocol *prp)
|
|
|
f92ce9 |
done:
|
|
|
f92ce9 |
slapi_sdn_free(&area_sdn);
|
|
|
f92ce9 |
slapi_ch_free_string(&hostname);
|
|
|
f92ce9 |
+ if (cb_data.flowcontrol_detection > 1)
|
|
|
f92ce9 |
+ {
|
|
|
f92ce9 |
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
f92ce9 |
+ "%s: Total update flow control triggered %d times\n"
|
|
|
f92ce9 |
+ "You may increase %s and/or decrease %s in the replica agreement configuration\n",
|
|
|
f92ce9 |
+ agmt_get_long_name(prp->agmt),
|
|
|
f92ce9 |
+ cb_data.flowcontrol_detection,
|
|
|
f92ce9 |
+ type_nsds5ReplicaFlowControlPause,
|
|
|
f92ce9 |
+ type_nsds5ReplicaFlowControlWindow);
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+ conn_set_tot_update_cb(prp->conn, NULL);
|
|
|
f92ce9 |
if (cb_data.lock)
|
|
|
f92ce9 |
{
|
|
|
f92ce9 |
PR_DestroyLock(cb_data.lock);
|
|
|
f92ce9 |
@@ -645,6 +663,37 @@ void get_result (int rc, void *cb_data)
|
|
|
f92ce9 |
((callback_data*)cb_data)->rc = rc;
|
|
|
f92ce9 |
}
|
|
|
f92ce9 |
|
|
|
f92ce9 |
+/* Call must hold the connection lock */
|
|
|
f92ce9 |
+int
|
|
|
f92ce9 |
+repl5_tot_last_rcv_msgid(Repl_Connection *conn)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ struct callback_data *cb_data;
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
|
|
|
f92ce9 |
+ if (cb_data == NULL) {
|
|
|
f92ce9 |
+ return -1;
|
|
|
f92ce9 |
+ } else {
|
|
|
f92ce9 |
+ return cb_data->last_message_id_received;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+/* Increase the flowcontrol counter
|
|
|
f92ce9 |
+ * Call must hold the connection lock
|
|
|
f92ce9 |
+ */
|
|
|
f92ce9 |
+int
|
|
|
f92ce9 |
+repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment)
|
|
|
f92ce9 |
+{
|
|
|
f92ce9 |
+ struct callback_data *cb_data;
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
+ conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
|
|
|
f92ce9 |
+ if (cb_data == NULL) {
|
|
|
f92ce9 |
+ return -1;
|
|
|
f92ce9 |
+ } else {
|
|
|
f92ce9 |
+ cb_data->flowcontrol_detection += increment;
|
|
|
f92ce9 |
+ return cb_data->flowcontrol_detection;
|
|
|
f92ce9 |
+ }
|
|
|
f92ce9 |
+}
|
|
|
f92ce9 |
+
|
|
|
f92ce9 |
static
|
|
|
f92ce9 |
int send_entry (Slapi_Entry *e, void *cb_data)
|
|
|
f92ce9 |
{
|
|
|
f92ce9 |
diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c
|
|
|
f92ce9 |
index 5609def..e2157fa 100644
|
|
|
f92ce9 |
--- a/ldap/servers/plugins/replication/repl_globals.c
|
|
|
f92ce9 |
+++ b/ldap/servers/plugins/replication/repl_globals.c
|
|
|
f92ce9 |
@@ -139,6 +139,8 @@ const char *type_nsds5ReplicaBusyWaitTime = "nsds5ReplicaBusyWaitTime";
|
|
|
f92ce9 |
const char *type_nsds5ReplicaSessionPauseTime = "nsds5ReplicaSessionPauseTime";
|
|
|
f92ce9 |
const char *type_nsds5ReplicaEnabled = "nsds5ReplicaEnabled";
|
|
|
f92ce9 |
const char *type_nsds5ReplicaStripAttrs = "nsds5ReplicaStripAttrs";
|
|
|
f92ce9 |
+const char* type_nsds5ReplicaFlowControlWindow = "nsds5ReplicaFlowControlWindow";
|
|
|
f92ce9 |
+const char* type_nsds5ReplicaFlowControlPause = "nsds5ReplicaFlowControlPause";
|
|
|
f92ce9 |
|
|
|
f92ce9 |
/* windows sync specific attributes */
|
|
|
f92ce9 |
const char *type_nsds7WindowsReplicaArea = "nsds7WindowsReplicaSubtree";
|
|
|
f92ce9 |
--
|
|
|
f92ce9 |
1.9.3
|
|
|
f92ce9 |
|