|
|
c16027 |
From a39e2b7cba91b9f13fe54123b7e8b510bf5bcee8 Mon Sep 17 00:00:00 2001
|
|
|
c16027 |
From: Ludwig Krispenz <lkrispen@redhat.com>
|
|
|
c16027 |
Date: Wed, 8 Jun 2016 11:28:07 +0200
|
|
|
c16027 |
Subject: [PATCH 95/99] Ticket 48766 - Replication changelog can incorrectly
|
|
|
c16027 |
skip over updates
|
|
|
c16027 |
|
|
|
c16027 |
Bug Description:
|
|
|
c16027 |
The changelog iterator uses a buffer to load and send changes, when the buffer is empty
|
|
|
c16027 |
there were scenarios when the straing point for reloading the buffer was incorrectly set
|
|
|
c16027 |
and changes were skipped
|
|
|
c16027 |
|
|
|
c16027 |
Fix Description: reworked clcach buffer code following design at
|
|
|
c16027 |
http://www.port389.org/docs/389ds/design/changelog-processing-in-repl-state-sending-updates.html
|
|
|
c16027 |
|
|
|
c16027 |
https://fedorahosted.org/389/ticket/48766
|
|
|
c16027 |
|
|
|
c16027 |
Reviewed by: Mark and Thierry, thanks
|
|
|
c16027 |
|
|
|
c16027 |
(cherry picked from commit b08df71aa9eb18572f58e55e8d6b9ef7fe181773)
|
|
|
c16027 |
(cherry picked from commit ec15a75ccdba713e4d74dcd760e3244ba43b6191)
|
|
|
c16027 |
---
|
|
|
c16027 |
ldap/servers/plugins/replication/cl5_api.c | 171 +++------------
|
|
|
c16027 |
ldap/servers/plugins/replication/cl5_clcache.c | 292 +++++++++++++++----------
|
|
|
c16027 |
ldap/servers/plugins/replication/cl5_clcache.h | 2 +-
|
|
|
c16027 |
3 files changed, 214 insertions(+), 251 deletions(-)
|
|
|
c16027 |
|
|
|
c16027 |
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
|
|
|
c16027 |
index ae23353..3adaf86 100644
|
|
|
c16027 |
--- a/ldap/servers/plugins/replication/cl5_api.c
|
|
|
c16027 |
+++ b/ldap/servers/plugins/replication/cl5_api.c
|
|
|
c16027 |
@@ -5489,18 +5489,13 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
|
|
|
c16027 |
{
|
|
|
c16027 |
CLC_Buffer *clcache = NULL;
|
|
|
c16027 |
CL5DBFile *file;
|
|
|
c16027 |
- int i;
|
|
|
c16027 |
- CSN **csns = NULL;
|
|
|
c16027 |
CSN *startCSN = NULL;
|
|
|
c16027 |
- CSN *minCSN = NULL;
|
|
|
c16027 |
char csnStr [CSN_STRSIZE];
|
|
|
c16027 |
int rc = CL5_SUCCESS;
|
|
|
c16027 |
Object *supplierRuvObj = NULL;
|
|
|
c16027 |
RUV *supplierRuv = NULL;
|
|
|
c16027 |
- PRBool newReplica;
|
|
|
c16027 |
PRBool haveChanges = PR_FALSE;
|
|
|
c16027 |
char *agmt_name;
|
|
|
c16027 |
- ReplicaId rid;
|
|
|
c16027 |
|
|
|
c16027 |
PR_ASSERT (consumerRuv && replica && fileObj && iterator);
|
|
|
c16027 |
csnStr[0] = '\0';
|
|
|
c16027 |
@@ -5528,111 +5523,32 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
|
|
|
c16027 |
ruv_dump (supplierRuv, agmt_name, NULL);
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
- /*
|
|
|
c16027 |
- * get the sorted list of SupplierMinCSN (if no ConsumerMaxCSN)
|
|
|
c16027 |
- * and ConsumerMaxCSN for those RIDs where consumer is not
|
|
|
c16027 |
- * up-to-date.
|
|
|
c16027 |
- */
|
|
|
c16027 |
- csns = cl5BuildCSNList (consumerRuv, supplierRuv);
|
|
|
c16027 |
- if (csns == NULL)
|
|
|
c16027 |
- {
|
|
|
c16027 |
- rc = CL5_NOTFOUND;
|
|
|
c16027 |
- goto done;
|
|
|
c16027 |
- }
|
|
|
c16027 |
|
|
|
c16027 |
- /* iterate over elements of consumer's (and/or supplier's) ruv */
|
|
|
c16027 |
- for (i = 0; csns[i]; i++)
|
|
|
c16027 |
- {
|
|
|
c16027 |
- CSN *consumerMaxCSN = NULL;
|
|
|
c16027 |
-
|
|
|
c16027 |
- rid = csn_get_replicaid(csns[i]);
|
|
|
c16027 |
-
|
|
|
c16027 |
- /*
|
|
|
c16027 |
- * Skip CSN that is originated from the consumer.
|
|
|
c16027 |
- * If RID==65535, the CSN is originated from a
|
|
|
c16027 |
- * legacy consumer. In this case the supplier
|
|
|
c16027 |
- * and the consumer may have the same RID.
|
|
|
c16027 |
- */
|
|
|
c16027 |
- if ((rid == consumerRID && rid != MAX_REPLICA_ID) || (is_cleaned_rid(rid)) )
|
|
|
c16027 |
- continue;
|
|
|
c16027 |
+ /* initialize the changelog buffer and do the initial load */
|
|
|
c16027 |
|
|
|
c16027 |
- startCSN = csns[i];
|
|
|
c16027 |
+ rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
|
|
|
c16027 |
+ if ( rc != 0 ) goto done;
|
|
|
c16027 |
|
|
|
c16027 |
- rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
|
|
|
c16027 |
- if ( rc != 0 ) goto done;
|
|
|
c16027 |
-
|
|
|
c16027 |
- /* This is the first loading of this iteration. For replicas
|
|
|
c16027 |
- * already known to the consumer, we exclude the last entry
|
|
|
c16027 |
- * sent to the consumer by using DB_NEXT. However, for
|
|
|
c16027 |
- * replicas new to the consumer, we include the first change
|
|
|
c16027 |
- * ever generated by that replica.
|
|
|
c16027 |
- */
|
|
|
c16027 |
- newReplica = ruv_get_largest_csn_for_replica (consumerRuv, rid, &consumerMaxCSN);
|
|
|
c16027 |
- csn_free(&consumerMaxCSN);
|
|
|
c16027 |
- rc = clcache_load_buffer (clcache, startCSN, (newReplica ? DB_SET : DB_NEXT));
|
|
|
c16027 |
-
|
|
|
c16027 |
- /* there is a special case which can occur just after migration - in this case,
|
|
|
c16027 |
- the consumer RUV will contain the last state of the supplier before migration,
|
|
|
c16027 |
- but the supplier will have an empty changelog, or the supplier changelog will
|
|
|
c16027 |
- not contain any entries within the consumer min and max CSN - also, since
|
|
|
c16027 |
- the purge RUV contains no CSNs, the changelog has never been purged
|
|
|
c16027 |
- ASSUMPTIONS - it is assumed that the supplier had no pending changes to send
|
|
|
c16027 |
- to any consumers; that is, we can assume that no changes were lost due to
|
|
|
c16027 |
- either changelog purging or database reload - bug# 603061 - richm@netscape.com
|
|
|
c16027 |
- */
|
|
|
c16027 |
- if ((rc == DB_NOTFOUND) && !ruv_has_csns(file->purgeRUV))
|
|
|
c16027 |
- {
|
|
|
c16027 |
- char mincsnStr[CSN_STRSIZE];
|
|
|
c16027 |
-
|
|
|
c16027 |
- /* use the supplier min csn for the buffer start csn - we know
|
|
|
c16027 |
- this csn is in our changelog */
|
|
|
c16027 |
- if ((RUV_SUCCESS == ruv_get_min_csn_ext(supplierRuv, &minCSN, 1 /* ignore cleaned rids */)) &&
|
|
|
c16027 |
- minCSN)
|
|
|
c16027 |
- { /* must now free startCSN */
|
|
|
c16027 |
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
c16027 |
- csn_as_string(startCSN, PR_FALSE, csnStr);
|
|
|
c16027 |
- csn_as_string(minCSN, PR_FALSE, mincsnStr);
|
|
|
c16027 |
- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
|
|
|
c16027 |
- "%s: CSN %s not found and no purging, probably a reinit\n",
|
|
|
c16027 |
- agmt_name, csnStr);
|
|
|
c16027 |
- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
|
|
|
c16027 |
- "%s: Will try to use supplier min CSN %s to load changelog\n",
|
|
|
c16027 |
- agmt_name, mincsnStr);
|
|
|
c16027 |
- }
|
|
|
c16027 |
- startCSN = minCSN;
|
|
|
c16027 |
- rc = clcache_load_buffer (clcache, startCSN, DB_SET);
|
|
|
c16027 |
- }
|
|
|
c16027 |
- else
|
|
|
c16027 |
- {
|
|
|
c16027 |
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
c16027 |
- csn_as_string(startCSN, PR_FALSE, csnStr);
|
|
|
c16027 |
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
c16027 |
- "%s: CSN %s not found and no purging, probably a reinit\n",
|
|
|
c16027 |
- agmt_name, csnStr);
|
|
|
c16027 |
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
c16027 |
- "%s: Could not get the min csn from the supplier RUV\n",
|
|
|
c16027 |
- agmt_name);
|
|
|
c16027 |
- }
|
|
|
c16027 |
- rc = CL5_RUV_ERROR;
|
|
|
c16027 |
- goto done;
|
|
|
c16027 |
- }
|
|
|
c16027 |
- }
|
|
|
c16027 |
+ rc = clcache_load_buffer (clcache, &startCSN);
|
|
|
c16027 |
|
|
|
c16027 |
if (rc == 0) {
|
|
|
c16027 |
- haveChanges = PR_TRUE;
|
|
|
c16027 |
- rc = CL5_SUCCESS;
|
|
|
c16027 |
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
c16027 |
- csn_as_string(startCSN, PR_FALSE, csnStr);
|
|
|
c16027 |
- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
|
|
|
c16027 |
- "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
|
|
|
c16027 |
- }
|
|
|
c16027 |
- if (startCSN != csns[i]) {
|
|
|
c16027 |
- csn_free(&startCSN);
|
|
|
c16027 |
- }
|
|
|
c16027 |
- break;
|
|
|
c16027 |
+ haveChanges = PR_TRUE;
|
|
|
c16027 |
+ rc = CL5_SUCCESS;
|
|
|
c16027 |
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
c16027 |
+ csn_as_string(startCSN, PR_FALSE, csnStr);
|
|
|
c16027 |
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
|
|
|
c16027 |
+ "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
|
|
|
c16027 |
+ }
|
|
|
c16027 |
}
|
|
|
c16027 |
- else if (rc == DB_NOTFOUND) /* entry not found */
|
|
|
c16027 |
- {
|
|
|
c16027 |
+ else if (rc == DB_NOTFOUND) {
|
|
|
c16027 |
+ /* buffer not loaded.
|
|
|
c16027 |
+ * either because no changes have to be sent ==> startCSN is NULL
|
|
|
c16027 |
+ * or the calculated startCSN cannot be found in the changelog
|
|
|
c16027 |
+ */
|
|
|
c16027 |
+ if (startCSN == NULL) {
|
|
|
c16027 |
+ rc = CL5_NOTFOUND;
|
|
|
c16027 |
+ goto done;
|
|
|
c16027 |
+ }
|
|
|
c16027 |
/* check whether this csn should be present */
|
|
|
c16027 |
rc = _cl5CheckMissingCSN (startCSN, supplierRuv, file);
|
|
|
c16027 |
if (rc == CL5_MISSING_DATA) /* we should have had the change but we don't */
|
|
|
c16027 |
@@ -5650,17 +5566,6 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
|
|
|
c16027 |
"%s: CSN %s not found, we aren't as up to date, or we purged\n",
|
|
|
c16027 |
agmt_name, csnStr);
|
|
|
c16027 |
}
|
|
|
c16027 |
- if (startCSN != csns[i]) {
|
|
|
c16027 |
- csn_free(&startCSN);
|
|
|
c16027 |
- }
|
|
|
c16027 |
- if (rc == CL5_MISSING_DATA) /* we should have had the change but we don't */
|
|
|
c16027 |
- {
|
|
|
c16027 |
- break;
|
|
|
c16027 |
- }
|
|
|
c16027 |
- else /* we are not as up to date or we purged */
|
|
|
c16027 |
- {
|
|
|
c16027 |
- continue;
|
|
|
c16027 |
- }
|
|
|
c16027 |
}
|
|
|
c16027 |
else
|
|
|
c16027 |
{
|
|
|
c16027 |
@@ -5669,34 +5574,29 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
|
|
|
c16027 |
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
c16027 |
"%s: Failed to retrieve change with CSN %s; db error - %d %s\n",
|
|
|
c16027 |
agmt_name, csnStr, rc, db_strerror(rc));
|
|
|
c16027 |
- if (startCSN != csns[i]) {
|
|
|
c16027 |
- csn_free(&startCSN);
|
|
|
c16027 |
- }
|
|
|
c16027 |
|
|
|
c16027 |
rc = CL5_DB_ERROR;
|
|
|
c16027 |
- break;
|
|
|
c16027 |
- }
|
|
|
c16027 |
+ }
|
|
|
c16027 |
|
|
|
c16027 |
- } /* end for */
|
|
|
c16027 |
|
|
|
c16027 |
/* setup the iterator */
|
|
|
c16027 |
if (haveChanges)
|
|
|
c16027 |
{
|
|
|
c16027 |
- *iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
|
|
|
c16027 |
+ *iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
|
|
|
c16027 |
|
|
|
c16027 |
- if (*iterator == NULL)
|
|
|
c16027 |
- {
|
|
|
c16027 |
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
c16027 |
+ if (*iterator == NULL)
|
|
|
c16027 |
+ {
|
|
|
c16027 |
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
c16027 |
"%s: _cl5PositionCursorForReplay: failed to allocate iterator\n", agmt_name);
|
|
|
c16027 |
- rc = CL5_MEMORY_ERROR;
|
|
|
c16027 |
- goto done;
|
|
|
c16027 |
- }
|
|
|
c16027 |
+ rc = CL5_MEMORY_ERROR;
|
|
|
c16027 |
+ goto done;
|
|
|
c16027 |
+ }
|
|
|
c16027 |
|
|
|
c16027 |
/* ONREPL - should we make a copy of both RUVs here ?*/
|
|
|
c16027 |
- (*iterator)->fileObj = fileObj;
|
|
|
c16027 |
- (*iterator)->clcache = clcache; clcache = NULL;
|
|
|
c16027 |
- (*iterator)->consumerRID = consumerRID;
|
|
|
c16027 |
- (*iterator)->consumerRuv = consumerRuv;
|
|
|
c16027 |
+ (*iterator)->fileObj = fileObj;
|
|
|
c16027 |
+ (*iterator)->clcache = clcache; clcache = NULL;
|
|
|
c16027 |
+ (*iterator)->consumerRID = consumerRID;
|
|
|
c16027 |
+ (*iterator)->consumerRuv = consumerRuv;
|
|
|
c16027 |
(*iterator)->supplierRuvObj = supplierRuvObj;
|
|
|
c16027 |
}
|
|
|
c16027 |
else if (rc == CL5_SUCCESS)
|
|
|
c16027 |
@@ -5706,11 +5606,8 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
done:
|
|
|
c16027 |
- if ( clcache )
|
|
|
c16027 |
- clcache_return_buffer ( &clcache );
|
|
|
c16027 |
-
|
|
|
c16027 |
- if (csns)
|
|
|
c16027 |
- cl5DestroyCSNList (&csns);
|
|
|
c16027 |
+ if ( clcache )
|
|
|
c16027 |
+ clcache_return_buffer ( &clcache );
|
|
|
c16027 |
|
|
|
c16027 |
if (rc != CL5_SUCCESS)
|
|
|
c16027 |
{
|
|
|
c16027 |
diff --git a/ldap/servers/plugins/replication/cl5_clcache.c b/ldap/servers/plugins/replication/cl5_clcache.c
|
|
|
c16027 |
index b53d7c0..2d3bb28 100644
|
|
|
c16027 |
--- a/ldap/servers/plugins/replication/cl5_clcache.c
|
|
|
c16027 |
+++ b/ldap/servers/plugins/replication/cl5_clcache.c
|
|
|
c16027 |
@@ -39,6 +39,7 @@
|
|
|
c16027 |
#define DEFAULT_CLC_BUFFER_COUNT_MAX 0
|
|
|
c16027 |
#define DEFAULT_CLC_BUFFER_PAGE_COUNT 32
|
|
|
c16027 |
#define DEFAULT_CLC_BUFFER_PAGE_SIZE 1024
|
|
|
c16027 |
+#define WORK_CLC_BUFFER_PAGE_SIZE 8*DEFAULT_CLC_BUFFER_PAGE_SIZE
|
|
|
c16027 |
|
|
|
c16027 |
enum {
|
|
|
c16027 |
CLC_STATE_READY = 0, /* ready to iterate */
|
|
|
c16027 |
@@ -56,8 +57,9 @@ struct csn_seq_ctrl_block {
|
|
|
c16027 |
ReplicaId rid; /* RID this block serves */
|
|
|
c16027 |
CSN *consumer_maxcsn; /* Don't send CSN <= this */
|
|
|
c16027 |
CSN *local_maxcsn; /* Don't send CSN > this */
|
|
|
c16027 |
- CSN *prev_local_maxcsn; /* */
|
|
|
c16027 |
- int state; /* CLC_STATE_* */
|
|
|
c16027 |
+ CSN *prev_local_maxcsn; /* Copy of last state at buffer loading */
|
|
|
c16027 |
+ CSN *local_mincsn; /* Used to determin anchor csn*/
|
|
|
c16027 |
+ int state; /* CLC_STATE_* */
|
|
|
c16027 |
};
|
|
|
c16027 |
|
|
|
c16027 |
/*
|
|
|
c16027 |
@@ -70,6 +72,8 @@ struct clc_buffer {
|
|
|
c16027 |
ReplicaId buf_consumer_rid; /* help checking threshold csn */
|
|
|
c16027 |
const RUV *buf_consumer_ruv; /* used to skip change */
|
|
|
c16027 |
const RUV *buf_local_ruv; /* used to refresh local_maxcsn */
|
|
|
c16027 |
+ int buf_ignoreConsumerRID; /* how to handle updates from consumer */
|
|
|
c16027 |
+ int buf_load_cnt; /* number of loads for session */
|
|
|
c16027 |
|
|
|
c16027 |
/*
|
|
|
c16027 |
* fields for retriving data from DB
|
|
|
c16027 |
@@ -90,7 +94,6 @@ struct clc_buffer {
|
|
|
c16027 |
int buf_max_cscbs;
|
|
|
c16027 |
|
|
|
c16027 |
/* fields for debugging stat */
|
|
|
c16027 |
- int buf_load_cnt; /* number of loads for session */
|
|
|
c16027 |
int buf_record_cnt; /* number of changes for session */
|
|
|
c16027 |
int buf_record_skipped; /* number of changes skipped */
|
|
|
c16027 |
int buf_skipped_new_rid; /* number of changes skipped due to new_rid */
|
|
|
c16027 |
@@ -133,7 +136,8 @@ struct clc_pool {
|
|
|
c16027 |
static struct clc_pool *_pool = NULL; /* process's buffer pool */
|
|
|
c16027 |
|
|
|
c16027 |
/* static prototypes */
|
|
|
c16027 |
-static int clcache_adjust_anchorcsn ( CLC_Buffer *buf );
|
|
|
c16027 |
+static int clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag );
|
|
|
c16027 |
+static int clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag );
|
|
|
c16027 |
static void clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf );
|
|
|
c16027 |
static int clcache_refresh_local_maxcsns ( CLC_Buffer *buf );
|
|
|
c16027 |
static int clcache_skip_change ( CLC_Buffer *buf );
|
|
|
c16027 |
@@ -251,8 +255,23 @@ clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
if ( NULL != *buf ) {
|
|
|
c16027 |
+ CSN *c_csn = NULL;
|
|
|
c16027 |
+ CSN *l_csn = NULL;
|
|
|
c16027 |
(*buf)->buf_consumer_ruv = consumer_ruv;
|
|
|
c16027 |
(*buf)->buf_local_ruv = local_ruv;
|
|
|
c16027 |
+ (*buf)->buf_load_flag = DB_MULTIPLE_KEY;
|
|
|
c16027 |
+ ruv_get_largest_csn_for_replica (consumer_ruv, consumer_rid, &c_csn);
|
|
|
c16027 |
+ ruv_get_largest_csn_for_replica (local_ruv, consumer_rid, &l_csn);
|
|
|
c16027 |
+ if (l_csn && csn_compare(l_csn, c_csn) > 0) {
|
|
|
c16027 |
+ /* the supplier has updates for the consumer RID and
|
|
|
c16027 |
+ * these updates are newer than on the consumer
|
|
|
c16027 |
+ */
|
|
|
c16027 |
+ (*buf)->buf_ignoreConsumerRID = 0;
|
|
|
c16027 |
+ } else {
|
|
|
c16027 |
+ (*buf)->buf_ignoreConsumerRID = 1;
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+ csn_free(&c_csn);
|
|
|
c16027 |
+ csn_free(&l_csn);
|
|
|
c16027 |
}
|
|
|
c16027 |
else {
|
|
|
c16027 |
slapi_log_error ( SLAPI_LOG_FATAL, get_thread_private_agmtname(),
|
|
|
c16027 |
@@ -305,36 +324,25 @@ clcache_return_buffer ( CLC_Buffer **buf )
|
|
|
c16027 |
* historic reason.
|
|
|
c16027 |
*/
|
|
|
c16027 |
int
|
|
|
c16027 |
-clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
|
|
|
c16027 |
+clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN )
|
|
|
c16027 |
{
|
|
|
c16027 |
int rc = 0;
|
|
|
c16027 |
+ int flag = DB_NEXT;
|
|
|
c16027 |
|
|
|
c16027 |
+ if (anchorCSN) *anchorCSN = NULL;
|
|
|
c16027 |
clcache_refresh_local_maxcsns ( buf );
|
|
|
c16027 |
|
|
|
c16027 |
- /* Set the loading key */
|
|
|
c16027 |
- if ( anchorcsn ) {
|
|
|
c16027 |
+ if (buf->buf_load_cnt == 0 ) {
|
|
|
c16027 |
clcache_refresh_consumer_maxcsns ( buf );
|
|
|
c16027 |
- buf->buf_load_flag = DB_MULTIPLE_KEY;
|
|
|
c16027 |
- csn_as_string ( anchorcsn, 0, (char*)buf->buf_key.data );
|
|
|
c16027 |
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
c16027 |
- "session start: anchorcsn=%s\n", (char*)buf->buf_key.data );
|
|
|
c16027 |
- }
|
|
|
c16027 |
- else if ( csn_get_time(buf->buf_current_csn) == 0 ) {
|
|
|
c16027 |
- /* time == 0 means this csn has never been set */
|
|
|
c16027 |
- rc = DB_NOTFOUND;
|
|
|
c16027 |
- }
|
|
|
c16027 |
- else if ( clcache_adjust_anchorcsn ( buf ) != 0 ) {
|
|
|
c16027 |
- rc = DB_NOTFOUND;
|
|
|
c16027 |
- }
|
|
|
c16027 |
- else {
|
|
|
c16027 |
- csn_as_string ( buf->buf_current_csn, 0, (char*)buf->buf_key.data );
|
|
|
c16027 |
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
c16027 |
- "load next: anchorcsn=%s\n", (char*)buf->buf_key.data );
|
|
|
c16027 |
+ rc = clcache_initial_anchorcsn ( buf, &flag );
|
|
|
c16027 |
+ } else {
|
|
|
c16027 |
+ rc = clcache_adjust_anchorcsn ( buf, &flag );
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
if ( rc == 0 ) {
|
|
|
c16027 |
|
|
|
c16027 |
buf->buf_state = CLC_STATE_READY;
|
|
|
c16027 |
+ if (anchorCSN) *anchorCSN = buf->buf_current_csn;
|
|
|
c16027 |
rc = clcache_load_buffer_bulk ( buf, flag );
|
|
|
c16027 |
|
|
|
c16027 |
/* Reset some flag variables */
|
|
|
c16027 |
@@ -344,21 +352,15 @@ clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
|
|
|
c16027 |
buf->buf_cscbs[i]->state = CLC_STATE_READY;
|
|
|
c16027 |
}
|
|
|
c16027 |
}
|
|
|
c16027 |
- else if ( anchorcsn ) {
|
|
|
c16027 |
- /* Report error only when the missing is persistent */
|
|
|
c16027 |
- if ( buf->buf_missing_csn && csn_compare (buf->buf_missing_csn, anchorcsn) == 0 ) {
|
|
|
c16027 |
- if (!buf->buf_prev_missing_csn || csn_compare (buf->buf_prev_missing_csn, anchorcsn)) {
|
|
|
c16027 |
- slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
|
|
|
c16027 |
- "Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
|
|
|
c16027 |
- (char*)buf->buf_key.data, rc );
|
|
|
c16027 |
- csn_dup_or_init_by_csn (&buf->buf_prev_missing_csn, anchorcsn);
|
|
|
c16027 |
- }
|
|
|
c16027 |
- }
|
|
|
c16027 |
- else {
|
|
|
c16027 |
- csn_dup_or_init_by_csn (&buf->buf_missing_csn, anchorcsn);
|
|
|
c16027 |
- }
|
|
|
c16027 |
+ else {
|
|
|
c16027 |
+ slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
|
|
|
c16027 |
+ "Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
|
|
|
c16027 |
+ (char*)buf->buf_key.data, rc );
|
|
|
c16027 |
}
|
|
|
c16027 |
+ } else if (rc == CLC_STATE_DONE) {
|
|
|
c16027 |
+ rc = DB_NOTFOUND;
|
|
|
c16027 |
}
|
|
|
c16027 |
+
|
|
|
c16027 |
if ( rc != 0 ) {
|
|
|
c16027 |
slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
c16027 |
"clcache_load_buffer: rc=%d\n", rc );
|
|
|
c16027 |
@@ -483,7 +485,7 @@ clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **da
|
|
|
c16027 |
* We're done with the current buffer. Now load the next chunk.
|
|
|
c16027 |
*/
|
|
|
c16027 |
if ( NULL == *key && CLC_STATE_READY == buf->buf_state ) {
|
|
|
c16027 |
- rc = clcache_load_buffer ( buf, NULL, DB_NEXT );
|
|
|
c16027 |
+ rc = clcache_load_buffer ( buf, NULL );
|
|
|
c16027 |
if ( 0 == rc && buf->buf_record_ptr ) {
|
|
|
c16027 |
DB_MULTIPLE_KEY_NEXT ( buf->buf_record_ptr, &buf->buf_data,
|
|
|
c16027 |
*key, *keylen, *data, *datalen );
|
|
|
c16027 |
@@ -521,7 +523,6 @@ clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf )
|
|
|
c16027 |
int i;
|
|
|
c16027 |
|
|
|
c16027 |
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
c16027 |
- csn_free(&buf->buf_cscbs[i]->consumer_maxcsn);
|
|
|
c16027 |
ruv_get_largest_csn_for_replica (
|
|
|
c16027 |
buf->buf_consumer_ruv,
|
|
|
c16027 |
buf->buf_cscbs[i]->rid,
|
|
|
c16027 |
@@ -538,14 +539,11 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
|
|
|
c16027 |
int i;
|
|
|
c16027 |
|
|
|
c16027 |
rid = csn_get_replicaid ( rid_data->csn );
|
|
|
c16027 |
-
|
|
|
c16027 |
- /*
|
|
|
c16027 |
- * No need to create cscb for consumer's RID.
|
|
|
c16027 |
- * If RID==65535, the CSN is originated from a
|
|
|
c16027 |
- * legacy consumer. In this case the supplier
|
|
|
c16027 |
- * and the consumer may have the same RID.
|
|
|
c16027 |
+ /* we do not handle updates originated at the consumer if not required
|
|
|
c16027 |
+ * and we ignore RID which have been cleaned
|
|
|
c16027 |
*/
|
|
|
c16027 |
- if ( rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID )
|
|
|
c16027 |
+ if ( (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID) ||
|
|
|
c16027 |
+ is_cleaned_rid(rid) )
|
|
|
c16027 |
return rc;
|
|
|
c16027 |
|
|
|
c16027 |
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
c16027 |
@@ -564,9 +562,20 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
|
|
|
c16027 |
}
|
|
|
c16027 |
buf->buf_cscbs[i]->rid = rid;
|
|
|
c16027 |
buf->buf_num_cscbs++;
|
|
|
c16027 |
+ /* this is the first time we have a local change for the RID
|
|
|
c16027 |
+ * we need to check what the consumer knows about it.
|
|
|
c16027 |
+ */
|
|
|
c16027 |
+ ruv_get_largest_csn_for_replica (
|
|
|
c16027 |
+ buf->buf_consumer_ruv,
|
|
|
c16027 |
+ buf->buf_cscbs[i]->rid,
|
|
|
c16027 |
+ &buf->buf_cscbs[i]->consumer_maxcsn );
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
+ if (buf->buf_cscbs[i]->local_maxcsn)
|
|
|
c16027 |
+ csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn, buf->buf_cscbs[i]->local_maxcsn );
|
|
|
c16027 |
+
|
|
|
c16027 |
csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_maxcsn, rid_data->csn );
|
|
|
c16027 |
+ csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_mincsn, rid_data->min_csn );
|
|
|
c16027 |
|
|
|
c16027 |
if ( buf->buf_cscbs[i]->consumer_maxcsn &&
|
|
|
c16027 |
csn_compare (buf->buf_cscbs[i]->consumer_maxcsn, rid_data->csn) >= 0 ) {
|
|
|
c16027 |
@@ -580,88 +589,147 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
|
|
|
c16027 |
static int
|
|
|
c16027 |
clcache_refresh_local_maxcsns ( CLC_Buffer *buf )
|
|
|
c16027 |
{
|
|
|
c16027 |
- int i;
|
|
|
c16027 |
|
|
|
c16027 |
- for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
c16027 |
- csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn,
|
|
|
c16027 |
- buf->buf_cscbs[i]->local_maxcsn );
|
|
|
c16027 |
- }
|
|
|
c16027 |
return ruv_enumerate_elements ( buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf );
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
/*
|
|
|
c16027 |
* Algorithm:
|
|
|
c16027 |
*
|
|
|
c16027 |
- * 1. Snapshot local RUVs;
|
|
|
c16027 |
- * 2. Load buffer;
|
|
|
c16027 |
- * 3. Send to the consumer only those CSNs that are covered
|
|
|
c16027 |
- * by the RUVs snapshot taken in the first step;
|
|
|
c16027 |
- * All CSNs that are covered by the RUVs snapshot taken in the
|
|
|
c16027 |
- * first step are guaranteed in consecutive order for the respected
|
|
|
c16027 |
- * RIDs because of the the CSN pending list control;
|
|
|
c16027 |
- * A CSN that is not covered by the RUVs snapshot may be out of order
|
|
|
c16027 |
- * since it is possible that a smaller CSN might not have committed
|
|
|
c16027 |
- * yet by the time the buffer was loaded.
|
|
|
c16027 |
- * 4. Determine anchorcsn for each RID:
|
|
|
c16027 |
- *
|
|
|
c16027 |
- * Case| Local vs. Buffer | New Local | Next
|
|
|
c16027 |
- * | MaxCSN MaxCSN | MaxCSN | Anchor-CSN
|
|
|
c16027 |
- * ----+-------------------+-----------+----------------
|
|
|
c16027 |
- * 1 | Cl >= Cb | * | Cb
|
|
|
c16027 |
- * 2 | Cl < Cb | Cl | Cb
|
|
|
c16027 |
- * 3 | Cl < Cb | Cl2 | Cl
|
|
|
c16027 |
- *
|
|
|
c16027 |
- * 5. Determine anchorcsn for next load:
|
|
|
c16027 |
+ * 1. Determine anchorcsn for each RID:
|
|
|
c16027 |
+ * 2. Determine anchorcsn for next load:
|
|
|
c16027 |
* Anchor-CSN = min { all Next-Anchor-CSN, Buffer-MaxCSN }
|
|
|
c16027 |
*/
|
|
|
c16027 |
static int
|
|
|
c16027 |
-clcache_adjust_anchorcsn ( CLC_Buffer *buf )
|
|
|
c16027 |
+clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag )
|
|
|
c16027 |
{
|
|
|
c16027 |
PRBool hasChange = PR_FALSE;
|
|
|
c16027 |
struct csn_seq_ctrl_block *cscb;
|
|
|
c16027 |
int i;
|
|
|
c16027 |
+ CSN *anchorcsn = NULL;
|
|
|
c16027 |
|
|
|
c16027 |
if ( buf->buf_state == CLC_STATE_READY ) {
|
|
|
c16027 |
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
c16027 |
+ CSN *rid_anchor = NULL;
|
|
|
c16027 |
+ int rid_flag = DB_NEXT;
|
|
|
c16027 |
cscb = buf->buf_cscbs[i];
|
|
|
c16027 |
|
|
|
c16027 |
- if ( cscb->state == CLC_STATE_UP_TO_DATE )
|
|
|
c16027 |
- continue;
|
|
|
c16027 |
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
c16027 |
+ char prevmax[CSN_STRSIZE];
|
|
|
c16027 |
+ char local[CSN_STRSIZE];
|
|
|
c16027 |
+ char curr[CSN_STRSIZE];
|
|
|
c16027 |
+ char conmaxcsn[CSN_STRSIZE];
|
|
|
c16027 |
+ csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
|
|
|
c16027 |
+ csn_as_string(cscb->local_maxcsn, 0, local);
|
|
|
c16027 |
+ csn_as_string(buf->buf_current_csn, 0, curr);
|
|
|
c16027 |
+ csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
|
|
|
c16027 |
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn" ,
|
|
|
c16027 |
+ "%s - (cscb %d - state %d) - csnPrevMax (%s) "
|
|
|
c16027 |
+ "csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
|
|
|
c16027 |
+ buf->buf_agmt_name, i, cscb->state, prevmax, local,
|
|
|
c16027 |
+ curr, conmaxcsn);
|
|
|
c16027 |
+ }
|
|
|
c16027 |
|
|
|
c16027 |
- /*
|
|
|
c16027 |
- * Case 3 unsafe ruv change: next buffer load should start
|
|
|
c16027 |
- * from where the maxcsn in the old ruv was. Since each
|
|
|
c16027 |
- * cscb has remembered the maxcsn sent to the consumer,
|
|
|
c16027 |
- * CSNs that may be loaded again could easily be skipped.
|
|
|
c16027 |
- */
|
|
|
c16027 |
- if ( cscb->prev_local_maxcsn &&
|
|
|
c16027 |
- csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) < 0 &&
|
|
|
c16027 |
- csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) != 0 ) {
|
|
|
c16027 |
+ if (cscb->consumer_maxcsn == NULL) {
|
|
|
c16027 |
+ /* the consumer hasn't seen changes for this RID */
|
|
|
c16027 |
+ rid_anchor = cscb->local_mincsn;
|
|
|
c16027 |
+ rid_flag = DB_SET;
|
|
|
c16027 |
+ } else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
|
|
|
c16027 |
+ rid_anchor = cscb->consumer_maxcsn;
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+
|
|
|
c16027 |
+ if (rid_anchor && (anchorcsn == NULL ||
|
|
|
c16027 |
+ ( csn_compare(rid_anchor, anchorcsn) < 0))) {
|
|
|
c16027 |
+ anchorcsn = rid_anchor;
|
|
|
c16027 |
+ *flag = rid_flag;
|
|
|
c16027 |
hasChange = PR_TRUE;
|
|
|
c16027 |
- cscb->state = CLC_STATE_READY;
|
|
|
c16027 |
- csn_init_by_csn ( buf->buf_current_csn, cscb->prev_local_maxcsn );
|
|
|
c16027 |
- csn_as_string ( cscb->prev_local_maxcsn, 0, (char*)buf->buf_key.data );
|
|
|
c16027 |
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
c16027 |
- "adjust anchor csn upon %s\n",
|
|
|
c16027 |
- ( cscb->state == CLC_STATE_CSN_GT_RUV ? "out of sequence csn" : "unsafe ruv change") );
|
|
|
c16027 |
- continue;
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
- /*
|
|
|
c16027 |
- * check if there are still changes to send for this RID
|
|
|
c16027 |
- * Assume we had compared the local maxcsn and the consumer
|
|
|
c16027 |
- * max csn before this function was called and hence the
|
|
|
c16027 |
- * cscb->state had been set accordingly.
|
|
|
c16027 |
- */
|
|
|
c16027 |
- if ( hasChange == PR_FALSE &&
|
|
|
c16027 |
- csn_compare (cscb->local_maxcsn, buf->buf_current_csn) > 0 ) {
|
|
|
c16027 |
+
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+
|
|
|
c16027 |
+ if ( !hasChange ) {
|
|
|
c16027 |
+ buf->buf_state = CLC_STATE_DONE;
|
|
|
c16027 |
+ } else {
|
|
|
c16027 |
+ csn_init_by_csn(buf->buf_current_csn, anchorcsn);
|
|
|
c16027 |
+ csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
|
|
|
c16027 |
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn",
|
|
|
c16027 |
+ "anchor is now: %s\n", (char *)buf->buf_key.data);
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+
|
|
|
c16027 |
+ return buf->buf_state;
|
|
|
c16027 |
+}
|
|
|
c16027 |
+
|
|
|
c16027 |
+static int
|
|
|
c16027 |
+clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag )
|
|
|
c16027 |
+{
|
|
|
c16027 |
+ PRBool hasChange = PR_FALSE;
|
|
|
c16027 |
+ struct csn_seq_ctrl_block *cscb;
|
|
|
c16027 |
+ int i;
|
|
|
c16027 |
+ CSN *anchorcsn = NULL;
|
|
|
c16027 |
+
|
|
|
c16027 |
+ if ( buf->buf_state == CLC_STATE_READY ) {
|
|
|
c16027 |
+ for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
c16027 |
+ CSN *rid_anchor = NULL;
|
|
|
c16027 |
+ int rid_flag = DB_NEXT;
|
|
|
c16027 |
+ cscb = buf->buf_cscbs[i];
|
|
|
c16027 |
+
|
|
|
c16027 |
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
c16027 |
+ char prevmax[CSN_STRSIZE];
|
|
|
c16027 |
+ char local[CSN_STRSIZE];
|
|
|
c16027 |
+ char curr[CSN_STRSIZE];
|
|
|
c16027 |
+ char conmaxcsn[CSN_STRSIZE];
|
|
|
c16027 |
+ csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
|
|
|
c16027 |
+ csn_as_string(cscb->local_maxcsn, 0, local);
|
|
|
c16027 |
+ csn_as_string(buf->buf_current_csn, 0, curr);
|
|
|
c16027 |
+ csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
|
|
|
c16027 |
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn" ,
|
|
|
c16027 |
+ "%s - (cscb %d - state %d) - csnPrevMax (%s) "
|
|
|
c16027 |
+ "csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
|
|
|
c16027 |
+ buf->buf_agmt_name, i, cscb->state, prevmax, local,
|
|
|
c16027 |
+ curr, conmaxcsn);
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+
|
|
|
c16027 |
+ if (csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) == 0 ||
|
|
|
c16027 |
+ csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) > 0 ) {
|
|
|
c16027 |
+ if (csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
|
|
|
c16027 |
+ rid_anchor = buf->buf_current_csn;
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+ } else {
|
|
|
c16027 |
+ /* prev local max csn < csnBuffer AND different from local maxcsn */
|
|
|
c16027 |
+ if (cscb->prev_local_maxcsn == NULL) {
|
|
|
c16027 |
+ if (cscb->consumer_maxcsn == NULL) {
|
|
|
c16027 |
+ /* the consumer hasn't seen changes for this RID */
|
|
|
c16027 |
+ rid_anchor = cscb->local_mincsn;
|
|
|
c16027 |
+ rid_flag = DB_SET;
|
|
|
c16027 |
+ } else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
|
|
|
c16027 |
+ rid_anchor = cscb->consumer_maxcsn;
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+ } else {
|
|
|
c16027 |
+ /* csnPrevMaxSup > 0 */
|
|
|
c16027 |
+ rid_anchor = cscb->consumer_maxcsn;
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+
|
|
|
c16027 |
+ if (rid_anchor && (anchorcsn == NULL ||
|
|
|
c16027 |
+ ( csn_compare(rid_anchor, anchorcsn) < 0))) {
|
|
|
c16027 |
+ anchorcsn = rid_anchor;
|
|
|
c16027 |
+ *flag = rid_flag;
|
|
|
c16027 |
hasChange = PR_TRUE;
|
|
|
c16027 |
}
|
|
|
c16027 |
+
|
|
|
c16027 |
+
|
|
|
c16027 |
}
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
if ( !hasChange ) {
|
|
|
c16027 |
buf->buf_state = CLC_STATE_DONE;
|
|
|
c16027 |
+ } else {
|
|
|
c16027 |
+ csn_init_by_csn(buf->buf_current_csn, anchorcsn);
|
|
|
c16027 |
+ csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
|
|
|
c16027 |
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn",
|
|
|
c16027 |
+ "anchor is now: %s\n", (char *)buf->buf_key.data);
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
return buf->buf_state;
|
|
|
c16027 |
@@ -675,7 +743,6 @@ clcache_skip_change ( CLC_Buffer *buf )
|
|
|
c16027 |
int skip = 1;
|
|
|
c16027 |
int i;
|
|
|
c16027 |
char buf_cur_csn_str[CSN_STRSIZE];
|
|
|
c16027 |
- char oth_csn_str[CSN_STRSIZE];
|
|
|
c16027 |
|
|
|
c16027 |
do {
|
|
|
c16027 |
|
|
|
c16027 |
@@ -688,25 +755,14 @@ clcache_skip_change ( CLC_Buffer *buf )
|
|
|
c16027 |
* legacy consumer. In this case the supplier
|
|
|
c16027 |
* and the consumer may have the same RID.
|
|
|
c16027 |
*/
|
|
|
c16027 |
- if (rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID){
|
|
|
c16027 |
- CSN *cons_maxcsn = NULL;
|
|
|
c16027 |
-
|
|
|
c16027 |
- ruv_get_max_csn(buf->buf_consumer_ruv, &cons_maxcsn);
|
|
|
c16027 |
- if ( csn_compare ( buf->buf_current_csn, cons_maxcsn) > 0 ) {
|
|
|
c16027 |
- /*
|
|
|
c16027 |
- * The consumer must have been "restored" and needs this newer update.
|
|
|
c16027 |
- */
|
|
|
c16027 |
- skip = 0;
|
|
|
c16027 |
- } else if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
c16027 |
+ if (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID){
|
|
|
c16027 |
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
c16027 |
csn_as_string(buf->buf_current_csn, 0, buf_cur_csn_str);
|
|
|
c16027 |
- csn_as_string(cons_maxcsn, 0, oth_csn_str);
|
|
|
c16027 |
slapi_log_error(SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
c16027 |
- "Skipping update because the changelog buffer current csn [%s] is "
|
|
|
c16027 |
- "less than or equal to the consumer max csn [%s]\n",
|
|
|
c16027 |
- buf_cur_csn_str, oth_csn_str);
|
|
|
c16027 |
+ "Skipping update because the consumer with Rid: [%d] is "
|
|
|
c16027 |
+ "ignored\n", rid);
|
|
|
c16027 |
buf->buf_skipped_csn_gt_cons_maxcsn++;
|
|
|
c16027 |
}
|
|
|
c16027 |
- csn_free(&cons_maxcsn);
|
|
|
c16027 |
break;
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
@@ -821,6 +877,7 @@ clcache_free_cscb ( struct csn_seq_ctrl_block ** cscb )
|
|
|
c16027 |
csn_free ( & (*cscb)->consumer_maxcsn );
|
|
|
c16027 |
csn_free ( & (*cscb)->local_maxcsn );
|
|
|
c16027 |
csn_free ( & (*cscb)->prev_local_maxcsn );
|
|
|
c16027 |
+ csn_free ( & (*cscb)->local_mincsn );
|
|
|
c16027 |
slapi_ch_free ( (void **) cscb );
|
|
|
c16027 |
}
|
|
|
c16027 |
|
|
|
c16027 |
@@ -1003,6 +1060,15 @@ clcache_cursor_get ( DBC *cursor, CLC_Buffer *buf, int flag )
|
|
|
c16027 |
{
|
|
|
c16027 |
int rc;
|
|
|
c16027 |
|
|
|
c16027 |
+ if (buf->buf_data.ulen > WORK_CLC_BUFFER_PAGE_SIZE) {
|
|
|
c16027 |
+ /*
|
|
|
c16027 |
+ * The buffer size had to be increased,
|
|
|
c16027 |
+ * reset it to a smaller working size,
|
|
|
c16027 |
+ * if not sufficient it will be increased again
|
|
|
c16027 |
+ */
|
|
|
c16027 |
+ buf->buf_data.ulen = WORK_CLC_BUFFER_PAGE_SIZE;
|
|
|
c16027 |
+ }
|
|
|
c16027 |
+
|
|
|
c16027 |
rc = cursor->c_get ( cursor,
|
|
|
c16027 |
& buf->buf_key,
|
|
|
c16027 |
& buf->buf_data,
|
|
|
c16027 |
diff --git a/ldap/servers/plugins/replication/cl5_clcache.h b/ldap/servers/plugins/replication/cl5_clcache.h
|
|
|
c16027 |
index 4c459ab..75b2817 100644
|
|
|
c16027 |
--- a/ldap/servers/plugins/replication/cl5_clcache.h
|
|
|
c16027 |
+++ b/ldap/servers/plugins/replication/cl5_clcache.h
|
|
|
c16027 |
@@ -23,7 +23,7 @@ typedef struct clc_buffer CLC_Buffer;
|
|
|
c16027 |
int clcache_init ( DB_ENV **dbenv );
|
|
|
c16027 |
void clcache_set_config ();
|
|
|
c16027 |
int clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV *consumer_ruv, const RUV *local_ruv );
|
|
|
c16027 |
-int clcache_load_buffer ( CLC_Buffer *buf, CSN *startCSN, int flag );
|
|
|
c16027 |
+int clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN );
|
|
|
c16027 |
void clcache_return_buffer ( CLC_Buffer **buf );
|
|
|
c16027 |
int clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **data, size_t *datalen, CSN **csn );
|
|
|
c16027 |
void clcache_destroy ();
|
|
|
c16027 |
--
|
|
|
c16027 |
2.4.11
|
|
|
c16027 |
|