From 2ae48266c49d391664796bd3320319a06408acb8 Mon Sep 17 00:00:00 2001 From: Noriko Hosoi Date: Thu, 2 Apr 2015 13:09:12 -0700 Subject: [PATCH 310/319] Ticket #47942 - DS hangs during online total update Backported the patch in the master branch to 389-ds-base-1.2.11 by Jatin Nansi (jnansi@redhat.com). commit fbafee54dc17e0673004d6d26d739ea1b19dd578 Author: Thierry bordaz (tbordaz) Date: Mon Dec 15 15:12:35 2014 +0100 Ticket 47942: DS hangs during online total update Reviewed by tbordaz@redhat.com and nhosoi@redhat.com. https://fedorahosted.org/389/ticket/47942 (cherry picked from commit 88ecf0c9b43060822e5bc9a3ba38b48438c296e6) --- ldap/schema/01core389.ldif | 4 +- ldap/servers/plugins/replication/repl5.h | 11 ++ ldap/servers/plugins/replication/repl5_agmt.c | 160 ++++++++++++++++++++ ldap/servers/plugins/replication/repl5_agmtlist.c | 26 ++++ .../servers/plugins/replication/repl5_connection.c | 165 ++++++++++++++++++++- .../plugins/replication/repl5_inc_protocol.c | 32 +++- .../plugins/replication/repl5_prot_private.h | 2 + .../plugins/replication/repl5_tot_protocol.c | 52 ++++++- ldap/servers/plugins/replication/repl_globals.c | 3 + 9 files changed, 449 insertions(+), 6 deletions(-) diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif index a1f993f..f8924fa 100644 --- a/ldap/schema/01core389.ldif +++ b/ldap/schema/01core389.ldif @@ -153,6 +153,8 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2152 NAME 'nsds5ReplicaProtocolTimeout' attributeTypes: ( 2.16.840.1.113730.3.1.2154 NAME 'nsds5ReplicaBackoffMin' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) attributeTypes: ( 2.16.840.1.113730.3.1.2155 NAME 'nsds5ReplicaBackoffMax' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) attributeTypes: ( 2.16.840.1.113730.3.1.2156 NAME 'nsslapd-sasl-max-buffer-size' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) +attributeTypes: ( 2.16.840.1.113730.3.1.2310 NAME 'nsds5ReplicaFlowControlWindow' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) +attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) # # objectclasses # @@ -164,7 +166,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape d objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' ) -objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime ) X-ORIGIN 'Netscape Directory Server' ) +objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.317 NAME 'nsSaslMapping' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSaslMapRegexString $ nsSaslMapBaseDNTemplate $ nsSaslMapFilterTemplate ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.43 NAME 'nsSNMP' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSNMPEnabled ) MAY ( nsSNMPOrganization $ nsSNMPLocation $ nsSNMPContact $ nsSNMPDescription $ nsSNMPName $ nsSNMPMasterHost $ nsSNMPMasterPort ) X-ORIGIN 'Netscape Directory Server' ) diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h index a5761be..231daf5 100644 --- a/ldap/servers/plugins/replication/repl5.h +++ b/ldap/servers/plugins/replication/repl5.h @@ -168,6 +168,9 @@ extern const char *type_nsds5ReplicaSessionPauseTime; extern const char *type_nsds5ReplicaEnabled; extern const char *type_nsds5ReplicaStripAttrs; extern const char *type_nsds5ReplicaCleanRUVnotified; +extern const char *type_nsds5ReplicaFlowControlWindow; +extern const char *type_nsds5ReplicaFlowControlPause; + /* Attribute names for windows replication agreements */ extern const char *type_nsds7WindowsReplicaArea; @@ -312,6 +315,8 @@ int agmt_get_auto_initialize(const Repl_Agmt *ra); long agmt_get_timeout(const Repl_Agmt *ra); long agmt_get_busywaittime(const Repl_Agmt *ra); long agmt_get_pausetime(const Repl_Agmt *ra); +long agmt_get_flowcontrolwindow(const Repl_Agmt *ra); +long agmt_get_flowcontrolpause(const Repl_Agmt *ra); int agmt_start(Repl_Agmt *ra); int windows_agmt_start(Repl_Agmt *ra); int agmt_stop(Repl_Agmt *ra); @@ -332,6 +337,8 @@ int agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name); int agmt_schedule_in_window_now(const Repl_Agmt *ra); int agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e ); int agmt_set_timeout_from_entry( Repl_Agmt *ra, const Slapi_Entry *e ); +int agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e); +int agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e); int agmt_set_busywaittime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e ); int agmt_set_pausetime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e ); int agmt_set_credentials_from_entry( Repl_Agmt *ra, const Slapi_Entry *e ); @@ -463,6 +470,10 @@ void conn_lock(Repl_Connection *conn); void conn_unlock(Repl_Connection *conn); void conn_delete_internal_ext(Repl_Connection *conn); const char* conn_get_bindmethod(Repl_Connection *conn); +void conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data); +void conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data); +void conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data); +void conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data); /* In repl5_protocol.c */ typedef struct repl_protocol Repl_Protocol; diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c index 708966a..41a81ca 100644 --- a/ldap/servers/plugins/replication/repl5_agmt.c +++ b/ldap/servers/plugins/replication/repl5_agmt.c @@ -87,6 +87,8 @@ #include "slapi-plugin.h" #define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */ +#define DEFAULT_FLOWCONTROL_WINDOW 1000 /* #entries sent without acknowledgment */ +#define DEFAULT_FLOWCONTROL_PAUSE 2000 /* msec of pause when #entries sent witout acknowledgment */ #define STATUS_LEN 1024 struct changecounter { @@ -142,6 +144,12 @@ typedef struct repl5agmt { char **attrs_to_strip; /* for fractional replication, if a "mod" is empty, strip out these attributes: * modifiersname, modifytimestamp, internalModifiersname, internalModifyTimestamp, etc */ int agreement_type; + long flowControlWindow; /* This is the maximum number of entries + * sent without acknowledgment + */ + long flowControlPause; /* When nb of not acknowledged entries overpass totalUpdateWindow + * This is the duration (in msec) that the RA will pause before sending the next entry + */ } repl5agmt; /* Forward declarations */ @@ -332,6 +340,27 @@ agmt_new_from_entry(Slapi_Entry *e) ra->timeout = slapi_value_get_long(sval); } } + /* flow control update window. */ + ra->flowControlWindow = DEFAULT_FLOWCONTROL_WINDOW; + if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr) == 0) + { + Slapi_Value *sval; + if (slapi_attr_first_value(sattr, &sval) == 0) + { + ra->flowControlWindow = slapi_value_get_long(sval); + } + } + + /* flow control update pause. */ + ra->flowControlPause = DEFAULT_FLOWCONTROL_PAUSE; + if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr) == 0) + { + Slapi_Value *sval; + if (slapi_attr_first_value(sattr, &sval) == 0) + { + ra->flowControlPause = slapi_value_get_long(sval); + } + } /* DN of entry at root of replicated area */ tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot); @@ -963,6 +992,26 @@ agmt_get_pausetime(const Repl_Agmt *ra) return return_value; } +long +agmt_get_flowcontrolwindow(const Repl_Agmt *ra) +{ + long return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->flowControlWindow; + PR_Unlock(ra->lock); + return return_value; +} +long +agmt_get_flowcontrolpause(const Repl_Agmt *ra) +{ + long return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->flowControlPause; + PR_Unlock(ra->lock); + return return_value; +} /* * Warning - reference to the long name of the agreement is returned. * The long name of an agreement is the DN of the agreement entry, @@ -1694,6 +1743,90 @@ agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) return return_value; } +/* + * Set or reset the windows of entries sent without acknowledgment. + * The window is used during update to determine the number of + * entries will be send by the replica agreement without acknowledgment from the consumer + * + * Returns 0 if window set, or -1 if an error occurred. + */ +int +agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + Slapi_Attr *sattr = NULL; + int return_value = -1; + + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + + slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr); + if (NULL != sattr) + { + Slapi_Value *sval = NULL; + slapi_attr_first_value(sattr, &sval); + if (NULL != sval) + { + long tmpval = slapi_value_get_long(sval); + if (tmpval >= 0) { + ra->flowControlWindow = tmpval; + return_value = 0; /* success! */ + } + } + } + PR_Unlock(ra->lock); + if (return_value == 0) + { + prot_notify_agmt_changed(ra->protocol, ra->long_name); + } + return return_value; +} + +/* + * Set or reset the pause duration when #entries sent without acknowledgment overpass flow control window + * + * Returns 0 if pause set, or -1 if an error occurred. + */ +int +agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + Slapi_Attr *sattr = NULL; + int return_value = -1; + + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + + slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr); + if (NULL != sattr) + { + Slapi_Value *sval = NULL; + slapi_attr_first_value(sattr, &sval); + if (NULL != sval) + { + long tmpval = slapi_value_get_long(sval); + if (tmpval >= 0) { + ra->flowControlPause = tmpval; + return_value = 0; /* success! */ + } + } + } + PR_Unlock(ra->lock); + if (return_value == 0) + { + prot_notify_agmt_changed(ra->protocol, ra->long_name); + } + return return_value; +} + int agmt_set_timeout(Repl_Agmt *ra, long timeout) { @@ -1708,6 +1841,33 @@ agmt_set_timeout(Repl_Agmt *ra, long timeout) return 0; } +int +agmt_set_flowcontrolwindow(Repl_Agmt *ra, long window) +{ + PR_Lock(ra->lock); + if (ra->stop_in_progress){ + PR_Unlock(ra->lock); + return -1; + } + ra->flowControlWindow = window; + PR_Unlock(ra->lock); + + return 0; +} +int +agmt_set_flowcontrolpause(Repl_Agmt *ra, long pause) +{ + PR_Lock(ra->lock); + if (ra->stop_in_progress){ + PR_Unlock(ra->lock); + return -1; + } + ra->flowControlPause = pause; + PR_Unlock(ra->lock); + + return 0; +} + /* * Set or reset the busywaittime * diff --git a/ldap/servers/plugins/replication/repl5_agmtlist.c b/ldap/servers/plugins/replication/repl5_agmtlist.c index d37704d..caa41af 100644 --- a/ldap/servers/plugins/replication/repl5_agmtlist.c +++ b/ldap/servers/plugins/replication/repl5_agmtlist.c @@ -345,6 +345,32 @@ agmtlist_modify_callback(Slapi_PBlock *pb, Slapi_Entry *entryBefore, Slapi_Entry } } else if (slapi_attr_types_equivalent(mods[i]->mod_type, + type_nsds5ReplicaFlowControlWindow)) + { + /* New replica timeout */ + if (agmt_set_flowcontrolwindow_from_entry(agmt, e) != 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: " + "failed to update the flow control window for agreement %s\n", + agmt_get_long_name(agmt)); + *returncode = LDAP_OPERATIONS_ERROR; + rc = SLAPI_DSE_CALLBACK_ERROR; + } + } + else if (slapi_attr_types_equivalent(mods[i]->mod_type, + type_nsds5ReplicaFlowControlPause)) + { + /* New replica timeout */ + if (agmt_set_flowcontrolpause_from_entry(agmt, e) != 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: " + "failed to update the flow control pause for agreement %s\n", + agmt_get_long_name(agmt)); + *returncode = LDAP_OPERATIONS_ERROR; + rc = SLAPI_DSE_CALLBACK_ERROR; + } + } + else if (slapi_attr_types_equivalent(mods[i]->mod_type, type_nsds5ReplicaBusyWaitTime)) { /* New replica busywaittime */ diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c index 5efd0e6..e080a3f 100644 --- a/ldap/servers/plugins/replication/repl5_connection.c +++ b/ldap/servers/plugins/replication/repl5_connection.c @@ -52,6 +52,7 @@ replica locked. Seems like right thing to do. */ #include "repl5.h" +#include "repl5_prot_private.h" #if defined(USE_OPENLDAP) #include "ldap.h" #else @@ -90,6 +91,7 @@ typedef struct repl_connection struct timeval timeout; int flag_agmt_changed; char *plain; + void *tot_init_callback; /* Used during total update to do flow control */ } repl_connection; /* #define DEFAULT_LINGER_TIME (5 * 60) */ /* 5 minutes */ @@ -277,6 +279,32 @@ conn_get_error(Repl_Connection *conn, int *operation, int *error) PR_Unlock(conn->lock); } +void +conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data) +{ + conn->tot_init_callback = (void *) cb_data; +} +void +conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data) +{ + PR_Lock(conn->lock); + conn_set_tot_update_cb_nolock(conn, cb_data); + PR_Unlock(conn->lock); +} + +void +conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data) +{ + *cb_data = (void *) conn->tot_init_callback; +} +void +conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data) +{ + PR_Lock(conn->lock); + conn_get_tot_update_cb_nolock(conn, cb_data); + PR_Unlock(conn->lock); +} + /* * Return the last operation type processed by the connection * object, and the LDAP error encountered. @@ -629,6 +657,133 @@ see_if_write_available(Repl_Connection *conn, PRIntervalTime timeout) } #endif /* ! USE_OPENLDAP */ +/* + * During a total update, this function checks how much entries + * have been sent to the consumer without having received their acknowledgment. + * Basically it checks how late is the consumer. + * + * If the consumer is too late, it pause the RA.sender (releasing the lock) to + * let the consumer to catch up and RA.reader to receive the acknowledgments. + * + * Caller must hold conn->lock + */ +static void +check_flow_control_tot_init(Repl_Connection *conn, int optype, const char *extop_oid, int sent_msgid) +{ + int rcv_msgid; + int once; + + if ((sent_msgid != 0) && (optype == CONN_EXTENDED_OPERATION) && (strcmp(extop_oid, REPL_NSDS50_REPLICATION_ENTRY_REQUEST_OID) == 0)) { + /* We are sending entries part of the total update of a consumer + * Wait a bit if the consumer needs to catchup from the current sent entries + */ + rcv_msgid = repl5_tot_last_rcv_msgid(conn); + if (rcv_msgid == -1) { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: check_flow_control_tot_init no callback data [ msgid sent: %d]\n", + agmt_get_long_name(conn->agmt), + sent_msgid); + } else if (sent_msgid < rcv_msgid) { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "%s: check_flow_control_tot_init invalid message ids [ msgid sent: %d, rcv: %d]\n", + agmt_get_long_name(conn->agmt), + sent_msgid, + rcv_msgid); + } else if ((sent_msgid - rcv_msgid) > agmt_get_flowcontrolwindow(conn->agmt)) { + int totalUpdatePause; + + totalUpdatePause = agmt_get_flowcontrolpause(conn->agmt); + if (totalUpdatePause) { + /* The consumer is late. Last sent entry compare to last acknowledged entry + * overpass the allowed limit (flowcontrolwindow) + * Give some time to the consumer to catch up + */ + once = repl5_tot_flowcontrol_detection(conn, 1); + PR_Unlock(conn->lock); + if (once == 1) { + /* This is the first time we hit total update flow control. + * Log it at least once to inform administrator there is + * a potential configuration issue here + */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Total update flow control gives time (%d msec) to the consumer before sending more entries [ msgid sent: %d, rcv: %d])\n" + "If total update fails you can try to increase %s and/or decrease %s in the replica agreement configuration\n", + agmt_get_long_name(conn->agmt), + totalUpdatePause, + sent_msgid, + rcv_msgid, + type_nsds5ReplicaFlowControlPause, + type_nsds5ReplicaFlowControlWindow); + } + DS_Sleep(PR_MillisecondsToInterval(totalUpdatePause)); + PR_Lock(conn->lock); + } + } + } + +} +/* + * Test if the connection is available to do a write. + * This function is doing a periodic polling of the connection. + * If the polling times out: + * - it releases the connection lock (to let other thread ,i.e. + * replication result thread, the opportunity to use the connection) + * - Sleeps for a short period (100ms) + * - acquires the connection lock + * + * It loops until + * - it is available + * - exceeds RA complete timeout + * - server is shutdown + * - connection is disconnected (Disable, stop, delete the RA + * 'terminate' the replication protocol and disconnect the connection) + * + * Return: + * - CONN_OPERATION_SUCCESS if the connection is available + * - CONN_TIMEOUT if the overall polling/sleeping delay exceeds RA timeout + * - CONN_NOT_CONNECTED if the replication connection state is disconnected + * - other ConnResult + * + * Caller must hold conn->Lock. At the exit, conn->lock is held + */ +static ConnResult +conn_is_available(Repl_Connection *conn) +{ + time_t poll_timeout_sec = 1; /* Polling for 1sec */ + time_t yield_delay_msec = 100; /* Delay to wait */ + time_t start_time = time( NULL ); + time_t time_now; + ConnResult return_value = CONN_OPERATION_SUCCESS; + + while (!slapi_is_shutting_down() && (conn->state != STATE_DISCONNECTED)) { + return_value = see_if_write_available(conn, PR_SecondsToInterval(poll_timeout_sec)); + if (return_value == CONN_TIMEOUT) { + /* in case of timeout we return CONN_TIMEOUT only + * if the RA.timeout is exceeded + */ + time_now = time(NULL); + if (conn->timeout.tv_sec <= (time_now - start_time)) { + break; + } else { + /* Else give connection to others threads */ + PR_Unlock(conn->lock); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "%s: perform_operation transient timeout. retry)\n", + agmt_get_long_name(conn->agmt)); + DS_Sleep(PR_MillisecondsToInterval(yield_delay_msec)); + PR_Lock(conn->lock); + } + } else { + break; + } + } + if (conn->state == STATE_DISCONNECTED) { + return_value = CONN_NOT_CONNECTED; + } + return return_value; +} + + /* * Common code to send an LDAPv3 operation and collect the result. * Return values: @@ -670,10 +825,13 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn, Slapi_Eq_Context eqctx = repl5_start_debug_timeout(&setlevel); - return_value = see_if_write_available( - conn, PR_SecondsToInterval(conn->timeout.tv_sec)); + return_value = conn_is_available(conn); if (return_value != CONN_OPERATION_SUCCESS) { PR_Unlock(conn->lock); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: perform_operation connection is not available (%d)\n", + agmt_get_long_name(conn->agmt), + return_value); return return_value; } conn->last_operation = optype; @@ -745,6 +903,9 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn, */ return_value = CONN_NOT_CONNECTED; } + + check_flow_control_tot_init(conn, optype, extop_oid, msgid); + PR_Unlock(conn->lock); /* release the lock */ if (message_id) { diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c index ae26380..f5516a3 100644 --- a/ldap/servers/plugins/replication/repl5_inc_protocol.c +++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c @@ -108,6 +108,7 @@ typedef struct result_data int stop_result_thread; /* Flag used to tell the result thread to exit */ int last_message_id_sent; int last_message_id_received; + int flowcontrol_detection; int result; /* The UPDATE_TRANSIENT_ERROR etc */ } result_data; @@ -460,6 +461,23 @@ repl5_inc_destroy_async_result_thread(result_data *rd) return retval; } +/* The interest of this routine is to give time to the consumer + * to apply the sent updates and return the acks. + * So the caller should not hold the replication connection lock + * to let the RA.reader receives the acks. + */ +static void +repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd) +{ + PR_Lock(rd->lock); + if ((rd->last_message_id_received <= rd->last_message_id_sent) && + ((rd->last_message_id_sent - rd->last_message_id_received) >= agmt_get_flowcontrolwindow(agmt))) { + rd->flowcontrol_detection++; + DS_Sleep(PR_MillisecondsToInterval(agmt_get_flowcontrolpause(agmt))); + } + PR_Unlock(rd->lock); +} + static void repl5_inc_waitfor_async_results(result_data *rd) { @@ -1669,7 +1687,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu { int finished = 0; ConnResult replay_crc; - char csn_str[CSN_STRSIZE]; + char csn_str[CSN_STRSIZE]; /* Start the results reading thread */ rd = repl5_inc_rd_new(prp); @@ -1804,6 +1822,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu sop->replica_id = replica_id; PL_strncpyz(sop->uniqueid, uniqueid, sizeof(sop->uniqueid)); repl5_int_push_operation(rd,sop); + repl5_inc_flow_control_results(prp->agmt, rd); } else { slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "%s: Skipping update operation with no message_id (uniqueid %s, CSN %s):\n", @@ -1892,6 +1911,17 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu } *num_changes_sent = rd->num_changes_sent; } + PR_Lock(rd->lock); + if (rd->flowcontrol_detection) { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "%s: Incremental update flow control triggered %d times\n" + "You may increase %s and/or decrease %s in the replica agreement configuration\n", + agmt_get_long_name(prp->agmt), + rd->flowcontrol_detection, + type_nsds5ReplicaFlowControlPause, + type_nsds5ReplicaFlowControlWindow); + } + PR_Unlock(rd->lock); repl5_inc_rd_destroy(&rd); cl5_operation_parameters_done ( entry.op ); diff --git a/ldap/servers/plugins/replication/repl5_prot_private.h b/ldap/servers/plugins/replication/repl5_prot_private.h index 10aa02b..7ca31ca 100644 --- a/ldap/servers/plugins/replication/repl5_prot_private.h +++ b/ldap/servers/plugins/replication/repl5_prot_private.h @@ -79,6 +79,8 @@ typedef struct private_repl_protocol extern Private_Repl_Protocol *Repl_5_Inc_Protocol_new(); extern Private_Repl_Protocol *Repl_5_Tot_Protocol_new(); +extern int repl5_tot_last_rcv_msgid(Repl_Connection *conn); +extern int repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment); extern Private_Repl_Protocol *Windows_Inc_Protocol_new(); extern Private_Repl_Protocol *Windows_Tot_Protocol_new(); diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c index 9829984..e514dc6 100644 --- a/ldap/servers/plugins/replication/repl5_tot_protocol.c +++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c @@ -82,6 +82,7 @@ typedef struct callback_data int stop_result_thread; /* Flag used to tell the result thread to exit */ int last_message_id_sent; int last_message_id_received; + int flowcontrol_detection; } callback_data; /* @@ -416,12 +417,17 @@ repl5_tot_run(Private_Repl_Protocol *prp) LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); - cb_data.prp = prp; - cb_data.rc = 0; + cb_data.prp = prp; + cb_data.rc = 0; cb_data.num_entries = 0UL; cb_data.sleep_on_busy = 0UL; cb_data.last_busy = current_time (); + cb_data.flowcontrol_detection = 0; cb_data.lock = PR_NewLock(); + /* This allows during perform_operation to check the callback data + * especially to do flow contol on delta send msgid / recv msgid + */ + conn_set_tot_update_cb(prp->conn, (void *) &cb_data); /* Before we get started on sending entries to the replica, we need to * setup things for async propagation: @@ -492,6 +498,17 @@ repl5_tot_run(Private_Repl_Protocol *prp) done: slapi_sdn_free(&area_sdn); slapi_ch_free_string(&hostname); + if (cb_data.flowcontrol_detection > 1) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Total update flow control triggered %d times\n" + "You may increase %s and/or decrease %s in the replica agreement configuration\n", + agmt_get_long_name(prp->agmt), + cb_data.flowcontrol_detection, + type_nsds5ReplicaFlowControlPause, + type_nsds5ReplicaFlowControlWindow); + } + conn_set_tot_update_cb(prp->conn, NULL); if (cb_data.lock) { PR_DestroyLock(cb_data.lock); @@ -619,6 +636,37 @@ void get_result (int rc, void *cb_data) ((callback_data*)cb_data)->rc = rc; } +/* Call must hold the connection lock */ +int +repl5_tot_last_rcv_msgid(Repl_Connection *conn) +{ + struct callback_data *cb_data; + + conn_get_tot_update_cb_nolock(conn, (void **) &cb_data); + if (cb_data == NULL) { + return -1; + } else { + return cb_data->last_message_id_received; + } +} + +/* Increase the flowcontrol counter + * Call must hold the connection lock + */ +int +repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment) +{ + struct callback_data *cb_data; + + conn_get_tot_update_cb_nolock(conn, (void **) &cb_data); + if (cb_data == NULL) { + return -1; + } else { + cb_data->flowcontrol_detection += increment; + return cb_data->flowcontrol_detection; + } +} + static int send_entry (Slapi_Entry *e, void *cb_data) { diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c index f31a476..bfcff03 100644 --- a/ldap/servers/plugins/replication/repl_globals.c +++ b/ldap/servers/plugins/replication/repl_globals.c @@ -131,6 +131,9 @@ const char *type_nsds5ReplicaSessionPauseTime = "nsds5ReplicaSessionPauseTime"; const char *type_nsds5ReplicaEnabled = "nsds5ReplicaEnabled"; const char *type_nsds5ReplicaStripAttrs = "nsds5ReplicaStripAttrs"; const char *type_nsds5ReplicaCleanRUVnotified = "nsds5ReplicaCleanRUVNotified"; +const char* type_nsds5ReplicaFlowControlWindow = "nsds5ReplicaFlowControlWindow"; +const char* type_nsds5ReplicaFlowControlPause = "nsds5ReplicaFlowControlPause"; + /* windows sync specific attributes */ const char *type_nsds7WindowsReplicaArea = "nsds7WindowsReplicaSubtree"; -- 1.9.3